Receiving Data via TCP and Preprocessing

Purpose

This application demonstrates how to receive events via TCP transport and carryout data pre-processing with numerous Siddhi extensions (e.g.. string extension, time extension). For more information on Siddhi extensions please refer to "https://wso2.github.io/siddhi/extensions/". In this sample, a composite ID is obtained using string concatenation and the time format of the incoming event

Prerequisites

  1. Ensure that MySQL is installed on your machine.
  2. Add the MySQL JDBC driver into {WSO2_SI_HOME}/lib as follows:
    1. Download the JDBC driver from: https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.45.tar.gz.
    2. Unzip the archive.
    3. Copy mysql-connector-java-5.1.45-bin.jar to {WSO2_SI_Home}/lib directory.
  3. Create a database named sampleDB in MySQL. This database is referred to with jdbc:mysql://localhost:3306/sampleDB url.
  4. In the store configuration of this application, replace 'username' and 'password' values with your MySQL credentials.
  5. Save this sample.

Executing the Sample

  1. Start the Siddhi application by clicking on 'Run'.
  2. If the Siddhi application starts successfully, the following messages would be shown on the console.
    * Tcp Server started in 0.0.0.0:9892
    * SweetProductionDataPreprocessing.siddhi - Started Successfully!

Testing the Sample

Navigate to {WSO2SIHome}/samples/sample-clients/tcp-client and run the ant command as follows.

ant -Dtype=binary

If you want to publish custom number of events, you need to run ant command as follows.

ant -Dtype=binary -DnoOfEventsToSend=5

Viewing the Results

Check the ProcessedSweetProductionTable created in sampleDB. You would be able to see the pre-processed data written to the table

@App:name("SweetProductionDataPreprocessing")
@App:description('Collect data via TCP transport and pre-process')


@source(type='tcp',
        context='SweetProductionStream',
        port='9892',
        @map(type='binary'))
define stream SweetProductionStream (name string, amount double);

@Store(type="rdbms",
       jdbc.url="jdbc:mysql://localhost:3306/sampleDB",
       username="root",
       password="mysql" ,
       jdbc.driver.name="com.mysql.jdbc.Driver")
@PrimaryKey("compositeID")
define table ProcessedSweetProductionTable (compositeID string, amount double, date string);

--Process smart home data by concatenating the IDs and formatting the time
@info(name='query1')
from SweetProductionStream
select str:concat(str:lower(name), "::", time:currentTimestamp()) as compositeID, amount, time:currentDate() as date
insert into ProcessedSweetProductionTable;
Top