Problem Encountered In our project, SeaTunnel is used to extract data from the business database into the data warehouse (StarRocks), and we’ve already successfully used MySQL-CDC for large-scale real-time synchronization. However, we encountered an abnormal issue when syncing a particular MySQL table: after the job started, the logs showed zero read and write counts, and the job didn’t stop for a long time. After 6 hours of running, it terminated with a checkpoint timeout error. The job structure is as follows (sensitive information removed): Key logs during execution: Background Scenario: Real-time data extraction from MySQL to StarRocks using MySQL-CDCSeaTunnel version: 2.3.9MySQL version: 8.xStarRocks version: 3.2Source table data volume: 60–70 million rows Scenario: Real-time data extraction from MySQL to StarRocks using MySQL-CDC SeaTunnel version: 2.3.9 MySQL version: 8.x StarRocks version: 3.2 Source table data volume: 60–70 million rows Key Questions Why do the read and write counts remain at 0?Why does it take so long to throw a timeout error? Why do the read and write counts remain at 0? Why does it take so long to throw a timeout error? Analysis Process We’ve used MySQL-CDC for many sync jobs before, and the configurations were mostly the same, so the issue likely isn’t with SeaTunnel itself. We compared this source table with previously successful ones to see if there were differences. Sure enough, we found something suspicious: The previous tables all had auto-increment primary keys; this one didn’t. It only had multiple unique indexes. So the question arises: How exactly does SeaTunnel sync data? As far as we know, SeaTunnel uses a two-step approach when syncing CDC data: first snapshot sync, then binlog-based incremental sync. Since read count remains zero, the job must be stuck at the snapshot phase. So how does snapshot sync work? We checked the official SeaTunnel docs: MySQL CDC | Apache SeaTunnel: https://seatunnel.apache.org/zh-CN/docs/2.3.9/connector-v2/source/MySQL-CDC https://seatunnel.apache.org/zh-CN/docs/2.3.9/connector-v2/source/MySQL-CDC There isn’t any architectural explanation, but we did find some configurable parameters. Parameter Explanation chunk-key.even-distribution.factor.upper-bound chunk-key.even-distribution.factor.upper-bound Default value: 100 Description: The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor (e.g., (MAX(id) - MIN(id) + 1) / row count) is ≤ this value, the table is considered evenly distributed and will use uniform chunking. If it exceeds this value, and the estimated number of shards surpasses sample-sharding.threshold, the sampling-based sharding strategy will be used. Default: 100.0 sample-sharding.threshold chunk-key.even-distribution.factor.lower-bound chunk-key.even-distribution.factor.lower-bound Default value: 0.5 Description: The lower bound of the distribution factor. If the distribution factor is ≥ this value, the table is considered evenly distributed. Otherwise, it’s considered uneven and might trigger sampling-based sharding. sample-sharding.threshold sample-sharding.threshold Default value: 1000 Description: If the distribution factor is outside the [lower, upper] range and the estimated number of shards (approx. row count / chunk size) exceeds this threshold, the sampling-based sharding strategy will be used. This improves efficiency for large datasets. inverse-sampling.rate inverse-sampling.rate Default value: 1000 Description: Used in sampling-based sharding. A value of 1000 means a 1/1000 sampling rate. It controls the granularity of sampling and affects the number of final shards. snapshot.split.size snapshot.split.size Default value: 8096 Description: The number of rows per chunk in snapshot sync. Tables will be split into chunks based on this. snapshot.fetch.size snapshot.fetch.size Default value: 1024 Description: Maximum number of rows fetched per poll during snapshot reading. From these parameters, we learned: During snapshot sync, SeaTunnel chunks data into multiple splits. The sharding strategy depends on whether the data is evenly distributed. Our table has ~60 million rows (estimated by business staff since we couldn’t count them directly). Since the table has no primary key, we weren’t sure what field SeaTunnel uses for chunking. We assumed it used the ID column, which does have a unique index, and tested: SELECT MAX(ID), MIN(ID) FROM table; SELECT MAX(ID), MIN(ID) FROM table; Max key value: 804306477418Min key value: 607312608210Distribution factor = (804306477418 - 607312608210 + 1) / 60,000,000 ≈ 3283.23 Max key value: 804306477418 Min key value: 607312608210 Distribution factor = (804306477418 - 607312608210 + 1) / 60,000,000 ≈ 3283.23 This is clearly outside the [0.5, 100] "even" range → so SeaTunnel considers this uneven distribution. Default chunk size: 8096Shard count = 60,000,000 / 8096 ≈ 7411 → greater than sample-sharding.threshold (1000) Default chunk size: 8096 Shard count = 60,000,000 / 8096 ≈ 7411 → greater than sample-sharding.threshold (1000) sample-sharding.threshold So, SeaTunnel likely switched to sampling-based sharding. Sampling rate (inverse): 1000 → need to sample 60,000 rows Sampling rate (inverse): 1000 → need to sample 60,000 rows At this point, we were convinced that SeaTunnel was stuck sampling—and we became curious: how exactly does it sample? Why does it run for 6 hours? Even with 60M rows, sampling 60K shouldn’t be that slow. Surely it's scanning the ID column (which has a unique index)? that We decided to dive into the source code. GitHub: https://github.com/apache/seatunnel/ https://github.com/apache/seatunnel/ SeaTunnel’s architecture is quite complex, and setting up the environment took us a full day (mostly dependency setup). Finding the critical logic took another day—we traced from log messages and keyword searches. Partial Source Code Analysis private List<ChunkRange> splitTableIntoChunks( JdbcConnection jdbc, TableId tableId, Column splitColumn) throws Exception { final String splitColumnName = splitColumn.name(); // Get min/max values final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn); final Object min = minMax[0]; final Object max = minMax[1]; if (min == null || max == null || min.equals(max)) { // Empty table or only one row — full table scan as a chunk return Collections.singletonList(ChunkRange.all()); } // Get chunk size, distribution factor bounds, and sampling threshold from config final int chunkSize = sourceConfig.getSplitSize(); final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); final double distributionFactorLower = sourceConfig.getDistributionFactorLower(); final int sampleShardingThreshold = sourceConfig.getSampleShardingThreshold(); log.info("Splitting table {} into chunks, split column: {}, min: {}, max: {}, chunk size: {}, " + "distribution factor upper: {}, distribution factor lower: {}, sample sharding threshold: {}", tableId, splitColumnName, min, max, chunkSize, distributionFactorUpper, distributionFactorLower, sampleShardingThreshold); if (isEvenlySplitColumn(splitColumn)) { long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId); double distributionFactor = calculateDistributionFactor(tableId, min, max, approximateRowCnt); boolean dataIsEvenlyDistributed = doubleCompare(distributionFactor, distributionFactorLower) >= 0 && doubleCompare(distributionFactor, distributionFactorUpper) <= 0; if (dataIsEvenlyDistributed) { final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); return splitEvenlySizedChunks(tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); } else { int shardCount = (int) (approximateRowCnt / chunkSize); int inverseSamplingRate = sourceConfig.getInverseSamplingRate(); if (sampleShardingThreshold < shardCount) { if (inverseSamplingRate > chunkSize) { log.warn("inverseSamplingRate {} > chunkSize {}, adjusting...", inverseSamplingRate, chunkSize); inverseSamplingRate = chunkSize; } log.info("Using sampling sharding for table {}, rate = {}", tableId, inverseSamplingRate); Object[] sample = sampleDataFromColumn(jdbc, tableId, splitColumn, inverseSamplingRate); log.info("Sampled {} records from table {}", sample.length, tableId); return efficientShardingThroughSampling(tableId, sample, approximateRowCnt, shardCount); } return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); } } else { return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); } } private List<ChunkRange> splitTableIntoChunks( JdbcConnection jdbc, TableId tableId, Column splitColumn) throws Exception { final String splitColumnName = splitColumn.name(); // Get min/max values final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn); final Object min = minMax[0]; final Object max = minMax[1]; if (min == null || max == null || min.equals(max)) { // Empty table or only one row — full table scan as a chunk return Collections.singletonList(ChunkRange.all()); } // Get chunk size, distribution factor bounds, and sampling threshold from config final int chunkSize = sourceConfig.getSplitSize(); final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); final double distributionFactorLower = sourceConfig.getDistributionFactorLower(); final int sampleShardingThreshold = sourceConfig.getSampleShardingThreshold(); log.info("Splitting table {} into chunks, split column: {}, min: {}, max: {}, chunk size: {}, " + "distribution factor upper: {}, distribution factor lower: {}, sample sharding threshold: {}", tableId, splitColumnName, min, max, chunkSize, distributionFactorUpper, distributionFactorLower, sampleShardingThreshold); if (isEvenlySplitColumn(splitColumn)) { long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId); double distributionFactor = calculateDistributionFactor(tableId, min, max, approximateRowCnt); boolean dataIsEvenlyDistributed = doubleCompare(distributionFactor, distributionFactorLower) >= 0 && doubleCompare(distributionFactor, distributionFactorUpper) <= 0; if (dataIsEvenlyDistributed) { final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); return splitEvenlySizedChunks(tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); } else { int shardCount = (int) (approximateRowCnt / chunkSize); int inverseSamplingRate = sourceConfig.getInverseSamplingRate(); if (sampleShardingThreshold < shardCount) { if (inverseSamplingRate > chunkSize) { log.warn("inverseSamplingRate {} > chunkSize {}, adjusting...", inverseSamplingRate, chunkSize); inverseSamplingRate = chunkSize; } log.info("Using sampling sharding for table {}, rate = {}", tableId, inverseSamplingRate); Object[] sample = sampleDataFromColumn(jdbc, tableId, splitColumn, inverseSamplingRate); log.info("Sampled {} records from table {}", sample.length, tableId); return efficientShardingThroughSampling(tableId, sample, approximateRowCnt, shardCount); } return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); } } else { return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); } } Let’s focus on the sampling logic: public static Object[] skipReadAndSortSampleData( JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate ) throws Exception { final String sampleQuery = String.format("SELECT %s FROM %s", quote(columnName), quote(tableId)); Statement stmt = null; ResultSet rs = null; List<Object> results = new ArrayList<>(); try { stmt = jdbc.connection().createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); stmt.setFetchSize(Integer.MIN_VALUE); rs = stmt.executeQuery(sampleQuery); int count = 0; while (rs.next()) { count++; if (count % 100000 == 0) { log.info("Processing row index: {}", count); } if (count % inverseSamplingRate == 0) { results.add(rs.getObject(1)); } if (Thread.currentThread().isInterrupted()) { throw new InterruptedException("Thread interrupted"); } } } finally { if (rs != null) rs.close(); if (stmt != null) stmt.close(); } Object[] resultsArray = results.toArray(); Arrays.sort(resultsArray); return resultsArray; } public static Object[] skipReadAndSortSampleData( JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate ) throws Exception { final String sampleQuery = String.format("SELECT %s FROM %s", quote(columnName), quote(tableId)); Statement stmt = null; ResultSet rs = null; List<Object> results = new ArrayList<>(); try { stmt = jdbc.connection().createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); stmt.setFetchSize(Integer.MIN_VALUE); rs = stmt.executeQuery(sampleQuery); int count = 0; while (rs.next()) { count++; if (count % 100000 == 0) { log.info("Processing row index: {}", count); } if (count % inverseSamplingRate == 0) { results.add(rs.getObject(1)); } if (Thread.currentThread().isInterrupted()) { throw new InterruptedException("Thread interrupted"); } } } finally { if (rs != null) rs.close(); if (stmt != null) stmt.close(); } Object[] resultsArray = results.toArray(); Arrays.sort(resultsArray); return resultsArray; } This is the core sampling logic: It scans the entire table row by row, sampling 1 out of every 1000 records. That explains why it was running so slowly — we saw Processing row indexmessages in the logs and wondered what they were doing. Processing row index Roughly 60,000 IDs were sampled. Now for the sampling-based sharding strategy: protected List<ChunkRange> efficientShardingThroughSampling( TableId tableId, Object[] sampleData, long approximateRowCnt, int shardCount ) { log.info("Using sampling-based sharding on table {}, approx rows: {}, shards: {}", tableId, approximateRowCnt, shardCount); List<ChunkRange> splits = new ArrayList<>(); if (shardCount == 0) { splits.add(ChunkRange.of(null, null)); return splits; } double approxSamplePerShard = (double) sampleData.length / shardCount; Object lastEnd = null; if (approxSamplePerShard <= 1) { splits.add(ChunkRange.of(null, sampleData[0])); lastEnd = sampleData[0]; for (int i = 1; i < sampleData.length; i++) { if (!sampleData[i].equals(lastEnd)) { splits.add(ChunkRange.of(lastEnd, sampleData[i])); lastEnd = sampleData[i]; } } splits.add(ChunkRange.of(lastEnd, null)); } else { for (int i = 0; i < shardCount; i++) { Object chunkStart = lastEnd; Object chunkEnd = (i < shardCount - 1) ? sampleData[(int) ((i + 1) * approxSamplePerShard)] : null; if (i == 0 || i == shardCount - 1 || !Objects.equals(chunkEnd, chunkStart)) { splits.add(ChunkRange.of(chunkStart, chunkEnd)); lastEnd = chunkEnd; } } } return splits; } protected List<ChunkRange> efficientShardingThroughSampling( TableId tableId, Object[] sampleData, long approximateRowCnt, int shardCount ) { log.info("Using sampling-based sharding on table {}, approx rows: {}, shards: {}", tableId, approximateRowCnt, shardCount); List<ChunkRange> splits = new ArrayList<>(); if (shardCount == 0) { splits.add(ChunkRange.of(null, null)); return splits; } double approxSamplePerShard = (double) sampleData.length / shardCount; Object lastEnd = null; if (approxSamplePerShard <= 1) { splits.add(ChunkRange.of(null, sampleData[0])); lastEnd = sampleData[0]; for (int i = 1; i < sampleData.length; i++) { if (!sampleData[i].equals(lastEnd)) { splits.add(ChunkRange.of(lastEnd, sampleData[i])); lastEnd = sampleData[i]; } } splits.add(ChunkRange.of(lastEnd, null)); } else { for (int i = 0; i < shardCount; i++) { Object chunkStart = lastEnd; Object chunkEnd = (i < shardCount - 1) ? sampleData[(int) ((i + 1) * approxSamplePerShard)] : null; if (i == 0 || i == shardCount - 1 || !Objects.equals(chunkEnd, chunkStart)) { splits.add(ChunkRange.of(chunkStart, chunkEnd)); lastEnd = chunkEnd; } } } return splits; } Each chunk gets a distinct start and end based on the sorted sampled IDs — no overlap. Let’s look at the ChunkRange class that represents the result: ChunkRange Snapshot sharding enables parallel data reads, speeding up historical sync. Final Solution Through the above analysis, we confirmed that the job was stuck in the snapshot phase performing sampling, triggered because SeaTunnel determined the source table was unevenly distributed. Since the sync job had been blocked for days, we came up with a simple fix: adjust the distribution factor thresholds so SeaTunnel would treat the table as evenly distributed and skip sampling. The default factor range is 0.5 ~ 100, but our table's factor was ~3283 — so we increased the upper bound to 4000. The final configuration was: 0.5 ~ 100 snapshot.split.size: Our table was highly sparse, so we increased this value drastically (randomly multiplied by 1000 — admittedly not very scientific). snapshot.split.size table-names-config: Manually specified the primary key and split key, since the table had no primary key and we weren’t sure which column SeaTunnel used. Better to be explicit. table-names-config Final Result It finally started syncing! 🎉