Apache Spark is one of the most versatile big data frameworks out there. The ability to mix different kinds of workloads, in memory processing and functional style makes it desirable for anyone coming to code in the data processing world.
One important aspect of Spark is that is has been built for extensibility. Writing new connectors for the RDD API or extending the DataFrame/DataSet API allows third parties to integrate with Spark with easy. Most people will use one of the built-in API, such as Kafka for streams processing or JSON / CVS for file processing. However, there are times where we need more specific implementations, closer to us. For example, we might have a proprietary database we use in our company and there will not be a connector for it. We can simply write one as we explained in this previous post (Spark Data Source API. Extending Our Spark SQL Query Engine).
Starting on Spark 2.0, we can create sources from streams, which gave life to the Spark Structured Streaming API. As we could imagine, there are some built-in streaming sources, being Kafka one of them, alongside FileStreamSource, TextSocketSource, etc…
Using the new Structured Streaming API should be preferred over the old DStreams. However, the same problem than before presents again. How can we extend this new API so we can use our own streaming sources? The answer to this question is on this post.
Let’s start by reviewing the main components that we need to touch in order to create our own streaming source.
First of all, StreamSourceProvider is what indicates what source will be used as the stream reader.
Secondly, DataSourceRegister will allow us to register our source within Spark so it becomes available to the stream processing.
Thirdly, Source is the interface that we need to implement so we provide streaming source like behavior.
For the sake of this post, we will implement a rather easy streaming source, but the same concepts apply to any streaming source that you need to implement your own.
Our streaming source is called InMemoryRandomStrings. It basically generates a sequence of random strings and their length that are viewed as a DataFrame.
Since we want to keep it simple, we will store the batches in memory and discard them when the process is done. InMemoryRandomStrings is not fault tolerant since data is generated at the processing time in contrast to the built-in Kafka Source where data actually lives in a Kafka cluster.
We can start by defining our StreamSourceProvider which defines how our Source is created.
The class DefaultSource is our StreamSourceProvider and we need to implement the two required functions, sourceSchema and createSource.
InMemoryRandomStrings.schema is the fixed schema we are going to use for the example, but the schema can be dynamically passed in.
The createSource function then returns an instance of InMemoryRandomStrings that is our actual Source.
Now, let’s see InMemoryRandomStrings code in parts so we can focus on all the details.
schema
returns the schema that our source uses, in our case, we know that the schema is fixed.
getOffset
should return the latest offset seen by our source.
Notice that we added a variable called offset
that will keep track of the seen data. Then we return None
if our source has never seen any data, Some(offset)
otherwise.
Now, let’s see how our source can produce some data, we will use a running thread for it. Please, notice the dataGeneratorStartingThread function.
In here we have added a thread that generates random values and increments the offset while storing the value and offset on an internal buffer. The thread starts running as soon our source is created. The stop
function stops the running thread.
At this point, we are only two functions away from our goal.
getBatch
returns a DataFrame back to spark with data within the passed offset range.
We can see that we are getting the data from our internal buffer so that the data has the corresponding indexes. From there, we generate the DataFrame that we then send back to Spark.
Finally,commit
is how Spark indicates to us that it will not request offsets less or equal to the one being passed. In other words, we can remove all data from our internal buffer with an offset less or equal than the one passed to commit
. In this way, we can save some memory and avoid running out of it.
Now, we have completed our source, the entire code is the following.
Now, we need to plug in our source into the Spark Structured Streaming API by indicating the correct format to be used.
In here we use the regular .readStream
API and specify that the stream format is our implementation of StreamSourceProvide
, that is com.github.anicolaspp.spark.sql.streaming.DefaultSource.
Now we can query our streaming source as any other DataFrame.
The output will look similar to this.
What we see in a continuous aggregation of the data generated by our source.
Apache Spark is the way to go when processing data at scale. It features outperform almost any other tool out there. Also, it can be extended in many different ways and as we can see, we can write our own data sources and streaming sources so they can be plugged in into our spark code with easy.
The entire project and source code can be found here SparkStreamSources.
Happy Coding.