Let’s imagine we have XML data on a queue in IBM MQ, and we want to ingest it into Kafka to then use downstream, perhaps in an application or maybe to stream to a NoSQL store like MongoDB. : This same pattern for ingesting XML will work with other connectors such as and . Note JMS ActiveMQ I’ve got a stack running that includes: Docker Compose ‣ IBM MQ ‣ Apache Kafka (deployed as Confluent Platform to include the all-important Schema Registry) ‣ MongoDB Loading some test data onto IBM MQ Let’s load some messages onto the queue from an XML file: docker exec --interactive ibmmq \ /opt/mqm/samp/bin/amqsput DEV.QUEUE.1 QM1 < data/note.xml Streaming from IBM MQ to Kafka and translating the XML messages Now we can ingest this into Kafka using the with the plugin and : Kafka Connect IbmMQSourceConnector XML Transformation curl -i -X PUT -H \ http: -d "Content-Type:application/json" //localhost:8083/connectors/source-ibmmq-note-01/config \ '{ "connector.class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector", "kafka.topic":"ibmmq-note-01", "mq.hostname":"ibmmq", "mq.port":"1414", "mq.queue.manager":"QM1", "mq.transport.type":"client", "mq.channel":"DEV.APP.SVRCONN", "mq.username":"app", "mq.password":"password123", "jms.destination.name":"DEV.QUEUE.1", "jms.destination.type":"queue", "confluent.license":"", "confluent.topic.bootstrap.servers":"broker:29092", "confluent.topic.replication.factor":"1", "transforms": "extractPayload,xml", "transforms.extractPayload.type": "org.apache.kafka.connect.transforms.ExtractField$Value", "transforms.extractPayload.field": "text", "transforms.xml.type": "com.github.jcustenborder.kafka.connect.transform.xml.FromXml$Value", "transforms.xml.schema.path": "file:///data/note.xsd", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url":"http://schema-registry:8081" }' NOTE: is needed otherwise the XML transform will fail with since it will be trying to operate on the entire payload from IBM MQ which includes fields other than the XML that we’re interested in. ExtractField java.lang.UnsupportedOperationException: STRUCT is not a supported type. The resulting Kafka topic holds the value of the text field in the messages, serialised in Avro: docker exec kafkacat \ kafkacat \ -b broker:29092 \ -r http://schema-registry:8081 \ -s key=s -s value=avro \ -t ibmmq-note-01 \ -C -o beginning -u -q -J | \ jq -c '.payload' {"Note":{"to":"Tove","from":"Jani","heading":"Reminder 01","body":"Don't forget me this weekend!"}} {"Note":{"to":"Jani","from":"Tove","heading":"Reminder 02","body":"Of course I won't!"}} … To understand more about the concepts around getting XML data into Kafka , and I’ve written about the specifics of Kafka Connect and the XML transformation see here here Streaming the data from Kafka to MongoDB We can then add another Kafka Connect connector to the pipeline, using the , which will stream data straight from a Kafka topic into MongoDB: official plugin for Kafka Connect from MongoDB curl -i -X PUT -H \ http: -d "Content-Type:application/json" //localhost:8083/connectors/sink-mongodb-note-01/config \ '{ "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics":"ibmmq-note-01", "connection.uri":"mongodb://mongodb:27017", "database":"rmoff", "collection":"notes", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url":"http://schema-registry:8081" }' Check out the data in MongoDB: docker exec --interactive mongodb mongo localhost:27017 <<EOF use rmoff db.notes.find() EOF MongoDB shell version v4 connecting to: mongodb: Implicit session: session { : UUID( ) } MongoDB server version: switched to db rmoff { : ObjectId( ), : , : , : , : } { : ObjectId( ), : , : , : , : } { : ObjectId( ), : , : , : , : } { : ObjectId( ), : , : , : , : } bye .4 .1 //localhost:27017/test?compressors=disabled&gssapiServiceName=mongodb "id" "9aae83c4-0e25-43a9-aca5-7278d366423b" 4.4 .1 "_id" "5f77b64eee00df1cc80135a1" "to" "Tove" "from" "Jani" "heading" "Reminder 01" "body" "Don't forget me this weekend!" "_id" "5f77b64eee00df1cc80135a2" "to" "Jani" "from" "Tove" "heading" "Reminder 02" "body" "Of course I won't!" "_id" "5f77b64eee00df1cc80135a3" "to" "Tove" "from" "Jani" "heading" "Reminder 03" "body" "Where are you?" "_id" "5f77b64eee00df1cc80135a4" "to" "Jani" "from" "Tove" "heading" "Reminder 04" "body" "I forgot ð¤·ââï¸" Let’s check that this is actually streaming, by sending another record to the MQ: echo "<note> <to>Tove</to> <from>Jani</from> <heading>Reminder 05</heading> <body>Srsly?</body> </note>" | \ docker exec --interactive ibmmq /opt/mqm/samp/bin/amqsput DEV.QUEUE.1 QM1 Sample AMQSPUT0 start target queue is DEV.QUEUE.1 Sample AMQSPUT0 end And, behold, the new record in MongoDB: docker exec --interactive mongodb mongo localhost: <<EOF use rmoff db.notes.find() EOF 27017 MongoDB shell version v4 connecting to: mongodb: Implicit session: session { : UUID( ) } MongoDB server version: switched to db rmoff { : ObjectId( ), : , : , : , : } { : ObjectId( ), : , : , : , : } { : ObjectId( ), : , : , : , : } { : ObjectId( ), : , : , : , : } { : ObjectId( ), : , : , : , : } bye .4 .1 //localhost:27017/test?compressors=disabled&gssapiServiceName=mongodb "id" "2641e93e-9c5d-4270-8f64-e52295a60309" 4.4 .1 "_id" "5f77b64eee00df1cc80135a1" "to" "Tove" "from" "Jani" "heading" "Reminder 01" "body" "Don't forget me this weekend!" "_id" "5f77b64eee00df1cc80135a2" "to" "Jani" "from" "Tove" "heading" "Reminder 02" "body" "Of course I won't!" "_id" "5f77b64eee00df1cc80135a3" "to" "Tove" "from" "Jani" "heading" "Reminder 03" "body" "Where are you?" "_id" "5f77b64eee00df1cc80135a4" "to" "Jani" "from" "Tove" "heading" "Reminder 04" "body" "I forgot ð¤·ââï¸" "_id" "5f77b77cee00df1cc80135a6" "to" "Tove" "from" "Jani" "heading" "Reminder 05" "body" "Srsly?" What if my data’s not in XML? What if we want other fields from the payload? In the example above we’re taking data from the source system (IBM MQ) and Kafka Connect is applying a schema to the field called text within it (the XML transformation does this, based on the supplied XSD). When it’s written to Kafka it’s serialised using the selected converter which since it’s Avro stores the schema in the Schema Registry. This is a Good Way of doing things, since we retain the schema for use by any consumer. We could use Protobuf or JSON Schema here too if we wanted. If this doesn’t all make sense to you then check out . Schemas, Schmeeeemas / Why not just JSON? But the full payload that comes through from IBM MQ looks like this: messageID=ID: d5120514d3120202020202020202060e67a5f06352924 messageType=text timestamp= deliveryMode= redelivered= expiration= priority= properties={JMS_IBM_Format=Struct{propertyType=string,string=MQSTR }, JMS_IBM_PutDate=Struct{propertyType=string,string= }, JMS_IBM_Character_Set=Struct{propertyType=string,string=ISO }, JMSXDeliveryCount=Struct{propertyType=integer,integer= }, JMS_IBM_MsgType=Struct{propertyType=integer,integer= }, JMSXUserID=Struct{propertyType=string,string=mqm }, JMS_IBM_Encoding=Struct{propertyType=integer,integer= }, JMS_IBM_PutTime=Struct{propertyType=string,string= }, JMSXAppID=Struct{propertyType=string,string=amqsput }, JMS_IBM_PutApplType=Struct{propertyType=integer,integer= }} text= <to>Jani</to> <from>Tove</from> <heading>Reminder 02</heading> <body>Of course I won't!</body> 414 1601893142430 1 false 0 0 20201005 -8859 -1 1 8 546 10190243 6 < > note </ > note If we want to retain some or all of these fields, we’re going to have to approach things a different way. As things stand, there is no Single Message Transform that I’m aware of that can take the non-XML fields the XML field and wrangle them into a single structured schema (which is the ideal outcome, or perhaps putting the non-XML fields into the Kafka message header). By default, the will write the full payload to a . This means that you still use a schema-supporting serialisation method, but the text payload field remains unparsed. both and IBM MQ Source Connector schema Here’s an example: curl -i -X PUT -H \ http: -d "Content-Type:application/json" //localhost:8083/connectors/source-ibmmq-note-03/config \ '{ "connector.class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector", "kafka.topic":"ibmmq-note-03", "mq.hostname":"ibmmq", "mq.port":"1414", "mq.queue.manager":"QM1", "mq.transport.type":"client", "mq.channel":"DEV.APP.SVRCONN", "mq.username":"app", "mq.password":"password123", "jms.destination.name":"DEV.QUEUE.1", "jms.destination.type":"queue", "confluent.license":"", "confluent.topic.bootstrap.servers":"broker:29092", "confluent.topic.replication.factor":"1", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url":"http://schema-registry:8081" }' Now the full IBM MQ message is written to a Kafka topic, serialised with a schema. We can deserialise it with something like kafkacat: kafkacat \ -b broker:29092 \ -r http://schema-registry:8081 \ -s key=s -s value=avro \ -t ibmmq-note-03 \ -C -c1 -o beginning -u -q -J | \ jq '.' { : , : , : , : , : , : , : , : { : , : , : , : , : { : { : , : , : , : , : , : , : , : , : { : } }, … : , : { : } } } "topic" "ibmmq-note-03" "partition" 0 "offset" 0 "tstype" "create" "ts" 1601894073400 "broker" 1 "key" "Struct{messageID=ID:414d5120514d3120202020202020202060e67a5f033a2924}" "payload" "messageID" "ID:414d5120514d3120202020202020202060e67a5f033a2924" "messageType" "text" "timestamp" 1601894073400 "deliveryMode" 1 "properties" "JMS_IBM_Format" "propertyType" "string" "boolean" null "byte" null "short" null "integer" null "long" null "float" null "double" null "string" "string" "MQSTR " "map" null "text" "string" "<note> <to>Tove</to> <from>Jani</from> <heading>Reminder 01</heading> <body>Don't forget me this weekend!</body> </note>" Observe that the text field is just a string, holding [what happens to be] XML. You can use ksqlDB to work with the data, to an extent - although there’s currently no : support for handing the XML [ ]-> JMSXAppID, [ ]-> JMS_IBM_PutTime, [ ]-> JMSXDeliveryCount, [ ]-> JMSXUserID, IBMMQ_SOURCE EMIT CHANGES; SELECT "PROPERTIES" 'JMSXAppID' STRING as "PROPERTIES" 'JMS_IBM_PutTime' STRING as "PROPERTIES" 'JMSXDeliveryCount' INTEGER as "PROPERTIES" 'JMSXUserID' STRING as text FROM + |JMSXAPPID |JMS_IBM_PUTTIME |JMSXDELIVERYCOUNT |JMSXUSERID |TEXT | + |amqsput |10302905 |1 |mqm |<note> <to>Jani</to> <from>Tove</fro| | | | | |m> <heading>Reminder 02</heading> <b| | | | | |ody>Of course I won't!</body> </note| | | | | |> | |amqsput |10302905 |1 |mqm |<note> <to>Tove</to> <from>Jani</fro| | | | | |m> <heading>Reminder 03</heading> <b| | | | | |ody>Where are you?</body> </note> | -----------+-----------------+-------------------+------------+------------------------------------+ -----------+-----------------+-------------------+------------+------------------------------------+ 👾 Try it out! You can find the code to run this for yourself using Docker Compose on . GitHub Also published on: https://rmoff.net/2020/10/05/streaming-xml-messages-from-ibm-mq-into-kafka-into-mongodb/