SeaTunnel is an easy-to-use, high-performance distributed data integration platform supporting massive real-time data synchronization. It features stable and efficient processing capabilities, enables the synchronization of hundreds of billions of records daily, and has been widely used in production environments by over 3,000 enterprises in China. Databend is a cloud-native compute-storage separated data platform with elasticity and high concurrency features, suitable for modern data processing demands. This article will focus on analyzing the MySQL-CDC plugin in SeaTunnel and the data format output by its Sink, and further explore the feasibility and implementation path of integrating SeaTunnel with Databend in practical scenarios. <!--truncate--> SeaTunnel as a whole is a standard data synchronization tool: SeaTunnel and MySQL-CDC SeaTunnel’s MySQL CDC connector allows reading snapshot data and incremental data from MySQL databases. Depending on the sink side, we observe whether the data output by MySQL-CDC can be directly used by Databend. From testing, the MySQL synchronization component used by SeaTunnel appears to be debezium-mysql-connector (the same component used by Kafka Connect). Sure! Here is the line-by-line English translation of the text you provided: source: MySQL-CDC sink: console The task is to synchronize the wubx.t01 table from MySQL using SeaTunnel. Configuration file v2.mysql.streaming.conf wubx.t01 v2.mysql.streaming.conf # v2.mysql.streaming.conf env{ parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 2000 } source { MySQL-CDC { base-url="jdbc:mysql://192.168.1.100:3306/wubx" username="wubx" password="wubxwubx" table-names=["wubx.t01"] startup.mode="initial" } } sink { Console { } } # v2.mysql.streaming.conf env{ parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 2000 } source { MySQL-CDC { base-url="jdbc:mysql://192.168.1.100:3306/wubx" username="wubx" password="wubxwubx" table-names=["wubx.t01"] startup.mode="initial" } } sink { Console { } } Start SeaTunnel ./bin/seatunnel.sh --config ./config/v2.mysql.streaming.conf -m local ./bin/seatunnel.sh --config ./config/v2.mysql.streaming.conf -m local Observe the logs on the terminal. Observed full synchronization SELECT * FROM `wubx`.`t01` SELECT * FROM `wubx`.`t01` The retrieved data is as follows: 2025-05-07 14:28:21,914 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=INSERT : 1, databend 2025-05-07 14:28:21,914 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=INSERT : 3, MySQL 2025-05-07 14:28:21,914 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=INSERT : 4, Setunnel01 2025-05-07 14:28:21,914 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=INSERT : 1, databend 2025-05-07 14:28:21,914 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=INSERT : 3, MySQL 2025-05-07 14:28:21,914 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=INSERT : 4, Setunnel01 Full synchronization completed. Insert on source side insert into t01 values(5,'SeaTunnel'); SeaTunnel can directly capture incremental data, with the corresponding action kind=INSERT. insert into t01 values(5,'SeaTunnel'); 2025-05-07 14:35:48,520 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=INSERT : 5, SeaTunnel 2025-05-07 14:35:48,520 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=INSERT : 5, SeaTunnel Update on source side update t01 set c1='MySQL-CDC' where id=5; update t01 set c1='MySQL-CDC' where id=5; 2025-05-07 14:36:47,455 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=UPDATE_BEFORE : 5, SeaTunnel 2025-05-07 14:36:47,455 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=UPDATE_AFTER : 5, MySQL-CDC 2025-05-07 14:36:47,455 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=UPDATE_BEFORE : 5, SeaTunnel 2025-05-07 14:36:47,455 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=UPDATE_AFTER : 5, MySQL-CDC Delete on source side delete from t01 where id=5; delete from t01 where id=5; 2025-05-07 14:37:33,082 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=DELETE : 5, MySQL-CDC 2025-05-07 14:37:33,082 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=DELETE : 5, MySQL-CDC The log format output by the console is relatively clear, which is very helpful for troubleshooting and subsequent use. source: MySQL-CDC sink: MySQL Based on the above test of MySQL-CDC output to the terminal, it can be confirmed that insert, update, and delete operations can all be correctly captured and processed. Next, we test MySQL-CDC -> MySQL. The corresponding configuration file v2.mysql.streaming.m.conf is as follows: v2.mysql.streaming.m.conf #v2.mysql.streaming.m.conf env{ parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 2000 } source { MySQL-CDC { base-url="jdbc:mysql://192.168.1.100:3306/wubx" username="wubx" password="wubxwubx" table-names=["wubx.t01"] startup.mode="initial" } } sink { jdbc { url = "jdbc:mysql://192.168.1.100:3306/wubx?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "wubx" password = "wubxwubx" generate_sink_sql = true # You need to configure both database and table database = wubx table = s_t01 primary_keys = ["id"] schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode="APPEND_DATA" } } #v2.mysql.streaming.m.conf env{ parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 2000 } source { MySQL-CDC { base-url="jdbc:mysql://192.168.1.100:3306/wubx" username="wubx" password="wubxwubx" table-names=["wubx.t01"] startup.mode="initial" } } sink { jdbc { url = "jdbc:mysql://192.168.1.100:3306/wubx?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "wubx" password = "wubxwubx" generate_sink_sql = true # You need to configure both database and table database = wubx table = s_t01 primary_keys = ["id"] schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode="APPEND_DATA" } } Here is the line-by-line English translation of the content you provided: Start SeaTunnel ./bin/seatunnel.sh --config ./config/v2.mysql.streaming.m.conf -m local ./bin/seatunnel.sh --config ./config/v2.mysql.streaming.m.conf -m local Observe the logs on the terminal. Sync process analysis Full synchronization statements: 2025-05-07 14:56:01,024 INFO [e.IncrementalSourceScanFetcher] [debezium-snapshot-reader-0] - Start snapshot read task for snapshot split: SnapshotSplit(tableId=wubx.t01, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null) exactly-once: false 2025-05-07 14:56:01,026 INFO [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Snapshot step 1 - Determining low watermark {ts_sec=0, file=mysql-bin.000058, pos=7737, gtids=12b437c2-ba62-11ec-a554-b4b5b694bca5:1-2215900, row=0, event=0} for split SnapshotSplit(tableId=wubx.t01, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null) 2025-05-07 14:56:01,028 INFO [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Snapshot step 2 - Snapshotting data 2025-05-07 14:56:01,028 INFO [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Exporting data from split 'wubx.t01:0' of table wubx.t01 2025-05-07 14:56:01,028 INFO [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - For split 'wubx.t01:0' of table wubx.t01 using select statement: 'SELECT * FROM `wubx`.`t01`' 2025-05-07 14:56:01,032 INFO [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Finished exporting 3 records for split 'wubx.t01:0', total duration '00:00:00.004' 2025-05-07 14:56:01,033 INFO [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Snapshot step 3 - Determining high watermark {ts_sec=0, file=mysql-bin.000058, pos=7737, gtids=12b437c2-ba62-11ec-a554-b4b5b694bca5:1-2215900, row=0, event=0} for split SnapshotSplit(tableId=wubx.t01, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null) 2025-05-07 14:56:01,519 INFO [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=972391330309210113, pipelineId=1, taskGroupId=2}] - Finished reading from splits [wubx.t01:0] 2025-05-07 14:56:01,024 INFO [e.IncrementalSourceScanFetcher] [debezium-snapshot-reader-0] - Start snapshot read task for snapshot split: SnapshotSplit(tableId=wubx.t01, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null) exactly-once: false 2025-05-07 14:56:01,026 INFO [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Snapshot step 1 - Determining low watermark {ts_sec=0, file=mysql-bin.000058, pos=7737, gtids=12b437c2-ba62-11ec-a554-b4b5b694bca5:1-2215900, row=0, event=0} for split SnapshotSplit(tableId=wubx.t01, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null) 2025-05-07 14:56:01,028 INFO [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Snapshot step 2 - Snapshotting data 2025-05-07 14:56:01,028 INFO [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Exporting data from split 'wubx.t01:0' of table wubx.t01 2025-05-07 14:56:01,028 INFO [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - For split 'wubx.t01:0' of table wubx.t01 using select statement: 'SELECT * FROM `wubx`.`t01`' 2025-05-07 14:56:01,032 INFO [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Finished exporting 3 records for split 'wubx.t01:0', total duration '00:00:00.004' 2025-05-07 14:56:01,033 INFO [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Snapshot step 3 - Determining high watermark {ts_sec=0, file=mysql-bin.000058, pos=7737, gtids=12b437c2-ba62-11ec-a554-b4b5b694bca5:1-2215900, row=0, event=0} for split SnapshotSplit(tableId=wubx.t01, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null) 2025-05-07 14:56:01,519 INFO [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=972391330309210113, pipelineId=1, taskGroupId=2}] - Finished reading from splits [wubx.t01:0] Sink side prepared SQL statements for writing data: 2025-05-07 14:56:01,708 INFO [.e.FieldNamedPreparedStatement] [st-multi-table-sink-writer-1] - PrepareStatement sql is: INSERT INTO `wubx`.`s_t01` (`id`, `c1`) VALUES (?, ?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`), `c1`=VALUES(`c1`) 2025-05-07 14:56:01,709 INFO [.e.FieldNamedPreparedStatement] [st-multi-table-sink-writer-1] - PrepareStatement sql is: DELETE FROM `wubx`.`s_t01` WHERE `id` = ? 2025-05-07 14:56:01,708 INFO [.e.FieldNamedPreparedStatement] [st-multi-table-sink-writer-1] - PrepareStatement sql is: INSERT INTO `wubx`.`s_t01` (`id`, `c1`) VALUES (?, ?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`), `c1`=VALUES(`c1`) 2025-05-07 14:56:01,709 INFO [.e.FieldNamedPreparedStatement] [st-multi-table-sink-writer-1] - PrepareStatement sql is: DELETE FROM `wubx`.`s_t01` WHERE `id` = ? From the above statements, it can be seen that the corresponding binlog events can be directly handled as follows: insert and update can be handled directly by the statement: INSERT INTO wubx.s_t01 (id, c1) VALUES (?, ?) ON DUPLICATE KEY UPDATE id=VALUES(id), c1=VALUES(c1) delete can be handled by the statement: DELETE FROM wubx.s_t01 WHERE id = ? insert and update can be handled directly by the statement: INSERT INTO wubx.s_t01 (id, c1) VALUES (?, ?) ON DUPLICATE KEY UPDATE id=VALUES(id), c1=VALUES(c1) INSERT INTO wubx.s_t01 (id, c1) VALUES (?, ?) ON DUPLICATE KEY UPDATE id=VALUES(id), c1=VALUES(c1) delete can be handled by the statement: DELETE FROM wubx.s_t01 WHERE id = ? DELETE FROM wubx.s_t01 WHERE id = ? Summary SeaTunnel MySQL-CDC is relatively stable. The underlying data reading uses Debezium, which is a very mature and reliable tool. source: MySQL-CDC sink: s3 format json This section focuses on the data synchronization foundation in cloud environments, especially how to complete data synchronization at the lowest cost. When synchronizing data in the cloud, it is necessary to consider how to complete the task at minimal cost. In overseas projects, developers prefer to use kafka-connect, usually sinking data to S3 first, then processing files in S3 in batch to finally obtain a complete dataset. Use the configuration file v2.mysql.streaming.s3.conf directly: v2.mysql.streaming.s3.conf env{ parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 2000 } source { MySQL-CDC { base-url="jdbc:mysql://192.168.1.100:3306/wubx" username="wubx" password="wubxwubx" table-names=["wubx.t01"] startup.mode="initial" } } sink { S3File { bucket = "s3a://mystage" tmp_path = "/tmp/SeaTunnel/${table_name}" path="/mysql/${table_name}" fs.s3a.endpoint="http://192.168.1.100:9900" fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" access_key = "minioadmin" secret_key = "minioadmin" file_format_type="json" schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode="APPEND_DATA" } } env{ parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 2000 } source { MySQL-CDC { base-url="jdbc:mysql://192.168.1.100:3306/wubx" username="wubx" password="wubxwubx" table-names=["wubx.t01"] startup.mode="initial" } } sink { S3File { bucket = "s3a://mystage" tmp_path = "/tmp/SeaTunnel/${table_name}" path="/mysql/${table_name}" fs.s3a.endpoint="http://192.168.1.100:9900" fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" access_key = "minioadmin" secret_key = "minioadmin" file_format_type="json" schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode="APPEND_DATA" } } First, sink using json format. Start SeaTunnel ./bin/seatunnel.sh --config ./config/v2.mysql.streaming.s3.conf -m local ./bin/seatunnel.sh --config ./config/v2.mysql.streaming.s3.conf -m local Observe the logs on the terminal. Found full synchronization 2025-05-07 15:14:41,430 INFO [.c.s.f.h.HadoopFileSystemProxy] [hz.main.generic-operation.thread-42] - rename file :[/tmp/SeaTunnel/t01/SeaTunnel/972396021571125249/c679929b12/T_972396021571125249_c679929b12_0_1/NON_PARTITION/T_972396021571125249_c679929b12_0_1_0.json] to [/mysql/t01/T_972396021571125249_c679929b12_0_1_0.json] finish 2025-05-07 15:14:41,430 INFO [.c.s.f.h.HadoopFileSystemProxy] [hz.main.generic-operation.thread-42] - rename file :[/tmp/SeaTunnel/t01/SeaTunnel/972396021571125249/c679929b12/T_972396021571125249_c679929b12_0_1/NON_PARTITION/T_972396021571125249_c679929b12_0_1_0.json] to [/mysql/t01/T_972396021571125249_c679929b12_0_1_0.json] finish Content of /mysql/t01/T_972396021571125249_c679929b12_0_1_0.json: /mysql/t01/T_972396021571125249_c679929b12_0_1_0.json {"id":1,"c1":"databend"} {"id":3,"c1":"MySQL"} {"id":4,"c1":"Setunnel01"} {"id":5,"c1":"SeaTunnel"} {"id":1,"c1":"databend"} {"id":3,"c1":"MySQL"} {"id":4,"c1":"Setunnel01"} {"id":5,"c1":"SeaTunnel"} Seeing this is somewhat disappointing; it seems to lack the kind and timestamp fields. Insert on source side Next, insert into t01 values(6,'SeaTunnel01'); insert into t01 values(6,'SeaTunnel01'); 2025-05-07 15:18:59,380 INFO [.c.s.f.h.HadoopFileSystemProxy] [hz.main.generic-operation.thread-16] - rename file :[/tmp/SeaTunnel/t01/SeaTunnel/972396021571125249/c679929b12/T_972396021571125249_c679929b12_0_130/NON_PARTITION/T_972396021571125249_c679929b12_0_130_0.json] to [/mysql/t01/T_972396021571125249_c679929b12_0_130_0.json] finish 2025-05-07 15:18:59,380 INFO [.c.s.f.h.HadoopFileSystemProxy] [hz.main.generic-operation.thread-16] - rename file :[/tmp/SeaTunnel/t01/SeaTunnel/972396021571125249/c679929b12/T_972396021571125249_c679929b12_0_130/NON_PARTITION/T_972396021571125249_c679929b12_0_130_0.json] to [/mysql/t01/T_972396021571125249_c679929b12_0_130_0.json] finish Content of T_972396021571125249_c679929b12_0_130_0.json: T_972396021571125249_c679929b12_0_130_0.json {"id":6,"c1":"SeaTunnel01"} {"id":6,"c1":"SeaTunnel01"} source side update Execute statement: update t01 set c1='MySQL-CDC' where id=5; update t01 set c1='MySQL-CDC' where id=5; Log info: 2025-05-07 15:20:15,386 INFO [.c.s.f.h.HadoopFileSystemProxy] [hz.main.generic-operation.thread-9] - rename file :[/tmp/SeaTunnel/t01/SeaTunnel/972396021571125249/c679929b12/T_972396021571125249_c679929b12_0_168/NON_PARTITION/T_972396021571125249_c679929b12_0_168_0.json] to [/mysql/t01/T_972396021571125249_c679929b12_0_168_0.json] finish 2025-05-07 15:20:15,386 INFO [.c.s.f.h.HadoopFileSystemProxy] [hz.main.generic-operation.thread-9] - rename file :[/tmp/SeaTunnel/t01/SeaTunnel/972396021571125249/c679929b12/T_972396021571125249_c679929b12_0_168/NON_PARTITION/T_972396021571125249_c679929b12_0_168_0.json] to [/mysql/t01/T_972396021571125249_c679929b12_0_168_0.json] finish The corresponding JSON file content: {"id":5,"c1":"SeaTunnel"} {"id":5,"c1":"MySQL-CDC"} {"id":5,"c1":"SeaTunnel"} {"id":5,"c1":"MySQL-CDC"} One update operation recorded two data lines in the json file, but due to the lack of operation type (kind) and timestamp fields, it is difficult to accurately restore the data change process. If a timestamp field were included, the latest record could be kept. kind source side delete Execute statement: delete from t01 where id=5; delete from t01 where id=5; Log info: 2025-05-07 15:22:53,392 INFO [.c.s.f.h.HadoopFileSystemProxy] [hz.main.generic-operation.thread-6] - rename file :[/tmp/SeaTunnel/t01/SeaTunnel/972396021571125249/c679929b12/T_972396021571125249_c679929b12_0_247/NON_PARTITION/T_972396021571125249_c679929b12_0_247_0.json] to [/mysql/t01/T_972396021571125249_c679929b12_0_247_0.json] finish 2025-05-07 15:22:53,392 INFO [.c.s.f.h.HadoopFileSystemProxy] [hz.main.generic-operation.thread-6] - rename file :[/tmp/SeaTunnel/t01/SeaTunnel/972396021571125249/c679929b12/T_972396021571125249_c679929b12_0_247/NON_PARTITION/T_972396021571125249_c679929b12_0_247_0.json] to [/mysql/t01/T_972396021571125249_c679929b12_0_247_0.json] finish Corresponding JSON file content: {"id":5,"c1":"MySQL-CDC"} {"id":5,"c1":"MySQL-CDC"} The delete operation also lacks an operation type (kind) and only records one original data line, making it difficult for subsequent data processing and tracing. kind Summary Therefore, using SeaTunnel’s S3File sink with JSON format for data tracing is currently not feasible. It is recommended that the S3File sink adds support for maxwell_json and debezium_json formats. maxwell_json debezium_json https://github.com/apache/SeaTunnel/issues/9278 https://github.com/apache/SeaTunnel/issues/9278 https://github.com/apache/SeaTunnel/issues/9278 Looking forward to this feature enhancement, so that SeaTunnel can sync all data to S3, allowing S3 to play the role of a message queue. source: MySQL-CDC sink: Kafka The open source world is very interesting; if a feature is not achievable, there is always an alternative. Since MySQL-CDC is based on Debezium underneath, it should support the Debezium format. https://SeaTunnel.apache.org/docs/2.3.10/connector-v2/formats/debezium-json Also supports https://SeaTunnel.apache.org/docs/2.3.10/connector-v2/formats/maxwell-json https://SeaTunnel.apache.org/docs/2.3.10/connector-v2/formats/debezium-json Also supports https://SeaTunnel.apache.org/docs/2.3.10/connector-v2/formats/maxwell-json https://SeaTunnel.apache.org/docs/2.3.10/connector-v2/formats/debezium-json https://SeaTunnel.apache.org/docs/2.3.10/connector-v2/formats/maxwell-json This means SeaTunnel, to maintain compatibility with Debezium and Maxwell, supports choosing these two formats when sinking to Kafka. debezium-json { "before": { "id": 111, "name": "scooter", "description": "Big 2-wheel scooter ", "weight": 5.18 }, "after": { "id": 111, "name": "scooter", "description": "Big 2-wheel scooter ", "weight": 5.17 }, "source": { "version": "1.1.1.Final", "connector": "mysql", "name": "dbserver1", "ts_ms": 1589362330000, "snapshot": "false", "db": "inventory", "table": "products", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 2090, "row": 0, "thread": 2, "query": null }, "op": "u", "ts_ms": 1589362330904, "transaction": null } { "before": { "id": 111, "name": "scooter", "description": "Big 2-wheel scooter ", "weight": 5.18 }, "after": { "id": 111, "name": "scooter", "description": "Big 2-wheel scooter ", "weight": 5.17 }, "source": { "version": "1.1.1.Final", "connector": "mysql", "name": "dbserver1", "ts_ms": 1589362330000, "snapshot": "false", "db": "inventory", "table": "products", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 2090, "row": 0, "thread": 2, "query": null }, "op": "u", "ts_ms": 1589362330904, "transaction": null } The above format’s data can be easily processed in Databend or Snowflake. You can use the fields "op": "u", "ts_ms": 1589362330904, "op": "u", "ts_ms": 1589362330904, to merge the data into the target table via merge into + stream method. merge into + stream maxwell-json { "database":"test", "table":"product", "type":"insert", "ts":1596684904, "xid":7201, "commit":true, "data":{ "id":111, "name":"scooter", "description":"Big 2-wheel scooter ", "weight":5.18 }, "primary_key_columns":[ "id" ] } { "database":"test", "table":"product", "type":"insert", "ts":1596684904, "xid":7201, "commit":true, "data":{ "id":111, "name":"scooter", "description":"Big 2-wheel scooter ", "weight":5.18 }, "primary_key_columns":[ "id" ] } This JSON body contains type, ts, and primary key fields, making it very convenient to use SQL for ELT processing later. type ts Summary In other words, if you want to output this kind of standard CDC format logs using SeaTunnel, you need to introduce Kafka-like architecture: After talking with community members, it turns out some people do this—syncing messages from Kafka to OSS. Example of integrating maxwell-json message body with Databend Create an update table for recording binlog message details: Create an update table for recording binlog message details: create table t01_update( database varchar, table varchar, type varchar, ts bigint, xid bigint, commit boolean, data variant, primary_key_columns array(varchar) ); create table t01_update( database varchar, table varchar, type varchar, ts bigint, xid bigint, commit boolean, data variant, primary_key_columns array(varchar) ); This table’s data source can be obtained from S3, and loaded near real-time into t01_update using copy into. t01_update copy into Create a target table: Create a target table: create table t01( id int, name varchar, description varchar, weight double ); create table t01( id int, name varchar, description varchar, weight double ); Create a stream on t01_update table to capture its increments: Create a stream on t01_update table to capture its increments: t01_update create stream stream_t01_update on table t01_update; create stream stream_t01_update on table t01_update; Merge the data into the target table in Databend: Merge the data into the target table in Databend: MERGE INTO t01 AS a USING ( SELECT data:id AS id, data:name AS name, data:description AS description, data:weight AS weight, ts, type FROM stream_t01_update QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY ts DESC) = 1 ) AS b ON a.id = b.id WHEN MATCHED AND b.type = 'update' THEN UPDATE SET a.name = b.name, a.description = b.description, a.weight = b.weight WHEN MATCHED AND b.type = 'delete' THEN DELETE WHEN NOT MATCHED THEN INSERT (id, name, description, weight) VALUES (b.id, b.name, b.description, b.weight); MERGE INTO t01 AS a USING ( SELECT data:id AS id, data:name AS name, data:description AS description, data:weight AS weight, ts, type FROM stream_t01_update QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY ts DESC) = 1 ) AS b ON a.id = b.id WHEN MATCHED AND b.type = 'update' THEN UPDATE SET a.name = b.name, a.description = b.description, a.weight = b.weight WHEN MATCHED AND b.type = 'delete' THEN DELETE WHEN NOT MATCHED THEN INSERT (id, name, description, weight) VALUES (b.id, b.name, b.description, b.weight); This SQL uses window deduplication to merge raw binlog data into the target table. SeaTunnel and Databend integration approaches Based on analyzing MySQL-CDC output forms, there are three ways to integrate SeaTunnel with Databend: First approach: directly develop a SeaTunnel connector for Databend supporting both sink and source. This is simpler to implement. Second approach: add support for debezium-json and maxwell-json formats in S3File sink, which is more elegant. Incremental data can then be based on Databend Stream for easy external data source access. Third approach: introduce Kafka as SeaTunnel sink to directly use debezium-json and maxwell-json messages, enabling downstream systems to subscribe increment data with data governance. First approach: directly develop a SeaTunnel connector for Databend supporting both sink and source. This is simpler to implement. Second approach: add support for debezium-json and maxwell-json formats in S3File sink, which is more elegant. Incremental data can then be based on Databend Stream for easy external data source access. debezium-json maxwell-json Third approach: introduce Kafka as SeaTunnel sink to directly use debezium-json and maxwell-json messages, enabling downstream systems to subscribe increment data with data governance. debezium-json maxwell-json By testing SeaTunnel’s multiple output formats and behaviors, we preliminarily understand SeaTunnel MySQL-CDC capabilities, preparing for integration with Databend. Combined with Spark, Flink ecosystems, SeaTunnel can already handle large CDC tasks. If you have related practices, free free to share them in the comments!