Code: https://github.com/tspannhw/spring-pulsar-airquality
This uses https://github.com/spring-projects-experimental/spring-pulsar
This is how we can easily utilize the new Spring library for Pulsar applications.
import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Schema; import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.CommandLineRunner;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.pulsar.core.PulsarProducerFactory;import org.springframework.pulsar.core.PulsarTemplate; @Autowired private PulsarProducerFactory<Observation> producerFactory; PulsarTemplate<Observation> pulsarTemplate = new PulsarTemplate<>(producerFactory); pulsarTemplate.setSchema(Schema.JSON(Observation.class)); MessageId msgid = pulsarTemplate.newMessage(observation2) .withMessageCustomizer((mb) -> mb.key(uuidKey.toString())) .withTopic(topicName) .send();
That's all that is required to send messages with a schema from Spring to Apache Pulsar, very easy.
Once the data is in Apache Pulsar, we can easily query it with Apache Spark. An example run is included below.
val dfPulsar = spark.readStream.format("pulsar").option("service.url", "pulsar://pulsar1:6650").option("admin.url", "http://pulsar1:8080").option("topic", "persistent://public/default/airquality").load() dfPulsar.printSchema()
root |-- additionalProperties: map (nullable = true) | |-- key: string | |-- value: struct (valueContainsNull = false) |-- aqi: integer (nullable = true) |-- category: struct (nullable = true) | |-- additionalProperties: map (nullable = true) | | |-- key: string | | |-- value: struct (valueContainsNull = false) | |-- name: string (nullable = true) | |-- number: integer (nullable = true) |-- dateObserved: string (nullable = true) |-- hourObserved: integer (nullable = true) |-- latitude: double (nullable = true) |-- localTimeZone: string (nullable = true) |-- longitude: double (nullable = true) |-- parameterName: string (nullable = true) |-- reportingArea: string (nullable = true) |-- stateCode: string (nullable = true) |-- __key: binary (nullable = true) |-- __topic: string (nullable = true) |-- __messageId: binary (nullable = true) |-- __publishTime: timestamp (nullable = true) |-- __eventTime: timestamp (nullable = true) |-- __messageProperties: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true)
val pQuery = dfPulsar.selectExpr("*").writeStream.format("console").option("truncate", false).start() val pQuery = dfPulsar.selectExpr("CAST(__key AS STRING)", "CAST(aqi AS INTEGER)", "CAST(dateObserved AS STRING)", "CAST(hourObserved AS INTEGER)", "CAST(latitude AS DOUBLE)", "CAST(localTimeZone AS STRING)", "CAST(longitude AS DOUBLE)", "CAST(parameterName AS STRING)", "CAST(reportingArea AS STRING)", "CAST(stateCode AS STRING)") .as[(String, Integer, String, Integer, Double, String, Double, String, String, String)] .writeStream.format("csv") .option("truncate", "false") .option("header", true) .option("path", "/opt/demo/airquality") .option("checkpointLocation", "/tmp/checkpoint") .start()
pQuery.explain()
As you can see below it is very easy to build a SQL query against our streaming data live.
select aqi, parameterName, dateObserved, hourObserved, latitude, longitude, localTimeZone, stateCode, reportingArea from airquality select max(aqi) as MaxAQI, parameterName, reportingArea from airquality group by parameterName, reportingArea; select max(aqi) as MaxAQI, min(aqi) as MinAQI, avg(aqi) as AvgAQI, count(aqi) as RowCount, parameterName, reportingArea from airquality group by parameterName, reportingArea; Command Line Consumer ----- got message ----- key:[2e725251-c977-46b4-ae81-45a675a1a473], properties:[], content:{"dateObserved":"2022-04-07 ","hourObserved":13,"localTimeZone":"EST","reportingArea":"Atlanta","stateCode":"GA","latitude":33.65,"longitude":-84.43,"parameterName":"O3","aqi":40,"category":{"number":1,"name":"Good","additionalProperties":{}},"additionalProperties":{}} ----- got message ----- key:[c0ec765d-38b9-4416-bb27-daa80e7654ff], properties:[], content:{"dateObserved":"2022-04-07 ","hourObserved":13,"localTimeZone":"EST","reportingArea":"Atlanta","stateCode":"GA","latitude":33.65,"longitude":-84.43,"parameterName":"PM2.5","aqi":18,"category":{"number":1,"name":"Good","additionalProperties":{}},"additionalProperties":{}} ----- got message ----- key:[7a7f567a-b9d7-470e-992e-86a2c24c9ce8], properties:[], content:{"dateObserved":"2022-04-07 ","hourObserved":13,"localTimeZone":"EST","reportingArea":"Atlanta","stateCode":"GA","latitude":33.65,"longitude":-84.43,"parameterName":"PM10","aqi":17,"category":{"number":1,"name":"Good","additionalProperties":{}},"additionalProperties":{}}
For more resources see below.
Connecting to Data for Updates/Lookup - ScyllaDB
https://github.com/tspannhw/airquality-datastore
Resources
Also Published Here