Aggregating Data Incrementally¶
Purpose¶
This example demonstrates how to get running statistics using Siddhi. The sample Siddhi application aggregates the data relating to the raw material purchases of a sweet production factory.
Before you begin:
- Install MySQL.
- Add the MySQL JDBC driver to your Streaming Integrator library as follows:- Download the JDBC driver from the MySQL site. 
- Extract the MySQL JDBC Driver zip file you downloaded. Then use the jarbundletool in the<SI_TOOLING_HOME>/bindirectory to convert the jars in it into OSGi bundles. To do this, issue one of the following commands:- For Windows: <SI_TOOLING_HOME>/bin/jartobundle.bat <PATH_OF_DOWNLOADED_JAR> <PATH_OF_CONVERTED_JAR>
- For Linux: <SI_TOOLING_HOME>/bin/jartobundle.sh <PATH_OF_DOWNLOADED_JAR> <PATH_OF_CONVERTED_JAR
 
- For Windows: 
- Copy the converted bundles to the <SI_TOOLING_HOME>/libdirectory.
 
- Download the JDBC driver from the MySQL site. 
- Create a data store named sweetFactoryDBin MySQL with relevant access privileges.
- Replace the values for the jdbc.url,username, andpasswordparameters in the sample.
 e.g.,- jdbc.url-- jdbc:mysql://localhost:3306/sweetFactoryDB
- username-- root
- password-- root
 
- In the Streaming Integrator Tooling, save the sample Siddhi application.
Executing the Sample¶
To execute the sample Siddhi application, open it in Streaming Integrator Tooling and click the Start button (shown below) or click Run => Run.

If the Siddhi application starts successfully, the following message appears in the console.
AggregateDataIncrementally.siddhi - Started Successfully!.
Testing the Sample¶
To test the sample Siddhi application, simulate single events for it via the Streaming Integrator Tooling as follows:
- 
To open the Event Simulator, click the Event Simulator icon.  This opens the event simulation panel. 
- 
To simulate events for the RawMaterialStreamstream of theAggregateDataIncrementallySiddhi application, enter information in the Single Simulation tab of the event simulation panel as follows.Field Value Siddhi App Name AggregateDataIncrementallyStreamName RawMaterialStream As a result, the attributes of the RawMaterialStreamstream appear as marked in the image above.
- 
Send four events by entering values as shown below. Click Send after each event. - 
Event 1 - 
name: chocolate cake
- 
amount: 100 
 
- 
- 
Event 2 - 
name: chocolate cake
- 
amount: 200 
 
- 
- 
Event 3 - 
name: chocolate ice cream
- 
amount: `50 
 
- 
- 
Event 4 - 
name: chocolate ice cream
- 
amount: 150
 
- 
 
- 
- 
In the StreamName field, select TriggerStream*. In the triggerId field, enter 1as the trigger ID, and then click Send.
Viewing the Results:¶
The input and the corresponding output is displayed in the console as follows.
Info
The timestamp displayed is different because it is derived based on the time at which you send the events.
    INFO {io.siddhi.core.stream.output.sink.LogSink} - AggregateDataIncrementally : RawMaterialStatStream : [Event{timestamp=1513612116450, data=[1537862400000, chocolate ice cream, 100.0], isExpired=false}, Event{timestamp=1513612116450, data=[chocolate cake, 150.0], isExpired=false}]
    [INFO {io.siddhi.core.stream.output.sink.LogSink} - AggregateDataIncrementally : RawMaterialStatStream : [Event{timestamp=1513612116450, data=[1537862400000, chocolate ice cream, 100.0], isExpired=false}, Event{timestamp=1513612116450, data=[chocolate cake, 150.0], isExpired=false}]Click here to view the sample Siddhi application.
gi
@App:name("AggregateDataIncrementally")
@App:description('Aggregates values every second until year and gets statistics')
define stream RawMaterialStream (name string, amount double);
@sink(type ='log')
define stream RawMaterialStatStream (AGG_TIMESTAMP long, name string, avgAmount double);
@store( type="rdbms",
        jdbc.url="jdbc:mysql://localhost:3306/sweetFactoryDB",
        username="root",
        password="root",
        jdbc.driver.name="com.mysql.jdbc.Driver")
define aggregation stockAggregation
from RawMaterialStream
select name, avg(amount) as avgAmount, sum(amount) as totalAmount
group by name
aggregate every sec...year;
define stream TriggerStream (triggerId string);
@info(name = 'query1')
from TriggerStream as f join stockAggregation as s
within "2016-06-06 12:00:00 +05:30", "2020-06-06 12:00:00 +05:30"
per 'hours'
select AGG_TIMESTAMP, s.name, avgAmount
insert into RawMaterialStatStream;