Building a real time NYC subway tracker with Apache Kafka

Written by leihetito | Published 2018/07/20
Tech Story Tags: big-data | apache-kafka | api | python | nyc-subway-tracker

TLDRvia the TL;DR App

A practical use case of Apache Kafka

Data come in different shapes and forms. In the real world, a lot of data are generated in streams. Streaming data is a continuous flow of events generated by various sources. With the ubiquitousness of mobile devices, social media, IoT and big data platforms, more and more data sources offer streaming data for analytics consumption. Real time analytics of streaming data differs from batch data in that analytics are updated based on a few new data events, instead of the entire dataset.

Some examples of streaming data are

  1. Facebook/Instagram/Twitter posts
  2. Sensor data sent from airplane engines
  3. Time series data from financial stocks trading on exchanges

In recent years, Apache Kafka has become the technology of choice when it comes to working with streaming data. It offers persistent storage that guarantees message ordering, and a pub-sub semantics that are easy to reason with. It can handle terabyte throughput easily and is a popular choice for any company that works with massive amount of event-driven data.

MTA, the agency operating NYC subways, publishes real time subway trip updates via a set of RESTful APIs that are available to the public.

In this post we will use Apache Kafka to build a real time NYC subway tracker that shows you when the next train will arrive in the station. Here at Cloudbox Labs, building cloud native applications is our business. In that spirit, we will host our Kafka cluster on Amazon Web Services (AWS) and our application on EC2.

The overall architecture of our system looks like this

In order to interact with Kafka pub-sub model, we will write a message producer that generates message streams and publish them onto Kafka. MTA’s data stream refreshes every 30 seconds so our producer will query the API for the latest subway trip updates and publish these updates to a Kafka topic. On the other end we will spin up a message consumer that listens on a particular topic and process the update messages as they come through. The consumer keeps an in-memory data structure that stores timing of the next arrival train for each station by subway line.

For demo purpose our Kafka cluster setup is fairly simple. We set up a two node Kafka cluster so we can take advantage of its distributed and replicated storage capability. We created one topic that will store our message queue. With that we are ready to interact with the Kafka cluster.

MTA’s trip update API uses protobuf in gtfs-realtime feed specification so the data has a standard format. In order to parse the response from the API we need to first auto-generate the python protobuf classes based on gtfs-realtime.proto

python -m grpc_tools.protoc -Iprotos/ --python_out=. --grpc_python_out=. protos/gtfs-realtime.proto

Using the Confluent Kafka python client, writing Kafka producer and consumer are fairly easy. Here is the code snippet of our trip update producer

import time

import requestsfrom google.protobuf.json_format import MessageToJsonfrom confluent_kafka import Producer

import gtfs_realtime_pb2

class MTARealTime(object):

def \_\_init\_\_(self):  
    with open('.mta\_api\_key', 'r') as key\_in:  
        self.api\_key = key\_in.read().strip()  

    self.mta\_api\_url = 'http://datamine.mta.info/mta\_esi.php?key={}&feed\_id=1'.format(  
        self.api\_key)  
    self.kafka\_topic = 'test'  
    self.kafka\_producer = Producer({'bootstrap.servers': 'localhost:9092'})  

def produce\_trip\_updates(self):  
    feed = gtfs\_realtime\_pb2.FeedMessage()  
    response = requests.get(self.mta\_api\_url)  
    feed.ParseFromString(response.content)  

    for entity in feed.entity:  
        if entity.HasField('trip\_update'):  
            update\_json = MessageToJson(entity.trip\_update)  
            self.kafka\_producer.produce(  
                self.kafka\_topic, update\_json.encode('utf-8'))  

    self.kafka\_producer.flush()  

def run(self):  
    while True:  
        self.produce\_trip\_updates()  
        time.sleep(30)

And here is our trip update consumer

import csvfrom collections import defaultdictimport json

import arrowfrom confluent_kafka import Consumer, KafkaError

class MTATrainTracker(object):

def \_\_init\_\_(self):  
    self.kafka\_consumer = Consumer({  
        'bootstrap.servers': 'localhost:9092',  
        'group.id': 'test\_consumer\_group',  
        'default.topic.config': {  
            'auto.offset.reset': 'smallest'  
        }  
    })  
    self.kafka\_topic = 'test'  

    # subway line number -> (stop\_id, direction) -> next arrival time  
    self.arrival\_times = defaultdict(lambda: defaultdict(lambda: -1))  

    self.stations = {}  
    with open('static/mta\_stations.csv') as csvf:  
        reader = csv.DictReader(csvf)  
        for row in reader:  
            self.stations\[row\['GTFS Stop ID'\]\] = row\['Stop Name'\]  

def process\_message(self, message):  
    trip\_update = json.loads(message)  

    trip\_header = trip\_update.get('trip')  
    if not trip\_header:  
        return  

    route\_id = trip\_header\['routeId'\]  
    stop\_time\_updates = trip\_update.get('stopTimeUpdate')  
    if not stop\_time\_updates:  
        return  

    for update in stop\_time\_updates:  
        if 'arrival' not in update or 'stopId' not in update:  
            continue  

        stop\_id, direction = update\['stopId'\]\[0:3\], update\['stopId'\]\[3:\]  
        new\_arrival\_ts = int(update\['arrival'\]\['time'\])  

        next\_arrival\_ts = self.arrival\_times\[route\_id\]\[(stop\_id, direction)\]  
        now = arrow.now(tz='US/Eastern')  

        if new\_arrival\_ts >= now.timestamp and \\  
                (next\_arrival\_ts == -1 or new\_arrival\_ts < next\_arrival\_ts):  
            self.arrival\_times\[route\_id\]\[(stop\_id, direction)\] = new\_arrival\_ts  

            # convert time delta to minutes  
            time\_delta = arrow.get(new\_arrival\_ts) - now  
            minutes = divmod(divmod(time\_delta.seconds, 3600)\[1\], 60)\[0\]  
            print('Next {} bound {} train will arrive at station {} in {} minutes'.format(  
                direction, route\_id, self.stations\[stop\_id\], minutes))  

def run(self):  
    self.kafka\_consumer.subscribe(\[self.kafka\_topic\])  

    while True:  
        msg = self.kafka\_consumer.poll(1.0)  

        if msg is None or not msg.value():  
            continue  
        if msg.error() and msg.error().code() != KafkaError.\_PARTITION\_EOF:  
            raise ValueError('Kafka consumer exception: {}'.format(msg.error()))  

        msg = msg.value()  
        self.process\_message(msg.decode('utf-8'))

Both the Kafka cluster and the application components run on separate EC2 on AWS. Once we spin up all the pieces, we can see that the consumer is listing the next train arrival time for each station

Next S bound 1 train will arrive at station Rector St in 0 minutes

Next S bound 1 train will arrive at station South Ferry in 2 minutes

Next S bound 1 train will arrive at station Cortlandt St in 0 minutes

Next N bound 6 train will arrive at station Spring St in 4 minutes

Next N bound 6 train will arrive at station Bleecker St in 5 minutes

Next N bound 6 train will arrive at station Astor Pl in 6 minutes

Streaming data has major applications in modern data analytics. At Cloudbox Labs, we love building stream processing platforms so our clients can analyze data faster and at scale.

As always you can find the full code discussed in this post on Cloudbox Labs github.

Tags: Amazon Web Services, Apache Kafka, gtfs-realtime, NYC subway tracker, protocol buffer, rest API, streaming data


Published by HackerNoon on 2018/07/20