Continuous streams of data are ubiquitous and becoming even more so with the increasing number of IoT devices being used. Of course this data is stored, processed and analyzed to provide predictive, actionable results. But petabytes take long to analyze, even with Hadoop (as good as MapReduce may be) or Spark (a remedy to the limitations of MapReduce). Secondly, very often we don't need to deduce patterns over long periods of time. Of the petabytes of incoming data collected over months, , we might not need to take into account all of it, just a real-time snapshot. Perhaps we don't need to know the longest trending hashtag over five years, but just the one right now. at any given moment This is what Storm is built for, to accept tons of data coming in extremely fast, possibly from various sources, analyze it and publish the real-time updates to a UI or some other place . This article is not the be-all and end-all of nor is it meant to be. Storm's pretty huge, and just one long-read probably can't do it justice anyways. Of course, any additions, feedback or constructive criticism will be greatly appreciated. without storing any itself Storm OK, now that that's out of the way, let's see what we'll be covering: The necessity of Storm, the 'why' of it, what it is and what it isn't A bird's eye view of how it works What a Storm topology roughly looks like in code (Java) Setting up and playing with a production-worthy Storm cluster on Docker A few words on message processing reliability I'm also assuming that you're at least somewhat familiar with and containerization. Docker How It Works The architecture of Storm can be compared to a network of roads connecting a set of checkpoints. Traffic begins at a certain checkpoint (called a ) and passes through other checkpoints (called ). spout bolts The traffic is of course the stream of data that is retrieved by the (from a data source, a public API for example) and routed to various where the data is filtered, sanitized, aggregated, analyzed, sent to a UI for people to view or any other target. spout bolts The network of spouts and bolts is called a , and the data flows in the form of (list of values that may have different types). topology tuples One important thing to talk about is the direction of the data traffic. Conventionally, we would have one or multiple spouts reading the data from an API, a Kafka topic or some other queuing system. The data would then flow to one or multiple bolts which may forward it to other bolts and so on. one-way Bolts may publish the analyzed data to a UI or to another bolt. But , like a DAG. Although it is certainly possible to make cycles, we're unlikely to need such a convoluted topology. the traffic is almost always unidirectional involves a number of steps, which you're free to follow on your machine. But later on I'll be using Docker containers for a Storm cluster deployment and the images will take care of setting up everything we need. Installing a Storm release Some Code While Storm does offer , most topologies are written in Java since it's the most efficient option we have. support for other languages A very basic spout, that just emits random digits, may look like this: { SpoutOutputCollector collector; { randomDigit = ThreadLocalRandom.current().nextInt( , ); collector.emit( Values(randomDigit)); } { outputFieldsDeclarer.declare( Fields( )); } } public class RandomDigitSpout extends BaseRichSpout // To output tuples from spout to the next stage bolt public void nextTuple () int 0 10 // Emit the digit to the next stage bolt new public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) // Tell Storm the schema of the output tuple for this spout. // It consists of a single column called 'random-digit'. new "random-digit" And a simple bolt that takes in the stream of random digits and just emits the even ones: public { OutputCollector collector; public execute(Tuple tuple) { int randomDigit = tuple.getInt( ); (randomDigit % == ) { collector.emit( Values(randomDigit)); } } public declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( Fields( )); } } class EvenDigitBolt extends BaseRichBolt // To output tuples from this bolt to the next bolt. void // Get the 1st column 'random-digit' from the tuple 0 if 2 0 new void // Tell Storm the schema of the output tuple for this bolt. // It consists of a single column called 'even-digit' new "even-digit" Another simple bolt that'll receive the filtered stream from , and just multiply each even digit by 10 and emit it forward: EvenDigitBolt { OutputCollector collector; { evenDigit = tuple.getInt( ); collector.emit( Values(evenDigit * )); } { declarer.declare( Fields( )); } } public class MultiplyByTenBolt extends BaseRichBolt public void execute (Tuple tuple) // Get 'even-digit' from the tuple. int 0 new 10 public void declareOutputFields (OutputFieldsDeclarer declarer) new "even-digit-multiplied-by-ten" Putting them together to form our topology: packagename { { TopologyBuilder builder = TopologyBuilder(); builder.setSpout( , RandomDigitSpout()); builder.setBolt( , EvenDigitBolt(), ) .shuffleGrouping( ); builder.setBolt( , MultiplyByTenBolt(), ) .shuffleGrouping( ); Config conf = Config(); conf.setNumWorkers( ); StormSubmitter.submitTopology( , conf, builder.createTopology()); } } package // ... public class OurSimpleTopology Exception public static void main (String[] args) throws // Create the topology new // Attach the random digit spout to the topology. // Use just 1 thread for the spout. "random-digit-spout" new // Connect the even digit bolt to our spout. // The bolt will use 2 threads and the digits will be randomly // shuffled/distributed among the 2 threads. // The third parameter is formally called the parallelism hint. "even-digit-bolt" new 2 "random-digit-spout" // Connect the multiply-by-10 bolt to our even digit bolt. // This bolt will use 4 threads, among which data from the // even digit bolt will be shuffled/distributed randomly. "multiplied-by-ten-bolt" new 4 "even-digit-bolt" // Create a configuration object. new // The number of independent JVM processes this topology will use. 2 // Submit our topology with the configuration. "our-simple-topology" Parallelism In Storm Topologies Fully understanding parallelism in Storm can be daunting, at least in my experience. A topology requires at least one process to operate on (obviously). But within this process we can parallelize the execution of our spouts and bolts using threads. In our example, will launch just one thread, and the data spewed from that thread will be distributed among 2 threads of the . RandomDigitSpout EvenDigitBolt But the way this distribution happens, referred to as the , can be important. For example you may have a stream of temperature recordings from two cities, where the tuples emitted by the spout look like this: stream grouping ( , , ) ( , , ) ( , , ) ( , , ) ( , , ) ( , , ) ( , , ) // City name, temperature, time of recording "Atlanta" 94 "2018-05-11 23:14" "New York City" 75 "2018-05-11 23:15" "New York City" 76 "2018-05-11 23:16" "Atlanta" 96 "2018-05-11 23:15" "New York City" 77 "2018-05-11 23:17" "Atlanta" 95 "2018-05-11 23:16" "New York City" 76 "2018-05-11 23:18" Suppose we're attaching just one bolt whose job is to calculate the changing average temperature of each city. If we can reasonably expect that in any given time interval we'll get roughly an equal number of tuples from both the cities, it would make sense to dedicate 2 threads to our bolt and send the data for Atlanta to one of them and New York to the other. A would serve our purpose, which partitions data among the threads by the value of the field specified in the grouping: fields grouping builder.setBolt( , AvgTempBolt(), ) .fieldsGrouping( , Fields( )); // The tuples with the same city name will go to the same thread. "avg-temp-bolt" new 2 "temp-spout" new "city_name" And of course there are . For most cases, though, the grouping probably won't matter much and you can just shuffle the data and throw it among the bolt threads randomly ( ). other types of groupings as well shuffle grouping Now there's another important component to this: the number of worker processes that our topology will run on. . The total number of threads that we specified will then be equally divided among the worker processes So in our example random digit topology we had 1 spout thread, 2 bolt threads and 4 bolt threads (7 total). Each of the 2 worker processes would be responsible for running 2 bolt threads, 1 bolt and one of the processes will run the 1 spout thread. even-digit multiply-by-ten multiply-by-ten even-digit Of course, the 2 worker processes will have their main threads, which in turn will launch the spout and bolt threads. So all in all we'll have 9 threads. These are collectively called . executors It's important to realize that if you set a spout's parallelism hint > 1 (i.e. multiple executors), you can end up emitting the same data several times. Say, the spout reads from the public Twitter stream API and uses two executors. That means that the bolts receiving the data from the spout will get the same tweet twice. It is only after the spout emits the tuples that data parallelism comes into play, i.e. after the tuples get divided among the bolts according to the specified stream grouping. Running multiple workers on a single node would be fairly pointless. Later, however, we'll use a proper, distributed, multi-node cluster and see how workers are divided on different nodes. Building Our Topology Here's the directory structure I suggest: yourproject/ pom.xml src/ jvm/ packagename/ java java java java . RandomDigitSpout . EvenDigitBolt . MultiplyByTenBolt . OurSimpleTopology is commonly used for building Storm topologies, and it requires a file (The POM) that . Getting into the nitty-gritty of the POM will probably be an overkill here. Maven pom.xml defines various configuration details, project dependencies etc First, we'll run inside to clear any compiled files we may have, making sure to compile each module from scratch. mvn clean yourproject And then to compile our code and package it in an executable JAR file, inside a newly created folder. This might take quite a few minutes the first time, especially if your topology has many dependencies. mvn package target To submit our topology: storm jar target/packagename-{version number}.jar packagename.OurSimpleTopology Hopefully, by now the gap between concept and code in Storm has been somewhat bridged. However, no serious Storm deployment will be a single topology instance running on one server. What A Storm Cluster Looks Like To take full advantage of Storm's scalability and fault-tolerance, any production-grade topology would be submitted to a cluster of machines. Storm distributions are installed on the master node (Nimbus) and all the slave nodes (Supervisors). The node runs the Storm daemon and the Storm UI. The nodes run the Storm daemons. A daemon on a separate node is used for coordination among the master node and the slave nodes. Zookeeper, by the way, is only used for cluster management and never any kind of message passing. master Nimbus slave Supervisor Zookeeper It's not like spouts and bolts are sending data to each other through it or anything like that. The Nimbus daemon finds available Supervisors via ZooKeeper, to which the Supervisor daemons register themselves. And other managerial tasks, some of which will become clear shortly. The Storm UI is a web interface used to manage the state of our cluster. We'll get to this soon. . Because of Zookeeper, it doesn't matter how many slave/supervisor nodes you run initially, as you can always seamlessly add more and Storm will automatically integrate them into the cluster. Our topology is submitted to the Nimbus daemon on the master node and then distributed among the worker processes running on the slave/supervisor nodes Whenever we start a Supervisor it allocates a certain number of worker processes (that we can configure) which can then be used by the submitted topology. So in the image above there are a total of 5 workers. And remember this line? allocated conf.setNumWorkers( ) 5 This means that the topology will try to use a total of 5 workers. And since our two Supervisor nodes have a total of 5 workers: each of the 5 allocated worker processes will run one instance of the topology. If we had done: allocated conf.setNumWorkers( ) 4 then one worker process would have remained idle/unused. If the number of specified workers was 6 and the total workers were 5, then because of the limitation only 5 actual topology workers would've been functional. allocated Before we set this all up using Docker, a few important things to keep in mind regarding fault-tolerance: If any worker on any slave node dies, the Supervisor daemon will have it restarted. If restarting repeatedly fails, the worker will be reassigned to another machine. If an entire slave node dies, its share of the work will be given to another supervisor/slave node. If the Nimbus goes down, the workers will remain unaffected. However, until the Nimbus is restored workers won't be reassigned to other slave nodes if, say, their node crashes. The Nimbus & Supervisors are themselves stateless, but with Zookeeper, some state information is stored so that things can begin where they were left off if a node crashes or a daemon dies unexpectedly. Nimbus, Supervisor & Zookeeper daemons are all fail-fast. This means that they themselves are not very tolerant to unexpected errors, and will shut down if they encounter one. For this reason . is probably the most popular option for this (not to be confused with the Storm Supervisor daemon). they have to be run under supervision using a watchdog program that monitors them constantly and restarts them automatically if they ever crash Supervisord Note: In most Storm clusters, the Nimbus itself is never deployed as a single instance but as a cluster. If this fault-tolerance is not incorporated and our sole Nimbus goes down, we'll lose the ability to submit new topologies, gracefully kill running topologies, reassign work to other Supervisor nodes if one crashes etc . For simplicity, our illustrative cluster will use a single instance. Similarly, the Zookeeper is very often deployed as a cluster but we'll use just one. Dockerizing The Cluster Launching individual containers and all that goes along with them can be cumbersome, so I prefer to use . We'll be going with one Zookeeper node, one Nimbus node and one Supervisor node initially. Docker Compose They'll be defined as Compose services, all corresponding to one container each at the beginning. Later on, I'll use to add another Supervisor node (container). Here's our & the project structure: Compose scaling code zookeeper/ Dockerfile storm-nimbus/ Dockerfile storm code/ pom src/ jvm/ coincident_hashtags/ ExclamationTopology storm-supervisor/ Dockerfile storm docker-compose.yml .yaml .xml .java .yaml And our : docker-compose.yml version: '3.2' services: zookeeper: build: ./zookeeper # Keep it running. tty: true storm-nimbus: build: ./storm-nimbus # Run this service after 'zookeeper' and make 'zookeeper' reference. links: - zookeeper tty: true # Map port 8080 of the host machine to 8080 of the container. # To access the Storm UI from our host machine. ports: - 8080 :8080 volumes: - './storm-nimbus:/theproject' storm-supervisor: build: ./storm-supervisor links: - zookeeper - storm-nimbus tty: true # Host volume used to store our code on the master node (Nimbus). volumes: storm-nimbus: Feel free to explore the Dockerfiles. They basically just install the dependencies (Java 8, Storm, Maven, Zookeeper etc) on the relevant containers. The files override certain default configurations for the Storm installations. The line inside the Nimbus and Supervisor Dockerfiles puts them inside the containers where Storm can read them. storm.yaml ADD storm.yaml /conf : storm-nimbus/storm.yaml # The Nimbus needs to know where the Zookeeper is. This specifies the list of the # hosts in the Zookeeper cluster. We're using just one node, of course. # 'zookeeper' is the Docker Compose network reference. storm.zookeeper.servers: - "zookeeper" : storm-supervisor/storm.yaml # Telling the Supervisor where the Zookeeper is. storm.zookeeper.servers: - "zookeeper" # The worker nodes need to know which machine(s) are the candidate of master # in order to download the topology jars. nimbus.seeds : ["storm-nimbus"] # For each Supervisor, we configure how many workers run on that machine. # Each worker uses a single port for receiving messages, and this setting # defines which ports are open for use. We define four ports here, so Storm will # allocate up to four workers to run on this node. supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 These options are adequate for our cluster. The more curious can check out all the . default configurations here Run docker-compose up at the project root. After all the images have been built and all the service started, open a new terminal, type and you'll see something like this: docker ps Starting The Nimbus Let's SSH into the Nimbus container using its name: docker exec - coincidenthashtagswithapachestorm_storm-nimbus_1 bash it and then start the Nimbus daemon: storm nimbus Starting The Storm UI Similarly, open another terminal, SSH into the Nimbus again and launch the UI using : storm ui Go to on your browser and you'll see a nice overview of our cluster: localhost:8080 The in the indicate how many total workers (on all Supervisor nodes) are available & waiting for a topology to consume them. indicate how many of the total are currently busy with a topology. Free slots Cluster Summary Used Slots Since we haven't launched any Supervisors yet, they're both zero. We'll get to and later. Also, as we can see, no topologies have been submitted yet. Executors Tasks Starting A Supervisor Node SSH into the one Supervisor container and launch the Supervisor daemon: docker exec - coincidenthashtagswithapachestorm_storm-supervisor_1 bash it storm supervisor Now let's go refresh our UI: . Note: Any changes in our cluster may take a few seconds to reflect on the UI We have a new running Supervisor which comes with four allocated workers. These four workers are the result of specifying four ports in our for the Supervisor node. Of course, they're all free (four ). Let's submit a topology to the Nimbus and put them to work. storm.yaml Free slots Submitting A Topology To The Nimbus SSH into the Nimbus on a new terminal. I've written the so that we land on our working (landing) directory . Inside this is code, where our topology resides. . Dockerfile /theproject Our topology is pretty simple It uses a spout that generates random words and a bolt that just appends three exclamation marks to the words. Two of these bolts are added back-to-back and so at the end of the stream we'll get words with six exclamation marks. It also specifies that it needs three workers ( ). conf.setNumWorkers(3) { TopologyBuilder builder = TopologyBuilder(); builder.setSpout( , TestWordSpout(), ); builder.setBolt( , ExclamationBolt(), ).shuffleGrouping( ); builder.setBolt( , ExclamationBolt(), ).shuffleGrouping( ); Config conf = Config(); conf.setDebug( ); conf.setNumWorkers( ); StormSubmitter.submitTopology( , conf, builder.createTopology()); } Exception public static void main (String[] args) throws new "word" new 10 "exclaim1" new 3 "word" "exclaim2" new 2 "exclaim1" new // Turn on debugging mode true 3 "exclamation-topology" cd code mvn clean mvn package storm jar target/coincident-hashtags-1.2.1.jar coincident_hashtags.ExclamationTopology After the topology has been submitted successfully, refresh the UI: As soon as we submitted the topology, the Zookeeper was notified. The Zookeeper in turn notified the Supervisor to download the code from the Nimbus. We now see our topology along with its three occupied workers, leaving just one free. And 10 word spout threads + 3 exclaim1 bolt threads + 2 exclaim bolt threads + the 3 main threads from the workers = . And you might've noticed something new: . total of 18 executors tasks What Are Tasks Another concept in Storm's parallelism. But don't sweat it, a task is just an instance of a spout or bolt that an executor uses; what actually does the processing. By default the number of tasks is equal to the number of executors. In rare cases you might need each executor to instantiate more tasks. builder.setBolt( , EvenDigitBolt(), ) .setNumTasks( ) .shuffleGrouping( ); // Each of the two executors (threads) of this bolt will instantiate // two objects of this bolt (total 4 bolt objects/tasks). "even-digit-bolt" new 2 4 "random-digit-spout" This is a shortcoming on my part, but I can't think of a good use case where we'd need multiple tasks per executor. Maybe if we were adding some parallelism ourselves, like spawning a new thread within the bolt to handle a long running task, then the main executor thread won't block and will be able to continue processing using the other bolt. However this can make our topology hard to understand. If any one knows of scenarios where the performance gain from multiple tasks outweighs the added complexity, please post a comment. Anyways, returning from that slight detour, let's see an overview of our topology. Click on the name under and scroll down to : Topology Summary Worker Resources We can clearly see the division of our executors (threads) among the 3 workers. And of course all the 3 workers are on the same, single Supervisor node we're running. Now, let's say scale out! Add Another Supervisor From the project root, let's add another Supervisor node/container docker-compose scale =2 storm-supervisor SSH into the new container: docker exec - coincidenthashtagswithapachestorm_storm-supervisor_2 bash it And fire up: storm supervisor If you refresh the UI you'll see that we've successfully added another Supervisor and four more workers (total of 8 workers/slots). To really take advantage of the new Supervisor, let's increase the topology's workers. First kill the running one: storm kill exclamation-topology Change to: this line conf.setNumWorkers(6) Change the project version number in your . Try using a proper scheme, like semantic versioning. I'll just stick with 1.2.1. pom.xml Rebuild the topology: mvn package Resubmit it: storm jar target/coincident-hashtags-1.2.1.jar coincident_hashtags.ExclamationTopology Reload the UI: You can now see the new Supervisor and the 6 busy workers out of a total of 8 available ones. Also important to note is that the 6 busy ones have been equally divided among the two Supervisors. Again, click the topology name and scroll down. We see two unique Supervisor IDs, both running on different nodes, and all our executors pretty evenly divided among them. This is great. But Storm comes with another nifty way of doing so . Something called . On the Nimbus we'd run: while the topology is running rebalancing storm rebalance exclamation-topology -n 6 (go from 3 to 6 workers) Or to change the number of executors for a particular component: storm rebalance exclamation-topology -e =3 even-digit-bolt Reliable Message Processing One question we haven't tackled is about what happens if a bolt fails to process a tuple. Well, Storm provides us a mechanism using which the originating spout (specifically the ) can replay the failed tuple. This processing guarantee doesn't just happen by itself, it's a conscious design choice and does add latency. task Spouts send out tuples to bolts, which emit tuples derived from the input tuples to other bolts and so on. That one, original tuple spurs an entire tree of tuples. If any child tuple, so to speak, of the original one fails then any remedial steps (rollbacks etc) may well have to be taken at multiple bolts. That could get pretty hairy, and so what Storm does is that it allows the original tuple to be emitted again right from the source (the spout). Consequentially, any . A tuple is considered "fully processed" when every tuple in its tree has been processed, and every tuple has to be explicitly acknowledged by the bolts. However, that's not all. There's another thing to be done explicitly: maintain a link between the original tuple and its child tuples. operations performed by bolts that are a function of the incoming tuples should be idempotent Storm will then be able to trace the origin of the child tuples and thus be able to replay the original tuple. This is called . : anchoring And this has been done in our exclamation bolt _collector.emit(tuple, Values(exclamatedWord.toString())); _collector.ack(tuple); // ExclamationBolt // 'tuple' is the original one received from the test word spout. // It's been anchored to/with the tuple going out. new // Explicitly acknowledge that the tuple has been processed. The call will result in the method on the spout being called, if it has been implemented. So, say, you're reading the tuple data from some queue and you can only take it off the queue if the tuple has been fully processed. Well, the method is where you'd do that. You can also emit out tuples without anchoring: ack ack ack _collector.emit( Values(exclamatedWord.toString())) new and forgo reliability. A tuple can fail two ways: A bolt dies and a tuple times out. Or it times out for some other reason. The timeout is 30 seconds by default and can be changed using config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 60) The method is explicitly called on the tuple in a bolt: . You may do this in case of an exception. fail _collector.fail(tuple) In both these cases, the method on the spout will be called, if it is implemented. And if we want the tuple to be replayed, it would have to be done explicitly in the method by calling , just like in . When tracking tuples, every one has to be ed or ed. Otherwise, the topology will eventually run out of memory. fail fail emit nextTuple() ack fail It's also important to know that you have to do all of this yourself when writing custom spouts and bolts. But the Storm core can help. For example, a bolt implementing does acking automatically. Or built-in spouts for popular data sources like take care of queuing and replay logic after acknowledgment and failure. BaseBasicBolt Kafka Parting Shots Designing a Storm topology or cluster is always about tweaking the various knobs we have and settling where the result seems optimal. There are a few things that'll help in this process, like using a configuration file to read parallelism hints, number of workers etc so you don't have to edit and recompile your code repeatedly. Define your bolts logically, one per indivisible task, and keep them light and efficient. Similarly, your spouts' methods should be optimized. nextTuple() Use the Storm UI effectively. By default it doesn't show us the complete picture, only 5% of the total tuples emitted. To monitor all of them use . Keep an eye on the and values for individual bolts and topologies via the UI, that's what you want to look at when turning the knobs. config.setStatsSampleRate(1.0d) Acks Latency Previously published at https://medium.com/free-code-camp/apache-storm-is-awesome-this-is-why-you-should-be-using-it-d7c37519a427