Joining Streaming Data with Stored Data in RDBMS¶
Purpose:¶
This application demonstrates how to perform join on streaming data with the data stored in RDBMS. The sample depicts a scenario, where a transaction by a credit card with which fraudulent activity has been previously done. The credit card numbers, which were noted for fraudulent activities are stored in an RDBMS table.
Prerequisites:¶
- Ensure that MySQL is installed on your machine
- Create a database named 'fraudDB' in MySQL. This database is referred to with 'jdbc:mysql://localhost:3306/fraudDB' url.
- Create a table named 'FraudTable': CREATE TABLE FraudTable (creditCardNo VARCHAR(20));
- Insert some values to the table : INSERT INTO FraudTable VALUES ("143-90099-23433");
- In the store configuration of this application, replace 'username' and 'password' values with your MySQL credentials
- Save this sample
Executing the Sample:¶
- Start the Siddhi application by clicking on 'Run'
- If the Siddhi application starts successfully, the following messages would be shown on the console
- JoinWithStoredData.siddhi - Started Successfully!
Testing the Sample:¶
- Simulate single events. For this, click on 'Event Simulator' (double arrows on left tab) -> 'Single Simulation' -> Select 'JoinWithStoredData' as 'Siddhi App Name' -> Select 'TradeStream' as 'Stream Name' -> Provide attribute values -> Send
- Send at-least one event with the single event simulator, where the creditCardNo matches a creditCardNo value in the data we previously inserted to the FraudTable. This would satisfy the 'on' condition of our join query
Viewing the Results:¶
See the output for suspicious trades on the console.
@App:name("JoinWithStoredData")
@App:description('Join streaming data with data stored in an RDBMS table')
@Store(type="rdbms",
jdbc.url="jdbc:mysql://localhost:3306/fraudDB",
username="root",
password="root" ,
jdbc.driver.name="com.mysql.jdbc.Driver")
@PrimaryKey("creditCardNo")
define table FraudTable (creditCardNo string);
define stream TradeStream(creditCardNo string, trader string, tradeInfo string);
@sink(type='log')
define stream SuspiciousTradeStream(creditCardNo string, suspiciousTrader string, suspiciousInfoTrade string);
--Perform a join on credit card number, to capture transactions with credit cards that have previously been used for fraudulent activity
@info(name='query1')
from TradeStream as t join FraudTable as f
on t.creditCardNo == f.creditCardNo
select t.creditCardNo, t.trader as suspiciousTrader, t.tradeInfo as suspiciousInfoTrade
insert into SuspiciousTradeStream;
Top