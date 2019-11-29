Getting Started with Spring Cloud Stream

@ mcclain-pivotal Brian McClain

This post was co-written with Ben Wilcock, Product and Technical Marketing Manager for Spring at Pivotal.

🔔 A file has been uploaded! 🔔

🔔 A new user was registered! 🔔

🔔 An order was placed! 🔔

These sound like events that many parts of our application architecture might be interested in, right? For example, when an order is placed on our website, we’ll need a call to process the payment, a call to reserve inventory, and a call to begin the process of picking, packaging and shipping the product.

For a single order, this isn’t too bad. Our store can make a few requests to these backend services directly and it shouldn’t introduce too much overhead.But what happens if we’re really good at selling our product? Processing 100 orders a second suddenly means our frontend is making three hundred calls per second to our backend services.

If we add one more service to that—say, to report to an internal sales dashboard— now that’s four hundred calls per second. That’s a lot of overhead!

What if instead, we can simply have our website alert our whole architecture at once? It can yell, “Hey! I made a sale” to our whole stack, and any component that’s interested can take the appropriate action. This means we don’t need to update our frontend as we add additional services, and our new services just need to know what to listen for.

Why Spring Cloud Stream?

The above is an example of an event-driven architecture, where instead of reaching out to each service one by one, our services instead emit a change of state. If a file is uploaded, our file service can emit it out to a messaging platform, and then our Super Duper Image Resizer 3000 service can listen for that and automatically generate differently sized profile images. Pivotal’s own Richard Seroter wrote about this very topic in detail, and it’s a great read. In his blog post, Richard talks about messaging as a way of reliably delivering events to many consumers quickly and in volume.

He also touches on something we want to talk about today: Spring Cloud Stream

We’re big fans of both Kafka and RabbitMQ as event streaming platforms, so for this demo we’ll use Kafka. No matter which you choose to use, making it easy to produce and consume events is important for your developers. I’ve used a lot of frameworks that abstract away from the underlying message queue, but none quite as easy and flexible as Spring Cloud Stream.

My teammate Ben Wilcock put together a demo that really shows just how easy it is to get up and running. Let’s take it for a spin—and to follow along, you can download the full source code here

Prepping For The Demo

We only need a couple of things for our demo, which are Docker and Docker Compose, and of course your favorite distribution of the JDK (perhaps even AdoptOpenJDK , which we sponsor). To keep things easy, the demo includes a Docker Compose config that will set up both Kafka and RabbitMQ, though for our purposes we’ll only be using Kafka. We can spin this up with a simple command:

docker-compose up

This will read our docker-compose.yml file, download the necessary container images, run them, and configure them. After just a few moments, Kafka should be up and running and ready to go.

Sending Events

Our demo is made up of two Spring microservices, one to produce events and one to consume them. In our fictional scenario, the message producer will create a stream of applications for bank loans, and our processor will check if those applications should be approved or declined.

loansource directory. Let’s start by producing some messages that will be sent to Kafka, the code for which is in thedirectory.

Loan.java file defines a loan object and the Statuses.java file defines all the states a loan can be in. What’s interesting, though, is the LoansourceApplication.java file, which is what’s actually producing our messages. There are a few files of code here. Thefile defines aobject and thefile defines all the states a loan can be in. What’s interesting, though, is thefile, which is what’s actually producing our messages.

LoansourceApplication.java to see how this works. As you can imagine, Spring and its dependencies handle a lot of the wiring up of components for us automatically. Let’s take a look atto see how this works.

@Bean public Supplier<Loan> supplyLoan () { return () -> { String rName = names.get( new Random().nextInt(names.size())); Long rAmount = amounts.get( new Random().nextInt(amounts.size())); Loan loan = new Loan(UUID.randomUUID().toString(), rName, rAmount); log.info( "{} {} for ${} for {}" , loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName()); return loan; }; }

Supplier<> is a Java function data type. Because there is only one @Bean method that returns this type, Spring Cloud Stream knows exactly what to do next. By default, it will trigger this function once every second and send the result to the default MessageChannel named output . What’s nice about this function method is that it only contains business logic, so you can test it using your favorite testing methods. is a Java function data type. Because there is only onemethod that returns this type, Spring Cloud Stream knows exactly what to do next. By default, it will trigger this function once every second and send the result to the defaultnamed. What’s nice about this function method is that it only contains business logic, so you can test it using your favorite testing methods.

spring.cloud.function.definition property in the application.properties file to explicitly declare which function bean we want to be bound to binding destinations, but for cases when you only have a single @Bean defined, this is not necessary. We could use theproperty in the application.properties file to explicitly declare which function bean we want to be bound to binding destinations, but for cases when you only have a singledefined, this is not necessary.

spring.integration.poller.fixed-delay property in the application.properties file. The only question that remains is, “How does Spring know it’s Kafka we’re writing to?” For that, we take a look at our pom.xml : Likewise, if we wanted to use a different poller interval, we can use theproperty in thefile. The only question that remains is, “How does Spring know it’s Kafka we’re writing to?” For that, we take a look at our

< dependency > < groupId > org.springframework.cloud </ groupId > < artifactId > spring-cloud-stream-binder-kafka </ artifactId > </ dependency >

localhost on the default port, we don’t need to provide any additional configuration in our application.properties file, but Providing this dependency in our code tells Spring, “I’d like to send these messages to Kafka”. Since our Kafka server is listening onon the default port, we don’t need to provide any additional configuration in ourfile, but we can of course do so if that’s not the case, providing information such as hostname, port, authentication, etc.

kafka profile, which we’ve configured to be the profile that includes the Kafka SCS binding, and we should see it start producing messages: We can run our code and activate theprofile, which we’ve configured to be the profile that includes the Kafka SCS binding, and we should see it start producing messages:

cd loansource ./mvnw package spring-boot:run -DskipTests= true -Pkafka

After a few moments, we’ll see our application start creating new loans and sending them to Kafka:

2019 -10 -15. ..LoansourceApplication : PENDING 9 eff9b58-e1f1 -474 d -8 f1d-aa4db8dbb75a for $ 10000000 for Donald 2019 -10 -15. ..LoansourceApplication : PENDING d507c06c -81 bb -4 a98 -8 f85 -38 f74af36984 for $ 100 for Jacinda 2019 -10 -15. ..LoansourceApplication : PENDING 19 fc86a4-d461 -470 c -8005 -423 ce1a258e7 for $ 100 for Jacinda 2019 -10 -15. ..LoansourceApplication : PENDING 33 f3756c-ea9b -472 f-bad2 -73 f1647188b1 for $ 10000 for Vladimir 2019 -10 -15. ..LoansourceApplication : PENDING 1625 d10f-c1c8 -4e75 -8 fe8 -10 ce363ef56f for $ 10000000 for Theresa

localhost:9000 and you should see a UI that allows you to look at the messages stored in Kafka: If you prefer, you can also see the messages in your browser using KafDrop . Simply point your browser toand you should see a UI that allows you to look at the messages stored in Kafka:

Receiving Events

loancheck directory. For this half of the demo, our loan checker will observe every application and approve or decline it. If approved, an approval message will be sent to the approved topic otherwise, a denial message will be sent to the declined topic. We’ve got half of the equation here, but we also need something to consume and process these events. For this, we’ll look in thedirectory. For this half of the demo, our loan checker will observe every application and approve or decline it. If approved, an approval message will be sent to thetopic otherwise, a denial message will be sent to thetopic.

You can extrapolate from here that other systems down the line could listen for and pick up these messages for further processing. For example, maybe a payout system listens for an approved loan to start processing it.

LoanCheckApplication.java , we have the @EnableBinding(LoanProcessor.class) annotation, meaning that all of our definitions for channel bindings are found in the LoanProcessor class. We’ll see the code here is a little different, just pointing to different topics. We see that in, we have theannotation, meaning that all of our definitions for channel bindings are found in theclass.

LoanProcessor.java file, we’ll see we define the MessageChannel we’re listening on is named output, matching the default topic our producer writes to. Additionally, we define two other MessageChannels that we’ll be writing to, approved and declined . For each of these, we also define which method to invoke when a message is received on those channels. In ourfile, we’ll see we define thewe’re listening on is named output, matching the default topic our producer writes to. Additionally, we define two other MessageChannels that we’ll be writing to,and. For each of these, we also define which method to invoke when a message is received on those channels.

@Component public interface LoanProcessor { String APPLICATIONS_IN = "output" ; String APPROVED_OUT = "approved" ; String DECLINED_OUT = "declined" ; @Input (APPLICATIONS_IN) SubscribableChannel sourceOfLoanApplications () ; @Output (APPROVED_OUT) MessageChannel approved () ; @Output (DECLINED_OUT) MessageChannel declined () ; }

LoanChecker.java file. We’ll see we have a method checkAndSortLoans with the @StreamListener annotation that matches our Input we defined previously: Finally, we can see how this ties into which method is invoked if we take a look at thefile. We’ll see we have a methodwith theannotation that matches our Input we defined previously:

@StreamListener (LoanProcessor.APPLICATIONS_IN) public void checkAndSortLoans (Loan loan) { log.info( "{} {} for ${} for {}" , loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName()); if (loan.getAmount() > MAX_AMOUNT) { loan.setStatus(Statuses.DECLINED.name()); processor.declined().send(message(loan)); } else { loan.setStatus(Statuses.APPROVED.name()); processor.approved().send(message(loan)); } }

loansource , by opening up a separate terminal and running the following: We can start this code up much like we did our, by opening up a separate terminal and running the following:

cd loancheck ./mvnw package spring-boot:run -DskipTests= true -Pkafka

approved or declined : After a few moments, we’ll start seeing our pending messages come through and then get sorted intoor

2019 -10 -15. ..LoanChecker : PENDING 95 a887cf-ab5f -48 c4-b03b -556675446 cfc for $ 1000 for Kim 2019 -10 -15. ..LoanChecker : APPROVED 95 a887cf-ab5f -48 c4-b03b -556675446 cfc for $ 1000 for Kim 2019 -10 -15. ..LoanChecker : PENDING a15f13fe-fc9a -40 fb-b6f0 -24106 a18c0cd for $ 100000000 for Angela 2019 -10 -15. ..LoanChecker : DECLINED a15f13fe-fc9a -40 fb-b6f0 -24106 a18c0cd for $ 100000000 for Angela

Wrapping Up

Spring Cloud Stream provides an extremely powerful abstraction for potentially complicated messaging platforms, turning the act of producing messages into just a couple lines of code. Should your infrastructure needs change and you need to migrate to a new messaging platform, not a single line of code changes other than your pom file. No matter if you’re using Kafka, RabbitMQ, or a cloud provider’s solution such as GCP Pub/Sub or Azure Event Hub, Spring Cloud Stream means it’s simple and quick to get up and running.

About the Author

Brian is a Principal Product Marketing Manager on the Technical Marketing team at Pivotal, with a focus on technical educational content for Pivotal customers as well as the Cloud Foundry, BOSH, and Knative communities. Prior to Pivotal, Brian worked on both the development and operations of software, with a heavy focus on Cloud Foundry and BOSH at companies in many industries including finance, entertainment and technology. He loves learning and experimenting in many fields of technology, and more importantly sharing the lessons learned along the way.

