Integrating Data Stores in Streaming Integration¶
Introduction¶
ESB Streaming Integrator allows you to incorporate data stores when performing various streaming integration activities. The methods in which this is done include:
- Change data capture
- Storing received data/processed data in data stores
- Performing CRUD operations in data stores
[Performing Real-time Change Data Capture with MySQL] tutorial covers how to perform change data capture in details. Therefore, in this tutorial, let's learn how ESB Streaming Integrator can incorporate data stores in streaming operations by performing CRUD operations.
Scenario¶
Let's consider the example of a Sweet Factory that stores the following information in three different databases:
- Records of material purchases to be used in production
- Records of material dispatches for production
- The current stock of materials
In order to manage the material stock and to maintain the required records, the Factory Manager needs to do the following activities:
- Record each material purchase in the store for purchases.
- Record each dispatch of material for production in the store for dispatches.
- Update the store with current stock for each material after each purchase and dispatch to keep it up to date.
Recording purchases and dispatches involve inserting new records into data stores. To maintain the current stock records, the Factory Manager needs to retrieve information about both purchases and dispatches, calculate the impact of both on the current stock and then perform an insert/update operation to the store with the stock records.
To understand how the ESB Streaming Integrator performs these operations, follow the steps below.
Before you begin¶
You need to complete the following prerequisites before you begin:
-
You need to have access to a MySQL instance.
-
Install
rdbms-mysqlextension in ESB Streaming Integrator as follows:-
Start ESB Streaming Integrator by navigating to the
<SI_HOME>/bindirectory and issuing the appropriate command based on your operating system.- For Linux :
./server.sh - For Windows:
server.bat --run
- For Linux :
-
To install the
rdbms-mysqlextension, navigate to the to the<SI_HOME>/bindirectory and issue the appropriate command based on your operating system:- For Linux :
./extension-installer.sh - For Windows:
extension-installer.bat --run
- For Linux :
-
Restart the ESB Streaming Integrator server.
-
-
Install the
rdbms-mysqlextension in ESB Streaming Integrator as follows:-
Start ESB Streaming Integrator Tooling by navigating to the
<SI_TOOLING_HOME>/bindirectory and issuing the appropriate command based on your operating system.- For Linux :
./tooling.sh - For Windows:
tooling.bat --run
- For Linux :
-
Access Streaming Integrator Tooling. Then click Tools -> Extension Installer to open the Extension Installer dialog box.
-
In the Extension Installer dialog box, click Install for the RDBMS-MYSQL extension. Then click Install in the message that appears to confirm whether you want to proceed.
-
Restart ESB Streaming Integrator Tooling.
-
-
Start the MySQL server.
-
Create three MySQL databases by issuing the following commands.
CREATE SCHEMA purchases;CREATE SCHEMA dispatches;CREATE SCHEMA closingstock;
Step 1: Connect a Siddhi application to data stores¶
In this section, let's learn the different ways in which you can connect a Siddhi application to a data store.
In Streaming Integrator Tooling, open a new file and start creating a new Siddhi Application named StockManagementApp.
```
@App:name("StockManagementApp")
@App:description("Managing Raw Materials")
```
Now let's connect to the data stores (i.e., databases) you previously created to the Siddhi application. There are three methods in which this can be done. To learn them, let's connect each of the three databases in a different method.
Connect to a store via a data source¶
To connect to the closingstock database via a data source, follow the steps below:
-
Define a data source in the
<SI_TOOLING_HOME>/conf/server/deployment.yamlfile as follows:
The above data source connects to the- name: Stock_DB description: The datasource used for stock records jndiConfig: name: jdbc/closingstock definition: type: RDBMS configuration: jdbcUrl: 'jdbc:mysql://localhost:3306/closingstock?useSSL=false' username: root password: root driverClassName: com.mysql.jdbc.Driver minIdle: 5 maxPoolSize: 50 idleTimeout: 60000 connectionTestQuery: SELECT 1 validationTimeout: 30000 isAutoCommit: falseclosingstockdatabase you previously created via thejdbcUrlspecified. -
Now include the following table definition in the
StockManagementAppSiddhi application that you started creating.@store(type = 'rdbms', datasource = "Stock_DB") @primaryKey('name' ) define table StockTable (name string, amount double);In the above table definition:
-
The table has the two attributes
nameandamountto match theclosingstockdatabase you previously created. -
The
@storeannotation specifies the database type asrdbmsand connects the table to theStock_DBdata source you configured. Thus, you are connected to theclosingstockdatabase via the data source. -
The
@primaryKeyannotation specifiesnameas the primary key of the table, requiring each record in the table to have a unique value forname.
-
Refer to an externally defined store¶
To connect to the purchases database via a reference, follow the steps below:
-
In the
<SI_TOOLING_HOME>/conf/server/deployment.yamlfile, add a subsection for refs, and then add a ref as shown below:
The above reference connects to thesiddhi: refs: - ref: name: 'purchases' type: 'rdbms' properties: jdbc.url: "jdbc:mysql://localhost:3306/purchases?useSSL=false" username: 'root' password: 'root' jdbc.driver.name: 'com.mysql.jdbc.Driver'purchasesdatabase that you previously created. -
Now include the following table definition in the
StockManagementAppSiddhi application.@store(type = 'rdbms', ref = "purchases") define table PurchasesTable (timestamp long, name string, amount double);
Configure the data store inline¶
You can define the data store configuration for the dispatches database by adding a table definition in the StockManagementApp Siddhi application as follows:
@store(type = 'rdbms', jdbc.url = "jdbc:mysql://localhost:3306/dispatches?useSSL=false", username = "root", password = "root", jdbc.driver.name = "com.mysql.jdbc.Driver")
define table DispatchesTable (timestamp long, name string, amount double);
Here, you are configuring the data store configuration in the Siddhi application itself. The Siddhi application connects to the dispatches database via the specified JDBC URL.
Step 2: Perform CRUD operations¶
Perform CRUD operations via Siddhi queries¶
In this section, let's complete the StockManagementApp Siddhi application by adding the streams and queries to perform CRUD operations.
-
First, let's define the streams that receive information about material purchases and dispatches as follows.
-
For purchases:
define stream MaterialPurchasesStream (timestamp long, name string, amount double); -
For dispatches:
Now let's write Siddhi queries to perform different CRUD operations as follows:define stream MaterialDispatchesStream (timestamp long, name string, amount double);
-
Insert records¶
To insert values into purchases and dispatches databases, let's write two queries as follows:
-
For purchases:
@info(name = 'Save purchase records') from MaterialPurchasesStream select * insert into PurchasesTable; -
For dispatches:
To try out these queries, simulate events for the streams via the Event Simulator as follows:@info(name = 'Save purchase records') from MaterialPurchasesStream select * insert into PurchasesTable; -
Save the Siddhi application.
The complete Siddhi application looks as follows:
@App:name('StockManagementApp') @App:description('Managing Raw Materials') define stream MaterialDispatchesStream (timestamp long, name string, amount double); define stream MaterialPurchasesStream (timestamp long, name string, amount double); @store(type = 'rdbms', jdbc.url = "jdbc:mysql://localhost:3306/dispatches?useSSL=false", username = "root", password = "root", jdbc.driver.name = "com.mysql.jdbc.Driver") define table DispatchesTable (timestamp long, name string, amount double); @store(type = 'rdbms', ref = "purchases") define table PurchasesTable (timestamp long, name string, amount double); @store(type = 'rdbms', datasource = "Stock_DB") @primaryKey("name") define table StockTable (name string, amount double); @info(name = 'Save material dispatch records') from MaterialDispatchesStream select * insert into DispatchesTable; @info(name = 'Save purchase records') from MaterialPurchasesStream select * insert into PurchasesTable;Then start it by clicking the play icon for it in the top panel.
-
Click the Event Simulator icon to open the event simulator.

It opens the left panel for event simulation as follows.

-
To simulate purchase events, select
StockManagementAppfor the Siddhi App Name field, andMaterialPurchasesStreamfor the Stream Name field.Then enter values for the attribute fields as follows and click Send.
timestamp name amount 1608023646000honey150 -
To simulate an event for material dispatches, select
StockManagementAppfor the Siddhi App Name field, andMaterialDispatchesStreamfor the Stream Name field.Then enter values for the attribute fields as follows and click Send.
timestamp name amount 1608023646000honey100 -
To check whether the above insertions were successful, issue the following MySQL commands in the terminal in which you are running the MySQL server.
-
For the
purchasesdatabase:use purchases;select * from PurchasesTable;
The following table is displayed.

-
For the
dispatchesdatabase:use dispatches;select * from DispatchesTable;
The following table is displayed.

-
Retrieve Records¶
Assume that the Factory Manager needs to view all the purchase records for honey. This can be done by following the steps below:
-
To receive the record retrieval requests as input events, define an input stream as follows:
define stream PurchaseRecordRetrievalStream (name string);
This stream only has the name attribute because only the name is needed to filter the search results
-
To present the retrieved records, define an output stream as follows:
@sink(type = 'log', prefix = "Search Results", @map(type = 'passThrough')) define stream SearchResultsStream (timestamp long, name string, amount double);The
SearchResultsStreamoutput stream has all the attributes of thePurchasesTabletable to retrieve the complete record. Also, the@sinkannotation connects this stream to a log sink so that the search results can be logged. -
Now lets add a join query to join the
PurchaseRecordRetrievalStreamand thePurchasesTabletable.@info(name = 'Retrieve purchase records') from PurchaseRecordRetrievalStream as s join PurchasesTable as p on s.name == p.name select p.timestamp as timestamp, s.name as name, p.amount as amount group by p.name insert into SearchResultsStream;
Note the following about the above join query.
-
The stream is assigned the short name
sand the table is assigned the short namep. -
Based on the previous point,
on s.name == p.namecondition specifies that a matching event is identified when thePurchasesTablehas a record where the value for thenameattribute is the same as that of the stream. -
The
selectclause the query specifies that when such a matching event is identified, attribute values for the output event should be selected as follows:- The timestamp from the table - The name from the stream - The amount from the table -
The
insert intoclause specifies that the output events derived as stated above should be inserted into theSearchResultsStream. -
Save the Siddhi application. The complete Siddhi application after the above changes looks as follows:
5. Open the Event Simulator and simulate an event for the@App:name('StockManagementApp') @App:description('Managing Raw Materials') define stream MaterialDispatchesStream (timestamp long, name string, amount double); @sink(type = 'log', prefix = "Search Results", @map(type = 'passThrough')) define stream SearchResultsStream (timestamp long, name string, amount double); define stream MaterialPurchasesStream (timestamp long, name string, amount double); define stream PurchaseRecordRetrievalStream (name string); @store(type = 'rdbms', jdbc.url = "jdbc:mysql://localhost:3306/dispatches?useSSL=false", username = "root", password = "root", jdbc.driver.name = "com.mysql.jdbc.Driver") define table DispatchesTable (timestamp long, name string, amount double); @store(type = 'rdbms', ref = "purchases") define table PurchasesTable (timestamp long, name string, amount double); @store(type = 'rdbms', datasource = "Stock_DB") @primaryKey("name") define table StockTable (name string, amount double); @info(name = 'Save material dispatch records') from MaterialDispatchesStream select * insert into DispatchesTable; @info(name = 'Save purchase records') from MaterialPurchasesStream select * insert into PurchasesTable; @info(name = 'Retrieve purchase records') from PurchaseRecordRetrievalStream as s join PurchasesTable as p on s.name == p.name select p.timestamp as timestamp, s.name as name, p.amount as amount group by p.name insert into SearchResultsStream;PurchaseRecordRetrievalStreamstream of theStockManagementAppSiddhi application withhoneyas the value for the name attribute.The following is logged in the terminal.

Update or insert records¶
The Stock Table table at any given time contains a single record per product, showing the current closing stock for the relevant product. When you send a new event reporting a stock value to the table, the outcome is one of the following:
- If a record with the same value for
namealready exists, the event updates the value for theamountattribute in that record. - If a record with the same value for
namedoes not exist, the new event is inserted into the table as a new record.
To try this, follow the steps below:
-
Add a new stream as follows:
define stream LatestStockStream (name string, amount double); -
Now add a query to update or insert values into the
StockTablestream as follows:@info(name = 'Update or Record Stock') from LatestStockStream select name, amount update or insert into StockTable set LatestStockStream.amount = amount on StockTable.name == nameHere, the Streaming Integrator checks whether an event in the
LatestStockStreamhas a matching record in theStockTabletable where the value for thenameattribute is the same. If such a record exists, the value for theamountattribute in that record is set to the amount reported via the stream event. If no matching event exists, the stream event is inserted as a new event -
Save the Siddhi application. The complete Siddhi application is as follows:
@App:name('StockManagementApp') @App:description('Managing Raw Materials') define stream MaterialDispatchesStream (timestamp long, name string, amount double); @sink(type = 'log', prefix = "Search Results", @map(type = 'passThrough')) define stream SearchResultsStream (timestamp long, name string, amount double); define stream MaterialPurchasesStream (timestamp long, name string, amount double); define stream PurchaseRecordRetrievalStream (name string); define stream LatestStockStream (name string, amount double); @store(type = 'rdbms', jdbc.url = "jdbc:mysql://localhost:3306/dispatches?useSSL=false", username = "root", password = "root", jdbc.driver.name = "com.mysql.jdbc.Driver") define table DispatchesTable (timestamp long, name string, amount double); @store(type = 'rdbms', ref = "purchases") define table PurchasesTable (timestamp long, name string, amount double); @store(type = 'rdbms', datasource = "Stock_DB") @primaryKey("name") define table StockTable (name string, amount double); @info(name = 'Save material dispatch records') from MaterialDispatchesStream select * insert into DispatchesTable; @info(name = 'Save purchase records') from MaterialPurchasesStream select * insert into PurchasesTable; @info(name = 'Retrieve purchase records') from PurchaseRecordRetrievalStream as s join PurchasesTable as p on s.name == p.name select p.timestamp as timestamp, s.name as name, p.amount as amount group by p.name insert into SearchResultsStream; @info(name = ''Update or Record Stock'') from LatestStockStream select name, amount update or insert into StockTable set LatestStockStream.amount = amount on StockTable.name == name -
Simulate events as follows:
-
In the event simulator, select StockManagementApp for the Siddhi App Name field, and select LatestStockStream for the Stream Name field.
-
Enter the following values for the attribute fields and send the event.
name amount flour150 -
Execute the following MySQL queries:
use closing stockselect * from StockTableThe following is displayed.

Here, the single record displayed is the event you sent. This event is inserted as a new record because the
StockTabletable did not have any records. -
Now simulate another event for the same stream with the following attribute values:
name amount flour200 -
Execute the following MySQL queries:
use closing stockselect * from StockTableThe following is displayed.

Again, a single record is displayed. Although value for the
nameattribute is the same, the value for theamountattribute has been updated from150to200. This is becausenameis the primary key of theStockTabletable and at any given time, there can be only one record with a specific name for thenameattribute. Therefore, because you simulated two events with the same value for thenameattribute, the second event updated the first one.
-
Update records¶
To update the StockTable table via streams, follow the steps below:
-
Add a new stream as follows:
define stream UpdateStockStream (name string, amount double); -
Now add a query to update the values in the
StockTablestream as follows:
Here, the Streaming Integrator checks whether an event in the@info(name = 'Update Stock') from UpdateStockStream select name, amount update StockTable set UpdateStockStream.amount = amount on StockTable.name == name;UpdateStockStreamhas a matching record in theStockTabletable where the value for thenameattribute is the same. If such a record exists, the value for theamountattribute in that record is set to the amount reported via the stream event. -
Save the Siddhi application. The complete Siddhi application is as follows:
@App:name('StockManagementApp') @App:description('Managing Raw Materials') define stream MaterialDispatchesStream (timestamp long, name string, amount double); @sink(type = 'log', prefix = "Search Results", @map(type = 'passThrough')) define stream SearchResultsStream (timestamp long, name string, amount double); define stream MaterialPurchasesStream (timestamp long, name string, amount double); define stream PurchaseRecordRetrievalStream (name string); define stream LatestStockStream (name string, amount double); @store(type = 'rdbms', jdbc.url = "jdbc:mysql://localhost:3306/dispatches?useSSL=false", username = "root", password = "root", jdbc.driver.name = "com.mysql.jdbc.Driver") define table DispatchesTable (timestamp long, name string, amount double); @store(type = 'rdbms', ref = "purchases") define table PurchasesTable (timestamp long, name string, amount double); @store(type = 'rdbms', datasource = "Stock_DB") @primaryKey("name") define table StockTable (name string, amount double); @info(name = 'Save material dispatch records') from MaterialDispatchesStream select * insert into DispatchesTable; @info(name = 'Save purchase records') from MaterialPurchasesStream select * insert into PurchasesTable; @info(name = 'Retrieve purchase records') from PurchaseRecordRetrievalStream as s join PurchasesTable as p on s.name == p.name select p.timestamp as timestamp, s.name as name, p.amount as amount group by p.name insert into SearchResultsStream; @info(name = ''Update or Record Stock'') from LatestStockStream select name, amount update or insert into StockTable set LatestStockStream.amount = amount on StockTable.name == name @info(name = 'Update Stock') from UpdateStockStream select name, amount update StockTable set UpdateStockStream.amount = amount on StockTable.name == name; -
Simulate events as follows:
-
In the event simulator, select StockManagementApp for the Siddhi App Name field, and select UpdateStockStream for the Stream Name field.
-
Enter the following values for the attribute fields and send the event.
name amount flour129 -
Execute the following MySQL queries:
use closing stockselect * from StockTable
The following is displayed.

Here, the single record displayed is the event you sent. This event is inserted as a new record because the
StockTabletable did not have any records. -
Delete records¶
To delete records in the StockTable table via streams, follow the steps below:
-
Add a new stream as follows:
define stream DeleteStream (name string, amount double); -
Now add a query to update the values in the
StockTablestream as follows:
Here, the Streaming Integrator checks whether an event in the@info(name = 'Delete Stock') from DeleteStream select name, amount delete StockTable on StockTable.name == name;DeleteStreamhas a matching record in theStockTabletable where the value for thenameattribute is the same. If such a record exists, it is deleted. -
Save the Siddhi application. The complete Siddhi application is as follows:
@App:name('StockManagementApp') @App:description('Managing Raw Materials') define stream MaterialDispatchesStream (timestamp long, name string, amount double); @sink(type = 'log', prefix = "Search Results", @map(type = 'passThrough')) define stream SearchResultsStream (timestamp long, name string, amount double); define stream MaterialPurchasesStream (timestamp long, name string, amount double); define stream PurchaseRecordRetrievalStream (name string); define stream LatestStockStream (name string, amount double); @store(type = 'rdbms', jdbc.url = "jdbc:mysql://localhost:3306/dispatches?useSSL=false", username = "root", password = "root", jdbc.driver.name = "com.mysql.jdbc.Driver") define table DispatchesTable (timestamp long, name string, amount double); @store(type = 'rdbms', ref = "purchases") define table PurchasesTable (timestamp long, name string, amount double); @store(type = 'rdbms', datasource = "Stock_DB") @primaryKey("name") define table StockTable (name string, amount double); @info(name = 'Save material dispatch records') from MaterialDispatchesStream select * insert into DispatchesTable; @info(name = 'Save purchase records') from MaterialPurchasesStream select * insert into PurchasesTable; @info(name = 'Retrieve purchase records') from PurchaseRecordRetrievalStream as s join PurchasesTable as p on s.name == p.name select p.timestamp as timestamp, s.name as name, p.amount as amount group by p.name insert into SearchResultsStream; @info(name = ''Update or Record Stock'') from LatestStockStream select name, amount update or insert into StockTable set LatestStockStream.amount = amount on StockTable.name == name @info(name = 'Update Stock') from UpdateStockStream select name, amount update StockTable set UpdateStockStream.amount = amount on StockTable.name == name; @info(name = 'Delete Stock') from DeleteStream select name, amount delete StockTable on StockTable.name == name; -
Simulate events as follows:
-
In the event simulator, select StockManagementApp for the Siddhi App Name field, and select DeleteStream for the Stream Name field.
-
Enter the following values for the attribute fields and send the event.
name amount flour129 -
Execute the following MySQL queries:
use closing stockselect * from StockTable
The
StockTableis displayed as an empty set. This is because the event you sent to theDeleteStreamstream matched the record in the table, and as a result, the record was deleted by theDelete Stockquery. -
Perform CRUD operations via REST API¶
In this section, let's perform CRUD operations via the Store API
Insert records¶
To insert a record into the StockTable table, issue the following CURL command:
curl -X POST http://localhost:7370/stores/query -H "content-type: application/json" -u "admin:admin" -d '{"appName" : "StockManagementApp", "query" : "select \"sugar\" as name, 200.0 as amount insert into StockTable;" }' -k
Then issue the following commands in the terminal where you are running the MySQL server.
use closingstock;
select * from StockTable;
The following is displayed:

Retrieve records¶
To retrieve a record from the StockTable table, issue the following CURL command:
curl -X POST http://localhost:7370/stores/query -H "content-type: application/json" -u "admin:admin" -d '{"appName" : "StockManagementApp", "query" : "from StockTable on name == \"sugar\" select name, amount; " }' -k
This returns the following response:
{"records":[["sugar",200.0]]
Update or inserts records¶
First, let's send an event that has the same value for the name attribute as the existing record in the StockTable table. To do this, issue the following command:
curl -X POST http://localhost:7370/stores/query -H "content-type: application/json" -u "admin:admin" -d '{"appName" : "StockManagementApp", "query" : "select \"sugar\" as name, 260.0 as amount update or insert into StockTable set amount = amount on StockTable.name == name;" }' -k
Then issue the following commands in the terminal where you are running the MySQL server.
use closingstock;
select * from StockTable;
The following is displayed:

Now let's send an event where the value for the name attribute is different to that of the existing value in the StockTable table as follows:
curl -X POST http://localhost:7370/stores/query -H "content-type: application/json" -u "admin:admin" -d '{"appName" : "StockManagementApp", "query" : "select \"vanilla\" as name, 100.0 as amount update or insert into StockTable set amount = amount on StockTable.name == name;" }' -k
Then issue the following commands in the terminal where you are running the MySQL server.
use closingstock;
select * from StockTable;
The following is displayed:

Update records¶
To update an existing record in the StockTable table, issue the following CURL command:
curl -X POST http://localhost:7370/stores/query -H "content-type: application/json" -u "admin:admin" -d '{"appName" : "StockManagementApp", "query" : "select \"vanilla\" as name, 127.0 as amount update StockTable set amount = amount on StockTable.name == name;" }' -k
Then issue the following commands in the terminal where you are running the MySQL server.
use closingstock;
select * from StockTable;
The following is displayed:

Delete records¶
To delete an existing record in the StockTable table, issue the following CURL command:
curl -X POST http://localhost:7370/stores/query -H "content-type: application/json" -u "admin:admin" -d '{"appName" : "StockManagementApp", "query" : "select \"vanilla\" as name delete StockTable on StockTable.name == name;" }' -k
Then issue the following commands in the terminal where you are running the MySQL server.
use closingstock;
select * from StockTable;
The following is displayed:
Manipulate data in stores via SQL queries¶
You can execute SQL queries via ESB Streaming Integrator to manipulate data in data stores. This is supported via the siddhi-store-rdbms extension.
Before you begin:
To allow Streaming Integrator Tooling to perform CRUD operations, open <SI_TOOLING_HOME>/conf/server/deployment.yaml file, and add an extract as shown below with the perform.CRUD.operations parameter set to true as shown below:
siddhi:
extensions:
-
extension:
name: cud
namespace: rdbms
properties:
perform.CUD.operations: true
To perform CRUD operations in multiple tables via ESB Streaming Integrator, follow the steps below:
To start creating the Siddhi application with the required tables, follow the steps below:
-
In ESB Streaming Integrator Tooling, open the
StockManagementAppthat you previously created. -
Define a new stream in it named
StockStreamas follows.define stream TriggerStream (name string, amount double); -
Add a query as follows:
This query updates the record in thefrom TriggerStream#rdbms:cud("Stock_DB", "UPDATE StockTable SET name='sugarsyrup' where name='sugar'") select name, amount insert into OutputStreamStockTabletable where the value for thenameattribute issugarby changing that same value tosugarsyrup. -
Save the Siddhi application. The complete Siddhi application is now as follows:
@App:name('StockManagementApp') define stream MaterialDispatchesStream (timestamp long, name string, amount double); @sink(type = 'log', prefix = "Search Results", @map(type = 'passThrough')) define stream SearchResultsStream (timestamp long, name string, amount double); define stream MaterialPurchasesStream (timestamp long, name string, amount double); define stream PurchaseRecordRetrievalStream (name string); define stream LatestStockStream (name string, amount double); define stream UpdateStockStream (name string, amount double); define stream DeleteStream (name string, amount double); define stream TriggerStream (name string, amount double); @store(type = 'rdbms', jdbc.url = "jdbc:mysql://localhost:3306/dispatches?useSSL=false", username = "root", password = "root", jdbc.driver.name = "com.mysql.jdbc.Driver") define table DispatchesTable (timestamp long, name string, amount double); @store(type = 'rdbms', ref = "purchases") define table PurchasesTable (timestamp long, name string, amount double); @store(type = 'rdbms', datasource = "Stock_DB") @primaryKey("name") define table StockTable (name string, amount double); @info(name = 'Save material dispatch records') from MaterialDispatchesStream select * insert into DispatchesTable; @info(name = 'Save purchase records') from MaterialPurchasesStream select * insert into PurchasesTable; @info(name = 'Retrieve purchase records') from PurchaseRecordRetrievalStream as s join PurchasesTable as p on s.name == p.name select p.timestamp as timestamp, s.name as name, p.amount as amount group by p.name insert into SearchResultsStream; @info(name = 'Update or Record Stock') from LatestStockStream select name, amount update or insert into StockTable set LatestStockStream.amount = amount on StockTable.name == name; @info(name = 'Update Stock') from UpdateStockStream select name, amount update StockTable set UpdateStockStream.amount = amount on StockTable.name == name; @info(name = 'Delete Stock') from DeleteStream select name, amount delete StockTable on StockTable.name == name; from TriggerStream#rdbms:cud("Stock_DB", "UPDATE StockTable SET name='sugarsyrup' where name='sugar'") select name, amount insert into OutputStream -
Simulate an event for the
TriggerStreamstream of theStockManagementAppSiddhi application. You can enter any values of your choice as the attribute values. -
To check the database table, issue the following MySQL commands.
use closingstock;select * from StockTable;
The following is displayed:
