Previously, we evaluated using SeaTunnel for CDC (Change Data Capture) data lake ingestion: SeaTunnel-CDC Data Lake Ingestion Practice. Those scenarios were all based on direct database connections. However, in many business cases, we can't directly access databases to perform CDC. In these scenarios, we need to integrate via APIs and schedule data synchronization using Apache DolphinScheduler. SeaTunnel-CDC Data Lake Ingestion Practice Here’s a real-world example: Synchronizing inventory data from ERP (SAP) into a data lake for inventory analysis. Synchronizing inventory data from ERP (SAP) into a data lake for inventory analysis. At the same time, our goal is to enable colleagues to replicate the process and independently complete future API-to-lake integrations, rather than having to write custom code for each use case. Prerequisites SeaTunnel 2.3.10 SeaTunnel 2.3.10 First, add the connector names in the ${SEATUNNEL_HOME}/config/plugin_config file, and then install the connectors by executing the command. After installation, the connectors should appear under ${SEATUNNEL_HOME}/connectors/. ${SEATUNNEL_HOME}/config/plugin_config ${SEATUNNEL_HOME}/connectors/ In this example, we’ll use: connector-jdbc, connector-paimon. connector-jdbc connector-paimon To write to StarRocks, you can also use connector-starrocks, but for this particular case, connector-jdbc is more appropriate, so we'll use that. connector-starrocks connector-jdbc # Configure connector names --connectors-v2-- connector-jdbc connector-starrocks connector-paimon --end-- # Configure connector names --connectors-v2-- connector-jdbc connector-starrocks connector-paimon --end-- # Install connectors sh bin/install-plugin.sh 2.3.10 # Install connectors sh bin/install-plugin.sh 2.3.10 SeaTunnel Job Let’s first ensure the SeaTunnel job runs successfully locally, before integrating it with Apache DolphinScheduler. http to StarRocks http to StarRocks Example path: example/http2starrocks example/http2starrocks env { parallelism = 1 job.mode = "BATCH" } source { Http { plugin_output = "stock" url = "https://ip/http/prd/query_sap_stock" method = "POST" headers { Authorization = "Basic XXX" Content-Type = "application/json" } body = """{"IT_WERKS": [{"VALUE": "1080"}]}""" format = "json" content_field = "$.ET_RETURN.*" schema { fields { MATNR = "string" MAKTX = "string" WERKS = "string" NAME1 = "string" LGORT = "string" LGOBE = "string" CHARG = "string" MEINS = "string" LABST = "double" UMLME = "double" INSME = "double" EINME = "double" SPEME = "double" RETME = "double" } } } } # This transform is mainly for field renaming and readability transform { Sql { plugin_input = "stock" plugin_output = "stock-tf-out" query = "select MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME from stock" } } # Write to StarRocks using JDBC with partition overwrite sink { jdbc { plugin_input = "stock-tf-out" url = "jdbc:mysql://XXX:9030/scm?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "lab" password = "XXX" compatible_mode="starrocks" query = """insert overwrite ods_sap_stock PARTITION (WERKS='1080') (MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)""" } } env { parallelism = 1 job.mode = "BATCH" } source { Http { plugin_output = "stock" url = "https://ip/http/prd/query_sap_stock" method = "POST" headers { Authorization = "Basic XXX" Content-Type = "application/json" } body = """{"IT_WERKS": [{"VALUE": "1080"}]}""" format = "json" content_field = "$.ET_RETURN.*" schema { fields { MATNR = "string" MAKTX = "string" WERKS = "string" NAME1 = "string" LGORT = "string" LGOBE = "string" CHARG = "string" MEINS = "string" LABST = "double" UMLME = "double" INSME = "double" EINME = "double" SPEME = "double" RETME = "double" } } } } # This transform is mainly for field renaming and readability transform { Sql { plugin_input = "stock" plugin_output = "stock-tf-out" query = "select MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME from stock" } } # Write to StarRocks using JDBC with partition overwrite sink { jdbc { plugin_input = "stock-tf-out" url = "jdbc:mysql://XXX:9030/scm?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "lab" password = "XXX" compatible_mode="starrocks" query = """insert overwrite ods_sap_stock PARTITION (WERKS='1080') (MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)""" } } http to Paimon http to Paimon Example path: example/http2paimon example/http2paimon env { parallelism = 1 job.mode = "BATCH" } source { Http { plugin_output = "stock" url = "https://ip/http/prd/query_sap_stock" method = "POST" headers { Authorization = "Basic XXX" Content-Type = "application/json" } body = """{"IT_WERKS": [{"VALUE": "1080"}]}""" format = "json" content_field = "$.ET_RETURN.*" schema { fields { MATNR = "string" MAKTX = "string" WERKS = "string" NAME1 = "string" LGORT = "string" LGOBE = "string" CHARG = "string" MEINS = "string" LABST = "double" UMLME = "double" INSME = "double" EINME = "double" SPEME = "double" RETME = "double" } } } } # This transform is mainly for field renaming and readability transform { Sql { plugin_input = "stock" plugin_output = "stock-tf-out" query = "select MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME from stock" } } # Sync to Paimon (currently Paimon does not support insert overwrite on partitions; this is for reference only) sink { Paimon { warehouse = "s3a://test/" database = "sap" table = "ods_sap_stock" paimon.hadoop.conf = { fs.s3a.access-key=XXX fs.s3a.secret-key=XXX fs.s3a.endpoint="http://minio:9000" fs.s3a.path.style.access=true fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider } } } env { parallelism = 1 job.mode = "BATCH" } source { Http { plugin_output = "stock" url = "https://ip/http/prd/query_sap_stock" method = "POST" headers { Authorization = "Basic XXX" Content-Type = "application/json" } body = """{"IT_WERKS": [{"VALUE": "1080"}]}""" format = "json" content_field = "$.ET_RETURN.*" schema { fields { MATNR = "string" MAKTX = "string" WERKS = "string" NAME1 = "string" LGORT = "string" LGOBE = "string" CHARG = "string" MEINS = "string" LABST = "double" UMLME = "double" INSME = "double" EINME = "double" SPEME = "double" RETME = "double" } } } } # This transform is mainly for field renaming and readability transform { Sql { plugin_input = "stock" plugin_output = "stock-tf-out" query = "select MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME from stock" } } # Sync to Paimon (currently Paimon does not support insert overwrite on partitions; this is for reference only) sink { Paimon { warehouse = "s3a://test/" database = "sap" table = "ods_sap_stock" paimon.hadoop.conf = { fs.s3a.access-key=XXX fs.s3a.secret-key=XXX fs.s3a.endpoint="http://minio:9000" fs.s3a.path.style.access=true fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider } } } Integrating SeaTunnel with DolphinScheduler Build a DolphinScheduler Worker image with SeaTunnel pre-installed Build a DolphinScheduler Worker image with SeaTunnel pre-installed FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-worker:3.2.2 RUN mkdir /opt/seatunnel RUN mkdir /opt/seatunnel/apache-seatunnel-2.3.10 COPY apache-seatunnel-2.3.10/ /opt/seatunnel/apache-seatunnel-2.3.10/ FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-worker:3.2.2 RUN mkdir /opt/seatunnel RUN mkdir /opt/seatunnel/apache-seatunnel-2.3.10 COPY apache-seatunnel-2.3.10/ /opt/seatunnel/apache-seatunnel-2.3.10/ Build and push the Docker image: docker build --platform=linux/amd64 -t apache/dolphinscheduler-worker:3.2.2-seatunnel . docker build --platform=linux/amd64 -t apache/dolphinscheduler-worker:3.2.2-seatunnel . Deploy a new DolphinScheduler Worker using this image. Modify docker-compose.yaml to add a new dolphinscheduler-worker-seatunnel node: Deploy a new DolphinScheduler Worker using this image. Modify docker-compose.yaml to add a new dolphinscheduler-worker-seatunnel node: docker-compose.yaml dolphinscheduler-worker-seatunnel dolphinscheduler-worker-seatunnel: image: xxx/dolphinscheduler-worker:3.2.2-seatunnel profiles: ["all"] env_file: .env healthcheck: test: [ "CMD", "curl", "http://localhost:1235/actuator/health" ] interval: 30s timeout: 5s retries: 3 depends_on: dolphinscheduler-zookeeper: condition: service_healthy volumes: - ./dolphinscheduler-worker-seatunnel-data:/tmp/dolphinscheduler - ./dolphinscheduler-logs:/opt/dolphinscheduler/logs - ./dolphinscheduler-shared-local:/opt/soft - ./dolphinscheduler-resource-local:/dolphinscheduler networks: dolphinscheduler: ipv4_address: 172.15.0.18 dolphinscheduler-worker-seatunnel: image: xxx/dolphinscheduler-worker:3.2.2-seatunnel profiles: ["all"] env_file: .env healthcheck: test: [ "CMD", "curl", "http://localhost:1235/actuator/health" ] interval: 30s timeout: 5s retries: 3 depends_on: dolphinscheduler-zookeeper: condition: service_healthy volumes: - ./dolphinscheduler-worker-seatunnel-data:/tmp/dolphinscheduler - ./dolphinscheduler-logs:/opt/dolphinscheduler/logs - ./dolphinscheduler-shared-local:/opt/soft - ./dolphinscheduler-resource-local:/dolphinscheduler networks: dolphinscheduler: ipv4_address: 172.15.0.18 Configure Worker Group and Environment in DolphinScheduler Configure Worker Group and Environment in DolphinScheduler Worker Group Management: In the Security Center > Worker Group, create a group for the node IP used in the SeaTunnel Worker. Environment Configuration: In Environment Management, create a new environment for SeaTunnel execution and bind it to the Worker group created above. Create Workflow Definition: Fill in the SeaTunnel job configuration and define the workflow. Run the Task: During execution, select the correct SeaTunnel Worker Group and Environment to ensure the job runs in the integrated SeaTunnel environment. Worker Group Management: In the Security Center > Worker Group, create a group for the node IP used in the SeaTunnel Worker. Worker Group Management Environment Configuration: In Environment Management, create a new environment for SeaTunnel execution and bind it to the Worker group created above. Environment Configuration Create Workflow Definition: Fill in the SeaTunnel job configuration and define the workflow. Create Workflow Definition Run the Task: During execution, select the correct SeaTunnel Worker Group and Environment to ensure the job runs in the integrated SeaTunnel environment. Run the Task