Performing Real-time ETL with Files¶
Introduction¶
The Streaming Integrator (SI) allows you to perform real-time ETL with data that is stored in files.
This tutorial takes you through the different modes and options you could use, in order to perform real-time ETL with files using the SI.
Before you begin:
- Start the SI server by navigating to the
<SI_HOME>/bin
directory and issuing one of the following commands as appropriate, based on your operating system:
- For Windows:
server.bat --run
- For Linux:
sh server.sh
The following log appears in the Streaming Integrator console once you have successfully started the server.
INFO {org.wso2.carbon.kernel.internal.CarbonStartupHandler} - ESB Streaming Integrator started in 4.240 sec
- For Windows:
Extracting data from a file¶
In this section of the tutorial, you are exploring the different ways in which you could extract data from a specific file.
Tailing a text file line by line¶
In this scenario, you are tailing a text file, line by line, in order to extract data from it. Each line is extracted as an event that undergoes a simple transformation thereafter. Let's write a simple Siddhi application to do this.
-
Download
productions.csv
file from here and save it in a location of your choice. -
Open a text file and copy-paste following Siddhi application to it.
@App:name('TailFileLineByLine') @App:description('Tails a file line by line and does a simple transformation.') @source(type='file', mode='LINE', file.uri='file:/Users/foo/productions.csv', tailing='true', @map(type='csv')) define stream SweetProductionStream (name string, amount double); @sink(type = 'log') define stream LogStream (name string, amount double); from SweetProductionStream select str:upper(name) as name, amount insert into LogStream;
Change the value of the
file.uri
parameter in the above Siddhi application to the file path to which you downloadedproductions.csv
file in step 1. -
Save this file as
TailFileLineByLine.siddhi
in the<SI_HOME>/wso2/server/deployment/siddhi-files
directory.Info
This Siddhi application tails the file
productions.csv
line by line. Each line is converted to an event in theSweetProductionStream
stream. After that, a simple transformation is carried out for the sweet production runs. The transformation involves converting the value for thename
attribute to upper case. Finally, the output is logged in the Streaming Integrator console.Upon successful deployment, following log appears in the SI console:
4. To install the extensions required for theINFO {org.wso2.carbon.streaming.integrator.core.internal.StreamProcessorService} - Siddhi App TailFileLineByLine deployed successfully
TailFileLineByLine
Siddhi application you deployed, open a new terminal window and navigate to the<SI_HOME>/bin
directory and issue one of the following commands as appropriate, based on your operating system:
- For Windows:extension-installer.bat
- For Linux:sh extension-installer.sh
-
Now the Siddhi application starts to process the
productions.csv
file. The file contains the following entries.Almond cookie,100.0 Baked alaska,20.0
As a result, the following log appears in the SI console:
INFO {io.siddhi.core.stream.output.sink.LogSink} - ReceiveEventsFromFile : LogStream : Event{timestamp=1564490830652, data=[ALMOND COOKIE, 100.0], isExpired=false} INFO {io.siddhi.core.stream.output.sink.LogSink} - ReceiveEventsFromFile : LogStream : Event{timestamp=1564490830657, data=[BAKED ALASKA, 20.0], isExpired=false}
-
Now append the following line to
productions.csv
file and save the file.Cup cake,300.0
-
The following log appears in the SI console:
INFO {io.siddhi.core.stream.output.sink.LogSink} - ReceiveEventsFromFile : LogStream : Event{timestamp=1564490869579, data=[CUP CAKE, 300.0], isExpired=false}
Tailing a text file using a regular expression¶
In this scenario, you are using a regular expression to extract data from the file. After data is extracted, a simple transformation is performed on them. Finally, the transformed event is logged in the SI console. Let's write a simple Siddhi application to do this.
-
Download
noisy_data.txt
file from here and save it in a location of your choice. -
Open a text file and copy-paste following Siddhi application to it.
@App:name('TailFileRegex') @App:description('Tails a file using a regex and does a simple transformation.') @source(type='file', mode='REGEX', file.uri='file:/Users/foo/noisy_data.txt', begin.regex='\<', end.regex='\>', tailing='true', @map(type='text', fail.on.missing.attribute = 'false', regex.A='(\w+)\s([-0-9]+)',regex.B='volume\s([-0-9]+)', @attributes(symbol = 'A[1]',price = 'A[2]',volume = 'B'))) define stream StockStream (symbol string, price float, volume long); @sink(type = 'log') define stream LogStream (symbol string, price float, volume long); from StockStream[NOT(symbol is null)] select str:upper(symbol) as symbol, price, volume insert into LogStream;
Change the value of the
file.uri
parameter in the above Siddhi application to the file path to which you downloadednoisy_data.txt
file in step 1. -
Save this file as
TailFileRegex.siddhi
in the<SI_HOME>/wso2/server/deployment/siddhi-files
directory.Info
This Siddhi application tails the
noisy_data.txt
file to find matches according to the regular expressions given:begin.regex
andend.regex
. Each match is converted to an event in theStockStream
stream. After that, a simple transformation is carried out on theStockStream
stream where the value for thesymbol
attribute from the event is converted to upper case. Finally, the output is logged in the SI console.Once the Siddhi application is successfully deployed, the following log appears in the SI console.
INFO {org.wso2.carbon.streaming.integrator.core.internal.StreamProcessorService} - Siddhi App TailFileRegex deployed successfully
-
Now the Siddhi application starts to process the
noisy_data.txt
file.As a result, the following log appears in the SI console:
INFO {io.siddhi.core.stream.output.sink.LogSink} - TailFileRegex : LogStream : Event{timestamp=1564583307974, data=[WSO2, 75.0, 100], isExpired=false} INFO {io.siddhi.core.stream.output.sink.LogSink} - TailFileRegex : LogStream : Event{timestamp=1564583307975, data=[ORCL, 95.0, 200], isExpired=false}
-
Now append the following text to
noisy_data.txt
file and save the file.IBM <ibm 88 volume 150> 1 New Orchard Rd Armonk, NY 10504 Phone Number: (914) 499-1900 Fax Number: (914) 765-6021
-
The following log appears in the SI console:
INFO {io.siddhi.core.stream.output.sink.LogSink} - TailFileRegex : LogStream : Event{timestamp=1564588585214, data=[IBM, 88.0, 150], isExpired=false}
Reading a remote text file and moving it after processing¶
In the previous scenarios, you tailed a file and each file generated multiple events. In this scenario, you are reading the complete file to build a single event.
Furthermore, to try out the capability of processing remote files, you are processing a remote file instead of a file located in the local file system.
-
Download
portfolio.txt
file from here and upload it into an FTP server. -
Create a directory on the FTP server. The
portfolio.txt
file is moved to this folder after the processing is complete. -
Open a text file and copy-paste following Siddhi application to it.
@App:name('TextFullFileProcessing') @App:description('Reads a text file and moves it after processing.') @source(type='file', mode='TEXT.FULL', file.uri="ftp://<username>:<password>@<ftp_hostname>:<ftp_port>/Users/foo/portfolio.txt", action.after.process='MOVE', move.after.process="ftp://<username>:<password>@<ftp_hostname>:<ftp_port>/Users/foo/move.after.process", @map(type='json', enclosing.element="$.portfolio", @attributes(symbol = "stock.company.symbol", price = "stock.price", volume = "stock.volume"))) define stream StockStream (symbol string, price float, volume long); @sink(type = 'log') define stream LogStream (symbol string, price float, volume long); from StockStream select str:upper(symbol) as symbol, price, volume insert into LogStream;
Change the value of the
file.uri
parameter in the above Siddhi application to the remote file path to which you uploaded theportfolio.txt
file in step 1. In addition to that, changemove.after.process
so that it points to the remote folder you created in step 2. When configuring both of the above parameters, change the values for<username>
,<password>
,<ftp_hostname>
, and<ftp_port>
parameters accordingly. -
Save this file as
TextFullFileProcessing.siddhi
in the<SI_HOME>/wso2/server/deployment/siddhi-files
directory.Info
This Siddhi application reads the complete
portfolio.txt
remote file to create aStockStream
event. After that, a simple transformation is carried out on theStockStream
stream where the value for thesymbol
attribute in each event is converted ito upper case. Finally, the output is logged in the SI console.Once the Siddhi application is successfully deployed, following log appears in the SI console:
INFO {org.wso2.carbon.streaming.integrator.core.internal.StreamProcessorService} - Siddhi App TextFullFileProcessing deployed successfully
-
Now the Siddhi application starts to process the
portfolio.txt
file.As a result, the following log appears in the SI console:
INFO {io.siddhi.core.stream.output.sink.LogSink} - TextFullFileProcessing : LogStream : Event{timestamp=1564660443519, data=[WSO2, 55.6, 100], isExpired=false}
Info
In this scenario, you moved the file after processing. To delete a file after processing, remove the action.after.process
and move.after.process
parameters from the Siddhi application. For other configuration options, see Siddhi File Source documentation.
Reading a binary file and moving it after processing¶
In the previous scenarios, you processed text files in order to extract data. In this scenario, you are reading a binary file. The content of the file generates a single event.
-
Download
wso2.bin
file from here and save it in a location of your choice. -
Open a text file and copy-paste following Siddhi application to it.
@App:name('BinaryFullFileProcessing') @App:description('Reads a binary file and moves it after processing.') @source(type='file', mode='TEXT.FULL', file.uri='file:/Users/foo/wso2.bin', action.after.process='MOVE', move.after.process='file:/Users/foo/move.after.process', @map(type='json', enclosing.element="$.portfolio", @attributes(symbol = "stock.company.symbol", price = "stock.price", volume = "stock.volume"))) define stream StockStream (symbol string, price float, volume long); @sink(type = 'log') define stream LogStream (symbol string, price float, volume long); from StockStream select str:upper(symbol) as symbol, price, volume insert into LogStream;
In the above Siddhi application, change the value for the
file.uri
parameter to the file path to which you downloaded thewso2.bin
file in step 1. -
Save this file as
BinaryFullFileProcessing.siddhi
in the<SI_HOME>/wso2/server/deployment/siddhi-files
directory.Info
This Siddhi application reads the file
wso2.bin
fully to create aStockStream
event. After that, a simple transformation is carried out for theStockStream
stream where the value for thesymbol
attribute is converted to upper case. Finally, the output is logged in the SI console.Once the Siddhi application is successfully deployed, following log appears in the SI console.
INFO {org.wso2.carbon.streaming.integrator.core.internal.StreamProcessorService} - Siddhi App BinaryFullFileProcessing deployed successfully
-
Now the Siddhi application starts to process the
wso2.bin
file.As a result, the following log appears in the SI console:
INFO {io.siddhi.core.stream.output.sink.LogSink} - BinaryFullFileProcessing : LogStream : Event{timestamp=1564660553623, data=[WSO2, 55.6, 100], isExpired=false}
Reading a file line by line and deleting it after processing¶
In this scenario, you are reading a text file completely, and then deleting it after processing. In other words, the file is not tailed. You read the file line by line where each line generates an event.
-
Download
productions.csv
file from here and save it in a location of your choice. -
Open a text file and copy-paste following Siddhi application to it.
@App:name('ReadFileLineByLine') @App:description('Reads a file line by line and does a simple transformation.') @source(type='file', mode='LINE', file.uri='file:/Users/foo/productions.csv', tailing='false', @map(type='csv')) define stream SweetProductionStream (name string, amount double); @sink(type = 'log') define stream LogStream (name string, amount double); from SweetProductionStream select str:upper(name) as name, amount insert into LogStream;
In the above Siddhi application, change the value for the
file.uri
parameter to the file path to which you downloaded theproductions.csv
file in step 1. -
Save this file as
ReadFileLineByLine.siddhi
in the<SI_HOME>/wso2/server/deployment/siddhi-files
directory.Info
This Siddhi application tails the
productions.csv
file line by line. Each line is converted to an event in theSweetProductionStream
stream. After that, a simple transformation is carried out for the sweet production runs where the value for thename
attribute from the event is converted into upper case. Finally, the output is logged in the SI console.Once the Siddhi application is successfully deployed, the following log appears in the SI console:
INFO {org.wso2.carbon.streaming.integrator.core.internal.StreamProcessorService} - Siddhi App ReadFileLineByLine deployed successfully
-
Now the Siddhi application starts to process the
productions.csv
file. The file has below two entries:Almond cookie,100.0 Baked alaska,20.0
As a result, the following log appears in the SI console:
INFO {io.siddhi.core.stream.output.sink.LogSink} - ReadFileLineByLine : LogStream : Event{timestamp=1564490867341, data=[ALMOND COOKIE, 100.0], isExpired=false} INFO {io.siddhi.core.stream.output.sink.LogSink} - ReadFileLineByLine : LogStream : Event{timestamp=1564490867341, data=[BAKED ALASKA, 20.0], isExpired=false}
-
Note that
productions.csv
file is not present in thefile.uri
location. -
Next, create a new
productions.csv
file in thefile.uri
location that includes the latest set of productions. Downloadproductions.csv
file from here and save it in thefile.uri
location. -
Now the Siddhi application starts to process the new set of production runs in the
productions.csv
file. The file has the following two entries.Cup cake,300.0 Doughnut,500.0
As a result, the following log appears in the SI console:
INFO {io.siddhi.core.stream.output.sink.LogSink} - ReadFileLineByLine : LogStream : Event{timestamp=1564902130543, data=[CUP CAKE, 300.0], isExpired=false} INFO {io.siddhi.core.stream.output.sink.LogSink} - ReadFileLineByLine : LogStream : Event{timestamp=1564902130543, data=[DOUGHNUT, 500.0], isExpired=false}
Reading a file using a regular expression and deleting it after processing¶
In this scenario, you are using a regular expression to extract data from the content of the file. Here, you do not tail the file. Instead, you read the full content of the file and generate a single event. After this is done, the file is deleted. To generate an event stream, you can keep re-creating the file with new data.
-
Download
noisy_data.txt
file from here and save it in a location of your choice. -
Open a text file and copy-paste following Siddhi application to it.
@App:name('ReadFileRegex') @App:description('Reads a file using a regex and does a simple transformation.') @source(type='file', mode='REGEX', file.uri='file:/Users/foo/noisy_data.txt', begin.regex='\<', end.regex='\>', tailing='false', @map(type='text', fail.on.missing.attribute = 'false', regex.A='(\w+)\s([-0-9]+)',regex.B='volume\s([-0-9]+)', @attributes(symbol = 'A[1]',price = 'A[2]',volume = 'B'))) define stream StockStream (symbol string, price float, volume long); @sink(type = 'log') define stream LogStream (symbol string, price float, volume long); from StockStream[NOT(symbol is null)] select str:upper(symbol) as symbol, price, volume insert into LogStream;
In the above Siddhi application, change the value of the
file.uri
parameter to the file path to which you downloaded thenoisy_data.txt
file in step 1. -
Save this file as
ReadFileRegex.siddhi
in the<SI_HOME>/wso2/server/deployment/siddhi-files
directory.Info
This Siddhi application tails the
noisy_data.txt
file to find matches based on thebegin.regex
andend.regex
regular expressions. Each match is converted to an event in theStockStream
stream. After that, a simple transformation is carried out for theStockStream
stream where value for thesymbol
attribute converted to upper case. Finally, the output is logged in the SI console.Once the Siddhi application is successfully deployed, following log appears in the SI console:
INFO {org.wso2.carbon.streaming.integrator.core.internal.StreamProcessorService} - Siddhi App ReadFileRegex deployed successfully
-
Now the Siddhi application starts to process the
noisy_data.txt
file.As a result, the following log appears in the SI console.
INFO {io.siddhi.core.stream.output.sink.LogSink} - ReadFileRegex : LogStream : Event{timestamp=1564906475623, data=[WSO2, 75.0, 100], isExpired=false}
Note that
noisy_data.txt
file is not present in thefile.uri
location. -
Next, let's create a new
noisy_data.txt
file in thefile.uri
location that includes the latest set of productions. Downloadnoisy_data.txt
file from here and save it in thefile.uri
location.Now the Siddhi application starts to process the new content in the
noisy_data.txt
file. The file has the following content.Oracle Corporation <orcl 95 volume 200> 500 Oracle Parkway. Redwood Shores CA, 94065. Corporate Phone: 650.506.7000. HQ-Security: 650.506.5555
As a result, the following log appears in the SI console:
INFO {io.siddhi.core.stream.output.sink.LogSink} - ReadFileRegex : LogStream : Event{timestamp=1564906713176, data=[ORCL, 95.0, 200], isExpired=false}
Extracting data from a folder¶
Processing all files in the folder¶
In this scenario, you extract data from a specific folder. All of the files are processed sequentially, where each file generates a single event.
-
Download
productions.zip
file from here and extract it. Now you have a folder namedproductions
. Place it in a location of your choice. -
Open a text file and copy-paste following Siddhi application to it.
@App:name('ProcessFolder') @App:description('Process all files in the folder and delete files after processing.') @source(type='file', mode='text.full', dir.uri='file:/Users/foo/productions', @map(type='json', enclosing.element="$.portfolio", @attributes(symbol = "stock.company.symbol", price = "stock.price", volume = "stock.volume"))) define stream StockStream (symbol string, price float, volume long); @sink(type = 'log') define stream LogStream (symbol string, price float, volume long); from StockStream select str:upper(symbol) as symbol, price, volume insert into LogStream;
In the above Siddhi application, change the value for the
dir.uri
parameter so that it points to theproductions
folder you created in step 1. -
Save this file as
ProcessFolder.siddhi
in the<SI_HOME>/wso2/server/deployment/siddhi-files
directory.Info
This Siddhi application processes each file in
productions
folder. Each file generates an event in theStockStream
stream. After that, a simple transformation is carried out for theStockStream
stream where the value for thesymbol
attribute is converted to upper case. Finally, the output is logged in the SI console.Once the Siddhi application is successfully deployed, following log appears in the SI console:
INFO {org.wso2.carbon.streaming.integrator.core.internal.StreamProcessorService} - Siddhi App ProcessFolder deployed successfully
Now the Siddhi application starts to process each file in the
productions
directory.As a result, the following logs appear in the SI console:
INFO {io.siddhi.core.stream.output.sink.LogSink} - ProcessFolder : LogStream : Event{timestamp=1564932255417, data=[WSO2, 75.0, 100], isExpired=false} INFO {io.siddhi.core.stream.output.sink.LogSink} - ProcessFolder : LogStream : Event{timestamp=1564932255417, data=[ORCL, 95.0, 200], isExpired=false} INFO {io.siddhi.core.stream.output.sink.LogSink} - ProcessFolder : LogStream : Event{timestamp=1564932255417, data=[IBM, 88.0, 150], isExpired=false}
Info
In this scenario, you deleted each file in the folder after processing. You can choose to move the files instead of deleting them. To do this, set the action.after.process
parameter to MOVE
and specify the directory to which the files should be moved via the move.after.process
parameter. For more information about these parameters, see Siddhi File Source documentation.
Loading data into a file¶
In this section of the tutorial, you are exploring the different ways in which you could load data into a file.
Appending or over-writing events to a file¶
In this scenario, you are appending a stream of events to the end of a file.
-
Open a text file and copy-paste following Siddhi application to it.
@App:name('AppendToFile') @App:description('Append incoming events in to a file.') @Source(type = 'http', receiver.url='http://localhost:8006/SweetProductionStream', basic.auth.enabled='false', @map(type='json')) define stream SweetProductionStream (name string, amount double); @sink(type='file', @map(type='json'), file.uri='/Users/foo/low_productions.txt') define stream LowProductionStream (name string, amount double); -- Query to filter productions which have amount < 500.0 @info(name='query1') from SweetProductionStream[amount < 500.0] select * insert into LowProductionStream;
Create an empty file and specify the location of the file as the value for the
file.uri
parameter. If this file does not exist, it is created at runtime. -
Save this file as
AppendToFile.siddhi
in the<SI_HOME>/wso2/server/deployment/siddhi-files
directory.Info
This Siddhi application filters incoming
SweetProductionStream
events, selects the production runs of which the value for theamount
attribute is less than500.0
, and inserts the results into theLowProductionStream
. Finally, all the events in theLowProductionStream
events are appended to the file specified via thefile.uri
parameter in the Siddhi application.Once the Siddhi application is successfully deployed, the following log appears in the SI console:
INFO {org.wso2.carbon.streaming.integrator.core.internal.StreamProcessorService} - Siddhi App AppendToFile deployed successfully
-
To insert a few events into
SweetProductionStream
, let's issue the followingCURL
commands:curl -X POST -d "{\"event\": {\"name\":\"Almond cookie\",\"amount\": 100.0}}" http://localhost:8006/SweetProductionStream --header "Content-Type:application/json"
curl -X POST -d "{\"event\": {\"name\":\"Baked alaska\",\"amount\": 20.0}}" http://localhost:8006/SweetProductionStream --header "Content-Type:application/json"
curl -X POST -d "{\"event\": {\"name\":\"Cup cake\",\"amount\": 300.0}}" http://localhost:8006/SweetProductionStream --header "Content-Type:application/json"
-
Now open the file that you specified via the
file.uri
parameter. Note that the file has following content.{"event":{"name":"Almond cookie","amount":100.0}} {"event":{"name":"Baked alaska","amount":20.0}} {"event":{"name":"Cup cake","amount":300.0}}
Info
Instead of appending each event to the end of the file, you can configure your Siddhi application to over-write the file. To do this, set the append='false'
configuration in the Siddhi application as shown in the sample file
sink configuration below.
@sink(type='file', append='false', @map(type='json'), file.uri='/Users/foo/low_productions.txt')
define stream LowProductionAlertStream (name string, amount double);
For other configuration options, see Siddhi File Sink documentation.
Preserving the state of the application through a system failure¶
Let's try out a scenario where you deploy a Siddhi application to count the total number of production runs of a sweet factory.
The production data is updated in a file and therefore you have to keep tailing this file, in order to get updates on the productions.
Info
In this scenario, the Streaming Integrator server needs to remember the current count through system failures so that when the system is restored, the count is not reset to zero. To achieve this, you can use the state persistence capability in the Streaming Integrator.
-
Enable state persistence feature in SI server as follows. Open the
<SI_HOME>/conf/server/deployment.yaml
file on a text editor and locate thestate.persistence
section.# Periodic Persistence Configuration state.persistence: enabled: true intervalInMin: 1 revisionsToKeep: 2 persistenceStore: org.wso2.carbon.streaming.integrator.core.persistence.FileSystemPersistenceStore config: location: siddhi-app-persistence
Set
enabled
parameter totrue
and save the file. -
To enable the state persistence debug logs, open the
<SI_HOME>/conf/server/log4j2.xml
file on a text editor and locate following line in it.<Logger name="com.zaxxer.hikari" level="error"/>
Then add following
<Logger>
element below that line.<Logger name="org.wso2.carbon.streaming.integrator.core.persistence" level="debug"/>
Save the file.
-
Restart the Streaming Integrator server for above change to be effective.
-
Download
productions.csv
file from here and save it in a location of your choice. -
Open a text file and copy-paste following Siddhi application to it.
@App:name('CountProductions') @App:description('Siddhi application to count the total number of orders.') @source(type='file', mode='LINE', file.uri='file:/Users/foo/productions.csv', tailing='true', @map(type='csv')) define stream SweetProductionStream (name string, amount double); @sink(type = 'log') define stream LogStream (totalProductions double); -- Following query counts the number of sweet productions. @info(name = 'query') from SweetProductionStream select sum(amount) as totalProductions insert into LogStream;
Change the
file.uri
parameter in the above Siddhi application to the file path to which you downloaded theproductions.csv
file in step 4. -
Save this file as
CountProductions.siddhi
in the<SI_HOME>/wso2/server/deployment/siddhi-files
directory.Info
This Siddhi application tails the file
productions.csv
line by line. Each line is converted to an event in theSweetProductionStream
stream. After that, a simple transformation is carried out for the sweet production runs. This transformation involves converting the value for thename
attribute to upper case. Finally, the output is logged in the SI console.Once the Siddhi application is successfully deployed, the following log appears in the SI console.
INFO {org.wso2.carbon.streaming.integrator.core.internal.StreamProcessorService} - Siddhi App CountProductions deployed successfully
-
Now the Siddhi application starts to process the
productions.csv
file. The file two entries as follows.Almond cookie,100.0 Baked alaska,20.0
As a result, the following log appears in the SI console.
INFO {io.siddhi.core.stream.output.sink.LogSink} - CountProductions : LogStream : Event{timestamp=1565097506866, data=[100.0], isExpired=false} INFO {io.siddhi.core.stream.output.sink.LogSink} - CountProductions : LogStream : Event{timestamp=1565097506866, data=[120.0], isExpired=false}
These logs print the sweet production count. Note that the current count of sweet productions is being printed as
120
in the second log. This is because the factory has so far produced120
sweets:100
Almond cookies and20
Baked alaskas. -
Now wait for following log to appear in the SI console.
DEBUG {org.wso2.carbon.streaming.integrator.core.persistence.FileSystemPersistenceStore} - Periodic persistence of CountProductions persisted successfully
This log indicates that the current state of the Siddhi application is successfully persisted. The Siddhi application state is persisted every minute. Therefore, you can see this log appearing every minute.
Next, let's append two sweet production entries into the
productions.csv
file and shutdown the SI server before the state persistence happens (i.e., before the above log appears).Tip
It is better to start appending the records immediately after the state persistence log appears so that you have plenty of time to append the records and shutdown the server before next log appears.
-
Now append following content into the
productions.csv
file:Croissant,100.0 Croutons,100.0
-
Shutdown SI server. Here you are deliberately creating a scenario where the server crashes before the SI server could persist the latest production count.
Info
Here, the SI server crashes before the state is persisted. Therefore, the Streaming Integrator server cannot persist the latest count (which includes the last two production runs that produced
100
Croissants and100
Croutons). The good news is, theFile source
source replays the last two messages, allowing the Streaming Integrator to successfully recover from the server crash. -
Restart the SI server and wait for about one minute.
The following log appears in the SI console:
INFO {io.siddhi.core.stream.output.sink.LogSink} - CountProductions : LogStream : Event{timestamp=1565097846807, data=[220.0], isExpired=false} INFO {io.siddhi.core.stream.output.sink.LogSink} - CountProductions : LogStream : Event{timestamp=1565097846812, data=[320.0], isExpired=false}
Note that the File source
has replayed the last two messages. This indicates that the sweet productions count has been correctly restored.