Reactive Streams The specifications allow us to create application components that communicate in a async fashion while maintaining safety using back pressure mechanisms. Reactive Streams are being implemented in every modern programming language such as JavaScript and Scala. Reactive Streams Let’s look at one simplistic example that connects a to a to see how the flow goes. producer subscriber publisher = [Int] val Subject subscriber = publisher.subscribe(message => (message)) val println publisher.onNext(5) In here we have created a so we can call on it which pushes a message ( ) into the . On the other end of the pipe we have a that to the and specify that every new message will be processed with a function. Subject[Int] .onNext Int stream subscriber subscribes publisher As you can see we need two pieces, a publisher and a subscriber, and we need to connect them together. Operations on the Stream Sometimes we don’t need to process every single message, so we can the pipe. filter subscriber = publisher.filter(_ % 2 == 0).subscribe(message => (message)) val println Other times, we need to apply transformations, so we on the streams. map EvenNumber(n: Int) case class subscriber = publisher.filter(_ % 2 == 0).map(EvenNumber).subscribe(message => (message)) val println By now, we might have realized that an stream has similar operations to the ones we can find on any collection. This is a powerful feature since we don’t have to learn a new API, we can work with what we already know. One Small Problem Our problem is that to create a complete pipeline, we need to connect all pieces together. This implies that when creating a , we need an already created and we must have access to it. subscriber publisher subscriber = publisher.subscribe(message => (message)) val println , The subscriber needs to know about the publisher so we need to pass a publisher around to every place a subscriber is to be created. A Common Endpoint We want to create a mechanism to avoid the mentioned problem. The idea to solve this problem is not complicated at all. It actually comes from a state of the art system, . Apache Kafka Kafka uses a based approach to partition the streams. We can put a message in topic so only those listening to topic will receive and process the message. topic A A On the other hand, and are completely decoupled. They don’t know about each other. They use a central place to publish messages and subscribe to them. publishers subscribers Building MiniKaf is a library we have started in order overcome the problem in question. It exposes a very clean API supported by a minimal set of types. MiniKaf Let’s look at some of the main components. First, we have an interface for every event / message published to the system. Event[A] { value: A} trait def Then, we have a mechanism to convert our types to Event before they are sent to a pipe. @typeclass ToEvent[A] { event(a: A): Event[A]} trait def And of course, a default way to do the conversion. ToEvent { object toEvent[A]: ToEvent[A] = ToEvent[A] { event(a: A) = (a)}} implicit def new override def E Or we can use an user specific way to do the conversion through the function . event ToEvent { object event[A](a: A)(f: A => Event[A]): Event[A] = f(a) def toEvent[A]: ToEvent[A] = ToEvent[A] { event(a: A) = (a)}} implicit def new override def E We now are ready to start sending messages. val publisher = Publisher()publisher.publish(5)publisher.publish("hello") But wait! How do we consume these messages? Consuming a message is as simply as subscribing to the type of message you want to consume. val subscriber = Subscriber()subscriber.subscribe[Int](e => println((e.topic, e.value))) In here, we subscribed to messages of type . Next time an is sent through the stream the function will be executed. Notice, that the is not an , but an where the is and is the actual message pushed by the Int Int e => println((e.topic, e.value)) e Int EventRecord topic "Int" value publisher. The most import part to notice here is that we did not connect the to a . We just created the and specified the subscription. This is the same level of decoupling Kafka has but we have relied on Reactive Streams for the underlying implementation. subscriber publisher subscriber Conclusions are spreading out to a huge set of platforms allowing them to communicate using async channels, that are elastics, reliable and message driven with built-in back pressure. However, we think they are not meant to be used directly, but through our own APIs on top of them. Reactive Streams Help Wanted is in its very beginning, nowadays. We are doing some unsafe work at runtime that might be replaced by a compile-time mechanism (probably using ). Also, the API is not complete and some pieces are missing. Any help will be appreciated. MiniKaf Shapeless is how hackers start their afternoons. We’re a part of the family. We are now and happy to opportunities. Hacker Noon @AMI accepting submissions discuss advertising & sponsorship To learn more, , , or simply, read our about page like/message us on Facebook tweet/DM @HackerNoon. If you enjoyed this story, we recommend reading our and . Until next time, don’t take the realities of the world for granted! latest tech stories trending tech stories