paint-brush
Building on top of Reactive Streamsby@anicolaspp
1,427 reads
1,427 reads

Building on top of Reactive Streams

by Nicolas A PerezMarch 25th, 2017
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

The <em>Reactive Streams </em>specifications allow us to create application components that communicate in a async fashion while maintaining safety using back pressure mechanisms.

Company Mentioned

Mention Thumbnail

Coin Mentioned

Mention Thumbnail
featured image - Building on top of Reactive Streams
Nicolas A Perez HackerNoon profile picture

Reactive Streams

The Reactive Streams specifications allow us to create application components that communicate in a async fashion while maintaining safety using back pressure mechanisms.

Reactive Streams are being implemented in every modern programming language such as JavaScript and Scala.

Let’s look at one simplistic example that connects a producer to a subscriber to see how the flow goes.

val publisher = Subject[Int]

val subscriber = publisher.subscribe(message => println(message))

publisher.onNext(5)

In here we have created a Subject[Int] so we can call .onNext on it which pushes a message (Int) into the stream. On the other end of the pipe we have a subscriber that subscribes to the publisher and specify that every new message will be processed with a function.

As you can see we need two pieces, a publisher and a subscriber, and we need to connect them together.

Operations on the Stream

Sometimes we don’t need to process every single message, so we can filter the pipe.



val subscriber = publisher.filter(_ % 2 == 0).subscribe(message => println(message))

Other times, we need to apply transformations, so we map on the streams.

case class EvenNumber(n: Int)




val subscriber = publisher.filter(_ % 2 == 0).map(EvenNumber).subscribe(message => println(message))

By now, we might have realized that an stream has similar operations to the ones we can find on any collection. This is a powerful feature since we don’t have to learn a new API, we can work with what we already know.

One Small Problem

Our problem is that to create a complete pipeline, we need to connect all pieces together. This implies that when creating a subscriber, we need an already created publisher and we must have access to it.

val subscriber = publisher.subscribe(message => println(message))

The subscriber needs to know about the publisher, so we need to pass a publisher around to every place a subscriber is to be created.

A Common Endpoint

We want to create a mechanism to avoid the mentioned problem. The idea to solve this problem is not complicated at all. It actually comes from a state of the art system, Apache Kafka.

Kafka uses a topic based approach to partition the streams. We can put a message in topic A so only those listening to topic A will receive and process the message.

On the other hand, publishers and subscribers are completely decoupled. They don’t know about each other. They use a central place to publish messages and subscribe to them.

Building MiniKaf

MiniKaf is a library we have started in order overcome the problem in question. It exposes a very clean API supported by a minimal set of types.

Let’s look at some of the main components.

First, we have an interface for every event / message published to the system.



trait Event[A] {def value: A}

Then, we have a mechanism to convert our types to Event before they are sent to a pipe.



@typeclass trait ToEvent[A] {def event(a: A): Event[A]}

And of course, a default way to do the conversion.

object ToEvent {




implicit def toEvent[A]: ToEvent[A] = new ToEvent[A] {override def event(a: A) = E(a)}}

Or we can use an user specific way to do the conversion through the function event.

object ToEvent {

def event[A](a: A)(f: A => Event[A]): Event[A] = f(a)




implicit def toEvent[A]: ToEvent[A] = new ToEvent[A] {override def event(a: A) = E(a)}}

We now are ready to start sending messages.

val publisher = Publisher()publisher.publish(5)publisher.publish("hello")

But wait! How do we consume these messages?

Consuming a message is as simply as subscribing to the type of message you want to consume.

val subscriber = Subscriber()subscriber.subscribe[Int](e => println((e.topic, e.value)))

In here, we subscribed to messages of type Int. Next time an Int is sent through the stream the function e => println((e.topic, e.value)) will be executed. Notice, that the e is not an Int, but an EventRecord where the topic is "Int" and value is the actual message pushed by the publisher.

The most import part to notice here is that we did not connect the subscriber to a publisher. We just created the subscriber and specified the subscription. This is the same level of decoupling Kafka has but we have relied on Reactive Streams for the underlying implementation.

Conclusions

Reactive Streams are spreading out to a huge set of platforms allowing them to communicate using async channels, that are elastics, reliable and message driven with built-in back pressure. However, we think they are not meant to be used directly, but through our own APIs on top of them.

Help Wanted

MiniKaf is in its very beginning, nowadays. We are doing some unsafe work at runtime that might be replaced by a compile-time mechanism (probably using Shapeless). Also, the API is not complete and some pieces are missing. Any help will be appreciated.

Hacker Noon is how hackers start their afternoons. We’re a part of the @AMIfamily. We are now accepting submissions and happy to discuss advertising & sponsorship opportunities.

To learn more, read our about page, like/message us on Facebook, or simply, tweet/DM @HackerNoon.

If you enjoyed this story, we recommend reading our latest tech stories and trending tech stories. Until next time, don’t take the realities of the world for granted!