Kafka Inbound Endpoint Example¶
The Kafka inbound endpoint acts as a message consumer. It creates a connection to ZooKeeper and requests messages for either a topic/s or topic filters.
What you'll build¶
This sample demonstrates how one way message bridging from Kafka to HTTP can be done using the inbound Kafka endpoint. See Configuring Kafka connector for more information.
The following diagram illustrates all the required functionality of the Kafka service that you are going to build. In this example, you only need to consider about the scenario of message consuming.
If you do not want to configure this yourself, you can simply get the project and run it.
Set up Kafka¶
Before you begin, set up Kafka by following the instructions in Setting up Kafka.
Configure inbound endpoint using ESB Integration Studio¶
-
Download ESB Integration Studio. Create an Integration Project as below.
-
Right click on Source -> main -> synapse-config -> inbound-endpoints and add a new custom inbound endpoint.
-
Click on Inbound Endpoint in the design view and under the
properties
tab, update the class name toorg.wso2.carbon.inbound.kafka.KafkaMessageConsumer
. -
Navigate to the source view and update it with the following configuration as required.
<?xml version="1.0" encoding="UTF-8"?>
<inboundEndpoint name="KAFKAListenerEP" sequence="kafka_process_seq" onError="fault" class="org.wso2.carbon.inbound.kafka.KafkaMessageConsumer" suspend="false" xmlns="http://ws.apache.org/ns/synapse">
<parameters>
<parameter name="sequential">true</parameter>
<parameter name="interval">10</parameter>
<parameter name="coordination">true</parameter>
<parameter name="inbound.behavior">polling</parameter>
<parameter name="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
<parameter name="topic.name">test</parameter>
<parameter name="poll.timeout">100</parameter>
<parameter name="bootstrap.servers">localhost:9092</parameter>
<parameter name="group.id">hello</parameter>
<parameter name="contentType">application/json</parameter>
<parameter name="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
</parameters>
</inboundEndpoint>
Sequence to process the message:
In this example for simplicity we will just log the message, but in a real world use case, this can be any type of message mediation.
<?xml version="1.0" encoding="ISO-8859-1"?>
<sequence xmlns="http://ws.apache.org/ns/synapse" name="kafka_process_seq">
<log level="full"/>
<log level="custom">
<property xmlns:ns="http://org.apache.synapse/xsd" name="partitionNo" expression="get-property('partitionNo')"/>
</log>
<log level="custom">
<property xmlns:ns="http://org.apache.synapse/xsd" name="messageValue" expression="get-property('messageValue')"/>
</log>
<log level="custom">
<property xmlns:ns="http://org.apache.synapse/xsd" name="offset" expression="get-property('offset')"/>
</log>
</sequence>
Exporting Integration Logic as a CApp¶
CApp (Carbon Application) is the deployable artefact on the integration runtime. Let us see how we can export integration logic we developed into a CApp. To export the Solution Project
as a CApp, a Composite Application Project
needs to be created. Usually, when a solution project is created, this project is automatically created by Integration Studio. If not, you can specifically create it by navigating to File -> New -> Other -> WSO2 -> Distribution -> Composite Application Project.
-
Right click on Composite Application Project and click on Export Composite Application Project.
-
Select an Export Destination where you want to save the .car file.
-
In the next Create a deployable CAR file screen, select inbound endpoint and sequence artifacts and click Finish. The CApp will get created at the specified location provided in the previous step.
Get the project¶
You can download the ZIP file and extract the contents to get the project code.
Deployment¶
-
Navigate to the connector store and search for
Kafka
. Click onKafka Inbound Endpoint
and download the .jar file by clicking onDownload Inbound Endpoint
. Copy this .jar file into/lib folder. -
Copy the exported carbon application to the
/repository/deployment/server/carbonapps folder. -
Start the integration server.
Testing¶
Sample request
Run the following on the Kafka command line to create a topic named test with a single partition and only one replica:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Run the following on the Kafka command line to send a message to the Kafka brokers. You can also use the ESB Kafka Producer connector to send the message to the Kafka brokers.
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Executing the above command will open up the console producer. Send the following message using the console:
{"test":"wso2"}
Expected response
You can see the following Message content in the Micro Integrator:
[2020-02-19 12:39:59,331] INFO {org.apache.synapse.mediators.builtin.LogMediator} - To: , MessageID: d130fb8f-5d77-43f8-b6e0-85b98bf0f8c1, Direction: request, Payload: {"test":"wso2"}
[2020-02-19 12:39:59,335] INFO {org.apache.synapse.mediators.builtin.LogMediator} - partitionNo = 0
[2020-02-19 12:39:59,336] INFO {org.apache.synapse.mediators.builtin.LogMediator} - messageValue = {"test":"wso2"}
[2020-02-19 12:39:59,336] INFO {org.apache.synapse.mediators.builtin.LogMediator} - offset = 6
The Kafka inbound endpoint gets the messages from the Kafka brokers and logs the messages in the Micro Integrator.
Configure inbound endpoint with Kafka Avro message¶
You can setup ESB Micro Integrator inbound endpoint with Kafka Avro messaging format as well. Follow the instructions on Setting up Kafka to setup Kafka on the Micro Integrator. In inbound endpoint XML configurations, change the value.deserializer
parameter to io.confluent.kafka.serializers.KafkaAvroDeserializer
and key.deserializer
parameter to io.confluent.kafka.serializers.KafkaAvroDeserializer
. Add new parameter schema.registry.url
and add schema registry URL in there. The following is the modiefied sample of the Kafka inbound endpoint:
<?xml version="1.0" encoding="UTF-8"?>
<inboundEndpoint name="KAFKAListenerEP" sequence="kafka_process_seq" onError="fault" class="org.wso2.carbon.inbound.kafka.KafkaMessageConsumer" suspend="false" xmlns="http://ws.apache.org/ns/synapse">
<parameters>
<parameter name="sequential">true</parameter>
<parameter name="interval">10</parameter>
<parameter name="coordination">true</parameter>
<parameter name="inbound.behavior">polling</parameter>
<parameter name="value.deserializer">io.confluent.kafka.serializers.KafkaAvroDeserializer</parameter>
<parameter name="topic.name">test</parameter>
<parameter name="poll.timeout">100</parameter>
<parameter name="bootstrap.servers">localhost:9092</parameter>
<parameter name="group.id">hello</parameter>
<parameter name="contentType">text/plain</parameter>
<parameter name="key.deserializer">io.confluent.kafka.serializers.KafkaAvroDeserializer</parameter>
<parameter name="schema.registry.url">http://localhost:8081/</parameter>
</parameters>
</inboundEndpoint>
Add following configs when the Confluent Schema Registry is secured with basic auth,
<parameter name="basic.auth.credentials.source">source_of_basic_auth_credentials</parameter>
<parameter name="basic.auth.user.info">username:password</parameter>
Make sure to start Kafka Schema Registry before starting up the Micro Integrator.
What's next¶
- To customize this example for your own scenario, see Kafka Connector Configuration documentation.