\ Streaming data from a MySQL database to a PostgreSQL database can be a useful way to move data between systems or to create a real-time replica of a database for reporting and analysis. One way to accomplish this is through the use of __[Change Data Capture (CDC)](https://dbconvert.com/blog/change-data-capture-cdc-what-it-is-and-how-it-works/)__ tools. \ CDC is a method of tracking changes made to a database and capturing them in a separate stream. This stream can then be used to replicate the changes to another database. \ DBConvert Streams helps to replicate your MySQL data to PostgreSQL in real-time. It captures data changes from a source MySQL database and applies them to a target PostgreSQL database. This can be done by setting up a source to read the binary log of a MySQL database and transform the changes to a format that can be consumed by target PostgreSQL database. \ \[Database Streaming Examples and configurations GitHub repository\](https://github.com/slotix/dbconvert-streams-public) \ To begin, let's clone the GitHub repository containing the MySQL to PostgreSQL streaming example. ```bash git clone git@github.com:slotix/dbconvert-streams-public.git && cd dbconvert-streams-public/examples/mysql2postgres/sales-db/ ``` ## Docker Compose configuration. Since DBConvert Streams relies on multiple services, the most efficient way to start the containers is by using [Docker Compose](https://docs.docker.com/compose/). \ The *docker-compose.yml* file from the repository is provided below. \ ```yaml version: '3.9' services: dbs-api: container_name: api image: slotix/dbs-api entrypoint: - ./dbs-api - --nats=nats:4222 - --source=source-reader:8021 - --target=target-writer:8022 ports: - 8020:8020 depends_on: - nats volumes: - ./mysql2pg.json:/mysql2pg.json:ro dbs-source-reader: container_name: source-reader image: slotix/dbs-source-reader entrypoint: - ./dbs-source-reader - --nats=nats:4222 ports: - 8021:8021 depends_on: - dbs-api dbs-target-writer: container_name: target-writer image: slotix/dbs-target-writer entrypoint: - ./dbs-target-writer - --nats=nats:4222 - --prometheus=http://prometheus:9090 ports: - 8022:8022 depends_on: - dbs-source-reader nats: container_name: nats image: nats entrypoint: /nats-server command: "--jetstream -m 8222 --store_dir /data/nats-server" ports: - 4222:4222 - 8222:8222 prometheus: image: slotix/dbs-prometheus:latest container_name: prom user: root ports: - 9090:9090 mysql-source: container_name: mysql-source build: ./source environment: - MYSQL_ROOT_PASSWORD=123456 ports: - '3306:3306' postgres-target: container_name: postgres-target image: postgres:15-alpine environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres command: postgres ports: - '5432:5432' ``` Docker-compose file for DBConvert Streams \ This docker-compose file is going to bring up multiple services and link them together so they can communicate with each other. With this setup, the replication process takes place between `mysql-source` and `postgres-target` databases, and it is controlled by `dbs-api`, `dbs-source-reader`, `dbs-target-writer` and `nats` services. ## DBConvert Streams services. * `dbs-api` service is the entry point of DBConvert Streams. It is where requests are sent with configuration settings for the source and target databases. It specifies the connection details to other components of the system such as `source-reader`, and `target-writer`. * `dbs-source-reader` service is responsible for monitoring and capturing changes in the source database, then sending batches of records to the Event Hub. * `dbs-target-writer` service is used to receive changes from the Event Hub and apply them to the target database.. * `nats` service is the core of the Event Hub, it provides communication between other DBS services. * `prometheus` service is used for monitoring the metrics of DBS services. ### Database services. The structure of the tables that will be used in our example is depicted in the diagram below. [ ](https://github.com/slotix/dbconvert-streams-public/blob/main/examples/img/sales-db-erd.png) The structure of MySQL source tables is adapted from the [jdaarevalo/docker_postgres_with_data](https://github.com/jdaarevalo/docker_postgres_with_data) GitHub repository. ### Source and Target Databases. `mysql-source` database image is based on `slotix/dbs-mysql:8`, which includes all the necessary [settings to enable MySQL CDC replication](https://github.com/slotix/dbconvert-streams-public/tree/main/docker-images/mysql/base). This image also contains the `initdb.sql`[ script,](https://github.com/slotix/dbconvert-streams-public/blob/main/examples/mysql2postgres/sales-db/source/docker-entrypoint-initdb.d/initdb.sql) which creates tables with the structures shown above. \ `postgres-target` database, on the other hand, is based on the official lightweight `postgres:15-alpine` image. `postgres-target` database will receive all changes made to the `mysql-source` database. \ \ > Both of these databases, `mysql-source` and `postgres-target`, are typically located on separate physical servers in a production environment. However, in this example, we will run them on the same machine within distinct containers for demonstration purposes. ## Execution. To start all services described above, execute the following command: ```bash docker-compose up --build -d ``` \ This command will use the `docker-compose.yml` file to build and start the necessary containers in detached mode (-d option). The `--build` flag will force the rebuilding of the images before starting the containers. *** To check if the MySQL database inside the running container has all tables created successfully by the script on start, you can run the command: \ ```bash docker exec -it mysql-source mysql -uroot -p123456 source -e 'SHOW TABLES;' ``` \ By running the above command, you can see a list of tables created in the `source` database inside the container, which will confirm if the script has created all the tables successfully on start. \ ```bash +------------------+ | Tables_in_source | +------------------+ | city | | country | | order_status | | product | | sale | | status_name | | store | | users | +------------------+ ``` List of tables created on source ## Stream configuration This is the stream configuration file `mysql2pg.json` which is used to setup the database replication process. \ ```json { "source": { "type": "mysql", "connection": "root:123456@tcp(mysql-source:3306)/source", "filter": { "tables": [ { "name": "product", "operations": ["insert"]}, { "name": "country", "operations": ["insert"]}, { "name": "city", "operations": ["insert"]}, { "name": "store", "operations": ["insert"]}, { "name": "users", "operations": ["insert"]}, { "name": "status_name", "operations": ["insert"]}, { "name": "sale", "operations": ["insert"]}, { "name": "order_status", "operations": ["insert"]} ] } }, "target": { "type": "postgresql", "connection": "postgres://postgres:postgres@postgres-target:5432/postgres" }, "limits": { "numberOfEvents": 0, "elapsedTime": 0 } } ``` \ * The `source` field specifies the type of the source database as "*mysql*" and the connection details to connect to the database, including username and password as well as the host and port. * The `filter` field within the source field specifies that only certain tables and their corresponding operations (in this case "*insert*") will be replicated. * The `target` field specifies the type of the target database as "*postgresql*" and the connection details to connect to the target database, including username and password as well as the host and port. * The `limits` field specifies that number of events and elapsed time are set to zero (0), which means that there are no limits in place for this replication process. \ It is also worth noting that according to this config, only insert operations will be captured on the tables specified in the filter field. ### Send configuration to DBConvert Streams API. ```bash docker run -t --rm \ --network sales-db_default \ curlimages/curl \ --request POST \ --url http://dbs-api:8020/api/v1/streams\?file=./mysql2pg.json ``` \ This command runs a `docker run` command to start a new container from the `curlimages/curl` image. It specifies that the container should join the network named `sales-db_default` using `--network` option. \ This container will then run the command `curl` to make a HTTP POST request to the url `http://dbs-api:8020/api/v1/streams?file=./mysql2pg.json`. The URL contains an endpoint that is the dbs-api service which is running on port 8020 and is expecting a json file as a query parameter. This command creates a new stream on the dbs-api service with the configuration specified in the `mysql2pg.json` file. \ It's important to note that this command assumes that the `sales-db_default` network and the `dbs-api` service are already created and running. It also assumes that the `mysql2pg.json` file is in the current working directory from which the command is run. \ This is a JSON response indicating that the stream creation was successful. \ ```json {"status":"success", "data":{ "id":"2KGlt8BCHLT0lXklrs5wqM6n7BQ", "source":{...}, "target":{...}, "limits":{} } } ``` \ It contains the following fields: \ * `status`: This field indicates the status of the request, in this case "*success*" * `data`: This field contains the details of the stream that was created. * `id`: This field contains a unique identifier for the stream, in this case "2KGlt8BCHLT0lXklrs5wqM6n7BQ" * `source`: This field contains the details of the source database, including the type, connection details, and filter settings. * `target`: This field contains the details of the target database, including the type and connection details. * `limits`: This field contains the limits for the replication process, such as number of events and elapsed time. Note that the details of the source and target field are not given here for brevity, it is just shown as `...` ### Check if tables on the target are created successfully. > 💡DBConvert Streams creates tables with the same structure as the source on the target if they are missing. At this point, all tables specified in the filter should exist on the Postgres target database. > > \ \ To connect to the `postgres-target` Docker container and check if tables exist, you can run the following command: \ ```bash docker exec -it postgres-target psql -U postgres -d postgres -c '\dt' ``` \ By running the above command, you can see a list of tables created in the `postgres-target` database, which will confirm if DBConvert Streams has created all the tables successfully. \ ```bash List of relations Schema | Name | Type | Owner --------+--------------+-------+---------- public | city | table | postgres public | country | table | postgres public | order_status | table | postgres public | product | table | postgres public | sale | table | postgres public | status_name | table | postgres public | store | table | postgres public | users | table | postgres (8 rows) ``` ### Populate the source with sample data. Now that the `mysql-source` and `postgres-target` databases have identical table sets with the same structure, it is time to find out if the streaming of data works properly. This can be done by inserting data into the `mysql-source` database and observing if the same data is replicated to the `postgres-target` database. \ ```sql -- Set params SET @number_of_sales = '100'; SET @number_of_users = '100'; SET @number_of_products = '100'; SET @number_of_stores = '100'; SET @number_of_countries = '100'; SET @number_of_cities = '30'; SET @status_names = '5'; SET @start_date = '2023-01-01 00:00:00'; SET @end_date = '2023-02-01 00:00:00'; USE source; TRUNCATE TABLE city ; TRUNCATE TABLE product ; TRUNCATE TABLE country ; TRUNCATE TABLE status_name; TRUNCATE TABLE users; TRUNCATE TABLE order_status; TRUNCATE TABLE sale; TRUNCATE TABLE store; -- Filling of products INSERT INTO product WITH RECURSIVE t(id) AS ( SELECT 1 UNION ALL SELECT id + 1 FROM t WHERE id + 1 <= @number_of_products ) SELECT id, CONCAT_WS(' ','Product', id) FROM t; -- Filling of countries INSERT INTO country WITH RECURSIVE t(id) AS ( SELECT 1 UNION ALL SELECT id + 1 FROM t WHERE id + 1 <= @number_of_countries ) SELECT id, CONCAT('Country ', id) FROM t; -- Filling of cities INSERT INTO city WITH RECURSIVE t(id) AS ( SELECT 1 UNION ALL SELECT id + 1 FROM t WHERE id + 1 <= @number_of_cities ) SELECT id , CONCAT('City ', id) , FLOOR(RAND() * (@number_of_countries + 1)) FROM t; -- Filling of stores INSERT INTO store WITH RECURSIVE t(id) AS ( SELECT 1 UNION ALL SELECT id + 1 FROM t WHERE id + 1 <= @number_of_stores ) SELECT id , CONCAT('Store ', id) , FLOOR(RAND() * (@number_of_cities + 1)) FROM t; -- Filling of users INSERT INTO users WITH RECURSIVE t(id) AS ( SELECT 1 UNION ALL SELECT id + 1 FROM t WHERE id + 1 <= @number_of_users ) SELECT id , CONCAT('User ', id) FROM t; -- Filling of status_names INSERT INTO status_name WITH RECURSIVE t(status_name_id) AS ( SELECT 1 UNION ALL SELECT status_name_id + 1 FROM t WHERE status_name_id + 1 <= @status_names ) SELECT status_name_id , CONCAT('Status Name ', status_name_id) FROM t; -- Filling of sales INSERT INTO sale WITH RECURSIVE t(sale_id) AS ( SELECT 1 UNION ALL SELECT sale_id + 1 FROM t WHERE sale_id + 1 <= @number_of_sales ) SELECT UUID() AS sale_id , ROUND(RAND() * 10, 3) AS amount , DATE_ADD(@start_date, INTERVAL RAND() * 5 DAY) AS date_sale , FLOOR(RAND() * (@number_of_products + 1)) AS product_id , FLOOR(RAND() * (@number_of_users + 1)) AS user_id , FLOOR(RAND() * (@number_of_stores + 1)) AS store_id FROM t; -- Filling of order_status INSERT INTO order_status WITH RECURSIVE t(order_status_id) AS ( SELECT 1 UNION ALL SELECT order_status_id + 1 FROM t WHERE order_status_id + 1 <= @number_of_sales ) SELECT UUID() AS order_status_id , DATE_ADD(@start_date, INTERVAL RAND() * 5 DAY) AS update_at , FLOOR(RAND() * (@number_of_sales + 1)) AS sale_id , FLOOR(RAND() * (@status_names + 1)) AS status_name_id FROM t; ``` **fill_tables.sql script** \ This script above starts by truncating all the tables in the `source` database to clear any previous data, and then it inserts sample data into the tables. Each table is filled with sample data using a different set of parameters set at the top of the script. \ To execute this SQL script, you can run the following command: ```bash docker exec -i \ mysql-source \ mysql -uroot -p123456 -D source < $PWD/fill_tables.sql ``` ### Comparing number of records in source and target databases. Let's compare the number of rows in the tables of the source and target databases. \ ```bash docker exec -it mysql-source mysql -uroot -p123456 -D source -e "SELECT (SELECT COUNT(*) FROM product) as 'product_count',(SELECT COUNT(*) FROM country) as 'country_count',(SELECT COUNT(*) FROM city) as 'city_count',(SELECT COUNT(*) FROM store) as 'store_count',(SELECT COUNT(*) FROM users) as 'users_count',(SELECT COUNT(*) FROM status_name) as 'status_name_count',(SELECT COUNT(*) FROM sale) as 'sale_count',(SELECT COUNT(*) FROM order_status) as 'order_status_count';" ``` \ How many records are in each table of mysql-source db? \  \ ```bash docker exec -it postgres-target psql -U postgres -d postgres -c "SELECT (SELECT COUNT(*) FROM product) as product_count,(SELECT COUNT(*) FROM country) as country_count,(SELECT COUNT(*) FROM city) as city_count,(SELECT COUNT(*) FROM store) as store_count,(SELECT COUNT(*) FROM users) as users_count,(SELECT COUNT(*) FROM status_name) as status_name_count,(SELECT COUNT(*) FROM sale) as sale_count,(SELECT COUNT(*) FROM order_status) as order_status_count;" ``` \ how many records are in each table of postgres-target db? \  \ > As you can see from the resulting output, all tables in both the source and target databases have an identical number of records in each table. > > \ ### Check statistics. The following command sends a GET request to the DBConvert API endpoint `/api/v1/streams/stat` which retrieves the statistics of the current data stream. The `jq` command is used to format the JSON output for better readability. \ ```bash docker run -t --rm \ --network sales-db_default \ curlimages/curl \ --request GET \ --url http://dbs-api:8020/api/v1/streams/stat | jq ``` \ ```json { "streamID": "2KHTjpsZAUCb8y1BZny3YKoX5qO", "source": { "counter": 635, "elapsed": "0s", "started": "2023-01-13T17:24:50.089257721Z", "status": "RUNNING" }, "target": { "counter": 635, "elapsed": "0s", "started": "2023-01-13T17:24:50.089649312Z", "status": "RUNNING" } } ``` \ The above output shows the statistics of the current data stream. The "source" field shows the statistics of the source database, and the "target" field shows the statistics of the target database. \ The "counter" field shows the number of events that have been processed by the stream, the "elapsed" field shows the time elapsed since the stream started, the "started" field shows the date and time when the stream was started, and "status" field shows the status of the current stream. ### Prometheus metrics. Prometheus is a monitoring system that scrapes metrics data from various sources and stores them in a time-series database. DBConvert Streams collects its internal metrics in Prometheus format, allowing you to explore and visualize live data in dashboards. To view the collected metrics, visit [http://127.0.0.1:9090](http://127.0.0.1:9090/) in a web browser to access the Prometheus UI. \  ## Conclusion Change Data Capture (CDC) systems like **DBConvert Streams** can be used to stream data from a MySQL database to a PostgreSQL database in real time, allowing you to keep the two systems in sync and take advantage of the unique features and capabilities of each database. \ This guide provided information on streaming data in one direction, from MySQL Binlog to PostgreSQL. [The DBConvert Streams Github repository](https://github.com/slotix/dbconvert-streams-public) contains more examples of configuring data streams, including from PostgreSQL Wals to MySQL and other configurations. These examples can serve as a starting point for setting up your own data stream tailored to your specific needs and use case. \ Getting your feedback about DBConvert Streams is essential for the development team to improve the software and make it more useful for the community. By sharing your ideas, reporting bugs, and requesting new features, you can actively participate in the development of the software and help to make it more robust and useful for everyone. Your participation is valued and appreciated by the development team and the community. --- Also Published [Here](https://dbconvert.com/blog/streaming-data-mysql-postgres/)