There can be no Kafka Connector for your system, or available ones may not meet your requirements. On both cases, you have to write your own Kafka Connector and there are not many online resources about it. I’ll try to write my adventure to help others suffering with the same pain.
Here is my case: I want to read data from a Kafka topic and write them to Elastic index if data has “insert” flag in it’s status field, or delete them if status is “delete”. I know I couldn’t use official or any other open source Elastic sink connectors as they have one generic behavior option, not depending on data, but connector configuration.
For local development and testing, I’ve used Landoop’s fast-data-dev project as it includes Zookeeper, Kafka, Connect and sufficient UI tools in just one docker container.
If you want to write your own source or sink connector, you have to use Java, because our main idea is to create some jars from our project that is going to be a plug-in for our local Kafka Connect cluster, or standalone server. So, make sure that you have JDK on your local. I’ve used IntelliJ IDEA as I am a JetBrains fan.
There is a maven archetype for your project, a bunch of skeleton classes included. In order to use it via IntelliJ, you can add it when creating a new project. Here is its Github repository.
Not forget to use the latest version. Check it out from its repo.
As I said, we are going to create a custom Elastic Sink Connector that is going to behave according to the topic data. I am going to explain the most important parts of the project, please clone my repository to catch the following snippets.
You will use this class in order to get configuration properties, its pretty straight forward class. First you should define your config element, such as:
ELASTIC_PORT
is the config property that will be used for our Elastic transport client, or you can say Elastic driver. ELASTIC_PORT_DOC
is the explanation property that are used by ConfigDef
static method.
ConfigDef
is a method in kafka common package and we are binding all of our properties here:
And the rest of the file includes getters and setters of properties, not a complex stuff here.
If you have something to do when starting to consume from a topic, or stopping the connector, this is the class you should implement.
Just going over the methods:
version()
returns the version of the connector,start()
takes configuration fromconnector.properties
and pass them to ElasticSinkConnectorConfig
class where we discussed above,taskClass()
returns the class that does the actual work — gets data from the topic and processes. (We are going to talk about this in a minute),stop()
is kind of teardown function for your connector,config()
returns the config class — ElasticSinkConnectorConfig
for us,and taskConfigs()
is for your tasks. We have distributed the configs to all the tasks here: (by the way you should give tasks.max property from the worker configuration, and here is what value it should be)
By the way, I still could not realized how many task should be set for an optimal configuration.. Link to source
Here is the class you got the data from configured topic, in start()
method, I create an instance ofElasticServiceImpl
, I didn’t use any dependency injection strategy as testing the plug in overriding methods are really difficult and some kind of anti corruption layer is needed. For that reason, when I get the data in put()
method, I directly send it to ElasticServiceImpl
after a little modification and test my service instead of ElasticSinkTask
class. In the constructor of elastic service, I passed null
as the ElasticClient
, because I am going to pass it in the test class. So, my main processing class became ElasticServiceImpl
then.
The irony here is that I did not write any test class yet, I’ll implement them.
The most important thing for this class is you should be careful of using put()
method as Kafka Connect commits the data offsets in this method. So, proper error handling should be done here.
In constructor, we setup JsonConverter
and Gson
serializer just before creating an instance of ElasticClient
.
In process()
method, I convert record strings to Record
data type since it is more readable and more processable.
Topic message representation. Link to source
After playing with some Gson
methods to convert string
s to Record
s, it is ready to be sent to Elastic.
As an Elastic client, I have tried using Transport Client at first as it connects like another node to your Elastic cluster and bulk indexing is little bit faster than http
protocol. However, I faced with an error and could not find any solution, even I asked it to StackOverflow. I think it can be related to fast-data-dev
container, or docker
networking configuration etc. Then I switched to Jest Client as http
protocol performance is also great related to the same article I have mentioned above.
In bulkSend()
method, simply we add each data in dataList
to the Bulk
, if behavior
of Record
is insert
, then Index request is added, else Delete request is attached to the Bulk
.
We send it to Elastic and log if any error occurs:
To conclude, we have to write our own sink or source connector if there is no community implementation for our system. Also there can be ready to use connectors, but you may need to change its behaviour.
The most challenging part of this process is testing your connector. What you implement is somekind of plugin to your connect cluster and it is not a standalone application. Writing unit tests are tricky as you override methods of Kafka Connect package and kind of anti corruption layer is needed. Also debugging is quite difficult.
You can test the connector using the docker-compose
file I have written and following the steps in my repository.
Thanks,
Sometimes I tweet useful piece of information: @_skynyrd
Helpful resources I have used: