Streaming Integrator Connectors Overview¶
ESB Streaming Integrator is powered by Siddhi. Siddhi supports an extension architecture to enhance its functionality by incorporating other libraries in a seamless manner.
Purpose¶
Streaming Integration use cases may require a wide range of functionalities. Extensions are supported because it is not possible to have all this functionality within the Siddhi core. If the functionality covered by the supported extensions have gaps when addressing the requirements of your use cases, you can write a custom extension.
All extensions have a namespace. This is used to identify the relevant extensions together, and to let you specifically call the extension.
Syntax¶
The syntax of an extension is as follows:
<namespace>:<function name>(<parameter>, <parameter>, ... )
The following parameters are configured when referring a script function.
| Parameter | Description |
|---|---|
namespace |
Allows Siddhi to identify the extension without conflict. |
function name |
The name of the function referred. |
parameter |
The function input parameter for function execution. |
Extension Types¶
Siddhi supports the following extension types:
-
Function
For each event, this consumes zero or more parameters as input parameters and returns a single attribute. This can be used to manipulate existing event attributes to generate new attributes like any Function operation. This is implemented by extending
io.siddhi.core.executor.function.FunctionExecutor.e.g.,
math:sin(x)Here, the
sinfunction of math extension returns the sin value for thexparameter. -
Aggregate Function
For each event, this consumes zero or more parameters as input parameters and returns a single attribute with aggregated results. This can be used in conjunction with a window in order to find the aggregated results based on the given window like any Aggregate Function operation.
This is implemented by extending
io.siddhi.core.query.selector.attribute.aggregator.AttributeAggregatorExecutor.e.g.,
custom:std(x)Here, the std aggregate function of the
customextension returns the standard deviation of thexvalue based on its assigned window query. -
Window
This allows events to be collected, generated, dropped, and expired anytime without altering the event format based on the given input parameters, similar to any other Window operator.
This is implemented by extending
io.siddhi.core.query.processor.stream.window.WindowProcessor.e.g.,
custom:unique(key)Here, the
uniquewindow of thecustomextension retains one event for each uniquekeyparameter. -
Stream Function
This allows events to be generated or dropped only during event arrival. It also allows events to be altered by adding one or more attributes to it.
This is implemented by extending
io.siddhi.core.query.processor.stream.function.StreamFunctionProcessor.e.g.,
custom:pol2cart(theta,rho)Here, the
pol2cartfunction of thecustomextension returns all the events by calculating the cartesian coordinatesx&yand adding them as new attributes to the events. -
Stream Processor
This allows events to be collected, generated, dropped, and expired anytime by altering the event format. Altering the event format is done by adding one or more attributes to it based on the given input parameters.
This is implemented by extending
io.siddhi.core.query.processor.stream.StreamProcessor.e.g.,
custom:perMinResults(<parameter>, <parameter>, ...)Here, the
perMinResultsfunction of thecustomextension returns all events by adding one or more attributes to the events based on the conversion logic. Altered events are sent as output every minute regardless of event arrivals. -
Sink
Sinks provide a way to publish Siddhi events to external systems in the preferred data format. Sinks publish events from the streams to external endpoints via multiple transports and in various data formats.
This is implemented by extending
io.siddhi.core.stream.output.sink.Sink.e.g., @sink(type='sink_type', static_option_key1='static_option_value1')
To configure a stream to publish events via a sink, connect the sink configuration to a stream definition by adding the
@sinkannotation with the required parameter values. The sink syntax is as given above. -
Source
Source allows Siddhi to consume events from external systems and map the events to adhere to the associated stream. Sources receive events via multiple transports and in various data formats, and direct them into streams for processing.
This is implemented by extending
io.siddhi.core.stream.input.source.Source.e.g.,
@source(type='source_type', static.option.key1='static_option_value1')To configure a stream that consumes events via a source, connect a source configuration to a stream definition by adding the
@sourceannotation with the required parameter values. The source syntax is as given above. -
Store
You can use the Store extension type to work with data/events stored in various data stores through the table abstraction.
This is implemented by extending
io.siddhi.core.table.record.AbstractRecordTable. -
Script
Scripts allow you to define a function operation that is not provided in Siddhi core or its extension. It is not required to write an extension to define the function logic. Scripts allow you to write functions in other programming languages and execute them within Siddhi queries. Functions defined via scripts can be accessed in queries similar to any other inbuilt function.
This is implemented by extending
io.siddhi.core.function.Script. -
Source Mapper
Each
@sourceconfiguration has a mapping denoted by the@mapannotation that converts the format of incoming messages to Siddhi events. Thetypeparameter of@mapdefines the map type to be used to map the data. The other parameters to be configured depends on the mapper selected. Some of these parameters are optional.This is implemented by extending
io.siddhi.core.stream.output.sink.SourceMapper.e.g.,
@map(type='map_type', static_option_key1='static_option_value1') -
Sink Mapper
Each
@sinkconfiguration has a mapping denoted by the@mapannotation that converts the outgoing Siddhi events to the format of the configured message. Thetypeparameter of@mapdefines the map type to be used to map the data. The other parameters to be configured depends on the mapper selected. Some of these parameters are optional.This is implemented by extending
io.siddhi.core.stream.output.sink.SinkMapper.e.g.,
@map(type='map_type', static_option_key1='static_option_value1')
Example¶
A window extension created with fooas the namespace and unique as the function name can be referred in a Siddhi query as shown below.
from StockExchangeStream[price >= 20]#window.foo:unique(symbol)
select symbol, price
insert into StockQuote
Available Extensions¶
For the complete list of pre-written Siddhi extensions that are currently available, see Siddhi Query Guide - Extensions.
Further References¶
- For instructions to download and install a Siddhi extension, see Downloading and Installing Siddhi Extensions.
- If you want to install/uninstall one or more Siddhi extensions in Streaming Integrator Tooling, see Installing Siddhi Extensions.