paint-brush
Stop Wasting Time on Broken Data Syncs—Here’s How to Do It Right with Apache SeaTunnelby@Apache

Stop Wasting Time on Broken Data Syncs—Here’s How to Do It Right with Apache SeaTunnel

by SeaTunnelMarch 20th, 2025
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

In this guide, we will walk you through a lightweight data migration and synchronization solution from MySQL to OceanBase using Apache SeaTunnel.

Companies Mentioned

Mention Thumbnail
Mention Thumbnail

Coins Mentioned

Mention Thumbnail
Mention Thumbnail
featured image - Stop Wasting Time on Broken Data Syncs—Here’s How to Do It Right with Apache SeaTunnel
SeaTunnel HackerNoon profile picture
0-item

In this guide, we will walk you through a lightweight data migration and synchronization solution from MySQL to OceanBase using Apache SeaTunnel (SeaTunnel for short). We will leverage its built-inZeta engine, which supportsfull data synchronization, offline incremental synchronization, and Change Data Capture (CDC) solutions.

Preparing the Runtime Environment

Before we begin, ensure that your environment is ready.

Install Java

SeaTunnel requiresJava 8 or higher. While Java 8 is recommended, later versions should work as well.

After installation, verify that Java is correctly configured by running:

root:~\# java -version
openjdk version "17.0.12" 2024-07-16
OpenJDK Runtime Environment (build 17.0.12+7-Debian-2deb11u1)
OpenJDK 64-Bit Server VM (build 17.0.12+7-Debian-2deb11u1, mixed mode, sharing)

Make sure thatJAVA_HOMEis properly set.

Download and Install Apache SeaTunnel

Visit theofficial SeaTunnel websiteto download the latest version.

For this guide, we will useversion 2.3.9:

\# Download
wget https://dlcdn.apache.org/seatunnel/2.3.9/apache-seatunnel-2.3.9-bin.tar.gz
\# Extract
tar -zxvf apache-seatunnel-2.3.9-bin.tar.gz

Installing Connector Plugins

SeaTunnel’s installation package only contains the core framework and theZeta engine. To connect with various data sources, you need tomanually download and configurethe required plugins.

Automatic Plugin Installation

To automatically download the necessary connectors, modify theconfig/plugin_configfile and specify the required connectors. By default, the file includes all connectors, but for this guide, we only include the essential ones:

connector-cdc-mysql
connector-jdbc
connector-fake
connector-console

Run the following command to install the plugins:

sh bin/install-plugin.sh 2.3.9

Manual Plugin Installation

Alternatively, you can manually download the required plugins fromApache Maven Repository.

Download the necessary.jarfiles, for example:

connector-cdc-mysql-2.3.9.jar
connector-console-2.3.9.jar
connector-fake-2.3.9.jar
connector-jdbc-2.3.9.jar
seatunnel-transforms-v2-2.3.9.jar

After downloading,move the files into the**Connectors**directory.

Verifying Connector Installation

To check if the connectors are installed correctly, run:

./bin/seatunnel-connector.sh -l
 
Source
FakeSource MySQL-CDC Jdbc
 
 
Sink
Jdbc Console
 
 
Transform
Copy DynamicCompile FieldMapper Filter FilterRowKind JsonPath LLM Replace Split Sql

Since we will be usingJDBC for MySQL connectionto interact withOceanBase, you also need to download theMySQL JDBC driverfrom theofficial MySQL website.

Once downloaded, place themysql-connector-j-9.0.0.jarfile into {seatunnel/lib}

Verifying SeaTunnel Installation

To confirm that SeaTunnel is installed correctly, execute abatch processing testusing the default configuration template:

./bin/seatunnel.sh --config ./config/v2.batch.config.template -m local

Command Explanation

  • seatunnel.sh→ Standard SeaTunnel startup script
  • config→ Specifies the configuration script
  • -m local→ Runs in local mode

If everything is working correctly, you should see output similar to this:

2022-12-19 11:01:45,417 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - output rowType: name<STRING>, age<INT>
2022-12-19 11:01:46,489 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=1:  SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CpiOd, 8520946
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: eQqTs, 1256802974
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: UsRgO, 2053193072
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jDQJj, 1993016602
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: rqdKp, 1392682764
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: wCoWN, 986999925
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: qomTU, 72775247
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=8: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jcqXR, 1074529204
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=9: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: AkWIO, 1961723427
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=10: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: hBoib, 929089763
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=11: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: GSvzm, 827085798
2022-12-19 11:01:46,491 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=12: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: NNAYI, 94307133
2022-12-19 11:01:46,491 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=13: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: EexFl, 1823689599
2022-12-19 11:01:46,491 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=14: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CBXUb, 869582787
2022-12-19 11:01:46,491 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=15: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: Wbxtm, 1469371353
2022-12-19 11:01:46,491 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=16: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: mIJDt, 995616438

At the end of the job execution, you will see asummary log:

**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*
           Job Statistic Information
*_******\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\***
Start Time                : 2024-08-29 22:45:29
End Time                  : 2024-08-29 22:45:33
Total Time(s)             :                   4
Total Read Count          :                  32
Total Write Count         :                  32
Total Failed Count        :                   0
**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*_**

This confirms that SeaTunnel is working correctly.

Full Data Synchronization

Creating the Test Table

To verifyfull data synchronization, we create test tables in bothMySQLandOceanBase.

Step 1: Creating the MySQL Table

CREATE TABLE \`table1\` (
    \`id\` INT NOT NULL AUTO_INCREMENT,
    \`value1\` VARCHAR(255) NOT NULL,
    \`value2\` VARCHAR(255) ,
    \`value3\` VARCHAR(255) ,
    \`value4\` VARCHAR(255) ,
    \`value5\` VARCHAR(255) ,
    \`created\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP,
    \`updated\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (\`id\`),
    UNIQUE INDEX \`idx_value1\` (\`value1\`),
    INDEX \`idx\_value2\_value3\` (\`value2\`, \`value3\`),
    INDEX \`idx\_value3\_value4_value5\` (\`value3\`, \`value4\`, \`value5\`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
 
CREATE TABLE \`table2\` (
    \`id\` INT NOT NULL AUTO_INCREMENT,
    \`value1\` VARCHAR(255) NOT NULL,
    \`value2\` VARCHAR(255) ,
    \`value3\` VARCHAR(255) ,
    \`value4\` VARCHAR(255) ,
    \`value5\` VARCHAR(255) ,
    \`created\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP,
    \`updated\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (\`id\`),
    UNIQUE INDEX \`idx_value1\` (\`value1\`),
    INDEX \`idx\_value2\_value3\` (\`value2\`, \`value3\`),
    INDEX \`idx\_value3\_value4_value5\` (\`value3\`, \`value4\`, \`value5\`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

We used Navicat to create 100,000 records each.

# Configuring SeaTunnel for Full Synchronization

# Full Data Synchronization Configuration File

_Note: We recommend manually migrating the table schema since automatic migration may encounter issues and does not create indexes._

# Single-Table Full Sync

env {
  parallelism = 5
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        connection\_check\_timeout_sec = 100
        user = "xxx"
        password = "xxx"
        query = "select * from seatunnel.table1"
    }
}
sink {
    jdbc {
        url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "xxx@xxx"
        password = "xxx"
        \# Automatically generate SQL statements
        generate\_sink\_sql = true
        database = seatunnel
        table = table1
    }
}

Result:

**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*
           Job Statistic Information
*_******\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\***
Start Time                : 2024-08-30 15:05:39
End Time                  : 2024-08-30 15:05:47
Total Time(s)             :                   8
Total Read Count          :              100000
Total Write Count         :              100000
Total Failed Count        :                   0
**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*_**

Multi-Table Full Extraction

env {
  parallelism = 5
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        connection\_check\_timeout_sec = 100
        user = "xxx"
        password = "xxx"
        table_list = \[
          {
            table_path = "seatunnel.table1"
          },
          {
            table_path = "seatunnel.table2"
            query = "select * from seatunnel.table2 where id > 100"
          }
        \]
    }
}
sink {
    jdbc {
        url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "xxx@xxx"
        password = "xxx"
        \# Automatically generate SQL statements
        generate\_sink\_sql = true
        database = seatunnel
        table_list = \["seatunnel.table1", "seatunnel.table2"\]
    }
}

Result:

**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*
           Job Statistic Information
*_******\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\***
Start Time                : 2024-08-30 15:10:09
End Time                  : 2024-08-30 15:10:20
Total Time(s)             :                  10
Total Read Count          :              200000
Total Write Count         :              200000
Total Failed Count        :                   0
**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*_**

Incremental Synchronization Configuration File

For incremental sync, a simple approach is to use a query that filters based on anidorupdatetimecolumn.

env {
  parallelism = 1
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        connection\_check\_timeout_sec = 100
        user = "xxx"
        password = "xxx"
        query = "SELECT * FROM seatunnel.table1 WHERE updatetime > '2024-01-01' "
    }
}
sink {
    jdbc {
        url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "xxx@xxx"
        password = "xxx"
        generate\_sink\_sql = true
        database = seatunnel
        table = table1 
    }
}

_Note:_The sink will perform insert and update operations based on the primary key. However, manually updating the configuration file for each incremental run can be cumbersome. We recommend using Apache DolphinScheduler in conjunction with SeaTunnel to create a workflow. With DolphinScheduler, you can obtain the maximum timestamp oridfrom the sink and pass it as a workflow variable.

Example configuration with a variable:

env {
  parallelism = 1
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        connection\_check\_timeout_sec = 100
        user = "xxx"
        password = "xxx"
        query = "SELECT * FROM seatunnel.table1 WHERE updatetime > ${max_id} "
    }
}
sink {
    jdbc {
        url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "xxx@xxx"
        password = "xxx"
        generate\_sink\_sql = true
        database = seatunnel
        table = table1 
    }
}

The multi-table configuration is similar.

CDC Synchronization Configuration File

Manual Table Schema Migration

Due to issues with SeaTunnel’s OceanBase component, schema migration can be error-prone. It is recommended to migrate the table schema manually.

Check MySQL Binlog Status

Grant the necessary privileges to the user:

mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
mysql> FLUSH PRIVILEGES;

Verify that the binlog is enabled:

mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog\_row\_image', 'gtid_mode', 'enforce\_gtid\_consistency');
+--------------------------+----------------+
| Variable_name            | Value          |
+--------------------------+----------------+
| binlog_format            | ROW            |
| binlog\_row\_image         | FULL           |
| enforce\_gtid\_consistency | ON             |
| gtid_mode                | ON             |
| log_bin                  | ON             |
+--------------------------+----------------+

If the settings are not as above, please adjust yourmysql.cnffile accordingly. Note that when creating a consistent snapshot on large databases, read timeouts may occur; please configureinteractive_timeoutandwait_timeoutas needed.

After preparing the environment, write the configuration file.

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 10000
}
 
source {
  MySQL-CDC {
    base-url = "jdbc:mysql://127.0.0.1:3306/mysql"
    username = "xxx"
    password = "xxx@xxx"
    table-names = \["seatunnel.table1", "seatunnel.table2"\]
    startup.mode = "initial"
  }
}
 
sink {
    jdbc {
        url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "xxx@xxx"
        password = "xxx"
        database = "seatunnel"  \# Target database
        table-names = \["seatunnel.table1", "seatunnel.table2"\]
        generate\_sink\_sql = true     \# Automatically generate SQL
    }
}

Once started, the job will first perform a historical data migration, then process CDC changes.

Important Note:

Upon startup, SeaTunnel will execute different operations based on the configured tables and thestartup.modesetting. Thestartup.modeoptions are as follows:

  • **initial**: Synchronizes historical data first, then incremental data.
  • **earliest**: Starts from the earliest offset.
  • **latest**: Starts from the latest offset.
  • **specific**: Starts from a user-provided specific offset.

If you usespecific, you must provide the offset file (e.g.,startup.specific-offset.file binlog) and the offset position (e.g.,startup.specific-offset.pos binlog).

Conclusion

This article has detailed how to configure full, incremental, and CDC synchronization using Apache SeaTunnel. We covered:

  • Full sync configuration for single and multi-table extraction.
  • Incremental sync configuration using query filters (with an option to integrate with Apache DolphinScheduler).
  • CDC sync configuration, including prerequisites like binlog verification.

By following these steps, you can achieve a complete, end-to-end data migration and synchronization solution. Thank you for reading, and please provide your feedback!🚀