paint-brush
FastAPI with Materialize for Real-Time Data Processing: An Essential Guideby@bobbyiliev
243 reads

FastAPI with Materialize for Real-Time Data Processing: An Essential Guide

by Bobby IlievFebruary 17th, 2022
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

This is a self-contained demo of FastAPI and [Materialize.com. The demo project contains the following components: [FastAPI] [Redpanda] and [materialize]. A mock service is written in [BASH for producing records to a Redpanda topic. The mock service simulates data of air quality readings of IoT devices. We are using the 'mz-fastapi-demo' function rather than the `mz_logical' function.

Company Mentioned

Mention Thumbnail
featured image - FastAPI with Materialize for Real-Time Data Processing: An Essential Guide
Bobby Iliev HackerNoon profile picture

This is a self-contained demo of FastAPI and Materialize.


This demo project contains the following components:


Diagram

FastAPI and Materialize Demo

Running the demo

Clone the repository:

git clone https://github.com/bobbyiliev/materialize-tutorials.git

Access the FastAPI demo project directory:

cd mz-fastapi-demo

Pull all Docker images:

docker-compose pull

Build the project:

docker-compose build

Finally, run all containers:

docker-compose up

Create the Materialize sources and views

Once the demo is running, you can create the Materialize sources and views.

Let's start by creating a Redpanda/Kafka source:

CREATE SOURCE sensors
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'sensors'
FORMAT BYTES;


Then create a non-materialized view which you can think of essentially an alias that we will use to create our materialized views. The non-materialized views do not store the results of the query:

CREATE VIEW sensors_data AS
    SELECT
        *
    FROM (
        SELECT
            (data->>'id')::int AS id,
            (data->>'pm25')::double AS pm25,
            (data->>'pm10')::double AS pm10,
            (data->>'geo_lat')::double AS geo_lat,
            (data->>'geo_lon')::double AS geo_lon,
            (data->>'timestamp')::double AS timestamp
        FROM (
            SELECT CAST(data AS jsonb) AS data
            FROM (
                SELECT convert_from(data, 'utf8') AS data
                FROM sensors
            )
        )
    );


After that, create a materialized view that will hold all records in the last 10 minutes:

CREATE MATERIALIZED VIEW sensors_view AS
    SELECT
        *
    FROM sensors_data
    WHERE
        mz_logical_timestamp() < (timestamp*1000 + 100000)::numeric;

Note that we are using the mz_logical_timestamp() function rather than the now() function. This is because in Materialize now() doesn’t represent the system time, as it does in most systems; it represents the time with timezone when the query was executed. It cannot be used when creating views. For more information, see the documentation here.


Next, let's create materialized view that will only include data from the last second so we can see the dataflow and use it for our Server-Sent Events (SSE) demo later on:

CREATE MATERIALIZED VIEW sensors_view_1s AS
    SELECT
        *
    FROM sensors_data
    WHERE
        mz_logical_timestamp() < (timestamp*1000 + 6000)::numeric;


With that our materialized views are ready and we can visit the FastAPI demo project in the browser!

FastAPI Demo

Finally, visit the FastAPI demo app via your browser:


  • Endpoint for all records in the last 10 minutes:

http://localhost/sensors


  • SSE Endpoint streaming the latest records as they are generated using TAIL:

http://localhost/stream


Example response:

SSE FastAPI with Materialize

Materialize Cloud

If you want to run the demo on the cloud, you would need the following:


  • A publicly accessible Redpanda/Kafka instance so that you can connect to it.
  • A Materialize Cloud account. You can sign up for a free Materialize Cloud account to get started with Materialize Cloud.


If you already have that setup, you would need to make the following changes to the demo project:


  • When creating the source, change the redpanda:9092 to your Redpanda/Kafka instance:
CREATE SOURCE sensors
FROM KAFKA BROKER 'your_redpanda_instance:9092' TOPIC 'sensors'
FORMAT BYTES;
  • Change the DATABASE_URL environment variable to your Materialize Cloud database URL and uncomment the certificate-specific environment variables in the docker-compose.yml file. in the docker-compose.yml file.
  • Download the Materialize instance certificate files from your Materialize Cloud dashboard.

Stop the demo

To stop the demo, run the following command:

docker-compose down -v

You can also stop only the data generation container:

docker-compose stop datagen

Helpful Links