Performing Real-time Change Data Capture with MySQL¶
Introduction¶
The Streaming Integrator (SI) allows you to capture changes to a database table, in a streaming manner, enabling you to perform ETL operations.
This tutorial takes you through the different modes and options you could use to perform Change Data Capturing (CDC) using the SI. In this tutorial, you are using a MySQL datasource.
Info
To use a different database other than MySQL, see dependencies for CDC and add the corresponding driver jar. In addition to that, modify the JDBC URL accordingly, in url parameter in all Siddhi applications given in this tutorial.
Listening mode and Polling mode
There are two modes in which you could perform CDC using the SI: Listening mode and Polling mode.
-
Polling mode: In the polling mode, the data source is periodically polled for capturing the changes. The polling period can be configured.
-
Listening mode: In listening mode, the SI keeps listening to the Change Log of the database and notifies if a change takes place. Here, unlike the polling mode, you are notified about the change immediately.
Type of events captured
You can capture following type of changes done to a database table:
-
Insert operations
-
Update operations
-
Delete operations (available for Listening mode only)
Listening mode¶
Before you begin:
- You need to have access to a MySQL instance.
- Enable binary logging in the MySQL server. For detailed instructions, see Debezium documentation - Enabling the binlog.
!!! info If you are using MySQL 8.0, use the following query to check the binlog status.
SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM performance_schema.global_variables WHERE variable_name='log_bin'; - Add the MySQL JDBC driver into the
<SI_HOME>/libdirectoryas follows:- Download the MySQL JDBC driver from the MySQL site.
- Unzip the archive.
- Copy the
mysql-connector-java-5.1.45-bin.jarto the<SI_HOME>/libdirectory. - Start the SI server.
- Download the MySQL JDBC driver from the MySQL site.
- Once you install MySQL and start the MySQL server, create the database and the database table you require as follows:
- Let's create a new database in the MySQL server which you are to use throughout this tutorial. To do this, execute the following query.
CREATE SCHEMA production; - Create a new user by executing the following SQL query.
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'wso2si' IDENTIFIED BY 'wso2'; - Switch to the
productiondatabase and create a new table, by executing the following queries:
use production;
CREATE TABLE SweetProductionTable (name VARCHAR(20),amount double(10,2));
- Let's create a new database in the MySQL server which you are to use throughout this tutorial. To do this, execute the following query.
- Download and install the siddhi-io-cdc extension. For instructions, see Downloading and Installing Siddhi Connectors.
Capturing inserts¶
Now you can write a simple Siddhi application to monitor the SweetProductionTable for insert operations.
-
Open a text file and copy-paste following application into it.
Here the@App:name('CDCListenForInserts') @App:description('Capture MySQL Inserts using CDC listening mode.') @source(type = 'cdc', url = 'jdbc:mysql://localhost:3306/production', username = 'wso2si', password = 'wso2', table.name = 'SweetProductionTable', operation = 'insert', @map(type = 'keyvalue')) define stream InsertSweetProductionStream (name string, amount double); @sink(type = 'log') define stream LogStream (name string, amount double); @info(name = 'query') from InsertSweetProductionStream select * insert into LogStream;urlparameter has the valuejdbc:mysql://localhost:3306/production. Change it to point to your MySQL server. -
Save this file as
CDCListenForInserts.siddhiin the<SI_HOME>/wso2/server/deployment/siddhi-filesdirectory.Info
This Siddhi application captures all the inserts made to the
SweetProductionTabledatabase table and logs them. -
To install the extensions required for the
CDCListenForInsertsSiddhi application you deployed, open a new terminal window and navigate to the<SI_HOME>/bindirectory and issue one of the following commands as appropriate, based on your operating system:- For Windows:
extension-installer.bat
- For Linux:
sh extension-installer.sh
- For Windows:
-
Now let's perform an insert operation on the MySQL table by executing the following MySQL query on the database:
insert into SweetProductionTable values('chocolate',100.0);The following log appears in the SI console:
INFO {org.wso2.siddhi.core.stream.output.sink.LogSink} - CDCWithListeningMode : logStream : Event{timestamp=1563200225948, data=[chocolate, 100.0], isExpired=false}
Capturing updates¶
Now you can write a Siddhi application to monitor the SweetProductionTable for update operations.
-
Open a text file and copy-paste following application into it.
@App:name('CDCListenForUpdates') @App:description('Capture MySQL Updates using CDC listening mode.') @source(type = 'cdc', url = 'jdbc:mysql://localhost:3306/production', username = 'wso2si', password = 'wso2', table.name = 'SweetProductionTable', operation = 'update', @map(type = 'keyvalue')) define stream UpdateSweetProductionStream (before_name string, name string, before_amount double, amount double); @sink(type = 'log') define stream LogStream (before_name string, name string, before_amount double, amount double); @info(name = 'query') from UpdateSweetProductionStream select * insert into LogStream; -
Save this file as
CDCListenForUpdates.siddhiin the<SI_HOME>/wso2/server/deployment/siddhi-filesdirectory.Info
This Siddhi application captures all the updates to the
SweetProductionTabledatabase table and logs them. -
Now let's perform an update operation on the MySQL table. For this, execute following MySQL query on the database:
update SweetProductionTable SET name = 'Almond cookie' where name = 'chocolate';As a result, you can see the following log in the SI console.
INFO {org.wso2.siddhi.core.stream.output.sink.LogSink} - CDCWithListeningMode : updateSweetProductionStream : Event{timestamp=1563201040953, data=[chocolate, Almond cookie, 100.0, 100.0], isExpired=false}Info
Here, the
before_name1attribute indicates the value of thenameattribute before the update was made (chocolatein this case), and thenameattribute has the current name after the update (i.e.,almond cookie).
Capturing deletes¶
Now you can write a Siddhi application to monitor the SweetProductionTable for delete operations.
-
Open a text file and copy-paste following application into it.
@App:name('CDCListenForDeletes') @App:description('Capture MySQL Deletes using CDC listening mode.') @source(type = 'cdc', url = 'jdbc:mysql://localhost:3306/production', username = 'wso2si', password = 'wso2', table.name = 'SweetProductionTable', operation = 'delete', @map(type = 'keyvalue')) define stream DeleteSweetProductionStream (before_name string, before_amount double); @sink(type = 'log') define stream LogStream (before_name string, before_amount double); @info(name = 'query') from DeleteSweetProductionStream select * insert into LogStream; -
Save this file as
CDCListenForDeletes.siddhiin the<SI_HOME>/wso2/server/deployment/siddhi-filesdirectory.Info
This Siddhi application captures all the delete operations carried out for the
SweetProductionTabledatabase table and logs them. -
Now let's perform a delete operation for the MySQL table. To do this, execute following MySQL query on the database:
delete from SweetProductionTable where name = 'Almond cookie';The following log appears in the SI console:
INFO {org.wso2.siddhi.core.stream.output.sink.LogSink} - CDCWithListeningMode : DeleteSweetProductionStream : Event{timestamp=1563367367098, data=[Almond cookie, 100.0], isExpired=false}Info
Here, the
before_nameattribute indicates the name of the sweet in the deleted record (i.e.,Almond cookiein this case). Similarly, thebefore_amountindicates the amount in the deleted record.
Preserving State of the application through a system failure¶
Let's try out a scenario in which you are going to deploy a Siddhi application to count the total number of productions.
Info
In this scenario, the SI server is required to remember the current count through system failures so that when the system is restored, the count is not reset to zero. To achieve this, you can use the state persistence capability in the Streaming Integrator.
-
Enable state persistence feature in SI server as follows. Open the
<SI_HOME>/conf/server/deployment.yamlfile on a text editor and locate thestate.persistencesection.# Periodic Persistence Configuration state.persistence: enabled: true intervalInMin: 1 revisionsToKeep: 2 persistenceStore: org.wso2.carbon.streaming.integrator.core.persistence.FileSystemPersistenceStore config: location: siddhi-app-persistenceSet
enabledparameter totrueand save the file. -
Enable state persistence debug logs as follows. Open the
<SI_HOME>/conf/server/log4j2.xmlfile on a text editor and locate the following line in it.<Logger name="com.zaxxer.hikari" level="error"/>Add following
<Logger>element below that.<Logger name="org.wso2.carbon.streaming.integrator.core.persistence" level="debug"/>Save the file.
-
Restart the Streaming Integrator server for above change to be effective.
-
Open a text file and copy-paste following Siddhi application to it.
@App:name('CountProductions') @App:description('Siddhi application to count the total number of orders.') @source(type = 'cdc', url = 'jdbc:mysql://localhost:3306/production', username = 'wso2si', password = 'wso2', table.name = 'SweetProductionTable', operation = 'insert', @map(type = 'keyvalue')) define stream InsertSweetProductionStream (name string, amount double); @sink(type = 'log') define stream LogStream (totalProductions double); @info(name = 'query') from InsertSweetProductionStream select sum(amount) as totalProductions insert into LogStream; -
Save this file as
CountProductions.siddhiin the<SI_HOME>/wso2/server/deployment/siddhi-filesdirectory. When the Siddhi application is successfully deployed, the followingINFOlog appears in the Streaming Integrator console.INFO {org.wso2.carbon.stream.processor.core.internal.StreamProcessorService} - Siddhi App CountProductions deployed successfully -
Now let's perform a few insert operations on the MySQL table. Execute following MySQL queries on the database:
insert into SweetProductionTable values('Almond cookie',100.0);insert into SweetProductionTable values('Baked alaska',20.0);Now you can see following logs on the SI console.
INFO {io.siddhi.core.stream.output.sink.LogSink} - CountProductions : LogStream : Event{timestamp=1564151034866, data=[100.0], isExpired=false} INFO {io.siddhi.core.stream.output.sink.LogSink} - CountProductions : LogStream : Event{timestamp=1564151037870, data=[120.0], isExpired=false}These logs print the sweet production count. Note that the current count of sweet productions is being printed as
120in the second log. This is because the factory has so far produced120sweets:100Almond cookies and20Baked alaskas. -
Now wait for following log to appear on the SI console
DEBUG {org.wso2.carbon.streaming.integrator.core.persistence.FileSystemPersistenceStore} - Periodic persistence of CountProductions persisted successfullyThis log indicates that the current state of the Siddhi application is successfully persisted. Siddhi application state is persisted every minute. Therefore, you can see this log appearing every minute.
Next, let's insert two sweet productions into the
SweetProductionTableand shutdown the SI server before the state persistence happens (in other words, before the above log appears).Tip
It is better to start inserting records immediately after the state persistence log appears, so that you have plenty of time to push messages and shutdown the server before next log appears.
-
Now insert following sweets into the
SweetProductionTableby executing following queries on the database :insert into SweetProductionTable values('Croissant',100.0);insert into SweetProductionTable values('Croutons',100.0); -
Shutdown SI server. Here you are deliberately creating a scenario where the server crashes before the SI server could persist the latest production count.
Info
Here, the SI server crashes before the state is persisted. Therefore the SI server cannot persist the latest count (which should include the last two productions
100Croissants and100Croutons). The good news is,CDC sourcereplays the last two messages, allowing the Streaming Integrator to recover successfully from the server crash. -
Restart the SI server and wait for about one minute.
-
The following log appears in the SI console:
INFO {io.siddhi.core.stream.output.sink.LogSink} - CountProductions : LogStream : Event{timestamp=1564151078607, data=[220.0], isExpired=false} INFO {io.siddhi.core.stream.output.sink.LogSink} - CountProductions : LogStream : Event{timestamp=1564151078612, data=[320.0], isExpired=false}
Note that the CDC source has replayed the last two messages. As a result, the sweet production runs count has being correctly restored.
Polling mode¶
Before you begin:
You are required to have access to a MySQL instance. Create the required database and the database table in the MySQL instance as follows:
1. Let's create a new database in the MySQL server which you are to use throughout this tutorial. To do this, issue the following command.
CREATE SCHEMA production_pol;
2. Switch to the production database and create a new table by executing following queries.
use production_pol;
CREATE TABLE SweetProductionTable (last_update TIMESTAMP, name VARCHAR(20),amount double(10,2));
3. If you have not already created a user under Listening Mode, create a new user by executing the following SQL query.
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'wso2si' IDENTIFIED BY 'wso2';
4. If you have not already added the MySQL JDBC driver into <SI_HOME>/lib under Listening Mode, add it as follows:
a. Download the MySQL JDBC driver from the MySQL site.
b. Unzip the archive.
c. Copy the mysql-connector-java-5.1.45-bin.jar to the <SI_HOME>/lib directory.
Capturing inserts¶
Now you can write a simple Siddhi application to monitor the SweetProductionTable table for insert operations.
-
Open a text file and copy-paste the following application into it.
@App:name("CDCPolling") @App:description("Capture MySQL changes, using CDC source - polling mode.") @source(type = 'cdc', url = 'jdbc:mysql://localhost:3306/production_pol?useSSL=false', mode = 'polling', jdbc.driver.name = 'com.mysql.jdbc.Driver', polling.column = 'last_update', polling.interval = '10', username = 'wso2si', password = 'wso2', table.name = 'SweetProductionTable', @map(type = 'keyvalue' )) define stream SweetProductionStream (name string, amount double); @sink(type = 'log') define stream LogStream (name string, amount double); @info(name = 'query') from SweetProductionStream select * insert into LogStream;Here the
urlparameter currently specifies the URLjdbc:mysql://localhost:3306/production_pol. Change it to point to your MySQL server. -
Save this file as
CDCPolling.siddhiin the<SI_HOME>/wso2/server/deployment/siddhi-filesdirectory.Info
This Siddhi application polls the database periodically, captures the changes made to the
SweetProductionTabledatabase table during the polled interval and logs them. The polling interval is specified via thepolling.intervalparameter in the Siddhi application when defining the CDC source. In this example, the polling interval is 10 seconds. -
Now let's perform an insert operation on the MySQL table. To do this, execute following MySQL query on the database.
insert into SweetProductionTable(name,amount) values('chocolate',100.0);The following log appears in the SI console:
INFO {org.wso2.siddhi.core.stream.output.sink.LogSink} - CDCWithPollingMode : LogStream : Event{timestamp=1563378804914, data=[chocolate, 100.0], isExpired=false}
Capturing updates¶
For capturing updates, you can use the same CDCPolling.siddhi Siddhi application that you deployed in the Capturing inserts section.
Let's perform an update operation on the MySQL table. To do this, execute the following MySQL query on the database:
update SweetProductionTable SET name = 'Almond cookie' where name = 'chocolate';
The following log appears in the SI console:
INFO {org.wso2.siddhi.core.stream.output.sink.LogSink} - CDCWithPollingMode : logStream : Event{timestamp=1563436388530, data=[Almond cookie, 100.0], isExpired=false}
Preserving State of the application through a system failure¶
Let's try out a scenario in which you deploy a Siddhi application to count the total number of production runs.
Info
In this scenario, the SI server is required to remember the current count through system failures so that when the system is restored, the count is not reset to zero. To achieve this, you can use the state persistence capability in the Streaming Integrator.
-
Enable state persistence feature in SI server as follows. Open the
<SI_HOME>/conf/server/deployment.yamlfile on a text editor and locate thestate.persistencesection.# Periodic Persistence Configuration state.persistence: enabled: true intervalInMin: 1 revisionsToKeep: 2 persistenceStore: org.wso2.carbon.streaming.integrator.core.persistence.FileSystemPersistenceStore config: location: siddhi-app-persistenceSet
enabledparameter totrueand save the file. -
Enable state persistence debug logs as follows. Open the
<SI_HOME>/conf/server/log4j2.xmlfile on a text editor and locate the following line in it.<Logger name="com.zaxxer.hikari" level="error"/>Add following
<Logger>element below that.<Logger name="org.wso2.carbon.streaming.integrator.core.persistence" level="debug"/>Save the file.
-
Restart the Streaming Integrator server for above change to be effective.
-
Open a text file and copy-paste following Siddhi application to it.
@App:name("CountProductions_pol") @App:description("Siddhi application to count the total number of orders.") @source(type = 'cdc', url = 'jdbc:mysql://localhost:3306/production_pol?useSSL=false', mode = 'polling', jdbc.driver.name = 'com.mysql.jdbc.Driver', polling.column = 'last_update', polling.interval = '10', username = 'wso2si', password = 'wso2', table.name = 'SweetProductionTable', @map(type = 'keyvalue' )) define stream SweetProductionStream (name string, amount double); @sink(type = 'log') define stream LogStream (totalProductions double); @info(name = 'query') from SweetProductionStream select sum(amount) as totalProductions insert into LogStream; -
Save this file as
CountProductions_pol.siddhiin the<SI_HOME>/wso2/server/deployment/siddhi-filesdirectory. When the Siddhi application is successfully deployed, the followingINFOlog appears in the Streaming Integrator console.INFO {org.wso2.carbon.stream.processor.core.internal.StreamProcessorService} - Siddhi App CountProductions_pol deployed successfully -
Now let's perform a few insert operations on the MySQL table. Execute following MySQL queries on the database:
insert into SweetProductionTable(name,amount) values('Almond cookie',100.0);insert into SweetProductionTable(name,amount) values('Baked alaska',20.0);Now you can see following logs on the SI console.
INFO {io.siddhi.core.stream.output.sink.LogSink} - CountProductions_pol : LogStream : Event{timestamp=1564385971323, data=[100.0], isExpired=false} INFO {io.siddhi.core.stream.output.sink.LogSink} - CountProductions_pol : LogStream : Event{timestamp=1564386011344, data=[120.0], isExpired=false}These logs print the sweet production count. Note that the current count of sweet production runs is being printed as
120in the second log. This is because we have so far produced120sweets:100Almond cookies and20Baked alaskas. -
Now wait for following log to appear on the SI console.
DEBUG {org.wso2.carbon.streaming.integrator.core.persistence.FileSystemPersistenceStore} - Periodic persistence of CountProductions_pol persisted successfullyThis log indicates that the current state of the Siddhi application is successfully persisted. Siddhi application state is persisted every minute, therefore you can see this log appearing every minute.
Next, you are going to insert two sweet production runs into the
SweetProductionTableand shutdown the SI server before state persistence happens (in other words, before above log appears).Tip
It is better to start pushing messages immediately after the state persistence log appears, so that you have plenty of time to push messages and shutdown the server before next log appears.
-
Now insert following sweets into the
SweetProductionTableby executing following queries on the database :insert into SweetProductionTable(name,amount) values('Croissant',100.0);insert into SweetProductionTable(name,amount) values('Croutons',100.0); -
Shutdown SI server. Here you are deliberately creating a scenario where the server crashes before the SI server could persist the latest production count.
Info
Here, the SI server crashes before the state is persisted. Therefore, the SI server cannot persist the latest count (which should include the last two production runs
100Croissants and100Croutons). The good news is, theCDC sourcereplays the last two messages, allowing the Streaming Integrator to successfully recover from the server crash. -
Restart the SI server and wait for about one minute.
-
The following log appears in the SI console:
INFO {io.siddhi.core.stream.output.sink.LogSink} - CountProductions_pol : LogStream : Event{timestamp=1564386179998, data=[220.0], isExpired=false} INFO {io.siddhi.core.stream.output.sink.LogSink} - CountProductions_pol : LogStream : Event{timestamp=1564386180004, data=[320.0], isExpired=false}
Note that the CDC source has replayed the last two messages. This indicates that the sweet production run count is correctly restored.