Correlating Data¶
The streaming integrator can correlate data in order to detect patterns and trends in streaming data. Correlating can be done via patterns as well as sequences.

The difference between patterns and sequence is that sequences require all the matching events to arrive consecutively to match the sequence condition, whereas patterns identify events that match the pattern condition irrespective of the order in which they arrive.
Identifying patterns¶
A pattern identifies a correlation between two events that may or may not arrive in a specific sequence.
Combine several patterns logically and match events¶
Logical patterns involve combining several patterns logically and matching events.
To understand this type of pattern, consider an example where you need to calculate the average shelf life of a production batch by calculating the amount of time it takes to sell the total amount in a batch. For this purpose, let's assume that the products are sold on a FIFO (First In First Out) basis.
The above requirement can be addressed by the following Siddhi application.
@App:name('ShelfLifeApp')
define stream ProductionStream (timestamp long, name string, amount double);
define stream SalesStream (timestamp long, name string, amount double);
@sink(type = 'log', prefix = "Shelf Life",
    @map(type = 'json'))
define stream ShelfLifeStream (name string, days long);
@info(name = 'Calculate Sales Total')
from SalesStream 
select timestamp, name, sum(amount) as total 
insert into SalesTotalsStream;
@info(name = 'Calculate Shelf Life')
from every e1 = ProductionStream -> e2 = SalesTotalsStream[name == e1.name and total >= e1.amount] 
select e1.name as name, time:dateDiff(e1.timestamp, e2.timestamp) as days 
insert into ShelfLifeStream;Calculate Sales Total query calculates the total sales from the sales stream that reports each sales transaction, and inserts the total into the SalesTotalsStream stream. 
In the Calculate Shelf Life query, every e1 = ProductionStream -> e2 = SalesTotalsStream[name == e1.name and total >= e1.amount] means every event in the ProductionStream stream is compared with the first subsequent event in the SalesTotalsStream stream, of which the value for the total attribute is greater than or equal to that of the amount attribute of the ProductionStream stream. When such an event exists in the SalesTotalsStream stream, the time:dateDiff() function is applied to calculate the time difference between the two events. The result is inserted into the ShelfLifeStream stream, and logged with the Shelf Life prefix via the log sink connected to this stream.
Try it out¶
To try out the Siddhi application given in the sample above, follow the steps below:
- 
Open a new file, add the following content to it and save. @App:name('ShelfLifeApp') define stream ProductionStream (timestamp long, name string, amount double); define stream SalesStream (timestamp long, name string, amount double); @sink(type = 'log', prefix = "Shelf Life", @map(type = 'json')) define stream ShelfLifeStream (name string, days long); @info(name = 'Calculate Sales Total') from SalesStream select timestamp, name, sum(amount) as total insert into SalesTotalsStream; @info(name = 'Calculate Shelf Life') from every e1 = ProductionStream -> e2 = SalesTotalsStream[name == e1.name and total >= e1.amount] select e1.name as name, time:dateDiff(e1.timestamp, e2.timestamp) as days insert into ShelfLifeStream;
- 
Simulate events for the ShelfLifeAppapplication as follows. For instructions to simulate events, see Testing Siddhi Applications - Simulating Events.- 
For the ProductionStreamstreamtimestamp name amount 1602323450000cake10 
- 
For the SalesStreamstreamtimestamp name amount 1602327050000cake51602413450000cake6
 As a result, the following is logged in the terminal. INFO {io.siddhi.core.stream.output.sink.LogSink} - Shelf Life : {"event":{"name":"cake","days":-1}} (Encoded)
- 
Count and match multiple events for a given pattern condition¶
Counting patterns involve counting and matching multiple events for a given pattern condition.
To understand this type of patterns, consider an example where the manager of a Sweet Factory needs to count the number of times number of items sold within a time period one hour exceeds 90% of the latest stock amount during that same period.
The above requirement can be addressed by the following Siddhi application.
@App:name('DetectLowStockApp')
define stream LatestStockStream (timestamp long, name string, amount double);
define stream SalesStream (timestamp long, name string, amount double);
@sink(type = 'log', prefix = "Low Stock Alert!",
    @map(type = 'json'))
define stream LowStockLevelAlertStream (name string);
from e1=LatestStockStream -> e2=SalesStream[e1.name == e2.name and e2.amount > e1.amount]<3:> within 1 hour
select e1.name as name
insert into LowStockLevelAlertStream;Try it out¶
To try out the Siddhi application given in the sample above, follow the steps below:
- 
Open a new file, add the following content to it and save. @App:name('DetectLowStockApp') define stream LatestStockStream (timestamp long, name string, amount double); define stream SalesStream (timestamp long, name string, amount double); @sink(type = 'log', prefix = "Low Stock Alert!", @map(type = 'json')) define stream LowStockLevelAlertStream (name string); from e1=LatestStockStream -> e2=SalesStream[e1.name == e2.name and e2.amount > e1.amount]<3:> within 1 hour select e1.name as name insert into LowStockLevelAlertStream;
- 
Simulate events for the DetectingLowStockAppapplication as follows. For instructions to simulate events, see Testing Siddhi Applications - Simulating Events.- 
For the LatestStockStreamstreamtimestamp name amount 1602410450000eclairs101602411290000eclairs181602412310000eclairs20
- 
For the SalesStreamstreamtimestamp name amount 1602410570000eclairs101602411350000eclairs171602412850000eclairs19
 As a result, the following is logged in the terminal. INFO {io.siddhi.core.stream.output.sink.LogSink} - Low Stock Alert! : {"event":{"name":"eclairs"}} (Encoded)
- 
Find non-occurrence of events¶
To understand how to detect the non occurrence of events, consider a scenario where the production manager of sa sweet factory needs to count the number of times a wastage occurred due to some of the items in a production batch not being sold within three days since it was produced and having to be scrapped as a result. For this purpose, let's assume that the products are sold on a FIFO (First In First Out) basis.
The above requirement can be addressed by the following Siddhi application.
@App:name('DetectWastageApp')
define stream ProductionStream (name string, amount double);
@sink(type = 'log', prefix = "WastageAlert!",
    @map(type = 'json'))
define stream WastageAlertStream (name string);
define stream SalesStream (name string, amount double);
@info(name = 'Calculate Sales Total')
from SalesStream 
select name, sum(amount) as total 
insert into SalesTotalsStream;
@info(name = 'Detect Wastage')
from e1 = ProductionStream -> not SalesTotalsStream[name == e1.name and total >= e1.amount] for 3 days 
select e1.name as name 
insert into WastageAlertStream;Calculate Sales Total query calculates the total sales from the sales stream that reports each sales transaction, and inserts the total into the SalesTotalsStream stream. 
In the Detect Wastage query, the from clause detects a pattern where the sales total for a specific product has not equalled or exceeded the amount of the last production batch reported for that product within a time period of 3 days. When this pattern condition is met, the resulting output is inserted into the WastageAlertStream; and logged in the terminal via the connected log sink.
Try it out¶
To try out the Siddhi application given in the sample above, follow the steps below:
- 
Open a new file, add the following content to it and save. 
 For testing purposes, the above Siddhi application detects the non occurrence of a matching event within three seconds instead of three days.@App:name('DetectWastageApp') define stream ProductionStream (name string, amount double); @sink(type = 'log', prefix = "WastageAlert!", @map(type = 'json')) define stream WastageAlertStream (name string); define stream SalesStream (name string, amount double); @info(name = 'Calculate Sales Total') from SalesStream select name, sum(amount) as total insert into SalesTotalsStream; @info(name = 'Detect Wastage') from e1 = ProductionStream -> not SalesTotalsStream[name == e1.name and total >= e1.amount] for 3 seconds select e1.name as name insert into WastageAlertStream;
- 
Simulate events for the ProductionStreamstream of theDetectWastageAppapplication as follows. For instructions to simulate events, see Testing Siddhi Applications - Simulating Events.name amount eclairs100The following is logged after three seconds. INFO {io.siddhi.core.stream.output.sink.LogSink} - WastageAlert! : {"event":{"name":"eclairs"}} (Encoded)
Correlating events to find a trend(sequence)¶
Sequences are identified by observing trends in events that occur consecutively.
Combine several trends logically and match events¶
Logical sequences are trends observed when consecutively occurring events match a given condition.
To understand logical sequences, consider a production manager identifying a decreasing trend in production when he/she observes a continuous decrease within three consecutive production batches.
The above requirement can be addressed by the following Siddhi application.
@App:name('DecreasingProductionAlertApp')
define stream ProductionStream (name string, amount double);
@sink(type = 'log', prefix = "Decreasing Production Alert",
    @map(type = 'json'))
define stream DecreasingProductionAlertStream (name string);
@info(name = 'query')
from every e1 = ProductionStream , e2 = ProductionStream[e1.name == name and e1.amount > amount] , e3 = ProductionStream[e2.name == name and e2.amount > amount] 
select e1.name as name 
insert into DecreasingProductionAlertStream;The above Siddhi application compares  three events (i.e., e1, e2 and e3) that occur consecutively in the ProductionStream. If the second event reports a lower production amount than the first event, and then the third event reports a lower production amount than the second event, an output event is inserted into the DecreasingProductionTrendAlertStream stream. This output event is then logged via the log sink connected to the DecreasingProductionTrendAlertStream stream.
Try it out¶
To try out the Siddhi application given in the sample above, follow the steps below:
- 
Open a new file, add the following content to it and save. @App:name('DecreasingProductionAlertApp') define stream ProductionStream (name string, amount double); @sink(type = 'log', prefix = "Decreasing Production Alert", @map(type = 'json')) define stream DecreasingProductionAlertStream (name string); @info(name = 'query') from every e1 = ProductionStream , e2 = ProductionStream[e1.name == name and e1.amount > amount] , e3 = ProductionStream[e2.name == name and e2.amount > amount] select e1.name as name insert into DecreasingProductionAlertStream;
- 
Simulate three events for the ProductionStreamstream of theDecreasingProductionAlertAppapplication as follows. For instructions to simulate events, see Testing Siddhi Applications - Simulating Events.name amount gingerbread100gingerbread90gingerbread80As a result, the following is logged in the terminal. INFO {io.siddhi.core.stream.output.sink.LogSink} - Decreasing Production Alert : {"event":{"name":"gingerbread"}} (Encoded)
Count and match multiple events for a given trend¶
Counting sequences involves counting and matching multiple consecutively occurring events that match a given condition.
To understand this, consider a scenario where the productivity of a production bot changes since it is started. To use them in an optimum way, the production manager wants to identify the peaks and slumps in the production by observing every six production runs.
The above requirement can be addressed by the following Siddhi application.
@App:name('ObserveProductionTrendsApp')
define stream ProductionStream (name string, amount double);
@sink(type = 'log', prefix = "Production Peaks",
    @map(type = 'json'))
define stream DecreasingProductionAlertStream (initialAmount double, peakAmount double, firstReducedAmount double);
@info(name = 'query')
from every e1 = ProductionStream , e2 = ProductionStream[ifThenElse(e2[last].amount is null, e1.amount <= amount, e2[last].amount <= amount)] + , e3 = ProductionStream[e2[last].amount > amount] 
select e1.amount as initialAmount, e2[last].amount as peakAmount, e3.amount as firstReducedAmount 
insert into DecreasingProductionAlertStream;ProductionStream stream. It first checks whether the value for the amount attribute of the second event is greater than that of the first event. Then for every event, it checks whether the value for the amount attribute is greater or equal to that of the previous event (i.e., via e2[last].temp). If an event occurs with a value for the amount attribute that is less than that of the preceding event, an output event is generated in the DecreasingProductionAlertStream.
Try it out¶
To try out the Siddhi application given in the sample above, follow the steps below:
- 
Open a new file, add the following content to it and save. @App:name('ObserveProductionTrendsApp') define stream ProductionStream (name string, amount double); @sink(type = 'log', prefix = "Production Peaks", @map(type = 'json')) define stream DecreasingProductionAlertStream (initialAmount double, peakAmount double, firstReducedAmount double); @info(name = 'query') from every e1 = ProductionStream , e2 = ProductionStream[ifThenElse(e2[last].amount is null, e1.amount <= amount, e2[last].amount <= amount)] + , e3 = ProductionStream[e2[last].amount > amount] select e1.amount as initialAmount, e2[last].amount as peakAmount, e3.amount as firstReducedAmount insert into DecreasingProductionAlertStream;
- 
Simulate events for the ProductionStreamstream of theObserveProductionTrendsAppapplication as follows. For instructions to simulate events, see Testing Siddhi Applications - Simulating Events.name amount gingerbread100gingerbread105gingerbread117gingerbread121gingerbread115As a result, the following is logged in the terminal. INFO {io.siddhi.core.stream.output.sink.LogSink} - Production Peaks : {"event":{"initialAmount":100.0,"peakAmount":121.0,"firstReducedAmount":115.0}} (Encoded)