SeaTunnel’s RowKindExtractor Simplifies CDC Pipelines With Append-Only Transformation

Written by williamguo | Published 2025/11/28
Tech Story Tags: plugin | apache-seatunnel | cdc | append-only | opensource | transform | data-science | rowkindextractor

TLDRRowKindExtractor converts every event while keeping the original RowKind in a new field, ideal for systems that accept inserts only. via the TL;DR App

RowKindExtractor is a transformation plugin in Apache SeaTunnel that can convert CDC data streams to Append-Only mode and extract the original RowKind information as a new field. This article introduces the core functionalities of RowKindExtractor, its usage in CDC data synchronization scenarios, configuration options, precautions, and multiple application examples.

RowKindExtractor

The RowKindExtractor transformation plugin is used to convert CDC (Change Data Capture) data streams into Append-Only mode while extracting the original RowKind information into a new field.

Core Functions:

  • Convert all RowKind values of data rows to +I(INSERT), achieving Append-Only mode.
  • Preserve the original RowKind information (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE) in a newly added field.
  • Support both SHORT and FULL output formats.

Why is this plugin needed?

In CDC data synchronization scenarios, each data row carries a RowKind flag (+I-U+U-D), representing different types of changes. Some downstream systems (such as data lakes or analytical platforms) only support Append-Only mode and do not support UPDATE or DELETE operations. In such cases, it is necessary to:

  1. Convert all rows to INSERT type (Append-Only).
  2. Store the original change type in a regular field for downstream analysis.

Transformation Example:

Input (CDC data):
 RowKind: -D (DELETE)
 Data: id=1, name="test1", age=20

Output (Append-Only data):
 RowKind: +I (INSERT)
 Data: id=1, name="test1", age=20, row_kind="DELETE"

Typical Use Cases:

  • Writing CDC data into data lakes that only support Append-Only operations.
  • Retaining a complete change history in data warehouses.
  • Analyzing and aggregating different types of data changes.

Configuration Options

Parameter NameTypeRequiredDefault ValueDescription
custom_field_namestringNorow_kindName of the new field used to store the original RowKind information
transform_typeenumNoSHORTOutput format of RowKind; possible values: SHORT (short format) or FULL (full format)

custom_field_name [string]

Specifies the name of the new field used to store the original RowKind information.

Default: row_kind

Notes:

  • The field name cannot duplicate an existing field, otherwise an error will occur.
  • It is recommended to use meaningful names such as operation_typechange_type, or cdc_op.

Example:

custom_field_name = "operation_type"  # Use a custom field name

transform_type [enum]

Specifies the output format of the RowKind field value.

Options:

FormatDescriptionOutput Values
SHORTShort format (symbolic)+I, -U, +U, -D
FULLFull format (English name)INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE

Default: SHORT

Meaning of each value

RowKind TypeSHORT FormatFULL FormatDescription
INSERT+IINSERTInsert operation
UPDATE_BEFORE-UUPDATE_BEFOREValue before update
UPDATE_AFTER+UUPDATE_AFTERValue after update
DELETE-DDELETEDelete operation

Usage Recommendation:

  • SHORT format: Saves storage, suitable for space-sensitive scenarios.
  • FULL format: More readable, suitable for human inspection or analysis.

Example:

transform_type = FULL  # Use full format

Full Examples

Example 1: Using Default Configuration (SHORT Format)

Using default configuration, CDC data is converted to Append-Only mode, with RowKind saved in short format.

env {
  parallelism = 1
  job.mode = "STREAMING"
}

source {
  MySQL-CDC {
    plugin_output = "cdc_source"
    server-id = 5652
    username = "root"
    password = "your_password"
    table-names = ["mydb.users"]
    url = "jdbc:mysql://localhost:3306/mydb"
  }
}

transform {
  RowKindExtractor {
    plugin_input = "cdc_source"
    plugin_output = "append_only_data"
    # Default configuration:
    # custom_field_name = "row_kind"
    # transform_type = SHORT
  }
}

sink {
  Console {
    plugin_input = "append_only_data"
  }
}

Data Transformation Process:

Input Data (CDC Format):
 1. RowKind=+I, id=1, name="Zhang San", age=25
 2. RowKind=-U, id=1, name="Zhang San", age=25
 3. RowKind=+U, id=1, name="Zhang San", age=26
 4. RowKind=-D, id=1, name="Zhang San", age=26

Output Data (Append-Only Format):
 1. RowKind=+I, id=1, name="Zhang San", age=25, row_kind="+I"
 2. RowKind=+I, id=1, name="Zhang San", age=25, row_kind="-U"
 3. RowKind=+I, id=1, name="Zhang San", age=26, row_kind="+U"
 4. RowKind=+I, id=1, name="Zhang San", age=26, row_kind="-D"

Example 2: Using FULL Format with Custom Field Name

Output RowKind in full format and use a custom field name.

env {
  parallelism = 1
  job.mode = "STREAMING"
}

source {
  MySQL-CDC {
    plugin_output = "cdc_source"
    server-id = 5652
    username = "root"
    password = "your_password"
    table-names = ["mydb.orders"]
    url = "jdbc:mysql://localhost:3306/mydb"
  }
}

transform {
  RowKindExtractor {
    plugin_input = "cdc_source"
    plugin_output = "append_only_data"
    custom_field_name = "operation_type"  # Custom field name
    transform_type = FULL                 # Use full format
  }
}

sink {
  Iceberg {
    plugin_input = "append_only_data"
    catalog_name = "iceberg_catalog"
    database = "mydb"
    table = "orders_history"
    # The Iceberg table will include the operation_type field to record the change type of each row
  }
}

Data Transformation Process:

Input Data (CDC Format):
 1. RowKind=+I, order_id=1001, amount=100.00
 2. RowKind=-U, order_id=1001, amount=100.00
 3. RowKind=+U, order_id=1001, amount=150.00
 4. RowKind=-D, order_id=1001, amount=150.00

Output Data (Append-Only Format, FULL):
 1. RowKind=+I, order_id=1001, amount=100.00, operation_type="INSERT"
 2. RowKind=+I, order_id=1001, amount=100.00, operation_type="UPDATE_BEFORE"
 3. RowKind=+I, order_id=1001, amount=150.00, operation_type="UPDATE_AFTER"
 4. RowKind=+I, order_id=1001, amount=150.00, operation_type="DELETE"

Example 3: Full Test Using FakeSource

Generate test data using FakeSource to demonstrate transformation of various RowKind values.

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    plugin_output = "fake_cdc_data"
    schema = {
      fields {
        pk_id = bigint
        name = string
        score = int
      }
      primaryKey {
        name = "pk_id"
        columnNames = [pk_id]
      }
    }
    rows = [
      { kind = INSERT, fields = [1, "A", 100] },
      { kind = INSERT, fields = [2, "B", 100] },
      { kind = UPDATE_BEFORE, fields = [1, "A", 100] },
      { kind = UPDATE_AFTER, fields = [1, "A_updated", 95] },
      { kind = UPDATE_BEFORE, fields = [2, "B", 100] },
      { kind = UPDATE_AFTER, fields = [2, "B_updated", 98] },
      { kind = DELETE, fields = [1, "A_updated", 95] }
    ]
  }
}

transform {
  RowKindExtractor {
    plugin_input = "fake_cdc_data"
    plugin_output = "transformed_data"
    custom_field_name = "change_type"
    transform_type = FULL
  }
}

sink {
  Console {
    plugin_input = "transformed_data"
  }
}

Expected Output:

+I, pk_id=1, name="A", score=100, change_type="INSERT"
+I, pk_id=2, name="B", score=100, change_type="INSERT"
+I, pk_id=1, name="A", score=100, change_type="UPDATE_BEFORE"
+I, pk_id=1, name="A_updated", score=95, change_type="UPDATE_AFTER"
+I, pk_id=2, name="B", score=100, change_type="UPDATE_BEFORE"
+I, pk_id=2, name="B_updated", score=98, change_type="UPDATE_AFTER"
+I, pk_id=1, name="A_updated", score=95, change_type="DELETE"



Written by williamguo | William Guo, WhaleOps CEO, Apache Software Foundation Member
Published by HackerNoon on 2025/11/28