Receiving Custom Text Events via Kafka¶
Purpose:¶
This application demonstrates how to configure ESB Streaming Integrator to receive events to the SweetProductionStream
via Kafka transport in Text format using custom mapping and log the events in LowProductionAlertStream
to the output console.
Prerequisites:¶
- Setup Kafka.
- Kafka libs to be added and converted to OSGI from
{KafkaHome}/libs
are as follows.- kafka_2.11-0.10.0.0.jar
- kafka-clients-0.10.0.0.jar
- metrics-core-2.2.0.jar
- scala-library-2.11.8.jar
- zkclient-0.8.jar
- zookeeper-3.4.6.jar
- Add the OSGI converted kafka libs to
{WSO2SIHome}/lib
. - Add the kafka libs to
{WSO2SIHome}/samples/sample-clients/lib
.
- Kafka libs to be added and converted to OSGI from
- Save this sample.
- If there is no syntax error, the following message is shown on the console:
* Siddhi App PublishKafkaInJsonFormat successfully deployed.
Note:¶
To convert Kafka libs to OSGI,
1. Create a folder (e.g.: kafka) and copy Kafka libs to be added from {KafkaHome}/libs
.
2. Create another folder (e.g.: kafka-osgi, This folder will have the libs that converted to OSGI).
3. Navigate to {WSO2SIHome}/bin
and issue the following command.
- For Linux:
./jartobundle.sh <path/kafka> <path/kafka-osgi>
- For Windows:
./jartobundle.bat <path/kafka> <path/kafka-osgi>
4. If converted successfully then for each lib, following messages would be shown on the terminal.
- INFO: Created the OSGi bundle <kafka-lib-name>.jar for JAR file <absolute_path>/kafka/<kafka-lib-name>.jar
5. You can find the OSGi converted libs in kafka-osgi folder. You can copy that to {WSO2SIHome}/lib
.
Executing the Sample:¶
- Navigate to
{KafkaHome}
and start zookeeper node usingbin/zookeeper-server-start.sh config/zookeeper.properties
. - Navigate to
{KafkaHome}
and start kafka server node usingbin/kafka-server-start.sh config/server.properties
. - Start the Siddhi application by clicking on 'Run'.
- If the Siddhi application starts successfully, the following messages are shown on the console:
- ReceiveKafkaInTextFormatWithCustomMapping.siddhi - Started Successfully! - Kafka version : 0.10.0.0 - Kafka commitId : b8642491e78c5a13 - Adding partition 0 for topic: kafka_sample_topic - Adding partitions [0] for topic: kafka_sample_topic - Subscribed for topics: [kafka_sample_topic] - Kafka Consumer thread starting to listen on topic/s: [kafka_sample_topic] with partition/s: [0] - Discovered coordinator 10.100.7.56:9092 (id: 2147483647 rack: null) for group group
Testing the Sample:¶
Navigate to {WSO2SIHome}/samples/sample-clients/kafka-producer
and run ant
command as follows:
ant -Dtype=text -DcustomMapping=true
Viewing the Results:¶
Messages similar to the following would be shown on the console.
- INFO {io.siddhi.core.stream.output.sink.LogSink} - ReceiveKafkaInTextFormatWithCustomMapping: LowProductionAlertStream : Event{timestamp=1513282182570, data=["Cupcake", 1665.0], isExpired=false}
Note:¶
- Stop this Siddhi application, once you are done with the execution.
- Stop Kafka server and Zookeeper server individually by executing Ctrl+C.
@App:name("ReceiveKafkaInTextFormatWithCustomMapping")
@App:description('Receive events via Kafka transport in Text format with custom mapping and view the output on the console')
@source(type='kafka',
topic.list='kafka_sample_topic',
partition.no.list='0',
threading.option='single.thread',
group.id="group",
bootstrap.servers='localhost:9092',
@map(type='text',fail.on.missing.attribute='true', regex.A='(id):(.*)', regex.B='(amount):([-.0-9]+)',
@attributes(id = 'A[2]', amount = 'B[2]')))
define stream SweetProductionStream(id string, amount double);
@sink(type='log')
define stream LowProductionAlertStream(id string, amount double);
@info(name='query1')
from SweetProductionStream
select *
insert into LowProductionAlertStream;
Top