In this guide, we will walk you through a lightweight data migration and synchronization solution from MySQL to OceanBase using Apache SeaTunnel (SeaTunnel for short). We will leverage its built-inZeta engine, which supportsfull data synchronization, offline incremental synchronization, and Change Data Capture (CDC) solutions.
Before we begin, ensure that your environment is ready.
SeaTunnel requiresJava 8 or higher. While Java 8 is recommended, later versions should work as well.
After installation, verify that Java is correctly configured by running:
root:~\# java -version
openjdk version "17.0.12" 2024-07-16
OpenJDK Runtime Environment (build 17.0.12+7-Debian-2deb11u1)
OpenJDK 64-Bit Server VM (build 17.0.12+7-Debian-2deb11u1, mixed mode, sharing)
Make sure thatJAVA_HOME
is properly set.
Visit theofficial SeaTunnel websiteto download the latest version.
For this guide, we will useversion 2.3.9:
\# Download
wget https://dlcdn.apache.org/seatunnel/2.3.9/apache-seatunnel-2.3.9-bin.tar.gz
\# Extract
tar -zxvf apache-seatunnel-2.3.9-bin.tar.gz
SeaTunnel’s installation package only contains the core framework and theZeta engine. To connect with various data sources, you need tomanually download and configurethe required plugins.
To automatically download the necessary connectors, modify theconfig/plugin_config
file and specify the required connectors. By default, the file includes all connectors, but for this guide, we only include the essential ones:
connector-cdc-mysql
connector-jdbc
connector-fake
connector-console
Run the following command to install the plugins:
sh bin/install-plugin.sh 2.3.9
Alternatively, you can manually download the required plugins fromApache Maven Repository.
Download the necessary.jar
files, for example:
connector-cdc-mysql-2.3.9.jar
connector-console-2.3.9.jar
connector-fake-2.3.9.jar
connector-jdbc-2.3.9.jar
seatunnel-transforms-v2-2.3.9.jar
After downloading,move the files into the**Connectors**
directory.
To check if the connectors are installed correctly, run:
./bin/seatunnel-connector.sh -l
Source
FakeSource MySQL-CDC Jdbc
Sink
Jdbc Console
Transform
Copy DynamicCompile FieldMapper Filter FilterRowKind JsonPath LLM Replace Split Sql
Since we will be usingJDBC for MySQL connectionto interact withOceanBase, you also need to download theMySQL JDBC driverfrom theofficial MySQL website.
Once downloaded, place themysql-connector-j-9.0.0.jar
file into {seatunnel/lib}
To confirm that SeaTunnel is installed correctly, execute abatch processing testusing the default configuration template:
./bin/seatunnel.sh --config ./config/v2.batch.config.template -m local
seatunnel.sh
→ Standard SeaTunnel startup scriptconfig
→ Specifies the configuration script-m local
→ Runs in local modeIf everything is working correctly, you should see output similar to this:
2022-12-19 11:01:45,417 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - output rowType: name<STRING>, age<INT>
2022-12-19 11:01:46,489 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CpiOd, 8520946
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: eQqTs, 1256802974
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: UsRgO, 2053193072
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jDQJj, 1993016602
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: rqdKp, 1392682764
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: wCoWN, 986999925
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: qomTU, 72775247
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=8: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jcqXR, 1074529204
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=9: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: AkWIO, 1961723427
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=10: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: hBoib, 929089763
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=11: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: GSvzm, 827085798
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=12: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: NNAYI, 94307133
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=13: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: EexFl, 1823689599
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=14: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CBXUb, 869582787
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=15: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: Wbxtm, 1469371353
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=16: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: mIJDt, 995616438
At the end of the job execution, you will see asummary log:
**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*
Job Statistic Information
*_******\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\***
Start Time : 2024-08-29 22:45:29
End Time : 2024-08-29 22:45:33
Total Time(s) : 4
Total Read Count : 32
Total Write Count : 32
Total Failed Count : 0
**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*_**
This confirms that SeaTunnel is working correctly.
To verifyfull data synchronization, we create test tables in bothMySQLandOceanBase.
CREATE TABLE \`table1\` (
\`id\` INT NOT NULL AUTO_INCREMENT,
\`value1\` VARCHAR(255) NOT NULL,
\`value2\` VARCHAR(255) ,
\`value3\` VARCHAR(255) ,
\`value4\` VARCHAR(255) ,
\`value5\` VARCHAR(255) ,
\`created\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP,
\`updated\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (\`id\`),
UNIQUE INDEX \`idx_value1\` (\`value1\`),
INDEX \`idx\_value2\_value3\` (\`value2\`, \`value3\`),
INDEX \`idx\_value3\_value4_value5\` (\`value3\`, \`value4\`, \`value5\`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE \`table2\` (
\`id\` INT NOT NULL AUTO_INCREMENT,
\`value1\` VARCHAR(255) NOT NULL,
\`value2\` VARCHAR(255) ,
\`value3\` VARCHAR(255) ,
\`value4\` VARCHAR(255) ,
\`value5\` VARCHAR(255) ,
\`created\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP,
\`updated\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (\`id\`),
UNIQUE INDEX \`idx_value1\` (\`value1\`),
INDEX \`idx\_value2\_value3\` (\`value2\`, \`value3\`),
INDEX \`idx\_value3\_value4_value5\` (\`value3\`, \`value4\`, \`value5\`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
We used Navicat to create 100,000 records each.
# Configuring SeaTunnel for Full Synchronization
# Full Data Synchronization Configuration File
_Note: We recommend manually migrating the table schema since automatic migration may encounter issues and does not create indexes._
# Single-Table Full Sync
env {
parallelism = 5
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
connection\_check\_timeout_sec = 100
user = "xxx"
password = "xxx"
query = "select * from seatunnel.table1"
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "xxx@xxx"
password = "xxx"
\# Automatically generate SQL statements
generate\_sink\_sql = true
database = seatunnel
table = table1
}
}
Result:
**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*
Job Statistic Information
*_******\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\***
Start Time : 2024-08-30 15:05:39
End Time : 2024-08-30 15:05:47
Total Time(s) : 8
Total Read Count : 100000
Total Write Count : 100000
Total Failed Count : 0
**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*_**
env {
parallelism = 5
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
connection\_check\_timeout_sec = 100
user = "xxx"
password = "xxx"
table_list = \[
{
table_path = "seatunnel.table1"
},
{
table_path = "seatunnel.table2"
query = "select * from seatunnel.table2 where id > 100"
}
\]
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "xxx@xxx"
password = "xxx"
\# Automatically generate SQL statements
generate\_sink\_sql = true
database = seatunnel
table_list = \["seatunnel.table1", "seatunnel.table2"\]
}
}
Result:
**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*
Job Statistic Information
*_******\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\***
Start Time : 2024-08-30 15:10:09
End Time : 2024-08-30 15:10:20
Total Time(s) : 10
Total Read Count : 200000
Total Write Count : 200000
Total Failed Count : 0
**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*_**
For incremental sync, a simple approach is to use a query that filters based on anid
orupdatetime
column.
env {
parallelism = 1
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
connection\_check\_timeout_sec = 100
user = "xxx"
password = "xxx"
query = "SELECT * FROM seatunnel.table1 WHERE updatetime > '2024-01-01' "
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "xxx@xxx"
password = "xxx"
generate\_sink\_sql = true
database = seatunnel
table = table1
}
}
_Note:_The sink will perform insert and update operations based on the primary key. However, manually updating the configuration file for each incremental run can be cumbersome. We recommend using Apache DolphinScheduler in conjunction with SeaTunnel to create a workflow. With DolphinScheduler, you can obtain the maximum timestamp orid
from the sink and pass it as a workflow variable.
Example configuration with a variable:
env {
parallelism = 1
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
connection\_check\_timeout_sec = 100
user = "xxx"
password = "xxx"
query = "SELECT * FROM seatunnel.table1 WHERE updatetime > ${max_id} "
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "xxx@xxx"
password = "xxx"
generate\_sink\_sql = true
database = seatunnel
table = table1
}
}
The multi-table configuration is similar.
Due to issues with SeaTunnel’s OceanBase component, schema migration can be error-prone. It is recommended to migrate the table schema manually.
Grant the necessary privileges to the user:
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
mysql> FLUSH PRIVILEGES;
Verify that the binlog is enabled:
mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog\_row\_image', 'gtid_mode', 'enforce\_gtid\_consistency');
+--------------------------+----------------+
| Variable_name | Value |
+--------------------------+----------------+
| binlog_format | ROW |
| binlog\_row\_image | FULL |
| enforce\_gtid\_consistency | ON |
| gtid_mode | ON |
| log_bin | ON |
+--------------------------+----------------+
If the settings are not as above, please adjust yourmysql.cnf
file accordingly. Note that when creating a consistent snapshot on large databases, read timeouts may occur; please configureinteractive_timeout
andwait_timeout
as needed.
After preparing the environment, write the configuration file.
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 10000
}
source {
MySQL-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/mysql"
username = "xxx"
password = "xxx@xxx"
table-names = \["seatunnel.table1", "seatunnel.table2"\]
startup.mode = "initial"
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "xxx@xxx"
password = "xxx"
database = "seatunnel" \# Target database
table-names = \["seatunnel.table1", "seatunnel.table2"\]
generate\_sink\_sql = true \# Automatically generate SQL
}
}
Once started, the job will first perform a historical data migration, then process CDC changes.
Upon startup, SeaTunnel will execute different operations based on the configured tables and thestartup.mode
setting. Thestartup.mode
options are as follows:
**initial**
: Synchronizes historical data first, then incremental data.**earliest**
: Starts from the earliest offset.**latest**
: Starts from the latest offset.**specific**
: Starts from a user-provided specific offset.If you usespecific
, you must provide the offset file (e.g.,startup.specific-offset.file binlog
) and the offset position (e.g.,startup.specific-offset.pos binlog
).
This article has detailed how to configure full, incremental, and CDC synchronization using Apache SeaTunnel. We covered:
By following these steps, you can achieve a complete, end-to-end data migration and synchronization solution. Thank you for reading, and please provide your feedback!🚀