Enriching Data¶
Enriching data involves integrated the data received into a streaming integration flow with data from other medium such as a data store, another data stream, or an external service to derive an expected result.
Integrating data streams and static data¶
This involves enriching a data stream by joining it with a data store.
For example, consider a scenario where a sweet factory reports its production data in a streaming manner after each run. To update the stock with the latest amount produced, you need to add the latest production amounts to the stock amounts saved in a database.
To address this, you can write a query as follows.
from ProductionStream as p
join StocksTable as s
on p.name == s.name
select p.name as name, sum(p.amount) + s.amount as amount
group by p.name
insert into UpdateStockwithProductionStream;
Here, the ProductionStream stream that has the production amounts for sweets after each production run is assigned the short name p. The StocksTable that has the current stock for each product before the latest production runs is given the short name s. This allows you to uniquely identify the attributes in each. The matching condition is p.name == s.name, which means that a match is identified when an event in the ProductionStream stream has the same value for the name attribute as a record in the StockTable table.
sum(p.amount) calculates the total production per product. This total production amount for a product is then added to the stock amount of the product (i.e., s.amount). The resulting output is inserted into the UpdateStockwithProductionStream stream.
Integrating multiple data streams¶
This involves enriching a data stream by joining it with another data stream.
To understand this, consider the example you used in the previous Integrating data streams and static data section where you directed the stock amounts updated with the latest production amounts into a stream. Assume another stream reports the sale of stock in a streaming manner. To further update the stock by deducting the amounts sold, you need to join the data stream that has the latest stock amounts with the data stream that has the sales amounts.
To address the above requirement, you can write a query as follows.
from UpdateStockwithProductionStream as u
join SalesStream as s
on u.name == s.name
select u.name as name, sum(u.amount) - sum(s.amount) as amount
insert into LatestStockStream;
Here, u is the short name for the UpdateStockwithProductionStream stream that has the total stock amounts for each product updated with the latest production amounts. s is the short name for the SalesStream stream that reports the sales for all the products in a streaming manner. Matching events are the events with the same value for the name attribute.
To calculate the latest stock amount for each product, the total sales amount is deducted from the total stock amount updated with production amounts. The resulting output is inserted into the LatestStockStream.
Integrating data streams with external services¶
This involves enriching a data stream by incorporating information received from an external service to it.
To understand this, consider that in order to value the stock, a Sweet Factory obtains the value of one unit of a product from an external application named StockValuingApp. When you submit the name of the product, it returns the unit value. To value the stock based on this information, you can create a Siddhi application as follows:
@App:name("StockValuingApp")
@sink(type='http-request',publisher.url='http://localhost:5005/CheckProductValueEP',method='POST', headers="'Content-Type:application/x-www-form-urlencoded'",
sink.id="unitvalueSink",
@map(type='keyvalue', @payload(Sweet='')))
define stream CheckUnitValueStream (name string);
@source(type='http-response' ,sink.id='unitvalueSink',
@map(type='xml', namespaces = "xmlns=http://localhost:5005/CheckProductValueEP/",
@attributes(name = 'trp:name',unitValue = ".")))
define stream StoreUnitValueStream (name string,unitValue double);
from StoreUnitValueStream
select *
update or insert into ProductValueTable;
In the above application, events in the CheckUnitValueStream stream are published to the http://localhost:5005/CheckProductValueEP URL via the connected http-request sink to invoke a service that returns the unit value for the name of the product sent. ESB Streaming Integrator captures this response (i.e., unit value) in the StoreUnitValueStream stream via the http-response source connected to the stream. In order to allow ESB Streaming Integrator to identify the response as the result of the request it previously sent, the same value is specified for the sink.id parameter in both the source configuration and the sink configuration.
To store the unit values obtained for further processing, all the events in the StoreUnitValueStream stream are inserted into a table named ProductValueTable.
Enriching data with built-in extensions¶
The following is a list of Siddhi extensions with which you can enrich data.
- Siddhi-execution-streamingml
- Siddhi-execution-env
- Siddhi-execution-math
- Siddhi-execution-string
- Siddhi-execution-time
- Siddhi-execution-json
Try it out¶
To try out the examples given above, follow the steps below.
-
Set up your database as follows:
-
Download and install MySQL. Then start the MySQL server and create a new database in it by issuing the following command:
CREATE SCHEMA stock; -
Create a table in the
stockdatabase you created by issuing the following two commands.use stock;CREATE TABLE StockTable (name VARCHAR(20),amount double(10,2)); -
Insert two records into the
StockTableyou created by issuing the following commands.insert into StockTable values('gingerbread',8.0);insert into StockTable values('coffee cake',6.0); -
Then open the
<SI_TOOLING_HOME>/conf/server/deployment.yamlfile and add the following data source configuration underdatasources.- name: Stock_DB description: The datasource used for Stock Valuation jndiConfig: name: jdbc/stock definition: type: RDBMS configuration: jdbcUrl: 'jdbc:mysql://localhost:3306/stock?useSSL=false' username: root password: root driverClassName: com.mysql.jdbc.Driver minIdle: 5 maxPoolSize: 50 idleTimeout: 60000 connectionTestQuery: SELECT 1 validationTimeout: 30000 isAutoCommit: false
-
-
Open a new file in Streaming Integrator Tooling. Then add and save the following Siddhi application.
@App:name('StockValuingApp') @App:description('Description of the plan') define stream ProductionStream (name string, amount double); @source(type = 'http-response', sink.id = "unitvalueSink", @map(type = 'xml', namespaces = "xmlns=http://localhost:5005/CheckProductValueEP/", @attributes(name = "trp:name", unitValue = "."))) define stream GetUnitValueStream (name string, unitValue double); @sink(type = 'log', prefix = "Stock Update After Production", @map(type = 'text')) define stream UpdateStockwithProductionStream (name string, amount double); define stream SalesStream (name string, amount double); @sink(type = 'http-request', publisher.url = "http://localhost:5005/CheckProductValueEP", method = "POST", headers = "'Content-Type:application/x-www-form-urlencoded'", sink.id = "unitvalueSink", @map(type = 'keyvalue')) define stream CheckUnitValueStream (name string); @sink(type = 'log', prefix = "Latest Stock", @map(type = 'text')) define stream LatestStockStream (name string, amount double); @sink(type = 'log', prefix = "Stock Value", @map(type = 'text')) define stream StockValueStream (name string, value double); @store(type = 'rdbms', jdbc.url = "jdbc:mysql://localhost:3306/stock?useSSL=false", username = "root", password = "root", jdbc.driver.name = "com.mysql.jdbc.Driver") @primaryKey("name") define table StockTable (name string, amount double); @info(name = 'UpdateStockwithProduction') from ProductionStream as p join StockTable as s on p.name == s.name select p.name as name, sum(p.amount) + s.amount as amount group by p.name insert into UpdateStockwithProductionStream; @info(name='UpdateStockwithSales') from UpdateStockwithProductionStream#window.time(5 min) as u join SalesStream as s on u.name == s.name select u.name as name, sum(u.amount) - sum(s.amount) as amount insert into LatestStockStream; @info(name='CalculateStockValue') from LatestStockStream as l join GetUnitValueStream as g on l.name == g.name select l.name as name, g.unitValue * l.amount as value insert into StockValueStream;
The above Siddhi application does the following:
-
Updates the stock amount for each product by adding the total production amount to the stock amount. This is done by joining the
StockTabletable with theProductionStreamstream. Then it logs the result with theStock Update After Productionprefix. -
Calculates the latest stock by deducting the total sales for a product from the updated stock derived by adding the total production amount with the current stock amount. This is done by joining the
UpdateStockwithProductionStreamstream with theSalesStreamstream. The result is logged with theLatest Stockprefix. -
Sends requests with the product name to an external service with the
http://localhost:5005/CheckProductValueEPendpoint and receives the unit value of the submitted product name as a response. This response is captured in theGetUnitValueStreamstream. -
Calculates the stock value by multiplying the latest stock with the unit value obtained from the external service. This is done by joining the
GetUnitValueStreamstream with theLatestStockStreamstream. The result is then logged with theStock Valueprefix. -
In Streaming Integrator Tooling, create a new Siddhi application as follows, save it, and then start it.
This application functions as the external service for testing purposes.@App:name('ReturnUnitValueApp') @source(type='http-service' , source.id='unitValue', receiver.url='http://localhost:5005/CheckProductValueEP/', @map(type = 'json')) define stream RequestStream (name string); @sink(type='http-service-response', source.id='unitValue', message.id='', @map(type = 'json')) define stream ResultStream (name string, unitValue double); from RequestStream select name, ifThenElse(name == 'gingerbread', 10.0, 20.0) as unitValue insert into ResultStream; -
Simulate events for the
StockValuingAppapplication as follows. For instructions to simulate events, see Testing Siddhi Applications - Simulating Events.-
First, simulate two events for the
ProductionStreamstream with the following values.Name Amount gingerbread10coffee cake10As a result, the following logs appear in the terminal.
INFO {io.siddhi.core.stream.output.sink.LogSink} - Stock Update After Production : name:"gingerbread", amount:18.0 (Encoded)INFO {io.siddhi.core.stream.output.sink.LogSink} - Stock Update After Production : name:"coffee cake", amount:16.0 (Encoded)
Here, the
StockUpdateAfterProductionvalue is calculated by adding the production value to the stock value saved in the database.-
Now simulate the following two events to the
SalesStreamstream.Name Amount gingerbread12coffee cake13As a result, the following logs appear in the terminal.
INFO {io.siddhi.core.stream.output.sink.LogSink} - Latest Stock : name:"gingerbread", amount:6.0 (Encoded)INFO {io.siddhi.core.stream.output.sink.LogSink} - Latest Stock : name:"coffee cake", amount:3.0 (Encoded) -
Now simulate two events to the
CheckUnitValueStreamstream. Entergingerbreadandcoffee cakeas thenamefor the first and the second event respectively.As a result, the following logs appear in the terminal.
INFO {io.siddhi.core.stream.output.sink.LogSink} - Stock Value : name:"gingerbread", value:60.0 (Encoded)INFO {io.siddhi.core.stream.output.sink.LogSink} - Stock Value : name:"coffee cake", value:60.0 (Encoded)
-