This is a part of the digital transformation of a real estate giant. For the sake of confidentiality, I'm not going to reveal any business data, but you'll get a detailed view of our data warehouse and our optimization strategies.
Now let's get started.
Logically, our data architecture can be divided into four parts.
We create our dimension tables and fact tables centering each operating entity in business, including customers, houses, etc. If there are a series of activities involving the same operating entity, they should be recorded by one field. (This is a lesson learned from our previous chaotic data management system.)
Our data warehouse is divided into five conceptual layers. We use Apache Doris and Apache DolphinScheduler to schedule the DAG scripts between these layers.
Every day, the layers go through an overall update besides incremental updates in case of changes in historical status fields or incomplete data synchronization of ODS tables.
(1) Set where >= "activity time -1 day or -1 hour"
instead of where >= "activity time
The reason for doing so is to prevent data drift caused by the time gap of scheduling scripts. Let's say, with the execution interval set to 10 min, suppose that the script is executed at 23:58:00 and a new piece of data arrives at 23:59:00, if we set where >= "activity time
, that piece of data of the day will be missed.
(2) Fetch the ID of the largest primary key of the table before every script execution, store the ID in the auxiliary table, and set where >= "ID in auxiliary table"
This is to avoid data duplication. Data duplication might happen if you use the Unique Key model of Apache Doris and designate a set of primary keys, because if there are any changes in the primary keys in the source table, the changes will be recorded and the relevant data will be loaded. This method can fix that, but it is only applicable when the source tables have auto-increment primary keys.
(3) Partition the tables
As for time-based auto-increment data such as log tables, there might be less changes in historical data and status, but the data volume is large, so there could be huge computing pressure on overall updates and snapshot creation. Hence, it is better to partition such tables, so for each incremental update, we only need to replace one partition. (You might need to watch out for data drift, too.)
(1) Truncate Table
Clear out the table and then ingest all data from the source table into it. This is applicable for small tables and scenarios with no user activity in wee hours.
(2) ALTER TABLE tbl1 REPLACE WITH TABLE tbl2
This is an atomic operation and it is advisable for large tables. Every time before executing a script, we create a temporary table with the same schema, load all data into it, and replace the original table with it.
This is our configuration for TBs of legacy data and GBs of incremental data. You can use it as a reference and scale your cluster on this basis. Deployment of Apache Doris is simple. You don't need other components.
We use Flink CDC to synchronize data from source tables. Then we aggregate the real-time metrics utilizing the Materialized View or the Aggregate Model of Apache Doris. Since we only have to process part of the metrics in a real-time manner and we don't want to generate too many database connections, we use one Flink job to maintain multiple CDC source tables. This is realized by the multi-source merging and full database sync features of Dinky, or you can implement a Flink DataStream multi-source merging task yourself. It is noteworthy that Flink CDC and Apache Doris support Schema Change.
EXECUTE CDCSOURCE demo_doris WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '10000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'ods.ods_*,ods.ods_*',
'sink.connector' = 'doris',
'sink.fenodes' = '127.0.0.1:8030',
'sink.username' = 'root',
'sink.password' = '123456',
'sink.doris.batch.size' = '1000',
'sink.sink.max-retries' = '1',
'sink.sink.batch.interval' = '60000',
'sink.sink.db' = 'test',
'sink.sink.properties.format' ='json',
'sink.sink.properties.read_json_by_line' ='true',
'sink.table.identifier' = '${schemaName}.${tableName}',
'sink.sink.label-prefix' = '${schemaName}_${tableName}_1'
);
After ingesting data with ETL scripts, we create a page in our reporting tool. We assign different privileges to different accounts using SQL, including the privilege of modifying rows, fields, and global dictionary. Apache Doris supports privilege control over accounts, which works the same as that in MySQL.
We also use Apache Doris data backup for disaster recovery, Apache Doris audit logs to monitor SQL execution efficiency, Grafana+Loki for cluster metric alerts, and Supervisor to monitor the daemon processes of node components.
We use DataX to Stream Load offline data. It allows us to adjust the size of each batch. The Stream Load method returns results synchronously, which meets the needs of our architecture. If we execute asynchronous data import using DolphinScheduler, the system might assume that the script has been executed, and that can cause a messup. If you use a different method, we recommend that you execute show load
in the shell script, and check the regex filtering status to see if the ingestion succeeds.
We adopt the Unique Key model of Apache Doris for most of our tables. The Unique Key model ensures idempotence of data scripts and effectively avoids upstream data duplication.
We use the Multi-Catalog feature of Apache Doris to connect to external data sources. It allows us to create mappings of external data at the Catalog level.
We suggest that you put the most frequently used fields of non-character types (such as int and where clauses) in the first 36 bytes, so you can filter these fields within milliseconds in point queries.
For us, it is important to create a data dictionary because it largely reduces personnel communication costs, which can be a headache when you have a big team. We use the information_schema
in Apache Doris to generate a data dictionary. With it, we can quickly grasp the whole picture of the tables and fields and thus increase development efficiency.
Offline data ingestion time: Within minutes
Query latency: For tables containing over 100 million rows, Apache Doris responds to ad-hoc queries within one second, and complicated queries in five seconds.
Resource consumption: It only takes up a small number of servers to build this data warehouse. The 70% compression ratio of Apache Doris saves us lots of storage resources.
Actually, before we evolved into our current data architecture, we tried Hive, Spark and Hadoop to build an offline data warehouse. It turned out that Hadoop was overkill for a traditional company like us since we didn't have too much data to process. It is important to find the component that suits you most.
On the other hand, to smoothen our big data transition, we need to make our data platform as simple as possible in terms of usage and maintenance. That's why we landed on Apache Doris. It is compatible with MySQL protocol and provides a rich collection of functions so we don't have to develop our own UDFs. Also, it is composed of only two types of processes: frontends and backends, so it is easy to scale and track.