Aggregating Data Incrementally

Purpose

This example demonstrates how to get running statistics using Siddhi. The sample Siddhi application aggregates the data relating to the raw material purchases of a sweet production factory.

Before you begin:

  1. Install MySQL.
  2. Add the MySQL JDBC driver to your Streaming Integrator library as follows:
    1. Download the JDBC driver from the MySQL site.
    2. Extract the MySQL JDBC Driver zip file you downloaded. Then use the jarbundle tool in the <SI_TOOLING_HOME>/bin directory to convert the jars in it into OSGi bundles. To do this, issue one of the following commands:
      • For Windows: <SI_TOOLING_HOME>/bin/jartobundle.bat <PATH_OF_DOWNLOADED_JAR> <PATH_OF_CONVERTED_JAR>
      • For Linux: <SI_TOOLING_HOME>/bin/jartobundle.sh <PATH_OF_DOWNLOADED_JAR> <PATH_OF_CONVERTED_JAR
    3. Copy the converted bundles to the <SI_TOOLING_HOME>/lib directory.
  3. Create a data store named sweetFactoryDB in MySQL with relevant access privileges.
  4. Replace the values for the jdbc.url, username, and password parameters in the sample.
    e.g.,
    • jdbc.url - jdbc:mysql://localhost:3306/sweetFactoryDB
    • username - root
    • password - root
  5. In the Streaming Integrator Tooling, save the sample Siddhi application.

Executing the Sample

To execute the sample Siddhi application, open it in Streaming Integrator Tooling and click the Start button (shown below) or click Run => Run.

Start button

If the Siddhi application starts successfully, the following message appears in the console.

AggregateDataIncrementally.siddhi - Started Successfully!.

Testing the Sample

To test the sample Siddhi application, simulate single events for it via the Streaming Integrator Tooling as follows:

  1. To open the Event Simulator, click the Event Simulator icon.

    Event Simulator Icon

    This opens the event simulation panel.

  2. To simulate events for the RawMaterialStream stream of the AggregateDataIncrementally Siddhi application, enter information in the Single Simulation tab of the event simulation panel as follows.

    Field Value
    Siddhi App Name AggregateDataIncrementally
    StreamName RawMaterialStream

    Select Siddhi Application and Stream

    As a result, the attributes of the RawMaterialStream stream appear as marked in the image above.

  3. Send four events by entering values as shown below. Click Send after each event.

    • Event 1

      • name: chocolate cake

      • amount: 100

    • Event 2

      • name: chocolate cake

      • amount: 200

    • Event 3

      • name: chocolate ice cream

      • amount: `50

    • Event 4

      • name: chocolate ice cream

      • amount: 150

  4. In the StreamName field, select TriggerStream*. In the triggerId field, enter 1 as the trigger ID, and then click Send.

Viewing the Results:

The input and the corresponding output is displayed in the console as follows.

Info

The timestamp displayed is different because it is derived based on the time at which you send the events.

    INFO {io.siddhi.core.stream.output.sink.LogSink} - AggregateDataIncrementally : RawMaterialStatStream : [Event{timestamp=1513612116450, data=[1537862400000, chocolate ice cream, 100.0], isExpired=false}, Event{timestamp=1513612116450, data=[chocolate cake, 150.0], isExpired=false}]
    [INFO {io.siddhi.core.stream.output.sink.LogSink} - AggregateDataIncrementally : RawMaterialStatStream : [Event{timestamp=1513612116450, data=[1537862400000, chocolate ice cream, 100.0], isExpired=false}, Event{timestamp=1513612116450, data=[chocolate cake, 150.0], isExpired=false}]
Click here to view the sample Siddhi application.

gi

@App:name("AggregateDataIncrementally")
@App:description('Aggregates values every second until year and gets statistics')

define stream RawMaterialStream (name string, amount double);

@sink(type ='log')
define stream RawMaterialStatStream (AGG_TIMESTAMP long, name string, avgAmount double);
@store( type="rdbms",
        jdbc.url="jdbc:mysql://localhost:3306/sweetFactoryDB",
        username="root",
        password="root",
        jdbc.driver.name="com.mysql.jdbc.Driver")

define aggregation stockAggregation

from RawMaterialStream
select name, avg(amount) as avgAmount, sum(amount) as totalAmount
group by name
aggregate every sec...year;
define stream TriggerStream (triggerId string);
@info(name = 'query1')
from TriggerStream as f join stockAggregation as s
within "2016-06-06 12:00:00 +05:30", "2020-06-06 12:00:00 +05:30"
per 'hours'
select AGG_TIMESTAMP, s.name, avgAmount
insert into RawMaterialStatStream;

Top