In this guide, we will walk you through a lightweight data migration and synchronization solution from MySQL to OceanBase using Apache SeaTunnel (SeaTunnel for short). We will leverage its built-inZeta engine, which supportsfull data synchronization, offline incremental synchronization, and Change Data Capture (CDC) solutions. Apache SeaTunnel Zeta engine full data synchronization, offline incremental synchronization, and Change Data Capture (CDC) solutions Preparing the Runtime Environment Before we begin, ensure that your environment is ready. Install Java SeaTunnel requiresJava 8 or higher. While Java 8 is recommended, later versions should work as well. Java 8 or higher After installation, verify that Java is correctly configured by running: root:~\# java -version openjdk version "17.0.12" 2024-07-16 OpenJDK Runtime Environment (build 17.0.12+7-Debian-2deb11u1) OpenJDK 64-Bit Server VM (build 17.0.12+7-Debian-2deb11u1, mixed mode, sharing) root:~\# java -version openjdk version "17.0.12" 2024-07-16 OpenJDK Runtime Environment (build 17.0.12+7-Debian-2deb11u1) OpenJDK 64-Bit Server VM (build 17.0.12+7-Debian-2deb11u1, mixed mode, sharing) Make sure thatJAVA_HOMEis properly set. JAVA_HOME Download and Install Apache SeaTunnel Visit theofficial SeaTunnel websiteto download the latest version. official SeaTunnel website For this guide, we will useversion 2.3.9: version 2.3.9 \# Download wget https://dlcdn.apache.org/seatunnel/2.3.9/apache-seatunnel-2.3.9-bin.tar.gz \# Extract tar -zxvf apache-seatunnel-2.3.9-bin.tar.gz \# Download wget https://dlcdn.apache.org/seatunnel/2.3.9/apache-seatunnel-2.3.9-bin.tar.gz \# Extract tar -zxvf apache-seatunnel-2.3.9-bin.tar.gz Installing Connector Plugins SeaTunnel’s installation package only contains the core framework and theZeta engine. To connect with various data sources, you need tomanually download and configurethe required plugins. Zeta engine manually download and configure Automatic Plugin Installation To automatically download the necessary connectors, modify theconfig/plugin_configfile and specify the required connectors. By default, the file includes all connectors, but for this guide, we only include the essential ones: config/plugin_config connector-cdc-mysql connector-jdbc connector-fake connector-console connector-cdc-mysql connector-jdbc connector-fake connector-console Run the following command to install the plugins: sh bin/install-plugin.sh 2.3.9 sh bin/install-plugin.sh 2.3.9 Manual Plugin Installation Alternatively, you can manually download the required plugins fromApache Maven Repository. Apache Maven Repository Download the necessary.jarfiles, for example: .jar connector-cdc-mysql-2.3.9.jar connector-console-2.3.9.jar connector-fake-2.3.9.jar connector-jdbc-2.3.9.jar seatunnel-transforms-v2-2.3.9.jar connector-cdc-mysql-2.3.9.jar connector-console-2.3.9.jar connector-fake-2.3.9.jar connector-jdbc-2.3.9.jar seatunnel-transforms-v2-2.3.9.jar After downloading,move the files into the**Connectors**directory. move the files into the **Connectors** directory Verifying Connector Installation To check if the connectors are installed correctly, run: ./bin/seatunnel-connector.sh -l Source FakeSource MySQL-CDC Jdbc Sink Jdbc Console Transform Copy DynamicCompile FieldMapper Filter FilterRowKind JsonPath LLM Replace Split Sql ./bin/seatunnel-connector.sh -l Source FakeSource MySQL-CDC Jdbc Sink Jdbc Console Transform Copy DynamicCompile FieldMapper Filter FilterRowKind JsonPath LLM Replace Split Sql Since we will be usingJDBC for MySQL connectionto interact withOceanBase, you also need to download theMySQL JDBC driverfrom theofficial MySQL website. JDBC for MySQL connection OceanBase MySQL JDBC driver official MySQL website Once downloaded, place themysql-connector-j-9.0.0.jarfile into {seatunnel/lib} mysql-connector-j-9.0.0.jar Verifying SeaTunnel Installation To confirm that SeaTunnel is installed correctly, execute abatch processing testusing the default configuration template: batch processing test ./bin/seatunnel.sh --config ./config/v2.batch.config.template -m local ./bin/seatunnel.sh --config ./config/v2.batch.config.template -m local Command Explanation seatunnel.sh→ Standard SeaTunnel startup script config→ Specifies the configuration script -m local→ Runs in local mode seatunnel.sh→ Standard SeaTunnel startup script seatunnel.sh config→ Specifies the configuration script config -m local→ Runs in local mode -m local If everything is working correctly, you should see output similar to this: 2022-12-19 11:01:45,417 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - output rowType: name<STRING>, age<INT> 2022-12-19 11:01:46,489 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CpiOd, 8520946 2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: eQqTs, 1256802974 2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: UsRgO, 2053193072 2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jDQJj, 1993016602 2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: rqdKp, 1392682764 2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: wCoWN, 986999925 2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: qomTU, 72775247 2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=8: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jcqXR, 1074529204 2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=9: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: AkWIO, 1961723427 2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=10: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: hBoib, 929089763 2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=11: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: GSvzm, 827085798 2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=12: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: NNAYI, 94307133 2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=13: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: EexFl, 1823689599 2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=14: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CBXUb, 869582787 2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=15: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: Wbxtm, 1469371353 2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=16: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: mIJDt, 995616438 2022-12-19 11:01:45,417 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - output rowType: name<STRING>, age<INT> 2022-12-19 11:01:46,489 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CpiOd, 8520946 2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: eQqTs, 1256802974 2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: UsRgO, 2053193072 2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jDQJj, 1993016602 2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: rqdKp, 1392682764 2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: wCoWN, 986999925 2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: qomTU, 72775247 2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=8: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jcqXR, 1074529204 2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=9: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: AkWIO, 1961723427 2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=10: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: hBoib, 929089763 2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=11: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: GSvzm, 827085798 2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=12: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: NNAYI, 94307133 2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=13: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: EexFl, 1823689599 2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=14: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CBXUb, 869582787 2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=15: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: Wbxtm, 1469371353 2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=16: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: mIJDt, 995616438 At the end of the job execution, you will see asummary log: summary log **\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_* Job Statistic Information *_******\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*** Start Time : 2024-08-29 22:45:29 End Time : 2024-08-29 22:45:33 Total Time(s) : 4 Total Read Count : 32 Total Write Count : 32 Total Failed Count : 0 **\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*_** **\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_* Job Statistic Information *_******\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*** Start Time : 2024-08-29 22:45:29 End Time : 2024-08-29 22:45:33 Total Time(s) : 4 Total Read Count : 32 Total Write Count : 32 Total Failed Count : 0 **\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*_** This confirms that SeaTunnel is working correctly. Full Data Synchronization Creating the Test Table To verifyfull data synchronization, we create test tables in bothMySQLandOceanBase. full data synchronization MySQL OceanBase Step 1: Creating the MySQL Table CREATE TABLE \`table1\` ( \`id\` INT NOT NULL AUTO_INCREMENT, \`value1\` VARCHAR(255) NOT NULL, \`value2\` VARCHAR(255) , \`value3\` VARCHAR(255) , \`value4\` VARCHAR(255) , \`value5\` VARCHAR(255) , \`created\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP, \`updated\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (\`id\`), UNIQUE INDEX \`idx_value1\` (\`value1\`), INDEX \`idx\_value2\_value3\` (\`value2\`, \`value3\`), INDEX \`idx\_value3\_value4_value5\` (\`value3\`, \`value4\`, \`value5\`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE \`table2\` ( \`id\` INT NOT NULL AUTO_INCREMENT, \`value1\` VARCHAR(255) NOT NULL, \`value2\` VARCHAR(255) , \`value3\` VARCHAR(255) , \`value4\` VARCHAR(255) , \`value5\` VARCHAR(255) , \`created\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP, \`updated\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (\`id\`), UNIQUE INDEX \`idx_value1\` (\`value1\`), INDEX \`idx\_value2\_value3\` (\`value2\`, \`value3\`), INDEX \`idx\_value3\_value4_value5\` (\`value3\`, \`value4\`, \`value5\`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; We used Navicat to create 100,000 records each. # Configuring SeaTunnel for Full Synchronization # Full Data Synchronization Configuration File _Note: We recommend manually migrating the table schema since automatic migration may encounter issues and does not create indexes._ # Single-Table Full Sync env { parallelism = 5 job.mode = "BATCH" } source { Jdbc { url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" connection\_check\_timeout_sec = 100 user = "xxx" password = "xxx" query = "select * from seatunnel.table1" } } sink { jdbc { url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "xxx@xxx" password = "xxx" \# Automatically generate SQL statements generate\_sink\_sql = true database = seatunnel table = table1 } } CREATE TABLE \`table1\` ( \`id\` INT NOT NULL AUTO_INCREMENT, \`value1\` VARCHAR(255) NOT NULL, \`value2\` VARCHAR(255) , \`value3\` VARCHAR(255) , \`value4\` VARCHAR(255) , \`value5\` VARCHAR(255) , \`created\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP, \`updated\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (\`id\`), UNIQUE INDEX \`idx_value1\` (\`value1\`), INDEX \`idx\_value2\_value3\` (\`value2\`, \`value3\`), INDEX \`idx\_value3\_value4_value5\` (\`value3\`, \`value4\`, \`value5\`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE \`table2\` ( \`id\` INT NOT NULL AUTO_INCREMENT, \`value1\` VARCHAR(255) NOT NULL, \`value2\` VARCHAR(255) , \`value3\` VARCHAR(255) , \`value4\` VARCHAR(255) , \`value5\` VARCHAR(255) , \`created\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP, \`updated\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (\`id\`), UNIQUE INDEX \`idx_value1\` (\`value1\`), INDEX \`idx\_value2\_value3\` (\`value2\`, \`value3\`), INDEX \`idx\_value3\_value4_value5\` (\`value3\`, \`value4\`, \`value5\`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; We used Navicat to create 100,000 records each. # Configuring SeaTunnel for Full Synchronization # Full Data Synchronization Configuration File _Note: We recommend manually migrating the table schema since automatic migration may encounter issues and does not create indexes._ # Single-Table Full Sync env { parallelism = 5 job.mode = "BATCH" } source { Jdbc { url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" connection\_check\_timeout_sec = 100 user = "xxx" password = "xxx" query = "select * from seatunnel.table1" } } sink { jdbc { url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "xxx@xxx" password = "xxx" \# Automatically generate SQL statements generate\_sink\_sql = true database = seatunnel table = table1 } } Result: **\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_* Job Statistic Information *_******\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*** Start Time : 2024-08-30 15:05:39 End Time : 2024-08-30 15:05:47 Total Time(s) : 8 Total Read Count : 100000 Total Write Count : 100000 Total Failed Count : 0 **\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*_** **\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_* Job Statistic Information *_******\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*** Start Time : 2024-08-30 15:05:39 End Time : 2024-08-30 15:05:47 Total Time(s) : 8 Total Read Count : 100000 Total Write Count : 100000 Total Failed Count : 0 **\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*_** Multi-Table Full Extraction env { parallelism = 5 job.mode = "BATCH" } source { Jdbc { url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" connection\_check\_timeout_sec = 100 user = "xxx" password = "xxx" table_list = \[ { table_path = "seatunnel.table1" }, { table_path = "seatunnel.table2" query = "select * from seatunnel.table2 where id > 100" } \] } } sink { jdbc { url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "xxx@xxx" password = "xxx" \# Automatically generate SQL statements generate\_sink\_sql = true database = seatunnel table_list = \["seatunnel.table1", "seatunnel.table2"\] } } env { parallelism = 5 job.mode = "BATCH" } source { Jdbc { url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" connection\_check\_timeout_sec = 100 user = "xxx" password = "xxx" table_list = \[ { table_path = "seatunnel.table1" }, { table_path = "seatunnel.table2" query = "select * from seatunnel.table2 where id > 100" } \] } } sink { jdbc { url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "xxx@xxx" password = "xxx" \# Automatically generate SQL statements generate\_sink\_sql = true database = seatunnel table_list = \["seatunnel.table1", "seatunnel.table2"\] } } Result: **\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_* Job Statistic Information *_******\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*** Start Time : 2024-08-30 15:10:09 End Time : 2024-08-30 15:10:20 Total Time(s) : 10 Total Read Count : 200000 Total Write Count : 200000 Total Failed Count : 0 **\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*_** **\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_* Job Statistic Information *_******\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*** Start Time : 2024-08-30 15:10:09 End Time : 2024-08-30 15:10:20 Total Time(s) : 10 Total Read Count : 200000 Total Write Count : 200000 Total Failed Count : 0 **\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*_** Incremental Synchronization Configuration File For incremental sync, a simple approach is to use a query that filters based on anidorupdatetimecolumn. id updatetime env { parallelism = 1 job.mode = "BATCH" } source { Jdbc { url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" connection\_check\_timeout_sec = 100 user = "xxx" password = "xxx" query = "SELECT * FROM seatunnel.table1 WHERE updatetime > '2024-01-01' " } } sink { jdbc { url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "xxx@xxx" password = "xxx" generate\_sink\_sql = true database = seatunnel table = table1 } } env { parallelism = 1 job.mode = "BATCH" } source { Jdbc { url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" connection\_check\_timeout_sec = 100 user = "xxx" password = "xxx" query = "SELECT * FROM seatunnel.table1 WHERE updatetime > '2024-01-01' " } } sink { jdbc { url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "xxx@xxx" password = "xxx" generate\_sink\_sql = true database = seatunnel table = table1 } } _Note:_The sink will perform insert and update operations based on the primary key. However, manually updating the configuration file for each incremental run can be cumbersome. We recommend using Apache DolphinScheduler in conjunction with SeaTunnel to create a workflow. With DolphinScheduler, you can obtain the maximum timestamp oridfrom the sink and pass it as a workflow variable. id Example configuration with a variable: env { parallelism = 1 job.mode = "BATCH" } source { Jdbc { url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" connection\_check\_timeout_sec = 100 user = "xxx" password = "xxx" query = "SELECT * FROM seatunnel.table1 WHERE updatetime > ${max_id} " } } sink { jdbc { url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "xxx@xxx" password = "xxx" generate\_sink\_sql = true database = seatunnel table = table1 } } env { parallelism = 1 job.mode = "BATCH" } source { Jdbc { url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" connection\_check\_timeout_sec = 100 user = "xxx" password = "xxx" query = "SELECT * FROM seatunnel.table1 WHERE updatetime > ${max_id} " } } sink { jdbc { url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "xxx@xxx" password = "xxx" generate\_sink\_sql = true database = seatunnel table = table1 } } The multi-table configuration is similar. CDC Synchronization Configuration File Manual Table Schema Migration Due to issues with SeaTunnel’s OceanBase component, schema migration can be error-prone. It is recommended to migrate the table schema manually. Check MySQL Binlog Status Grant the necessary privileges to the user: mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password'; mysql> FLUSH PRIVILEGES; mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password'; mysql> FLUSH PRIVILEGES; Verify that the binlog is enabled: mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog\_row\_image', 'gtid_mode', 'enforce\_gtid\_consistency'); +--------------------------+----------------+ | Variable_name | Value | +--------------------------+----------------+ | binlog_format | ROW | | binlog\_row\_image | FULL | | enforce\_gtid\_consistency | ON | | gtid_mode | ON | | log_bin | ON | +--------------------------+----------------+ mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog\_row\_image', 'gtid_mode', 'enforce\_gtid\_consistency'); +--------------------------+----------------+ | Variable_name | Value | +--------------------------+----------------+ | binlog_format | ROW | | binlog\_row\_image | FULL | | enforce\_gtid\_consistency | ON | | gtid_mode | ON | | log_bin | ON | +--------------------------+----------------+ If the settings are not as above, please adjust yourmysql.cnffile accordingly. Note that when creating a consistent snapshot on large databases, read timeouts may occur; please configureinteractive_timeoutandwait_timeoutas needed. mysql.cnf interactive_timeout wait_timeout After preparing the environment, write the configuration file. env { parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 10000 } source { MySQL-CDC { base-url = "jdbc:mysql://127.0.0.1:3306/mysql" username = "xxx" password = "xxx@xxx" table-names = \["seatunnel.table1", "seatunnel.table2"\] startup.mode = "initial" } } sink { jdbc { url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "xxx@xxx" password = "xxx" database = "seatunnel" \# Target database table-names = \["seatunnel.table1", "seatunnel.table2"\] generate\_sink\_sql = true \# Automatically generate SQL } } env { parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 10000 } source { MySQL-CDC { base-url = "jdbc:mysql://127.0.0.1:3306/mysql" username = "xxx" password = "xxx@xxx" table-names = \["seatunnel.table1", "seatunnel.table2"\] startup.mode = "initial" } } sink { jdbc { url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "xxx@xxx" password = "xxx" database = "seatunnel" \# Target database table-names = \["seatunnel.table1", "seatunnel.table2"\] generate\_sink\_sql = true \# Automatically generate SQL } } Once started, the job will first perform a historical data migration, then process CDC changes. Important Note: Upon startup, SeaTunnel will execute different operations based on the configured tables and thestartup.modesetting. Thestartup.modeoptions are as follows: startup.mode startup.mode **initial**: Synchronizes historical data first, then incremental data. **earliest**: Starts from the earliest offset. **latest**: Starts from the latest offset. **specific**: Starts from a user-provided specific offset. **initial**: Synchronizes historical data first, then incremental data. **initial** **earliest**: Starts from the earliest offset. **earliest** **latest**: Starts from the latest offset. **latest** **specific**: Starts from a user-provided specific offset. **specific** If you usespecific, you must provide the offset file (e.g.,startup.specific-offset.file binlog) and the offset position (e.g.,startup.specific-offset.pos binlog). specific startup.specific-offset.file binlog startup.specific-offset.pos binlog Conclusion This article has detailed how to configure full, incremental, and CDC synchronization using Apache SeaTunnel. We covered: Full sync configuration for single and multi-table extraction. Incremental sync configuration using query filters (with an option to integrate with Apache DolphinScheduler). CDC sync configuration, including prerequisites like binlog verification. Full sync configuration for single and multi-table extraction. Incremental sync configuration using query filters (with an option to integrate with Apache DolphinScheduler). CDC sync configuration, including prerequisites like binlog verification. By following these steps, you can achieve a complete, end-to-end data migration and synchronization solution. Thank you for reading, and please provide your feedback!🚀