Kafka Inbound

Introduction

The Kafka inbound endpoint provides the functionalities of the Kafka messaging system. Kafka maintains feeds of messages in topics. Producers write data to topics and consumers read from topics. The Kafka inbound endpoint serves as a message consumer by creating a connection to ZooKeeper and requesting messages for a topic, topics, or topic filters.

Properties

Listed below are the properties used for creating an Kafka inbound endpoint.

Required Properties

The following properties are required when creating a Kafka inbound endpoint.

Property Description
zookeeper.connect The host and port of a ZooKeeper server (hostname:port).
consumer.type The consumer configuration type. This can either be simple or highlevel.
interval The polling interval for the inbound endpoint to poll the messages.
coordination If set to true in a clustered setup, this will run the inbound only in a single worker node.

The property is set to true by default.
sequential The behaviour when executing the given sequence.

The property is set to true by default. Set this property to false to use the Kafka inbound in a non-sequential mode as it allows better performance than the sequential mode.
topics The category to feed the messages. A high-level kafka configuration can have more than one topic. You can specify multiple topic names as comma separated values.
content.type The content of the message. The possible values are as follows: appllication/xml or application/json.
group.id

If all the consumer instances have the same consumer group, this works as a traditional queue balancing the load over the consumers.

topic.filter The name of the topic filter.
filter.from.allowlist If this is set to true, messages are consumed from the allowlist(include).
If this is set to false, messages are consumed from the denylist(exclude).
coordination This optional property is only applicable in a cluster environment. In a clustered environment, an inbound endpoint will only be executed in worker nodes. If set to true in a cluster setup, this will run the inbound only in a single worker node. Once the running worker is down, the inbound starts on another available worker in the cluster. By default, coordniation is enabled.
sequential Whether the messages need to be polled and injected sequentially or not.

Optional Properties

The following optional properties can be configured when creating a Kafka inbound endpoint.

Property Name

Description

thread.count The number of threads. The default value is 1.
consumer.id The id of the consumer. The default value is null.
socket.timeout.ms The socket timeout for network requests. The default value is 30 * 1000.
socket.receive.buffer.bytes The socket receive buffer for network requests. The default value is 64 * 1024.
fetch.message.max.bytes The number of bytes of messages that the system should attempt to fetch for each topic-partition in each fetch request. The default values is 1024 * 1024.
num.consumer.fetchers The number fetcher threads used to fetch data. The default value is 1.
auto.commit.enable The committed offset to be used as the position from which the new consumer will begin when the process fails.

The default value is true.
auto.commit.interval.ms The frequency (in miliseconds) at which the consumer offsets are committed to zookeeper.

The default value is 60 * 1000.
queued.max.message.chunks The maximum number of message chunks buffered for consumption. Each chunk can go up to the value specified in fetch.message.max.bytes.

The default value is 2.
rebalance.max.retries The maximum number of retry attempts. The default value is 4.
fetch.min.bytes The minimum amount of data the server should return for a fetch request. The default value is 1.
fetch.wait.max.ms The maximum amount of time the server will stay blocked before responding to the fetch request when sufficient data is not available to immediately serve fetch.min.bytes.

The default value is 100
rebalance.backoff.ms The backoff time between retries during rebalance. The default value is 2000.
refresh.leader.backoff.ms The backoff time to wait before trying to determine the leader of a partition that has just lost its leader.

The default value is 200.
auto.offset.reset

Set this to one of the following values based on what you need to do when there is no initial offset in ZooKeeper or if an offset is out of range.

  • smallest: Automatically reset the offset to the smallest offset.
  • largest: Automatically reset the offset to the largest offset.
  • anything else: Throw an exception to the consumer.
The default values is largest.
consumer.timeout.ms The timeout interval after which a timeout exception is to be thrown to the consumer if no message is available for consumption. It is a good practice to set this value lower than the interval of the Kafka inbound endpoint. The default value is 2000.
exclude.internal.topics Set to true if messages from internal topics such as offsets should be exposed to the consumer. The default value is true.
partition.assignment.strategy The partitions assignment strategy to be used when assigning partitions to consumer streams. Possible values are range and roundrobin.

Default setting is range.
client.id The user specified string sent in each request to help trace calls.

zookeeper.session.timeout.ms

The ZooKeeper session timeout value in milliseconds. The default value is 6000.
zookeeper.connection.timeout.ms The maximum time in milliseconds that the client should wait while establishing a connection to ZooKeeper.

The default value is 6000.

zookeeper.sync.time.ms

The time difference in milliseconds that a ZooKeeper follower can be behind a ZooKeeper leader. The default value is 2000.
offsets.storage The offsets storage location. Possible values are zookeeper and kafka. Default setting is zookeeper.
offsets.channel.backoff.ms The backoff period in milliseconds when reconnecting the offsets channel or retrying failed offset fetch/commit requests.

Default value is 1000.
offsets.channel.socket.timeout.ms The socket timeout in milliseconds when reading responses for offset fetch/commit requests.

The default value is 10000.
offsets.commit.max.retries The maximum retry attempts allowed. If a consumer metadata request fails for any reason, retry takes place but does not have an impact on this limit.

Default value is 5.
dual.commit.enabled If offsets.storage is set to kafka, the commit offsets can be dual to ZooKeeper. Set this to true if you need to perform migration from zookeeper-based offset storage to kafka-based offset storage. The default value is true.
simple.topic The category to feed the messages.
simple.brokers The specific Kafka broker name.
simple.port The specific Kafka server port number.
simple.partition The partition of the topic.
simple.max.messages.to.read

The maximum number of messages to retrieve.

Top