paint-brush
Source Code Analysis of Apache SeaTunnel Zeta Engine (Part 1): Server Initializationby@williamguo
194 reads

Source Code Analysis of Apache SeaTunnel Zeta Engine (Part 1): Server Initialization

by William GuoSeptember 12th, 2024
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

This series of articles is based on Apache SeaTunnel version 2.3.6 and introduces the full process of handling a task from submission to execution with the Zeta engine. This document aims to assist newcomers to SeaTunnel by providing some guidance.
featured image - Source Code Analysis of Apache SeaTunnel Zeta Engine (Part 1): Server Initialization
William Guo HackerNoon profile picture


This series of articles is based on Apache SeaTunnel version 2.3.6 and introduces the full process of handling a task from submission to execution with the Zeta engine. This document aims to assist newcomers to SeaTunnel by providing some guidance.

The article will be divided into three parts, covering the following parts:

  1. Initialization of the SeaTunnel Server
  2. Task submission process on the Client side
  3. Task execution process upon receiving the task on the Server side

Due to the extensive source code analysis involved, this series of articles will document the overall task process.

References


Cluster Topology

First, let’s get an overview of the SeaTunnel Zeta engine architecture. SeaTunnel is implemented using Hazelcast for distributed cluster communication.

Since version 2.3.6, nodes in the cluster can be assigned as Master or Worker nodes, separating scheduling from execution to prevent excessive load on the Master node and avoid potential issues.

Version 2.3.6 also added a feature to add tag attributes to each node. When submitting a task, tags can be used to select the nodes where the task will run, achieving resource isolation.


The server side of the cluster is divided into Master and Worker nodes. The Master node is responsible for receiving requests, generating logical plans, allocating tasks, etc. (compared to previous versions, it now includes additional Backup nodes, which is a significant improvement for cluster stability).

The Worker node, on the other hand, is responsible only for task execution, which includes data reading and writing.

When submitting a task, you can create a Hazelcast client connection to the cluster for communication or use the REST API for communication.

Server Startup

After getting a general understanding of the cluster architecture, let’s look at the specific process.

First, let’s examine the Server startup process.

The command to start the Server is:

sh bin/seatunnel-cluster.sh -d -r <node role type>


Looking into this script, you’ll find that it ultimately executes the following command:

java -cp seatunnel-starter.jar org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer <other_java_jvm_config_and_args>


Let’s check the code for starter.seatunnel.SeaTunnelServer:


public class SeaTunnelServer {
    public static void main(String[] args) throws CommandException {
        ServerCommandArgs serverCommandArgs =
                CommandLineUtils.parse(
                        args,
                        new ServerCommandArgs(),
                        EngineType.SEATUNNEL.getStarterShellName(),
                        true);
        SeaTunnel.run(serverCommandArgs.buildCommand());
    }
}



This part uses JCommander to parse user-provided arguments and build and run a command. The serverCommandArgs.buildCommand returns the class:

public class ServerExecuteCommand implements Command<ServerCommandArgs> {
    private final ServerCommandArgs serverCommandArgs;
    public ServerExecuteCommand(ServerCommandArgs serverCommandArgs) {
        this.serverCommandArgs = serverCommandArgs;
    }
    @Override
    public void execute() {
        SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
        String clusterRole = this.serverCommandArgs.getClusterRole();
        if (StringUtils.isNotBlank(clusterRole)) {
            if (EngineConfig.ClusterRole.MASTER.toString().equalsIgnoreCase(clusterRole)) {
                seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.MASTER);
            } else if (EngineConfig.ClusterRole.WORKER.toString().equalsIgnoreCase(clusterRole)) {
                seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.WORKER);
                // In Hazelcast, lite members will not store IMap data.
                seaTunnelConfig.getHazelcastConfig().setLiteMember(true);
            } else {
                throw new SeaTunnelEngineException("Not supported cluster role: " + clusterRole);
            }
        } else {
            seaTunnelConfig
                    .getEngineConfig()
                    .setClusterRole(EngineConfig.ClusterRole.MASTER_AND_WORKER);
        }
        HazelcastInstanceFactory.newHazelcastInstance(
                seaTunnelConfig.getHazelcastConfig(),
                Thread.currentThread().getName(),
                new SeaTunnelNodeContext(seaTunnelConfig));
    }
}


Here, the configuration information is modified based on the role type.

When it is a Worker node, the Hazelcast node type is set to lite member. In Hazelcast, lite members do not store data.

Then, a Hazelcast instance is created and passed the SeaTunnelNodeContext instance and the modified configuration information.


public class SeaTunnelNodeContext extends DefaultNodeContext {

private final SeaTunnelConfig seaTunnelConfig;    public SeaTunnelNodeContext(@NonNull SeaTunnelConfig seaTunnelConfig) {  
    this.seaTunnelConfig = seaTunnelConfig;  
}    @Override  
public NodeExtension createNodeExtension(@NonNull Node node) {  
    return new org.apache.seatunnel.engine.server.NodeExtension(node, seaTunnelConfig);  
}    @Override  
public Joiner createJoiner(Node node) {  
    JoinConfig join =  
            getActiveMemberNetworkConfig(seaTunnelConfig.getHazelcastConfig()).getJoin();  
    join.verify();        if (node.shouldUseMulticastJoiner(join) && node.multicastService != null) {  
        super.createJoiner(node);  
    } else if (join.getTcpIpConfig().isEnabled()) {  
        log.info("Using LiteNodeDropOutTcpIpJoiner TCP/IP discovery");  
        return new LiteNodeDropOutTcpIpJoiner(node);  
    } else if (node.getProperties().getBoolean(DISCOVERY\_SPI\_ENABLED)  
            || isAnyAliasedConfigEnabled(join)  
            || join.isAutoDetectionEnabled()) {  
        super.createJoiner(node);  
    }  
    return null;  
}    private static boolean isAnyAliasedConfigEnabled(JoinConfig join) {  
    return !AliasedDiscoveryConfigUtils.createDiscoveryStrategyConfigs(join).isEmpty();  
}    private boolean usePublicAddress(JoinConfig join, Node node) {  
    return node.getProperties().getBoolean(DISCOVERY\_SPI\_PUBLIC\_IP\_ENABLED)  
            || allUsePublicAddress(  
                    AliasedDiscoveryConfigUtils.aliasedDiscoveryConfigsFrom(join));  
}  

}


In SeaTunnelNodeContext, the createNodeExtension method is overridden to use the engine.server.NodeExtension class.

The code for this class is:



public class NodeExtension extends DefaultNodeExtension {private final NodeExtensionCommon extCommon;

public NodeExtension(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelConfig) {  
    super(node);  
    extCommon = new NodeExtensionCommon(node, new SeaTunnelServer(seaTunnelConfig));  
}    @Override  
public void beforeStart() {  
    // TODO Get Config from Node here  
    super.beforeStart();  
}    @Override  
public void afterStart() {  
    super.afterStart();  
    extCommon.afterStart();  
}    @Override  
public void beforeClusterStateChange(  
        ClusterState currState, ClusterState requestedState, boolean isTransient) {  
    super.beforeClusterStateChange(currState, requestedState, isTransient);  
    extCommon.beforeClusterStateChange(requestedState);  
}    @Override  
public void onClusterStateChange(ClusterState newState, boolean isTransient) {  
    super.onClusterStateChange(newState, isTransient);  
    extCommon.onClusterStateChange(newState);  
}    @Override  
public Map<String, Object> createExtensionServices() {  
    return extCommon.createExtensionServices();  
}    @Override  
public TextCommandService createTextCommandService() {  
    return new TextCommandServiceImpl(node) {  
        {  
            register(HTTP\_GET, new Log4j2HttpGetCommandProcessor(this));  
            register(HTTP\_POST, new Log4j2HttpPostCommandProcessor(this));  
            register(HTTP\_GET, new RestHttpGetCommandProcessor(this));  
            register(HTTP\_POST, new RestHttpPostCommandProcessor(this));  
        }  
    };  
}    @Override  
public void printNodeInfo() {  
    extCommon.printNodeInfo(systemLogger);  
}  
}


In this part, we see that the SeaTunnelServerclass is initialized in the constructor. This class is the core server-side class, and its full class name is org.apache.seatunnel.engine.server.SeaTunnelServer.

Let’s review the code for this class:



public class SeaTunnelServerimplements ManagedService, MembershipAwareService, LiveOperationsTracker {
private static final ILogger LOGGER = Logger.getLogger(SeaTunnelServer.class);    public static final String SERVICE\_NAME = "st:impl:seaTunnelServer";    private NodeEngineImpl nodeEngine;  
private final LiveOperationRegistry liveOperationRegistry;    private volatile SlotService slotService;  
private TaskExecutionService taskExecutionService;  
private ClassLoaderService classLoaderService;  
private CoordinatorService coordinatorService;  
private ScheduledExecutorService monitorService;    @Getter private SeaTunnelHealthMonitor seaTunnelHealthMonitor;    private final SeaTunnelConfig seaTunnelConfig;    private volatile boolean isRunning = true;    public SeaTunnelServer(@NonNull SeaTunnelConfig seaTunnelConfig) {  
    this.liveOperationRegistry = new LiveOperationRegistry();  
    this.seaTunnelConfig = seaTunnelConfig;  
    LOGGER.info("SeaTunnel server start...");  
}    @Override  
public void init(NodeEngine engine, Properties hzProperties) {  
     ...  
    if (EngineConfig.ClusterRole.MASTER\_AND\_WORKER.ordinal()  
            == seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) {  
        startWorker();  
        startMaster();  
      
    } else if (EngineConfig.ClusterRole.WORKER.ordinal()  
            == seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) {  
        startWorker();  
    } else {  
        startMaster();  
    }  
    ...  
}  
  
....  

}


This class is the core code on the SeaTunnel Server side, and it starts relevant components based on the role of the node.

A summary of the SeaTunnel process:

SeaTunnel utilizes Hazelcast’s foundational capabilities to implement cluster networking and invoke core startup code.

For those interested in a deeper understanding of this area, it’s worth checking out Hazelcast’s related content. Here is a summary of the invocation path:

Classes loaded in sequence:

  • starter.SeaTunnelServer
  • ServerExecuteCommand
  • SeaTunnelNodeContext
  • NodeExtension
  • server.SeaTunnelServer


Next, let’s look in detail at the components created in the Master and Worker nodes.

Worker Node

private void startWorker() {
    taskExecutionService =
            new TaskExecutionService(
                    classLoaderService, nodeEngine, nodeEngine.getProperties());
    nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService);
    taskExecutionService.start();
    getSlotService();
}
public SlotService getSlotService() {
    if (slotService == null) {
        synchronized (this) {
            if (slotService == null) {
                SlotService service =
                        new DefaultSlotService(
                                nodeEngine,
                                taskExecutionService,
                                seaTunnelConfig.getEngineConfig().getSlotServiceConfig());
                service.init();
                slotService = service;
            }
        }
    }
    return slotService;
}



In the startWorker method, two components are initialized: taskExecutionService and slotService. Both are related to task execution.

SlotService

First, let’s look at the initialization of SlotService.


@Override
public void init() {
    initStatus = true;
    slotServiceSequence = UUID.randomUUID().toString();
    contexts = new ConcurrentHashMap<>();
    assignedSlots = new ConcurrentHashMap<>();
    unassignedSlots = new ConcurrentHashMap<>();
    unassignedResource = new AtomicReference<>(new ResourceProfile());
    assignedResource = new AtomicReference<>(new ResourceProfile());
    scheduledExecutorService =
            Executors.newSingleThreadScheduledExecutor(
                    r ->
                            new Thread(
                                    r,
                                    String.format(
                                            "hz.%s.seaTunnel.slotService.thread",
                                            nodeEngine.getHazelcastInstance().getName())));
    if (!config.isDynamicSlot()) {
        initFixedSlots();
    }
    unassignedResource.set(getNodeResource());
    scheduledExecutorService.scheduleAtFixedRate(
            () -> {
                try {
                    LOGGER.fine(
                            "start send heartbeat to resource manager, this address: "
                                    + nodeEngine.getClusterService().getThisAddress());
                    sendToMaster(new WorkerHeartbeatOperation(getWorkerProfile())).join();
                } catch (Exception e) {
                    LOGGER.warning(
                            "failed send heartbeat to resource manager, will retry later. this address: "
                                    + nodeEngine.getClusterService().getThisAddress());
                }
            },
            0,
            DEFAULT_HEARTBEAT_TIMEOUT,
            TimeUnit.MILLISECONDS);
}



In SeaTunnel, there is a concept of dynamic Slots. If set to true, each node does not have a fixed number of Slots and can accept any number of tasks. If set to a fixed number of Slots, the node can only accept the number of tasks equal to the fixed Slots.

During initialization, the number of Slots is set based on whether dynamic Slots are enabled or not.


private void initFixedSlots() {
    long maxMemory = Runtime.getRuntime().maxMemory();
    for (int i = 0; i < config.getSlotNum(); i++) {
        unassignedSlots.put(
                i,
                new SlotProfile(
                        nodeEngine.getThisAddress(),
                        i,
                        new ResourceProfile(
                                CPU.of(0), Memory.of(maxMemory / config.getSlotNum())),
                        slotServiceSequence));
    }
}



It can also be seen that a thread is started to periodically send heartbeats to the Master node. The heartbeat information includes the current node’s information, such as the number of assigned and unassigned Slots. The Worker node updates this information to the Master node periodically through heartbeats.

@Override
public synchronized WorkerProfile getWorkerProfile() {
    WorkerProfile workerProfile = new WorkerProfile(nodeEngine.getThisAddress());
    workerProfile.setProfile(getNodeResource());
    workerProfile.setAssignedSlots(assignedSlots.values().toArray(new SlotProfile[0]));
    workerProfile.setUnassignedSlots(unassignedSlots.values().toArray(new SlotProfile[0]));
    workerProfile.setUnassignedResource(unassignedResource.get());
    workerProfile.setAttributes(nodeEngine.getLocalMember().getAttributes());
    workerProfile.setDynamicSlot(config.isDynamicSlot());
    return workerProfile;
}
private ResourceProfile getNodeResource() {
    return new ResourceProfile(CPU.of(0), Memory.of(Runtime.getRuntime().maxMemory()));
}


TaskExecutionService

This component is related to task submission. Here, we briefly look at the related code and will delve into it further later.

When the Worker node initializes, it creates a TaskExecutionService object and calls its start method.

private final ExecutorService executorService =
        newCachedThreadPool(new BlockingTaskThreadFactory());
public TaskExecutionService(
        ClassLoaderService classLoaderService,
        NodeEngineImpl nodeEngine,
        HazelcastProperties properties) {
        // Load configuration
    seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
    this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
    this.nodeEngine = nodeEngine;
    this.classLoaderService = classLoaderService;
    this.logger = nodeEngine.getLoggingService().getLogger(TaskExecutionService.class);
    // Metrics related
    MetricsRegistry registry = nodeEngine.getMetricsRegistry();
    MetricDescriptor descriptor =
            registry.newMetricDescriptor()
                    .withTag(MetricTags.SERVICE, this.getClass().getSimpleName());
    registry.registerStaticMetrics(descriptor, this);
    // Scheduled task to update metrics in IMAP
    scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    scheduledExecutorService.scheduleAtFixedRate(
            this::updateMetricsContextInImap,
            0,
            seaTunnelConfig.getEngineConfig().getJobMetricsBackupInterval(),
            TimeUnit.SECONDS);
    serverConnectorPackageClient =
            new ServerConnectorPackageClient(nodeEngine, seaTunnelConfig);
    eventBuffer = new ArrayBlockingQueue<>(2048);
    // Event forwarding service
    eventForwardService =
            Executors.newSingleThreadExecutor(
                    new ThreadFactoryBuilder().setNameFormat("event-forwarder-%d").build());
    eventForwardService.submit(
            () -> {
                List<Event> events = new ArrayList<>();
                RetryUtils.RetryMaterial retryMaterial =
                        new RetryUtils.RetryMaterial(2, true, e -> true);
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        events.clear();
                        Event first = eventBuffer.take();
                        events.add(first);
                        eventBuffer.drainTo(events, 500);
                        JobEventReportOperation operation = new JobEventReportOperation(events);
                        RetryUtils.retryWithException(
                                () ->
                                        NodeEngineUtil.sendOperationToMasterNode(
                                                        nodeEngine, operation)
                                                .join(),
                                retryMaterial);
                        logger.fine("Event forward success, events " + events.size());
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        logger.info("Event forward thread interrupted");
                    } catch (Throwable t) {
                        logger.warning(
                                "Event forward failed, discard events " + events.size(), t);
                    }
                }
            });
}
public void start() {
    runBusWorkSupplier.runNewBusWork(false);
}


In this class, a thread pool is created as a member variable. A scheduled task is created to update the job status in IMAP, and a task is created to send Event information to the Master node. The Master node then sends these Events to external services.


Master Node

private void startMaster() {
    coordinatorService =
            new CoordinatorService(nodeEngine, this, seaTunnelConfig.getEngineConfig());
    monitorService = Executors.newSingleThreadScheduledExecutor();
    monitorService.scheduleAtFixedRate(
            this::printExecutionInfo,
            0,
            seaTunnelConfig.getEngineConfig().getPrintExecutionInfoInterval(),
            TimeUnit.SECONDS);
}


We can see that two components are started in the Master node: the coordinator component and the monitoring component.

The monitoring component’s task is straightforward: it periodically prints cluster information.

CoordinatorService

public CoordinatorService(
        @NonNull NodeEngineImpl nodeEngine,
        @NonNull SeaTunnelServer seaTunnelServer,
        EngineConfig engineConfig) {
    this.nodeEngine = nodeEngine;
    this.logger = nodeEngine.getLogger(getClass());
    this.executorService =
            Executors.newCachedThreadPool(
                    new ThreadFactoryBuilder()
                            .setNameFormat("seatunnel-coordinator-service-%d")
                            .build());
    this.seaTunnelServer = seaTunnelServer;
    this.engineConfig = engineConfig;
    masterActiveListener = Executors.newSingleThreadScheduledExecutor();
    masterActiveListener.scheduleAtFixedRate(
            this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);
}
private void checkNewActiveMaster() {
    try {
        if (!isActive && this.seaTunnelServer.isMasterNode()) {
            logger.info(
                    "This node become a new active master node, begin init coordinator service");
            if (this.executorService.isShutdown()) {
                this.executorService =
                        Executors.newCachedThreadPool(
                                new ThreadFactoryBuilder()
                                        .setNameFormat("seatunnel-coordinator-service-%d")
                                        .build());
            }
            initCoordinatorService();
            isActive = true;
        } else if (isActive && !this.seaTunnelServer.isMasterNode()) {
            isActive = false;
            logger.info(
                    "This node become leave active master node, begin clear coordinator service");
            clearCoordinatorService();
        }
    } catch (Exception e) {
        isActive = false;
        logger.severe(ExceptionUtils.getMessage(e));
        throw new SeaTunnelEngineException("check new active master error, stop loop", e);
    }
}


During initialization, a thread is started to periodically check if the current node is a Master node. If the current node is not a Master but becomes one in the cluster, it will call initCoordinatorService() to initialize its state and set the status to True.

If the node is marked as a Master but is no longer a Master in the cluster, it will perform a state cleanup.


private void initCoordinatorService() {
    // Retrieve distributed IMAP from Hazelcast
    runningJobInfoIMap =
            nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_INFO);
    runningJobStateIMap =
            nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
    runningJobStateTimestampsIMap =
            nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_STATE_TIMESTAMPS);
    ownedSlotProfilesIMap =
            nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES);
    metricsImap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
    // Initialize JobHistoryService
    jobHistoryService =
            new JobHistoryService(
                    runningJobStateIMap,
                    logger,
                    runningJobMasterMap,
                    nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE),
                    nodeEngine
                            .getHazelcastInstance()
                            .getMap(Constant.IMAP_FINISHED_JOB_METRICS),
                    nodeEngine
                            .getHazelcastInstance()
                            .getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO),
                    engineConfig.getHistoryJobExpireMinutes());
    // Initialize EventProcessor for sending events to other services
    eventProcessor =
            createJobEventProcessor(
                    engineConfig.getEventReportHttpApi(),
                    engineConfig.getEventReportHttpHeaders(),
                    nodeEngine);
    // If the user has configured the connector package service, create it on the master node.
    ConnectorJarStorageConfig connectorJarStorageConfig =
            engineConfig.getConnectorJarStorageConfig();
    if (connectorJarStorageConfig.getEnable()) {
        connectorPackageService = new ConnectorPackageService(seaTunnelServer);
    }
    // After cluster recovery, attempt to restore previous historical tasks
    restoreAllJobFromMasterNodeSwitchFuture =
            new PassiveCompletableFuture(
                    CompletableFuture.runAsync(
                            this::restoreAllRunningJobFromMasterNodeSwitch, executorService));
}


In CoordinatorService, distributed maps (IMAPs), which are a data structure provided by Hazelcast, are pulled. This structure ensures data consistency across the cluster and is used in SeaTunnel to store task information, slot information, etc.

An EventProcessor is also created here. This class is used to send event notifications to other services. For example, if a task fails, it can send a message to a configured endpoint to achieve event-driven notifications.

Lastly, since the node startup could be due to a cluster crash or a node switch, historical running tasks need to be restored. It will attempt to restore these tasks by fetching the list of previously running tasks from the IMAP.

The IMAP data can be persisted to file systems like HDFS, allowing task states to be retrieved and restored even after a complete system reboot.

Components running within CoordinatorService include:

  • executorService (available on all nodes that can be elected as Master)
  • jobHistoryService (runs on the Master node)
  • eventProcessor (runs on the Master node)


On both Master and standby nodes:

  • Periodically check if the node is a Master; if it is, perform the corresponding state transition.

On the Master node:

  • Periodically print cluster state information.
  • Start the forwarding service to relay events to external services.

On Worker nodes, after startup:

  • Periodically report state information to the Master node.
  • Update task information in the IMAP.
  • Forward events generated by the Worker to the Master node to be pushed to external services.

At this point, all server-side service components have been successfully started. This concludes the article!