There can be no Kafka Connector for your system, or available ones may not meet your requirements. On both cases, you have to write your own Kafka Connector and there are not many online resources about it. I’ll try to write my adventure to help others suffering with the same pain. Here is my case: I want to read data from a Kafka topic and write them to Elastic index if data has “insert” flag in it’s status field, or delete them if status is “delete”. I know I couldn’t use official or any other open source Elastic sink connectors as they have one generic behavior option, not depending on data, but connector configuration. For local development and testing, I’ve used as it includes Zookeeper, Kafka, Connect and sufficient UI tools in just one docker container. Landoop’s fast-data-dev project Getting Ready If you want to write your own source or sink connector, you have to use Java, because our main idea is to create some jars from our project that is going to be a plug-in for our local Kafka Connect cluster, or standalone server. So, make sure that you have JDK on your local. I’ve used IntelliJ IDEA as I am a JetBrains fan. Create Your Project Using Custom Connect Maven Archetype There is a for your project, a bunch of skeleton classes included. In order to use it via IntelliJ, you can add it when creating a new project. Here is its maven archetype Github repository. Not forget to use the latest version. Check it out from its repo. What we are going to build? As I said, we are going to create a custom Elastic Sink Connector that is going to behave according to the topic data. I am going to explain the most important parts of the project, please to catch the following snippets. clone my repository ElasticSinkConnectorConfig.java You will use this class in order to get configuration properties, its pretty straight forward class. First you should define your config element, such as: Link to source is the config property that will be used for our Elastic transport client, or you can say Elastic driver. is the explanation property that are used by static method. ELASTIC_PORT ELASTIC_PORT_DOC ConfigDef is a method in kafka common package and we are binding all of our properties here: ConfigDef Link to source And the rest of the file includes getters and setters of properties, not a complex stuff here. ElasticSinkConnector.java If you have something to do when starting to consume from a topic, or stopping the connector, this is the class you should implement. Just going over the methods: returns the version of the connector, version() takes configuration from and pass them to class where we discussed above, start() connector.properties ElasticSinkConnectorConfig returns the class that does the actual work — gets data from the topic and processes. (We are going to talk about this in a minute), taskClass() is kind of teardown function for your connector, stop() returns the config class — for us, config() ElasticSinkConnectorConfig and is for your tasks. We have distributed the configs to all the tasks here: (by the way you should give property from the worker configuration, and is what value it should be) taskConfigs() tasks.max here By the way, I still could not realized how many task should be set for an optimal configuration.. Link to source ElasticSinkTask.java Here is the class you got the data from configured topic, in method, I create an instance of , I didn’t use any dependency injection strategy as testing the plug in overriding methods are really difficult and some kind of is needed. For that reason, when I get the data in method, I directly send it to after a little modification and test my service instead of class. In the constructor of elastic service, I passed as the , because I am going to pass it in the test class. So, my main processing class became then. start() ElasticServiceImpl anti corruption layer put() ElasticServiceImpl ElasticSinkTask null ElasticClient ElasticServiceImpl The irony here is that I did not write any test class yet, I’ll implement them. The most important thing for this class is you should be careful of using method as Kafka Connect commits the data offsets in this method. So, proper error handling should be done here. put() ElasticServiceImpl.java In constructor, we setup and serializer just before creating an instance of . JsonConverter Gson ElasticClient In method, I convert record strings to data type since it is more readable and more processable. process() Record Topic message representation. Link to source After playing with some methods to convert s to s, it is ready to be sent to Elastic. Gson string Record ElasticClientImpl.java As an Elastic client, I have tried using at first as it connects like another node to your Elastic cluster and than protocol. However, I faced with an error and could not find any solution, . I think it can be related to container, or networking configuration etc. Then I switched to as protocol performance is also great related to the same article I have mentioned above. Transport Client bulk indexing is little bit faster http even I asked it to StackOverflow fast-data-dev docker Jest Client http In method, simply we add each data in to the , if of is , then Index request is added, else Delete request is attached to the . bulkSend() dataList Bulk behavior Record insert Bulk We send it to Elastic and log if any error occurs: Link to source Conclusion To conclude, we have to write our own sink or source connector if there is no community implementation for our system. Also there can be ready to use connectors, but you may need to change its behaviour. The most challenging part of this process is testing your connector. What you implement is somekind of plugin to your connect cluster and it is not a standalone application. Writing unit tests are tricky as you override methods of Kafka Connect package and kind of anti corruption layer is needed. Also debugging is quite difficult. You can test the connector using the file I have written and following the steps in . docker-compose my repository Thanks, Sometimes I tweet useful piece of information: @_skynyrd Helpful resources I have used: https://docs.confluent.io/current/connect/devguide.html . Stephane Maarek’s Udemy courses