Data come in different shapes and forms. In the real world, a lot of data are generated in streams. Streaming data is a continuous flow of events generated by various sources. With the ubiquitousness of mobile devices, social media, IoT and big data platforms, more and more data sources offer streaming data for analytics consumption. Real time analytics of streaming data differs from batch data in that analytics are updated based on a few new data events, instead of the entire dataset.
Some examples of streaming data are
In recent years, Apache Kafka has become the technology of choice when it comes to working with streaming data. It offers persistent storage that guarantees message ordering, and a pub-sub semantics that are easy to reason with. It can handle terabyte throughput easily and is a popular choice for any company that works with massive amount of event-driven data.
MTA, the agency operating NYC subways, publishes real time subway trip updates via a set of RESTful APIs that are available to the public.
In this post we will use Apache Kafka to build a real time NYC subway tracker that shows you when the next train will arrive in the station. Here at Cloudbox Labs, building cloud native applications is our business. In that spirit, we will host our Kafka cluster on Amazon Web Services (AWS) and our application on EC2.
The overall architecture of our system looks like this
In order to interact with Kafka pub-sub model, we will write a message producer that generates message streams and publish them onto Kafka. MTA’s data stream refreshes every 30 seconds so our producer will query the API for the latest subway trip updates and publish these updates to a Kafka topic. On the other end we will spin up a message consumer that listens on a particular topic and process the update messages as they come through. The consumer keeps an in-memory data structure that stores timing of the next arrival train for each station by subway line.
For demo purpose our Kafka cluster setup is fairly simple. We set up a two node Kafka cluster so we can take advantage of its distributed and replicated storage capability. We created one topic that will store our message queue. With that we are ready to interact with the Kafka cluster.
MTA’s trip update API uses protobuf in gtfs-realtime feed specification so the data has a standard format. In order to parse the response from the API we need to first auto-generate the python protobuf classes based on gtfs-realtime.proto
python -m grpc_tools.protoc -Iprotos/ --python_out=. --grpc_python_out=. protos/gtfs-realtime.proto
Using the Confluent Kafka python client, writing Kafka producer and consumer are fairly easy. Here is the code snippet of our trip update producer
import time
import requestsfrom google.protobuf.json_format import MessageToJsonfrom confluent_kafka import Producer
import gtfs_realtime_pb2
class MTARealTime(object):
def \_\_init\_\_(self):
with open('.mta\_api\_key', 'r') as key\_in:
self.api\_key = key\_in.read().strip()
self.mta\_api\_url = 'http://datamine.mta.info/mta\_esi.php?key={}&feed\_id=1'.format(
self.api\_key)
self.kafka\_topic = 'test'
self.kafka\_producer = Producer({'bootstrap.servers': 'localhost:9092'})
def produce\_trip\_updates(self):
feed = gtfs\_realtime\_pb2.FeedMessage()
response = requests.get(self.mta\_api\_url)
feed.ParseFromString(response.content)
for entity in feed.entity:
if entity.HasField('trip\_update'):
update\_json = MessageToJson(entity.trip\_update)
self.kafka\_producer.produce(
self.kafka\_topic, update\_json.encode('utf-8'))
self.kafka\_producer.flush()
def run(self):
while True:
self.produce\_trip\_updates()
time.sleep(30)
And here is our trip update consumer
import csvfrom collections import defaultdictimport json
import arrowfrom confluent_kafka import Consumer, KafkaError
class MTATrainTracker(object):
def \_\_init\_\_(self):
self.kafka\_consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'test\_consumer\_group',
'default.topic.config': {
'auto.offset.reset': 'smallest'
}
})
self.kafka\_topic = 'test'
# subway line number -> (stop\_id, direction) -> next arrival time
self.arrival\_times = defaultdict(lambda: defaultdict(lambda: -1))
self.stations = {}
with open('static/mta\_stations.csv') as csvf:
reader = csv.DictReader(csvf)
for row in reader:
self.stations\[row\['GTFS Stop ID'\]\] = row\['Stop Name'\]
def process\_message(self, message):
trip\_update = json.loads(message)
trip\_header = trip\_update.get('trip')
if not trip\_header:
return
route\_id = trip\_header\['routeId'\]
stop\_time\_updates = trip\_update.get('stopTimeUpdate')
if not stop\_time\_updates:
return
for update in stop\_time\_updates:
if 'arrival' not in update or 'stopId' not in update:
continue
stop\_id, direction = update\['stopId'\]\[0:3\], update\['stopId'\]\[3:\]
new\_arrival\_ts = int(update\['arrival'\]\['time'\])
next\_arrival\_ts = self.arrival\_times\[route\_id\]\[(stop\_id, direction)\]
now = arrow.now(tz='US/Eastern')
if new\_arrival\_ts >= now.timestamp and \\
(next\_arrival\_ts == -1 or new\_arrival\_ts < next\_arrival\_ts):
self.arrival\_times\[route\_id\]\[(stop\_id, direction)\] = new\_arrival\_ts
# convert time delta to minutes
time\_delta = arrow.get(new\_arrival\_ts) - now
minutes = divmod(divmod(time\_delta.seconds, 3600)\[1\], 60)\[0\]
print('Next {} bound {} train will arrive at station {} in {} minutes'.format(
direction, route\_id, self.stations\[stop\_id\], minutes))
def run(self):
self.kafka\_consumer.subscribe(\[self.kafka\_topic\])
while True:
msg = self.kafka\_consumer.poll(1.0)
if msg is None or not msg.value():
continue
if msg.error() and msg.error().code() != KafkaError.\_PARTITION\_EOF:
raise ValueError('Kafka consumer exception: {}'.format(msg.error()))
msg = msg.value()
self.process\_message(msg.decode('utf-8'))
Both the Kafka cluster and the application components run on separate EC2 on AWS. Once we spin up all the pieces, we can see that the consumer is listing the next train arrival time for each station
Next S bound 1 train will arrive at station Rector St in 0 minutes
Next S bound 1 train will arrive at station South Ferry in 2 minutes
Next S bound 1 train will arrive at station Cortlandt St in 0 minutes
Next N bound 6 train will arrive at station Spring St in 4 minutes
Next N bound 6 train will arrive at station Bleecker St in 5 minutes
Next N bound 6 train will arrive at station Astor Pl in 6 minutes
Streaming data has major applications in modern data analytics. At Cloudbox Labs, we love building stream processing platforms so our clients can analyze data faster and at scale.
As always you can find the full code discussed in this post on Cloudbox Labs github.
Tags: Amazon Web Services, Apache Kafka, gtfs-realtime, NYC subway tracker, protocol buffer, rest API, streaming data