Siddhi Query API¶
Introduction¶
Siddhi query API is the REST API exposed by the Streaming Integrator (SI). It gives you a set of APIs to perform all of the essential operations relating to Siddhi applications, including developing, testing, and querying them.
Siddhi query API provides APIs related to: - Siddhi application management (such as creating, updating, deleting a Siddhi application; listing all running Siddhi applications etc.) - Event simulation - Authentication and permission management - Health check - Siddhi store operations
For a comprehensive reference on the Siddhi query API, see Streaming Integration REST API Guide.
This tutorial demonstrates how you can use the Siddhi query API to perform essential operations in ESB Streaming Integrator using simple examples.
Before you begin¶
-
You need to have access to a MySQL instance.
-
Save the MySQL JDBC driver in the
<SI_HOME>/lib
directory as follows: -
Download the MySQL JDBC driver from the MySQL site.
-
Unzip the archive.
-
Copy the
mysql-connector-java-5.1.45-bin.jar
to the<SI_HOME>/lib
directory. -
Start the SI server.
Step 1: Prepare the server¶
-
Let's create a new database in the MySQL server that you will be using throughout this tutorial. To do this, execute the following query.
CREATE SCHEMA production;
-
Create a new user by executing the following SQL query.
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'wso2si' IDENTIFIED BY 'wso2';
-
Switch to the
production
database and create a new table, by executing the following queries:use production;
CREATE TABLE SweetProductionTable (name VARCHAR(20),amount double(10,2));
Step 2: Create a Siddhi application¶
-
Open a text file and copy-paste following application to it.
@App:name("SweetProduction-Store") @App:description('Receive events via HTTP and persist the received data in the store.') @Source(type = 'http', receiver.url='http://localhost:8006/productionStream', basic.auth.enabled='false', @map(type='json')) define stream insertSweetProductionStream (name string, amount double); @Store(type="rdbms", jdbc.url="jdbc:mysql://localhost:3306/production?useSSL=false", username="wso2si", password="wso2" , jdbc.driver.name="com.mysql.jdbc.Driver") define table SweetProductionTable (name string, amount double); from insertSweetProductionStream update or insert into SweetProductionTable on SweetProductionTable.name == name;
Here the
jdbc.url
parameter has the valuejdbc:mysql://localhost:3306/production?useSSL=false
. Change it to point to your MySQL server. Similarly changeusername
andpassword
parameters as well. -
Save this file as
SweetProduction-Store.siddhi
in a location of your choice in the local file system. -
Now you need to execute a
CURL
command and deploy this Siddhi application. In the command line, navigate to the location where you saved the Siddhi application in the previous step, and execute following command:curl -X POST "https://localhost:9443/siddhi-apps" -H "accept: application/json" -H "Content-Type: text/plain" -d @SweetProduction-Store.siddhi -u admin:admin -k
Upon successful deployment, the following response is logged for the CURL
command you just executed.
```
{"type":"success","message":"Siddhi App saved succesfully and will be deployed in next deployment cycle"}
```
In addition to that, the following is logged in the SI console.
```
INFO {org.wso2.carbon.streaming.integrator.core.internal.StreamProcessorService} - Siddhi App SweetProduction-Store deployed successfully
```
-
To trigger an insert/update event, execute following
CURL
command in the console.curl -X POST -d "{\"event\": {\"name\":\"Almond cookie\",\"amount\": 100.0}}" http://localhost:8006/productionStream --header "Content-Type:application/json"
You have written the Siddhi application to insert a new record from the
insertSweetProductionStream
stream into theSweetProductionTable
table, or to update the record if it already exists in the table. As a result, above event is now inserted into theSweetProductionTable
. -
To verify whether above event is inserted into
SweetProductionTable
, execute followingSQL
query in the SQL console.SELECT * FROM SweetProductionTable;
The following table appears to indicate that the record has been inserted into the table.
+---------------+--------+ | name | amount | +---------------+--------+ | Almond cookie | 100.00 | +---------------+--------+
Step 3: Run a Siddhi Store API query¶
You can use Siddhi Store Query API
to execute queries on Siddhi Store tables.
In this section shows you how to execute a simple store query via the REST API in order to fetch all records from the SweetProductionTable
Siddhi Store table. To learn about other types of queries, see Streaming Integrator REST API Guide.
Execute following CURL
command on the console:
curl -k -X POST http://localhost:7070/stores/query -H "content-type: application/json" -u "admin:admin" -d '{"appName" : "SweetProduction-Store", "query" : "from SweetProductionTable select *" }'
The following output is logged in the console:
{"records":[["Almond cookie",100.0]]}
Step 4: Fetch the status of a Siddhi Application¶
Now let's fetch the status of the Siddhi application you just deployed.
Execute following CURL
command, in the console:
curl -X GET "http://localhost:9090/siddhi-apps/SweetProduction-Store/status" -H "accept: application/json" -u admin:admin -k
The following output appears in the command line:
{"status":"active"}
Preserving the state with snapshots¶
This section of the tutorial shows you how to preserve the state by taking snapshots of Siddhi applications.
Step 1: Take a snapshot of a Siddhi Application¶
In this section, deploy a stateful Siddhi application and use the REST API to take a snapshot of it. To do this, follow the procedure below:
-
To enable the state persistence feature in SI server, open the
<SI_HOME>/conf/server/deployment.yaml
file in a text editor and locate thestate.persistence
section, and then update it as follows.# Periodic Persistence Configuration state.persistence: enabled: true intervalInMin: 5 revisionsToKeep: 2 persistenceStore: org.wso2.carbon.streaming.integrator.core.persistence.FileSystemPersistenceStore config: location: siddhi-app-persistence
As shown above, set
enabled
parameter totrue
and set theintervalInMin
to5
. Then save the file. -
Restart the Streaming Integrator server for the above change to be effective.
-
Open a text file and copy-paste following application into it.
@App:name("CountProductions") @App:description("Siddhi application to count the total number of orders.") @Source(type = 'http', receiver.url='http://localhost:8007/productionStream', basic.auth.enabled='false', @map(type='json')) define stream SweetProductionStream (name string, amount double); @sink(type = 'log') define stream LogStream (totalProductions double); @info(name = 'query') from SweetProductionStream select sum(amount) as totalProductions insert into LogStream;
-
Save this file as
CountProductions.siddhi
in a location of your choice in the local file system. -
Now execute a
CURL
command and deploy this Siddhi application. To do this, use the command line to navigate to the location where you saved the Siddhi application in above step, and then execute following command.curl -X POST "https://localhost:9443/siddhi-apps" -H "accept: application/json" -H "Content-Type: text/plain" -d @CountProductions.siddhi -u admin:admin -k
Upon successful deployment, the following response is logged for the CURL
command you just executed.
{"type":"success","message":"Siddhi App saved succesfully and will be deployed in next deployment cycle"}
In addition to that, the following log appears in the SI console.
INFO {org.wso2.carbon.streaming.integrator.core.internal.StreamProcessorService} - Siddhi App CountProductions deployed successfully.
-
Now let's send two sweet production events using
CURL
by issuing the following twoCURL
commands via the command line:curl -X POST -d "{\"event\": {\"name\":\"Almond cookie\",\"amount\": 100.0}}" http://localhost:8007/productionStream --header "Content-Type:application/json"
curl -X POST -d "{\"event\": {\"name\":\"Baked alaska\",\"amount\": 20.0}}" http://localhost:8007/productionStream --header "Content-Type:application/json"
-
As a result, the following logs appears on the SI console:
Note that the current productions count isINFO {org.wso2.siddhi.core.stream.output.sink.LogSink} - CountProductions : LogStream : Event{timestamp=1566288572024, data=[100.0], isExpired=false} INFO {org.wso2.siddhi.core.stream.output.sink.LogSink} - CountProductions : LogStream : Event{timestamp=1566288596336, data=[120.0], isExpired=false}
120
. -
Now you can invoke the Siddhi Query API to take a snapshot of the Siddhi application. To do this, execute following
CURL
command on the command line:curl -X POST "https://localhost:9443/siddhi-apps/CountProductions/backup" -H "accept: application/json" -u admin:admin -k
An output similar to the following is logged.
{"revision":"1566293390654__CountProductions"}
Info
1566293390654__CountProductions
is the revision number of the Siddhi application snapshot that you requested via the REST API. You can store this revision number and later use it in order to restore the Siddhi application to the state at which you took the snapshot.
Step 2: Restore the Siddhi application via the snapshot¶
In the previous section, you took a snapshot of the CountProductions
Siddhi application when the productions count was 120
. In this section, you can increase the count further by sending a few more production events, and then restore the Siddhi application to the state you backed up. To do this, follow the procedure below
- Send following two sweet production events:
curl -X POST -d "{\"event\": {\"name\":\"Cup cake\",\"amount\": 300.0}}" http://localhost:8007/productionStream --header "Content-Type:application/json"
curl -X POST -d "{\"event\": {\"name\":\"Doughnut\",\"amount\": 500.0}}" http://localhost:8007/productionStream --header "Content-Type:application/json"
As a result, the following two lines of log appear in the SI console:
INFO {org.wso2.siddhi.core.stream.output.sink.LogSink} - CountProductions : LogStream : Event{timestamp=1566288572024, data=[420.0], isExpired=false}
INFO {org.wso2.siddhi.core.stream.output.sink.LogSink} - CountProductions : LogStream : Event{timestamp=1566288596336, data=[920.0], isExpired=false}
Note that the current productions count is 920
.
-
Now you can invoke the Siddhi Query API to restore the snapshot that you obtained in step 10 of the Taking a snapshot of a Siddhi Application section of this tutorial.
In this example, the revision number obtained is
1566293390654__CountProductions
(see step 10 in Taking a snapshot of a Siddhi Application section.). When restoring the state, use the exact revision number that you obtained.Note
Replace
1566293390654__CountProductions
with the revision number that you obtained when taking the Siddhi application snapshot.The response you receive is as follows:
{"type":"success","message":"State restored to revision 1566293390654__CountProductions for Siddhi App :CountProductions"}
In addition to that, the following log is printed in the SI console:
INFO {org.wso2.carbon.streaming.integrator.core.persistence.FileSystemPersistenceStore} - State loaded for CountProductions revision 1566293390654__CountProductions from the file system.
-
Now send another sweet production event by executing the following
CURL
command:curl -X POST -d "{\"event\": {\"name\":\"Danish pastry\",\"amount\": 100.0}}" http://localhost:8007/productionStream --header "Content-Type:application/json"
As a result, the following log appears in the SI console:
INFO {org.wso2.siddhi.core.stream.output.sink.LogSink} - CountProductions : LogStream : Event{timestamp=1566293520176, data=[220.0], isExpired=false}
Note that the productions count is 220
. This is because the count was reset to 120
when you restored the snapshot.
Top