Overview This article mainly introduces how to use DolphinScheduler in combination with SeaTunnel to complete data synchronization between heterogeneous data sources. This is a good solution for constructing a unified big data warehouse for batch and streaming data. It is stable and efficient—once you use it, you’ll love it. Environment Preparation DolphinScheduler cluster >= 3.1.5 DolphinScheduler 3.1.5 source code SeaTunnel cluster >= 2.3.3 For those who have not yet installed the above environments, please refer to the official website to set up the basic environment. Configuration File Modifications Here is an explanation: The SeaTunnel data synchronization tasks configured through DolphinScheduler are eventually assigned to a certain Worker group or Worker node of the DS cluster for execution. Therefore, you must ensure that the target Worker nodes of your DS cluster also have the SeaTunnel service installed. This is important because the SeaTunnel task instances defined in DolphinScheduler ultimately need to call the SeaTunnel service on the Worker node to execute the local task startup command and complete task submission and execution. Modifications to DolphinScheduler Configuration Files Since we need to use SeaTunnel for data integration, we need to configure the installation directory of SeaTunnel in DolphinScheduler's system environment variables. Locate the dolphinscheduler_env.sh file under the installation directory of your DolphinScheduler master node: $DOLPHINSCHEDULER_HOME/bin/env/dolphinscheduler_env.sh Set the access directory for SEATUNNEL_HOME, assigning it to your SeaTunnel installation directory: export SEATUNNEL_HOME=${SEATUNNEL_HOME:-/opt/software/seatunnel-2.3.5} Save the file, and restart the DolphinScheduler cluster to synchronize the configuration changes to all api-server, master-server, and worker-server nodes. Modifications to DolphinScheduler Source Code Why modify DolphinScheduler's source code? Because I am using SeaTunnel version 2.3.5 and the engine is not SeaTunnel's default engine, I am using the Spark engine, specifically version 2.4.5. The command I use to execute tasks is as follows: $SEATUNNEL_HOME/bin/start-seatunnel-spark-2-connector-v2.sh --master local[4] --deploy-mode client --config /opt/software/seatunnel-2.3.5/config/app-config/v2.batch.config.template If I were using Spark 3.X, the command would be: $SEATUNNEL_HOME/bin/start-seatunnel-spark-3-connector-v2.sh --master local[4] --deploy-mode client --config /opt/software/seatunnel-2.3.5/config/app-config/v2.batch.config.template However, in DolphinScheduler 3.1.5, the SeaTunnel task plugin has some issues that make it incompatible. Firstly, on the front end, the engine only supports Spark and Flink without compatibility for specific versions, so there is no option to freely choose between Spark 2, Spark 3, Flink 1.3, or Flink 1.5. Secondly, the back-end code also has issues. Locate the EngineEnum class, and modify the code as follows: public enum EngineEnum { // FLINK("${SEATUNNEL_HOME}/bin/start-seatunnel-flink.sh"), // SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark.sh"); FLINK("${SEATUNNEL_HOME}/bin/start-seatunnel-flink-13-connector-v2.sh"), FLINK15("${SEATUNNEL_HOME}/bin/start-seatunnel-flink-15-connector-v2.sh"), SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark-2-connector-v2.sh"), SPARK3("${SEATUNNEL_HOME}/bin/start-seatunnel-spark-3-connector-v2.sh"); private String command; EngineEnum(String command) { this.command = command; } public String getCommand() { return command; } } After completing the modifications, compile and package the DolphinScheduler source code. Updating the SeaTunnel Task Plugin in the DolphinScheduler Cluster After compiling and packaging the project, locate the dolphinscheduler-task-seatunnel-3.1.5.jar file in the target directory of the dolphinscheduler-task-seatunnel project. Upload it to the master node of your DolphinScheduler cluster. Rename the existing dolphinscheduler-task-seatunnel-3.1.5.jar file in the following directories of the DS installation directory on the master node to dolphinscheduler-task-seatunnel-3.1.5.jar.20240606 (including the date for easier tracking): api-server/libs master-server/libs worker-server/libs alert-server/libs Copy the newly compiled dolphinscheduler-task-seatunnel-3.1.5.jar file to these directories. Verify that each directory contains the updated .jar file (skip directories that do not originally have this file). Use the distribution script on the master node to synchronize the modifications in api-server/libs, master-server/libs, worker-server/libs, and alert-server/libs to other DS nodes. After the distribution is complete, check whether the distribution was successful. Finally, restart the DS cluster. With the above steps, we have completed the upgrade and adaptation of the SeaTunnel plugin in DolphinScheduler. Test Verification We define a Seatunnel data synchronization task on the workflow definition page of DolphinScheduler to complete the task of collecting Oracle database tables into a MySQL database. Let’s proceed with the operation. Regarding the Seatunnel task configuration script file, the official documentation introduces it as follows: Source: https://seatunnel.incubator.apache.org/zh-CN/docs/category/source-v2 Transform: https://seatunnel.incubator.apache.org/zh-CN/docs/category/transform-v2 Sink: https://seatunnel.incubator.apache.org/zh-CN/docs/category/sink-v2 Source Input Configuration Definition Explanation Here, our input source is Oracle, so we directly look for the relevant Oracle configuration definitions in the Source section. The official documentation provides many task examples: Simple Task Example # Defining the runtime environment env { parallelism = 4 job.mode = "BATCH" } source { Jdbc { url = "jdbc:oracle:thin:@datasource01:1523:xe" driver = "oracle.jdbc.OracleDriver" user = "root" password = "123456" query = "SELECT * FROM TEST_TABLE" } } transform {} sink { Console {} } Partition Column Parallel Task Example Parallel reading is configured for the partition column and data. If you want to read the entire table, you can do this: env { parallelism = 4 job.mode = "BATCH" } source { Jdbc { url = "jdbc:oracle:thin:@datasource01:1523:xe" driver = "oracle.jdbc.OracleDriver" connection_check_timeout_sec = 100 user = "root" password = "123456" # Define query logic as required query = "SELECT * FROM TEST_TABLE" # Set partition column for parallel slicing partition_column = "ID" # Number of partition slices partition_num = 10 properties { database.oracle.jdbc.timezoneAsRegion = "false" } } } sink { Console {} } Primary Key or Unique Index Parallel Task Example Configuring table_path enables automatic splitting, and you can configure split.* to adjust the splitting strategy. env { parallelism = 4 job.mode = "BATCH" } source { Jdbc { url = "jdbc:oracle:thin:@datasource01:1523:xe" driver = "oracle.jdbc.OracleDriver" connection_check_timeout_sec = 100 user = "root" password = "123456" table_path = "DA.SCHEMA1.TABLE1" query = "select * from SCHEMA1.TABLE1" split.size = 10000 } } sink { Console {} } Parallel Upper and Lower Bound Task Example Specifying the upper and lower bounds of the query allows for more efficient data retrieval within the defined range. source { Jdbc { url = "jdbc:oracle:thin:@datasource01:1523:xe" driver = "oracle.jdbc.OracleDriver" connection_check_timeout_sec = 100 user = "root" password = "123456" # Define query logic as required query = "SELECT * FROM TEST_TABLE" partition_column = "ID" # Read start boundary partition_lower_bound = 1 # Read end boundary partition_upper_bound = 500 partition_num = 10 } } Multi-Table Reading Task Example Configuring table_list enables automatic splitting, and you can adjust the splitting strategy via split. env { job.mode = "BATCH" parallelism = 4 } source { Jdbc { url = "jdbc:oracle:thin:@datasource01:1523:xe" driver = "oracle.jdbc.OracleDriver" connection_check_timeout_sec = 100 user = "root" password = "123456" "table_list" = [ { "table_path" = "XE.TEST.USER_INFO" }, { "table_path" = "XE.TEST.YOURTABLENAME" } ] split.size = 10000 } } sink { Console {} } Sink Output Configuration Definition Explanation Simple Task Example This example defines a SeaTunnel synchronization task. FakeSource automatically generates data and sends it to JDBC Sink. FakeSource generates 16 rows of data (row.num=16), with two fields per row: name (string type) and age (int type). The target table is test_table, which also contains 16 rows. Before running this job, you need to create the test database and test_table table in MySQL. If you haven't installed and deployed SeaTunnel, follow the instructions in the installation guide. Then run the job as described in the quick start guide. env { parallelism = 1 job.mode = "BATCH" } source { FakeSource { parallelism = 1 result_table_name = "fake" row.num = 16 schema = { fields { name = "string" age = "int" } } } } transform {} sink { jdbc { url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "123456" query = "insert into test_table(name,age) values(?,?)" } } Generated Output SQL Task Example This example does not require writing complex SQL statements. You can configure the output database name and table name to generate insert statements for you automatically. sink { jdbc { url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "123456" generate_sink_sql = true database = test table = test_table } } Accurate Task Example For scenarios requiring accurate writes, we ensure exactly once semantics. sink { jdbc { url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" max_retries = 0 user = "root" password = "123456" query = "insert into test_table(name,age) values(?,?)" is_exactly_once = "true" xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource" } } CDC (Change Data Capture) Events We also support CDC for change data capture. In this case, you need to configure the database, table, and primary_keys. sink { jdbc { url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "123456" generate_sink_sql = true database = test table = sink_table primary_keys = ["id", "name"] field_ide = UPPERCASE schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode = "APPEND_DATA" } } Complete Test Script Configuration File Below is the complete configuration file for this example: env { parallelism = 4 job.mode = "BATCH" } source { Jdbc { url = "jdbc:oracle:thin:@192.168.11.101:15210:YLAPP" driver = "oracle.jdbc.OracleDriver" user = "appuser001" password = "appuser001" query = "SELECT * FROM YL_APP.MET_COM_ICDOPERATION_LS" } } transform {} sink { jdbc { url = "jdbc:mysql://192.168.10.210:13306/yl-app-new?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "appuser001" password = "appuser001" generate_sink_sql = "true" database = "hive" table = "met_com_icdoperation_ls" schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode = "APPEND_DATA" } } (Attention: The current node configuration depends on your needs. If you have not installed Hadoop, choose either the 'local' or 'client' mode here. If you have installed a Hadoop cluster, you can select to run using the 'yarn' and 'cluster' modes.) Replace the database configuration information in the above script with your data connection configuration, then overwrite the script into the script input box shown in the figure above. Save the workflow, go online, and then start the workflow. Verify in the corresponding database. Original Oracle database table: Synchronized MySQL database table: The task ran, and the data was successfully synchronized. OK, the test passed! Next, you can expand and explore more based on this demo. The more practical experience you have, the deeper your understanding of the architecture and principles of DolphinScheduler and Seatunnel will become. Gradually, you can upgrade and extend the functionalities of these excellent open-source frameworks by extending the source code. Overview This article mainly introduces how to use DolphinScheduler in combination with SeaTunnel to complete data synchronization between heterogeneous data sources. This is a good solution for constructing a unified big data warehouse for batch and streaming data. It is stable and efficient—once you use it, you’ll love it. Environment Preparation DolphinScheduler cluster >= 3.1.5 DolphinScheduler 3.1.5 source code SeaTunnel cluster >= 2.3.3 DolphinScheduler cluster >= 3.1.5 DolphinScheduler 3.1.5 source code SeaTunnel cluster >= 2.3.3 For those who have not yet installed the above environments, please refer to the official website to set up the basic environment. Configuration File Modifications Here is an explanation: The SeaTunnel data synchronization tasks configured through DolphinScheduler are eventually assigned to a certain Worker group or Worker node of the DS cluster for execution. Therefore, you must ensure that the target Worker nodes of your DS cluster also have the SeaTunnel service installed. This is important because the SeaTunnel task instances defined in DolphinScheduler ultimately need to call the SeaTunnel service on the Worker node to execute the local task startup command and complete task submission and execution. Modifications to DolphinScheduler Configuration Files Since we need to use SeaTunnel for data integration, we need to configure the installation directory of SeaTunnel in DolphinScheduler's system environment variables. Locate the dolphinscheduler_env.sh file under the installation directory of your DolphinScheduler master node: dolphinscheduler_env.sh $DOLPHINSCHEDULER_HOME/bin/env/dolphinscheduler_env.sh $DOLPHINSCHEDULER_HOME/bin/env/dolphinscheduler_env.sh Set the access directory for SEATUNNEL_HOME , assigning it to your SeaTunnel installation directory: SEATUNNEL_HOME export SEATUNNEL_HOME=${SEATUNNEL_HOME:-/opt/software/seatunnel-2.3.5} export SEATUNNEL_HOME=${SEATUNNEL_HOME:-/opt/software/seatunnel-2.3.5} Save the file, and restart the DolphinScheduler cluster to synchronize the configuration changes to all api-server , master-server , and worker-server nodes. api-server master-server worker-server Modifications to DolphinScheduler Source Code Why modify DolphinScheduler's source code? Why modify DolphinScheduler's source code? Because I am using SeaTunnel version 2.3.5 and the engine is not SeaTunnel's default engine, I am using the Spark engine, specifically version 2.4.5. The command I use to execute tasks is as follows: $SEATUNNEL_HOME/bin/start-seatunnel-spark-2-connector-v2.sh --master local[4] --deploy-mode client --config /opt/software/seatunnel-2.3.5/config/app-config/v2.batch.config.template $SEATUNNEL_HOME/bin/start-seatunnel-spark-2-connector-v2.sh --master local[4] --deploy-mode client --config /opt/software/seatunnel-2.3.5/config/app-config/v2.batch.config.template If I were using Spark 3.X, the command would be: $SEATUNNEL_HOME/bin/start-seatunnel-spark-3-connector-v2.sh --master local[4] --deploy-mode client --config /opt/software/seatunnel-2.3.5/config/app-config/v2.batch.config.template $SEATUNNEL_HOME/bin/start-seatunnel-spark-3-connector-v2.sh --master local[4] --deploy-mode client --config /opt/software/seatunnel-2.3.5/config/app-config/v2.batch.config.template However, in DolphinScheduler 3.1.5, the SeaTunnel task plugin has some issues that make it incompatible. Firstly, on the front end, the engine only supports Spark and Flink without compatibility for specific versions, so there is no option to freely choose between Spark 2, Spark 3, Flink 1.3, or Flink 1.5. Secondly, the back-end code also has issues. Locate the EngineEnum class, and modify the code as follows: EngineEnum public enum EngineEnum { // FLINK("${SEATUNNEL_HOME}/bin/start-seatunnel-flink.sh"), // SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark.sh"); FLINK("${SEATUNNEL_HOME}/bin/start-seatunnel-flink-13-connector-v2.sh"), FLINK15("${SEATUNNEL_HOME}/bin/start-seatunnel-flink-15-connector-v2.sh"), SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark-2-connector-v2.sh"), SPARK3("${SEATUNNEL_HOME}/bin/start-seatunnel-spark-3-connector-v2.sh"); private String command; EngineEnum(String command) { this.command = command; } public String getCommand() { return command; } } public enum EngineEnum { // FLINK("${SEATUNNEL_HOME}/bin/start-seatunnel-flink.sh"), // SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark.sh"); FLINK("${SEATUNNEL_HOME}/bin/start-seatunnel-flink-13-connector-v2.sh"), FLINK15("${SEATUNNEL_HOME}/bin/start-seatunnel-flink-15-connector-v2.sh"), SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark-2-connector-v2.sh"), SPARK3("${SEATUNNEL_HOME}/bin/start-seatunnel-spark-3-connector-v2.sh"); private String command; EngineEnum(String command) { this.command = command; } public String getCommand() { return command; } } After completing the modifications, compile and package the DolphinScheduler source code. Updating the SeaTunnel Task Plugin in the DolphinScheduler Cluster After compiling and packaging the project, locate the dolphinscheduler-task-seatunnel-3.1.5.jar file in the target directory of the dolphinscheduler-task-seatunnel project. Upload it to the master node of your DolphinScheduler cluster. dolphinscheduler-task-seatunnel-3.1.5.jar target dolphinscheduler-task-seatunnel Rename the existing dolphinscheduler-task-seatunnel-3.1.5.jar file in the following directories of the DS installation directory on the master node to dolphinscheduler-task-seatunnel-3.1.5.jar.20240606 (including the date for easier tracking): dolphinscheduler-task-seatunnel-3.1.5.jar dolphinscheduler-task-seatunnel-3.1.5.jar.20240606 api-server/libs master-server/libs worker-server/libs alert-server/libs api-server/libs api-server/libs master-server/libs master-server/libs worker-server/libs worker-server/libs alert-server/libs alert-server/libs Copy the newly compiled dolphinscheduler-task-seatunnel-3.1.5.jar file to these directories. Verify that each directory contains the updated .jar file (skip directories that do not originally have this file). dolphinscheduler-task-seatunnel-3.1.5.jar .jar Use the distribution script on the master node to synchronize the modifications in api-server/libs , master-server/libs , worker-server/libs , and alert-server/libs to other DS nodes. After the distribution is complete, check whether the distribution was successful. api-server/libs master-server/libs worker-server/libs alert-server/libs Finally, restart the DS cluster. With the above steps, we have completed the upgrade and adaptation of the SeaTunnel plugin in DolphinScheduler. Test Verification We define a Seatunnel data synchronization task on the workflow definition page of DolphinScheduler to complete the task of collecting Oracle database tables into a MySQL database. Let’s proceed with the operation. Regarding the Seatunnel task configuration script file, the official documentation introduces it as follows: Source: https://seatunnel.incubator.apache.org/zh-CN/docs/category/source-v2 Transform: https://seatunnel.incubator.apache.org/zh-CN/docs/category/transform-v2 Sink: https://seatunnel.incubator.apache.org/zh-CN/docs/category/sink-v2 Source: https://seatunnel.incubator.apache.org/zh-CN/docs/category/source-v2 https://seatunnel.incubator.apache.org/zh-CN/docs/category/source-v2 Transform: https://seatunnel.incubator.apache.org/zh-CN/docs/category/transform-v2 https://seatunnel.incubator.apache.org/zh-CN/docs/category/transform-v2 Sink: https://seatunnel.incubator.apache.org/zh-CN/docs/category/sink-v2 https://seatunnel.incubator.apache.org/zh-CN/docs/category/sink-v2 Source Input Configuration Definition Explanation Here, our input source is Oracle, so we directly look for the relevant Oracle configuration definitions in the Source section. The official documentation provides many task examples: Simple Task Example # Defining the runtime environment env { parallelism = 4 job.mode = "BATCH" } source { Jdbc { url = "jdbc:oracle:thin:@datasource01:1523:xe" driver = "oracle.jdbc.OracleDriver" user = "root" password = "123456" query = "SELECT * FROM TEST_TABLE" } } transform {} sink { Console {} } # Defining the runtime environment env { parallelism = 4 job.mode = "BATCH" } source { Jdbc { url = "jdbc:oracle:thin:@datasource01:1523:xe" driver = "oracle.jdbc.OracleDriver" user = "root" password = "123456" query = "SELECT * FROM TEST_TABLE" } } transform {} sink { Console {} } Partition Column Parallel Task Example Parallel reading is configured for the partition column and data. If you want to read the entire table, you can do this: env { parallelism = 4 job.mode = "BATCH" } source { Jdbc { url = "jdbc:oracle:thin:@datasource01:1523:xe" driver = "oracle.jdbc.OracleDriver" connection_check_timeout_sec = 100 user = "root" password = "123456" # Define query logic as required query = "SELECT * FROM TEST_TABLE" # Set partition column for parallel slicing partition_column = "ID" # Number of partition slices partition_num = 10 properties { database.oracle.jdbc.timezoneAsRegion = "false" } } } sink { Console {} } env { parallelism = 4 job.mode = "BATCH" } source { Jdbc { url = "jdbc:oracle:thin:@datasource01:1523:xe" driver = "oracle.jdbc.OracleDriver" connection_check_timeout_sec = 100 user = "root" password = "123456" # Define query logic as required query = "SELECT * FROM TEST_TABLE" # Set partition column for parallel slicing partition_column = "ID" # Number of partition slices partition_num = 10 properties { database.oracle.jdbc.timezoneAsRegion = "false" } } } sink { Console {} } Primary Key or Unique Index Parallel Task Example Configuring table_path enables automatic splitting, and you can configure split.* to adjust the splitting strategy. table_path split.* env { parallelism = 4 job.mode = "BATCH" } source { Jdbc { url = "jdbc:oracle:thin:@datasource01:1523:xe" driver = "oracle.jdbc.OracleDriver" connection_check_timeout_sec = 100 user = "root" password = "123456" table_path = "DA.SCHEMA1.TABLE1" query = "select * from SCHEMA1.TABLE1" split.size = 10000 } } sink { Console {} } env { parallelism = 4 job.mode = "BATCH" } source { Jdbc { url = "jdbc:oracle:thin:@datasource01:1523:xe" driver = "oracle.jdbc.OracleDriver" connection_check_timeout_sec = 100 user = "root" password = "123456" table_path = "DA.SCHEMA1.TABLE1" query = "select * from SCHEMA1.TABLE1" split.size = 10000 } } sink { Console {} } Parallel Upper and Lower Bound Task Example Specifying the upper and lower bounds of the query allows for more efficient data retrieval within the defined range. source { Jdbc { url = "jdbc:oracle:thin:@datasource01:1523:xe" driver = "oracle.jdbc.OracleDriver" connection_check_timeout_sec = 100 user = "root" password = "123456" # Define query logic as required query = "SELECT * FROM TEST_TABLE" partition_column = "ID" # Read start boundary partition_lower_bound = 1 # Read end boundary partition_upper_bound = 500 partition_num = 10 } } source { Jdbc { url = "jdbc:oracle:thin:@datasource01:1523:xe" driver = "oracle.jdbc.OracleDriver" connection_check_timeout_sec = 100 user = "root" password = "123456" # Define query logic as required query = "SELECT * FROM TEST_TABLE" partition_column = "ID" # Read start boundary partition_lower_bound = 1 # Read end boundary partition_upper_bound = 500 partition_num = 10 } } Multi-Table Reading Task Example Configuring table_list enables automatic splitting, and you can adjust the splitting strategy via split. table_list split. env { job.mode = "BATCH" parallelism = 4 } source { Jdbc { url = "jdbc:oracle:thin:@datasource01:1523:xe" driver = "oracle.jdbc.OracleDriver" connection_check_timeout_sec = 100 user = "root" password = "123456" "table_list" = [ { "table_path" = "XE.TEST.USER_INFO" }, { "table_path" = "XE.TEST.YOURTABLENAME" } ] split.size = 10000 } } sink { Console {} } env { job.mode = "BATCH" parallelism = 4 } source { Jdbc { url = "jdbc:oracle:thin:@datasource01:1523:xe" driver = "oracle.jdbc.OracleDriver" connection_check_timeout_sec = 100 user = "root" password = "123456" "table_list" = [ { "table_path" = "XE.TEST.USER_INFO" }, { "table_path" = "XE.TEST.YOURTABLENAME" } ] split.size = 10000 } } sink { Console {} } Sink Output Configuration Definition Explanation Simple Task Example This example defines a SeaTunnel synchronization task. FakeSource automatically generates data and sends it to JDBC Sink. FakeSource generates 16 rows of data ( row.num=16 ), with two fields per row: name (string type) and age (int type). The target table is test_table , which also contains 16 rows. row.num=16 name age test_table Before running this job, you need to create the test database and test_table table in MySQL. If you haven't installed and deployed SeaTunnel, follow the instructions in the installation guide. Then run the job as described in the quick start guide. test test_table env { parallelism = 1 job.mode = "BATCH" } source { FakeSource { parallelism = 1 result_table_name = "fake" row.num = 16 schema = { fields { name = "string" age = "int" } } } } transform {} sink { jdbc { url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "123456" query = "insert into test_table(name,age) values(?,?)" } } env { parallelism = 1 job.mode = "BATCH" } source { FakeSource { parallelism = 1 result_table_name = "fake" row.num = 16 schema = { fields { name = "string" age = "int" } } } } transform {} sink { jdbc { url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "123456" query = "insert into test_table(name,age) values(?,?)" } } Generated Output SQL Task Example This example does not require writing complex SQL statements. You can configure the output database name and table name to generate insert statements for you automatically. sink { jdbc { url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "123456" generate_sink_sql = true database = test table = test_table } } sink { jdbc { url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "123456" generate_sink_sql = true database = test table = test_table } } Accurate Task Example For scenarios requiring accurate writes, we ensure exactly once semantics. sink { jdbc { url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" max_retries = 0 user = "root" password = "123456" query = "insert into test_table(name,age) values(?,?)" is_exactly_once = "true" xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource" } } sink { jdbc { url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" max_retries = 0 user = "root" password = "123456" query = "insert into test_table(name,age) values(?,?)" is_exactly_once = "true" xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource" } } CDC (Change Data Capture) Events We also support CDC for change data capture. In this case, you need to configure the database, table, and primary_keys . primary_keys sink { jdbc { url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "123456" generate_sink_sql = true database = test table = sink_table primary_keys = ["id", "name"] field_ide = UPPERCASE schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode = "APPEND_DATA" } } sink { jdbc { url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "123456" generate_sink_sql = true database = test table = sink_table primary_keys = ["id", "name"] field_ide = UPPERCASE schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode = "APPEND_DATA" } } Complete Test Script Configuration File Below is the complete configuration file for this example: env { parallelism = 4 job.mode = "BATCH" } source { Jdbc { url = "jdbc:oracle:thin:@192.168.11.101:15210:YLAPP" driver = "oracle.jdbc.OracleDriver" user = "appuser001" password = "appuser001" query = "SELECT * FROM YL_APP.MET_COM_ICDOPERATION_LS" } } transform {} sink { jdbc { url = "jdbc:mysql://192.168.10.210:13306/yl-app-new?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "appuser001" password = "appuser001" generate_sink_sql = "true" database = "hive" table = "met_com_icdoperation_ls" schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode = "APPEND_DATA" } } env { parallelism = 4 job.mode = "BATCH" } source { Jdbc { url = "jdbc:oracle:thin:@192.168.11.101:15210:YLAPP" driver = "oracle.jdbc.OracleDriver" user = "appuser001" password = "appuser001" query = "SELECT * FROM YL_APP.MET_COM_ICDOPERATION_LS" } } transform {} sink { jdbc { url = "jdbc:mysql://192.168.10.210:13306/yl-app-new?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "appuser001" password = "appuser001" generate_sink_sql = "true" database = "hive" table = "met_com_icdoperation_ls" schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode = "APPEND_DATA" } } (Attention: The current node configuration depends on your needs. If you have not installed Hadoop, choose either the 'local' or 'client' mode here. If you have installed a Hadoop cluster, you can select to run using the 'yarn' and 'cluster' modes.) Replace the database configuration information in the above script with your data connection configuration, then overwrite the script into the script input box shown in the figure above. Save the workflow, go online, and then start the workflow. Verify in the corresponding database. Original Oracle database table: Synchronized MySQL database table: The task ran, and the data was successfully synchronized. OK, the test passed! Next, you can expand and explore more based on this demo. The more practical experience you have, the deeper your understanding of the architecture and principles of DolphinScheduler and Seatunnel will become. Gradually, you can upgrade and extend the functionalities of these excellent open-source frameworks by extending the source code.