paint-brush
How to Migrate from Airflow to Dolphinscheduler in Two Stepsby@opensourceenthusiast
652 reads
652 reads

How to Migrate from Airflow to Dolphinscheduler in Two Steps

by Debra ChenMarch 9th, 2023
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

With Air2phin, users can migrate the scheduling system from Airflow to Apache DolphinScheduler in two steps. It is a multi-rule-based AST converter that uses LibCST to parse and convert Airflow's DAG code. All its rules are defined using Yaml files and provide certain custom rule extension capabilities.
featured image - How to Migrate from Airflow to Dolphinscheduler in Two Steps
Debra Chen HackerNoon profile picture



Recently, Air2phin, a scheduling system migration tool, announced its open source. With Air2phin, users can migrate the scheduling system from Airflow to Apache DolphinScheduler in two steps, which brings great convenience to users who need to migrate the scheduling system.

What is Air2phin?

Air2phin is a scheduling system migration tool, which aims to convert Apache Airflow DAGs files into Apache DolphinScheduler Python SDK definition files, to migrate the scheduling system (Workflow orchestration) from Airflow to DolphinScheduler. It is a multi-rule-based AST converter that uses LibCST to parse and convert Airflow's DAG code. All its rules are defined using Yaml files and provide certain custom rule extension capabilities.


Recently, Air2phin has released version 0.0.12, which provides rich functions and can better help users complete the migration from Airflow to Apache DolphinScheduler.


Note: AST is the abbreviation of Abstract Syntax Tree (Abstract Syntax Tree), which is a data structure that represents the grammatical structure of code in a tree structure. In a compiler, the AST is produced by a lexer and a parser. The lexical analyzer converts the source code into a stream of tokens, and the parser converts the stream of tokens into an abstract syntax tree. AST is a tree structure, which consists of a series of nodes, each node represents a grammatical structure in the code (such as expressions, statements, functions, classes, etc.), and the relationship between nodes represents the embedding of grammatical structures. set of relationships.

Why open-source Air2phin?

Some people may ask, why do I need a migration tool? This is because with the development of business, the original workflow orchestration system used by enterprises or organizations can no longer meet the current needs, and the workflow orchestration system needs to be migrated to a new platform or updated to a new version. After research, many users need to migrate the scheduling system from the open-source workflow orchestration system Airflow to Apache DolphinScheduler.


During the migration process, since data processing tasks may involve dependencies between multiple systems, the migration process needs to be completed without affecting business operations. At this time, the dispatching system migration tool can play an important role. It can reduce manual intervention, complete the migration work between the two dispatching systems as automatically as possible, and can be compatible with multiple versions between multiple systems. Intervention completes the migration.


To this end, WhaleOps has specially developed the open-source migration tool Air2phin, which allows users to migrate the scheduling system from Airflow to Apache DolphinScheduler in two steps, bringing great convenience to users.


To let everyone better understand the importance of Air2phin, let's start with the background knowledge of the scheduling system and understand the benefits of migrating the scheduling system from Airflow to Apache DolphinScheduler.

Why migrate from Airflow to DolphinScheduler?

What is a workflow orchestration system?

A workflow orchestration system manages data flow in a manner that respects orchestration rules and business logic. Workflow orchestration tools allow users to convert multiple related tasks into workflows that can be scheduled, run, and observed, helping companies better manage and control business processes, thereby improving business efficiency. Workflow orchestration is one of the indispensable components in data processing. It is responsible for executing data processing tasks according to predefined rules and logic to ensure that the data processing process is executed smoothly as expected. Popular workflow orchestration systems include Apache DolphinScheduler, Apache Airflow, Apache Oozie, Azkaban, etc.

What is Airflow?

Among them, Apache Airflow is an open-source workflow orchestration system, which can help users create, schedule and monitor complex workflows. Originally developed by Airbnb and open-sourced in 2016, Airflow is now maintained by the Apache Software Foundation. Written in the Python language, Airflow is highly scalable and flexible and supports a variety of task types, such as calculations, data processing, notifications, interactions, and more. Airflow's workflow is defined by writing Python scripts, and its functionality can be extended using operators and hooks provided by Airflow, as well as custom operators and hooks. However, it has flaws that cannot be ignored, such as the need for in-depth secondary development, the high cost of upgrading from the community version; the high iteration cost of Python technology stack maintenance; scheduler loop scanning Dag folder delays reduces the performance; and poor stability in production environments and so on.


Apache DolphinScheduler, born under the business needs of the new data era, is an open-source distributed workflow scheduling system, which makes up for the weaknesses of previous scheduling systems, and aims to provide enterprise users with a reliable, efficient, and easy-to-use workflow scheduling platform. Multiple task types are supported, such as computing, data processing, ETL, etc.


Compared with Airflow, DolphinScheduler adopts a distributed architecture and provides a variety of task types. Users can define dependencies between tasks, set task priorities and schedule policies, etc. It uses a visual interface to create and manage workflows, which is in stark contrast to Airflow, making it easier to operate and more friendly to non-programmers.

After research and comparison, for many users, migrating the scheduling system to Apache DolphinScheduler is a better choice to reduce costs and increase efficiency.

How to install and use Air2phin

Air2phin is a python package, which can be installed through the Python package installation tool pip, see air2phin getting start for details.

python -m pip install --upgrade air2phin

  • A simple example

We use a simple example to illustrate how to use Air2phin. We have intercepted part of the code in airflow tutorial.py as an example of Air2phin conversion to illustrate how Air2phin can be converted into dolphinscheduler python sdk step by step. picture Figure 1: Part of the code in airflow tutorial.py

Figure 2: How Air2phin is converted into dolphinscheduler python sdk step by step


Suppose you save part of the content of airflow tutorial.py to the file tutorial_part.py, and want to convert it into the definition of dolphinscheduler python sdk, you only need one line of command to complete it. The result is shown in Figure 2. Because the command adds the --inplace parameter, Air2phin will directly overwrite the original file. If you don't need to overwrite the original problem, you don't need to use the --inplace parameter, and Air2phin will add a tutorial_part-air2phin.py file to save the converted content.


air2phin migrate --inplace tutorial_part.py


We can see that this conversion triggers multiple conversion rules, including

  • Convert airflow.DAG to pydolphinscheduler.core.process_definition.ProcessDefinition, this rule is in the third line (import statement) and the sixth line DAG context
  • Convert airflow.operators.bash.BashOperator to pydolphinscheduler.tasks.shell.Shell, this rule is used in tasks t1, t2
  • In addition to the corresponding class conversion, we need to convert the attributes of the class, such as converting airflow.DAG.schedule_interval to ProcessDefinition.schedule, and modify the content of some values, such as converting timedelta(days=1) to' 0 0 0 * * ? *'

Finally, we only need to install pydolphinscheduler and run the converted file through python to complete the workflow migration. For details, see pydolphinscheduler usage (https://dolphinscheduler.apache.org/python/main/start.html# installing-pydolphinscheduler).



# Install apache-dolphinscheduler

python -m pip install apache-dolphinscheduler

# Submit workflow to dolphinscheduler

python tutorial_part.py


When running python tutorial_part.py, we must ensure that the dolphinscheduler API and python gateway service have been started, and the corresponding ports have been opened. For details, see Starting python gateway service.

So far, we have explained how Air2phin completes the migration through a simple example.

Working principle of Air2phin

How do Airflow and dolphinscheduler python sdk work?


Before understanding how Air2phin works, it is very important to understand how Airflow and dolphinscheduler python sdk work. It will help us better understand the migration steps of Air2phin and deal with problems more calmly.

  • How Airflow works: The information related to Airflow workflow is stored in the DAG file, and then the DAG file is placed in the specified directory of Airflow, and the Scheduler of Airflow will scan and analyze the DAG file of Airflow at a certain interval, so the DAG file is passively Scanned and updated.

  • dolphinscheduler python sdk: Similar to Airflow, all workflow-related information is defined through Python files, but dolphinscheduler python sdk submits workflow information through artificial triggering and runs the command python workflow file name to complete the active Task submission.

Air2phin workflow

After understanding the submit/discover workflow mechanism, it will be more conducive to our understanding of the working principle of Air2phin. Because both Airflow's DAG file and DolphinScheduler's Python sdk definition file are written in Python, most of Air2phin's code deals with the differences between the two, and finally converts Airflow's code into dolphinscheduler python sdk and definitions.


Air2phin uses LibCST (https://libcst.readthedocs.io/en/latest/) to implement the abstract syntax tree parsing of airflow python DAG code, and then through LibCST's Transformer (https://libcst.readthedocs.io/en/ latest/tutorial.html#Build-Visitor-or-Transformer) combined with the transformation rules and finally transformed into the definition of dolphinscheduler python sdk.

The overall workflow of Air2phin is as follows:

  • Get the original Airflow DAG content from stdin or a file
  • Load all transformation rules from yaml file
  • Parse Airflow DAG content into a CST tree via LibCST
  • Transform dolphinscheduler python sdk definitions via LibCST Transformer

Air2phin Best Practices

Migrate entire folders instead of individual files

When users want to migrate Airflow to DolphinScheduler, most of them want to migrate the whole system instead of a single file. Air2phin provides the ability to migrate the entire folder by just changing the path from the file path to a folder.


# Migrate the entire ~/airflow/dags folderair2phin migrate --inplace ~/airflow/dags

Add custom rules

Some of the Airflow users self-define Hooks or Operator. User-defined Operators cannot be converted through Air2phin's built-in conversion rules, so users need to add custom rules and tell Air2phin the location of the rules. For example, suppose we have an operator called MyCustomOperator that inherits most of the functions of PostgresOperator, with only a different name. Its definition is as follows:


from airflow.providers.postgres.operators.postgres import PostgresOperator
class MyCustomOperator(PostgresOperator):
def __init__(
self,
*,
sql: str | Iterable[str],
my_custom_conn_id: str = 'postgres_default',
autocommit: bool = False,
parameters: Iterable | Mapping | None = None,
database: str | None = None,
runtime_parameters: Mapping | None = None,
**kwargs,
) -> None:
super().__init__(
sql=sql,
postgres_conn_id=my_custom_conn_id,
autocommit=autocommit,
parameters=parameters,
database=database,
runtime_parameters=runtime_parameters,
**kwargs,
)



It is used in multiple DAGs in Airflow in the following way:


from custom.my_custom_operator import MyCustomOperator
with DAG(
dag_id='my_custom_dag',
default_args = default_args,
schedule_interval='@once',
start_date=days_ago(2),
tags=['example'],
) as dag:
t1 = MyCustomOperator(
task_id='my_custom_task',
sql='select * from table',
my_custom_conn_id='my_custom_conn_id',
)


Now we need to convert this Operator. We can customize a conversion rule and name it MyCustomOperator.yaml. The content is as follows. The main content is the definition of migration.module and migration.parameter, which determine the conversion rule:

name: MyCustomOperator
description: The configuration for migrating airflow custom operator MyCustomOperator to DolphinScheduler SQL task.
migration:
module:
- action: replace
src: custom.my_custom_operator.MyCustomOperator
dest: pydolphinscheduler.tasks.sql.Sql
parameter:
- action: replace
src: task_id
dest: name
- action: replace
src: my_custom_conn_id
dest: datasource_name

Then use the --custom-rules parameter to specify conversion custom parameters, and the conversion of custom rules can be applied:


# Specify the custom rule path as /path/to/MyCustomOperator.yaml

air2phin migrate --inplace --custom-rules /path/to/MyCustomOperator.yaml ~/airflow/dags

Make Air2phin run faster

By default, Air2phin runs the conversion of DAG files in one process. When you have many DAG files, Air2phin conversion is very time-consuming. We provide a parameter to start multi-process to run Air2phin conversion --multiprocess, which can be specified as the user machine's CPU amount to reduce conversion time:


# Specify air2phin to start 12 processes to convert at the same time

air2phin migrate --inplace --custom-rules /path/to/MyCustomOperator.yaml --multiprocess 12 ~/airflow/dags


FAQ

Q: Why did you choose to parse Airflow DAG files instead of databases?

A: Because the completed workflow information is only in the Airflow DAG file, the Airflow database only has basic workflow information, no task definition information, and no task relationship. We choose to complete it by parsing the Airflow DAG file instead of the database transform.

Q: Why do I need to transfer through the dolphinscheduler python sdk instead of submitting to DolphinScheduler myself?

A: Because Airflow DAG is defined by Python, there are many Python features in Airflow DAG, we don't want to convert these features into structured data (the conversion may cause information loss), it happens that DolphinScheduler already has Python sdk, so It is easier to convert directly through LibCST.

Q: Why use LibCST instead of python's built-in AST?

A: Because LibCST is more suitable for us, the comment information will be lost when the Python built-in AST library is parsed into AST, but we hope to keep some information. And LibCST provides more visitors to ensure that we can implement replacement more conveniently.


Reference: air2phin (https://github.com/WhaleOps/air2phin)


Also Published Here