Kafka Connector Reference

The following operations allow you to work with the Kafka Connector. Click an operation name to see parameter details and samples on how to use it.


To use the Kafka connector, add the <kafkaTransport.init> element in your configuration before carrying out any other Kafka operations. This can be with or without security depending on your requirements.

kafkaTransport.init

You can configure the kafkaTransport.init operation to setup your Kafka producer with or without security.

Parameter Name Description Required
name Unique name to identify the connection. Yes
bootstrapServers The Kafka brokers listed as host1:port1 and host2:port2. Yes
keySerializerClass The serializer class for the key that implements the serializer interface. Yes
valueSerializerClass The serializer class for the value that implements the serializer interface. Yes
schemaRegistryUrl The URL of the confluent schema registry, only applicable when dealing with apache avro serializer class.. Optional
basicAuthCredentialsSource The source of basic auth credentials (e.g. USER_INFO, URL), when schema registry is secured to use basic auth.. Optional
basicAuthUserInfo The relevant basic auth credentials (should be used with basicAuthCredentialsSource). Optional
acks The number of acknowledgments that the producer requires for the leader to receive before considering a request to be complete. Optional
bufferMemory The total bytes of memory the producer can use to buffer records waiting to be sent to the server. Optional
compressionType The compression type for the data generated by the producer. Optional
retries Set a value greater than zero if you want the client to resent any records automatically when a request fails. Optional
sslKeyPassword The password of the private key in the keystore file. Setting this for the client is optional. Optional
sslKeystoreLocation The location of the key store file. Setting this for the client is optional. Set this when you want to have two-way authentication for the client. Optional
sslKeystorePassword The store password for the keystore file. Setting this for the client is optional. Set it only if ssl.keystore.location is configured. Optional
sslTruststoreLocation The location of the trust store file. Optional
sslTruststorePassword The password for the trust store file. Optional
batchSize Specify how many records the producer should batch together when multiple records are sent to the same partition. Optional
clientId The client identifier that you pass to the server when making requests. Optional
connectionsMaxIdleTime The duration in milliseconds after which idle connections should be closed. Optional
lingerTime The time, in milliseconds, to wait before sending a record. Set this property when you want the client to reduce the number of requests sent when the load is moderate. This adds a small delay rather than immediately sending out a record. Therefore, the producer waits up to allow other records to be sent so that the requests can be batched together. Optional
maxBlockTime The maximum time in milliseconds that the KafkaProducer.send() and the KafkaProducer.partitionsFor() methods can be blocked. Optional
maxRequestSize The maximum size of a request in bytes. Optional
partitionerClass The partitioner class that implements the partitioner interface. Optional
receiveBufferBytes The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. Optional
requestTimeout The maximum amount of time, in milliseconds, that a client waits for the server to respond. Optional
saslJaasConfig JAAS login context parameters for SASL connections in the format used by JAAS configuration files. Optional
saslKerberosServiceName The Kerberos principal name that Kafka runs as. Optional
securityProtocol The protocol used to communicate with brokers. Optional
sendBufferBytes The size of the TCP send buffer (SO_SNDBUF) to use when sending data. Optional
sslEnabledProtocols The list of protocols enabled for SSL connections. Optional
sslKeystoreType The format of the keystore file. Setting this for the client is optional. Optional
sslProtocol The SSL protocol used to generate the SSLContext. Optional
sslProvider The name of the security provider used for SSL connections. The default value is the default security provider of the JVM. Optional
sslTruststoreType The format of the trust store file. Optional
timeout The maximum amount of time, in milliseconds, that the server waits for the acknowledgments from followers to meet the acknowledgment requirements that the producer has specified with acks configuration. Optional
blockOnBufferFull Set to true to stop accepting new records when the memory buffer is full. When blocking is not desirable, set this property to false, which causes the producer to throw an exception if a recrord is sent to the memory buffer when it is full. Optional
maxInFlightRequestsPerConnection The maximum number of unacknowledged requests that the client can send via a single connection before blocking. Optional
metadataFetchTimeout The maximum amount of time, in milliseconds, to block and wait for the metadata fetch to succeed before throwing an exception to the client. Optional
metadataMaxAge The period of time, in milliseconds, after which you should refresh metadata even if there was no partition leadership changes to proactively discover any new brokers or partitions. Optional
metricReporters A list of classes to use as metrics reporters. Optional
metricsNumSamples The number of samples maintained to compute metrics. Optional
metricsSampleWindow The window of time, in milliseconds, that a metrics sample is computed over. Optional
reconnectBackoff The amount of time to wait before attempting to reconnect to a given host. Optional
retryBackoff The amount of time, in milliseconds, to wait before attempting to retry a failed request to a given topic partition. Optional
saslKerberosKinitCmd The kerberos kinit command path. Optional
saslKerberosMinTimeBeforeRelogin Login thread's sleep time, in milliseconds, between refresh attempts. Optional
saslKerberosTicketRenewJitter Percentage of random jitter added to the renewal time. Optional
saslKerberosTicketRenewWindowFactor The login thread sleeps until the specified window factor of time from the last refresh to the ticket's expiry is reached, after which it will try to renew the ticket. Optional
sslCipherSuites A list of cipher suites. Optional
sslEndpointIdentificationAlgorithm The endpoint identification algorithm to validate the server hostname using a server certificate. Optional
sslKeymanagerAlgorithm The algorithm used by the key manager factory for SSL connections. The default value is the key manager factory algorithm configured for the Java Virtual Machine. Optional
sslSecureRandomImplementation The SecureRandom PRNG implementation to use for SSL cryptography operations. Optional
sslTrustmanagerAlgorithm The algorithm used by the trust manager factory for SSL connections. The default value is the trust manager factory algorithm configured for the Java Virtual Machine. Optional
poolingEnabled Indicates whether or not connection pooling is enabled. Set to 'true' if pooling is enabled and 'false' otherwise. Optional
maxActiveConnections Maximum number of active connections in the pool. Optional
maxIdleConnections Maximum number of idle connections in the pool. Optional
maxWaitTime Maximum number of idle connections in the pool. Optional
minEvictionTime The minimum amount of time an object may remain idle in the pool before it is eligible for eviction. Optional
evictionCheckInterval The number of milliseconds between runs of the object evictor. Optional
exhaustedAction The behavior of the pool when the pool is exhausted (WHEN_EXHAUSTED_FAIL/WHEN_EXHAUSTED_BLOCK/WHEN_EXHAUSTED_GROW). Optional

Performance Tuning Tip: For better throughput, configure the parameter as follows in the configuration:

<maxPoolSize>20</maxPoolSize>

If you do not specify the maxPoolSizeparameter in the configuration, a Kafka connection is created for each message request.

Sample configuration

Given below is a sample configuration to create a producer without security.

<kafkaTransport.init>
    <name>Sample_Kafka</name>
    <bootstrapServers>localhost:9092</bootstrapServers>
    <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
    <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
</kafkaTransport.init>

There is an additional feature for security found in Kafka version 0.9.0.0 and above. You can configure it using the element as shown in the sample below:

<kafkaTransport.init>
    <name>Sample_Kafka</name>
    <bootstrapServers>localhost:9092</bootstrapServers>
    <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
    <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
    <securityProtocol>SSL</securityProtocol>
    <sslTruststoreLocation>/home/hariprasath/Desktop/kafkaNewJira/certKafka/kafka.server.truststore.jks</sslTruststoreLocation>
    <sslTruststorePassword>test1234</sslTruststorePassword>
    <sslKeystoreLocation>/home/hariprasath/Desktop/kafkaNewJira/certKafka/kafka.server.keystore.jks</sslKeystoreLocation>
    <sslKeystorePassword>test1234</sslKeystorePassword>
    <sslKeyPassword>test1234</sslKeyPassword>
</kafkaTransport.init>
Sample configurations for dealing with Apache Avro Serialization

Given below is a sample configuration to create a producer for Kafka Avro Serialization,

<kafkaTransport.init>
    <name>Sample_Kafka</name>
    <bootstrapServers>localhost:9092</bootstrapServers>
    <keySerializerClass>io.confluent.kafka.serializers.KafkaAvroSerializer</keySerializerClass>
    <valueSerializerClass>io.confluent.kafka.serializers.KafkaAvroSerializer</valueSerializerClass>
    <schemaRegistryUrl>http://localhost:8081</schemaRegistryUrl>
</kafkaTransport.init>

Sample init configuration when confluent schema registry is secured with basic auth,

<kafkaTransport.init>
    <name>Sample_Kafka</name>
    <bootstrapServers>localhost:9092</bootstrapServers>
    <keySerializerClass>io.confluent.kafka.serializers.KafkaAvroSerializer</keySerializerClass>
    <valueSerializerClass>io.confluent.kafka.serializers.KafkaAvroSerializer</valueSerializerClass>
    <schemaRegistryUrl>http://localhost:8081</schemaRegistryUrl>
    <basicAuthCredentialsSource>USER_INFO</basicAuthCredentialsSource>
    <basicAuthUserInfo>admin:admin</basicAuthUserInfo>
</kafkaTransport.init>

Publishing messages to Kafka

publishMessages

The publishMessages operation allows you to publish messages to the Kafka brokers via Kafka topics.

Parameter Name Description Required
topic The name of the topic. Yes
partitionNo The partition number of the topic. Yes
key Key of the kafka message. Optional
keySchema Schema of the provided key (applicable only with Kafka Avro Serialization). Optional
keySchemaId Schema id of the key schema that is stored in the confluent schema registry (applicable only with Kafka Avro Serialization). Optional
value The kafka value/message. Optional
valueSchema Schema of the Kafka value (applicable only with Kafka Avro Serialization). Optional
valueSchemaId Schema id of the value schema that is stored in the confluent schema registry (applicable only with Kafka Avro Serialization). Optional
Content-Type The Content-Type of the message. Optional

If required, you can add custom headers to the records in publishMessage operation:

<topic.Content-Type>Value</topic.Content-Type>

You can add the parameter as follows in the publishMessage operation:

<kafkaTransport.publishMessage configKey="kafka_init">
    <topic>topicName</topic>
    <partitionNo>partitionNo</partitionNo>
    <topicName.Content-Type>Value</topicName.Content-Type>
</kafkaTransport.publishMessage>
When dealing with Avro Serialization the key and value parameters can be configured as:

<kafkaTransport.publishMessages>
   <topic>topicName</topic>
   <key>key of the message</key>
   <keySchema>schema of the configured key</keySchema>
   <value>value of the message</value>
   <valueSchema>schema of the configured value</valueSchema>
</kafkaTransport.publishMessages>
Sample configuration to retrieve the key/value schema from the Confluent Schema Registry:

<kafkaTransport.publishMessages>
   <topic>topicName</topic>
   <key>key of the message</key>
   <keySchemaId>schemaId of the configured key</keySchema>
   <value>value of the message</value>
   <valueSchemaId>schemaId of the configured value</valueSchema>
</kafkaTransport.publishMessages>

Note

With Kafka connector v3.1.2 and above, when an error occurs, one of the following errors will get set to the message context. For details on how to access these error properties, refer Generic Properties.

Error Code Detail
700501 Connection error.
700502 Invalid configuration.
700503 Error while serializing the Avro message in the producer.
700504 Illegal type is used in an Avro message.
700505 Error while building Avro schemas.
700506 Error while parsing schemas and protocols.
700507 Expected contents of a union cannot be resolved.
700508 The request message cannot be processed.
700509 Any other Kafka related error.
Top