paint-brush
Spring Boot with Apache Pulsar 101by@tspann
1,131 reads
1,131 reads

Spring Boot with Apache Pulsar 101

by Tim Spann
Tim Spann HackerNoon profile picture

Tim Spann

@tspann

Tim Spann is a Developer Advocate for StreamNative

October 13th, 2022
Read on Terminal Reader
Read this story in a terminal
Print this story
Read this story w/o Javascript
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

Using the New Spring Boot - Apache Pulsar Integration-likelyCode: <https://github.com/tspannhw/spring-pulsar-airquality> New Spring Module for Apache PulsarThis uses the Spring Boot 3.0.0. The code is a basic Spring-friendly API for developing Apache Pulsars applications. It's super easy to build a topic with Spring Administration with Spring. It's also easy with the new library for Pulsear Administration.

Companies Mentioned

Mention Thumbnail
AIR
Mention Thumbnail
Apache
featured image - Spring Boot with Apache Pulsar 101
1x
Read by Dr. One voice-avatar

Listen to this story

Tim Spann HackerNoon profile picture
Tim Spann

Tim Spann

@tspann

Tim Spann is a Developer Advocate for StreamNative

About @tspann
LEARN MORE ABOUT @TSPANN'S
EXPERTISE AND PLACE ON THE INTERNET.


Using the New Spring Boot - Apache Pulsar Integration


Code: https://github.com/tspannhw/spring-pulsar-airquality


New Spring Module for Apache Pulsar

This uses https://github.com/spring-projects-experimental/spring-pulsar

This is how we can easily utilize the new Spring library for Pulsar applications.


image


Setup


  • Visual Code with Spring Boot 3.0.0. & Java JDK 17
  • Apache Pulsar Version 2.10.1 works with 2.9.1+
  • Set an environment variable with your API key code from AirNow
  • Point to your Apache Pulsar cluster, if you are using StreamNative cloud I have SSL and configuration in the config class src/main/resources/application.yml


Basic Java Source Code


import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Schema; import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.CommandLineRunner;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.pulsar.core.PulsarProducerFactory;import org.springframework.pulsar.core.PulsarTemplate; @Autowired private PulsarProducerFactory<Observation> producerFactory; PulsarTemplate<Observation> pulsarTemplate = new PulsarTemplate<>(producerFactory); pulsarTemplate.setSchema(Schema.JSON(Observation.class)); MessageId msgid = pulsarTemplate.newMessage(observation2) .withMessageCustomizer((mb) -> mb.key(uuidKey.toString())) .withTopic(topicName) .send();


That's all that is required to send messages with a schema from Spring to Apache Pulsar, very easy.


image


Spark Run


Once the data is in Apache Pulsar, we can easily query it with Apache Spark. An example run is included below.


val dfPulsar = spark.readStream.format("pulsar").option("service.url", "pulsar://pulsar1:6650").option("admin.url", "http://pulsar1:8080").option("topic", "persistent://public/default/airquality").load() dfPulsar.printSchema()

root |-- additionalProperties: map (nullable = true) | |-- key: string | |-- value: struct (valueContainsNull = false) |-- aqi: integer (nullable = true) |-- category: struct (nullable = true) | |-- additionalProperties: map (nullable = true) | | |-- key: string | | |-- value: struct (valueContainsNull = false) | |-- name: string (nullable = true) | |-- number: integer (nullable = true) |-- dateObserved: string (nullable = true) |-- hourObserved: integer (nullable = true) |-- latitude: double (nullable = true) |-- localTimeZone: string (nullable = true) |-- longitude: double (nullable = true) |-- parameterName: string (nullable = true) |-- reportingArea: string (nullable = true) |-- stateCode: string (nullable = true) |-- __key: binary (nullable = true) |-- __topic: string (nullable = true) |-- __messageId: binary (nullable = true) |-- __publishTime: timestamp (nullable = true) |-- __eventTime: timestamp (nullable = true) |-- __messageProperties: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true)


val pQuery = dfPulsar.selectExpr("*").writeStream.format("console").option("truncate", false).start() val pQuery = dfPulsar.selectExpr("CAST(__key AS STRING)", "CAST(aqi AS INTEGER)", "CAST(dateObserved AS STRING)", "CAST(hourObserved AS INTEGER)", "CAST(latitude AS DOUBLE)", "CAST(localTimeZone AS STRING)", "CAST(longitude AS DOUBLE)", "CAST(parameterName AS STRING)", "CAST(reportingArea AS STRING)", "CAST(stateCode AS STRING)") .as[(String, Integer, String, Integer, Double, String, Double, String, String, String)] .writeStream.format("csv") .option("truncate", "false") .option("header", true) .option("path", "/opt/demo/airquality") .option("checkpointLocation", "/tmp/checkpoint") .start()

pQuery.explain()


Apache Flink Continuous SQL over Apache Pulsar Topics


As you can see below it is very easy to build a SQL query against our streaming data live.


select aqi, parameterName, dateObserved, hourObserved, latitude, longitude, localTimeZone, stateCode, reportingArea from airquality select max(aqi) as MaxAQI, parameterName, reportingArea from airquality group by parameterName, reportingArea; select max(aqi) as MaxAQI, min(aqi) as MinAQI, avg(aqi) as AvgAQI, count(aqi) as RowCount, parameterName, reportingArea from airquality group by parameterName, reportingArea; Command Line Consumer ----- got message ----- key:[2e725251-c977-46b4-ae81-45a675a1a473], properties:[], content:{"dateObserved":"2022-04-07 ","hourObserved":13,"localTimeZone":"EST","reportingArea":"Atlanta","stateCode":"GA","latitude":33.65,"longitude":-84.43,"parameterName":"O3","aqi":40,"category":{"number":1,"name":"Good","additionalProperties":{}},"additionalProperties":{}} ----- got message ----- key:[c0ec765d-38b9-4416-bb27-daa80e7654ff], properties:[], content:{"dateObserved":"2022-04-07 ","hourObserved":13,"localTimeZone":"EST","reportingArea":"Atlanta","stateCode":"GA","latitude":33.65,"longitude":-84.43,"parameterName":"PM2.5","aqi":18,"category":{"number":1,"name":"Good","additionalProperties":{}},"additionalProperties":{}} ----- got message ----- key:[7a7f567a-b9d7-470e-992e-86a2c24c9ce8], properties:[], content:{"dateObserved":"2022-04-07 ","hourObserved":13,"localTimeZone":"EST","reportingArea":"Atlanta","stateCode":"GA","latitude":33.65,"longitude":-84.43,"parameterName":"PM10","aqi":17,"category":{"number":1,"name":"Good","additionalProperties":{}},"additionalProperties":{}}


image

image

image



For more resources see below.


Connecting to Data for Updates/Lookup - ScyllaDB

https://github.com/tspannhw/airquality-datastore


Resources


Also Published Here


L O A D I N G
. . . comments & more!

About Author

Tim Spann HackerNoon profile picture
Tim Spann@tspann
Tim Spann is a Developer Advocate for StreamNative

TOPICS

THIS ARTICLE WAS FEATURED IN...

Permanent on Arweave
Read on Terminal Reader
Read this story in a terminal
 Terminal
Read this story w/o Javascript
Read this story w/o Javascript
 Lite
Learnrepo
Hashnode
Learnrepo

Mentioned in this story