This is a self-contained demo using Materialize to process data IoT devices data directly from a PostgreSQL server.
The demo builds up on the How to build AdonisJS API to store your Raspberry Pi Temperature tutorial.
The data is generated by a Raspberry Pi temperature mock service simulating 50 devices reporting to an AdonisJS API mock service.
Finally, we will create a sink to let us stream the data out of Materialize to a Redpanda topic.
Before you get started, you need to make sure that you have Docker and Docker Compose installed.
You can follow the steps here on how to install Docker:
In this demo, we’ll look at monitoring the temperature of a set of Raspberry Pi devices and extracting some insights from them, and streaming the data out to an external source.
The main source of data is a Raspberry Pi Mock service, that simulates 50 devices reporting their CPU temperature to a mock API service built with AdonisJS.
The mock service generates about ~25 new requests to the mock API service every second.
For more information on how the mock services works along with the AdonisJS API, you can follow the How to build AdonisJS API to store your Raspberry Pi Temperature tutorial.
The API mock service receives the data from the 50 simulated Raspberry Pi and stores the data from each request in a PostgreSQL instance.
The data that is being received with each request is:
The Mock API will save all data in a table called sensors
. The columns of the sensors
table are:
name
timestamp
temperature
Materialize presents an interface to ingest the temperature data from the PostgreSQL database.
In this demo, we are going to use Materialize to:
pgcli
.Clone the repository:
git clone https://github.com/bobbyiliev/mz-raspberry-pi-temperature.git
Access the directory:
cd mz-raspberry-pi-temperature
Build the Raspberry Pi Mock images:
docker-compose build
Start all of the services:
docker-compose up -d
docker-compose run mzcli
To create a PostgreSQL Materialize Source execute the following statement:
CREATE MATERIALIZED SOURCE "mz_source" FROM POSTGRES
CONNECTION 'user=postgres port=5432 host=postgres dbname=postgres password=postgres'
PUBLICATION 'mz_source';
A quick rundown of the above statement:
MATERIALIZED
: Materializes the PostgreSQL source’s data. All of the data is retained in memory and makes sources directly selectable.mz_source
: The name for the PostgreSQL source.CONNECTION
: The PostgreSQL connection parameters.PUBLICATION
: The PostgreSQL publication, containing the tables to be streamed to Materialize.Once we've created the PostgreSQL source, in order to be able to query the PostgreSQL tables, we would need to create views that represent the upstream publication’s original tables. In our case, we only have one table called sensors
so the statement that we would need to execute is:
CREATE VIEWS FROM SOURCE mz_source (sensors);
To see the available views execute the following statement:
SHOW FULL VIEWS;
Once that is done, you can query the new view directly:
SELECT * FROM sensors;
Next, let's go ahead and create a few more views.
If you wish you can enable timing so we could actually see how long it takes for each statement to be executed:
\timing
Example 1: Create a materialized view to show the total number of sensors data:
CREATE MATERIALIZED VIEW mz_count AS SELECT count(*) FROM sensors;
Querying the mz_count
view:
SELECT * FROM mz_count;
Output:
count
-------
34565
(1 row)
Time: 2.299 ms
Example 2: Create a view to show the average temperature of all sensors:
CREATE MATERIALIZED VIEW mz_total_avg AS SELECT avg(temperature::float) FROM sensors;
Query the mz_total_avg
:
SELECT * FROM mz_total_avg;
Output:
avg
-------------------
59.02989081226408
(1 row)
Time: 2.984 ms
Example 3: Create a view to show the average temperature of each separate sensor:
CREATE MATERIALIZED VIEW average AS
SELECT name::text, avg(temperature::float) AS temp
FROM sensors
GROUP BY (name);
Let's again query the average
view:
SELECT * FROM average LIMIT 10;
Output:
name | temp
--------------+--------------------
raspberry-1 | 58.60756530123859
raspberry-2 | 58.95694631912029
raspberry-3 | 58.628198038515066
raspberry-4 | 59.40673999174753
raspberry-5 | 59.079367226960734
raspberry-6 | 58.96244838239402
raspberry-7 | 58.4658871719401
raspberry-8 | 58.9830811196705
raspberry-9 | 59.398486896836936
raspberry-10 | 59.669463513068024
(10 rows)
Time: 2.353 ms
Feel free to experiment by creating more materialized views.
Sinks let you send data from Materialize to an external source.
For this demo, we will be using Redpanda.
Redpanda is a Kafka API-compatible and Materialize can process data from it just as it would process data from a Kafka source.
Let's create a materialized view, that will hold all of the devices with an average temperature of more than 60 celsius:
CREATE MATERIALIZED VIEW mz_high_temperature AS
SELECT * FROM average WHERE temp > 60;
If you were to do a SELECT
on this new materialized view, it would return only the devices with an average temperature of above 60 celsius:
SELECT * FROM mz_high_temperature;
Let's create a Sink where we will send the data of the above materialized view:
CREATE SINK high_temperature_sink
FROM mz_high_temperature
INTO KAFKA BROKER 'redpanda:9092' TOPIC 'high-temperature-sink'
FORMAT AVRO USING
CONFLUENT SCHEMA REGISTRY 'http://redpanda:8081';
Now if you were to connect to the Redpanda container and use the rpk topic consume
command, you will be able to read the records from the topic.
However, as of the time being, we won’t be able to preview the results with rpk
because it’s AVRO formatted. Redpanda would most likely implement this in the future, but for the moment, we can actually stream the topic back into Materialize to confirm the format.
First, get the name of the topic that has been automatically generated:
SELECT topic FROM mz_kafka_sinks;
Output:
topic
-----------------------------------------------------------------
high-temperature-sink-u12-1637586945-13670686352905873426
For more information on how the topic names are generated check out the documentation here.
Then create a new Materialized Source from this Redpanda topic:
CREATE MATERIALIZED SOURCE high_temp_test
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'high-temperature-sink-u12-1637586945-13670686352905873426'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://redpanda:8081';
Make sure to change the topic name accordingly!
Finally, query this new materialized view:
SELECT * FROM high_temp_test LIMIT 2;
Now that you have the data in the topic, you can have other services connect to it and consume it and then trigger emails or alerts for example.
In order to access the Metabase instance visit http://localhost:3030
if you are running the demo locally or http://your_server_ip:3030
if you are running the demo on a server. Then follow the steps to complete the Metabase setup.
To connect to your Materialize database, specify the following connection properties:
Field | Value
----------------- | ----------------
Database | PostgreSQL
Name | user_reviews
Host | materialized
Port | 6875
Database name | materialize
Database username | materialize
Database password | Leave empty
Once ready you will be able to visualize your data just as you would with a standard PostgreSQL database.
This is a simple example of how to use the direct PostgreSQL connection with Materialize and stream data into a Kafka/Redpanda topic.
In most cases, you would not store your IoT devices data in a PostgreSQL database, but in an S3 bucket or a Kafka topic. So the setup could be similar to the following:
S3 Bucket example:
Redpanda/Kafka example:
To stop all of the services run the following command:
docker-compose down
CREATE SOURCE: PostgreSQL
CREATE SOURCE
CREATE VIEWS
SELECT
Also Published Here