Kafka Compatibility

This topic covers Kafka compatibility for Streaming.

Overview

Streaming is API-compatibile with Apache Kafka. This means that you can use your applications written for Kafka with Streaming without having to rewrite your code.

Use cases for Kafka compatibility include:

  • Moving data from Streaming to Autonomous Data Warehouse via the JDBC Connector to perform advanced analytics and visualization

  • Using Oracle Goldengate to build an event-driven application

  • Moving data from Streaming to Oracle Object Storage via the HDFS/S3 Connector for long term storage, or to run Hadoop/Spark jobs

Limitations

Kafka compatibility for Streaming has the following limitations:

Unimplemented APIs

The following Kafka APIs and features are not implemented:

  • Compacted topics
  • Idempotent producers
  • Transactions
  • Kafka Streams
  • Adding partitions to a topic
  • Some administrative APIs

Unique Stream Names

If you have streams with the same names in a compartment, you won't be able to use the Kafka compatibility feature until you delete the duplicated streams (these are manifested through an "authentication failed" error). If you do not wish to delete your streams, please contact the Streaming team so we can rename your streams without data loss.

Stream Pools

A Stream Pool is a logical group of streams that share the same Kafka configuration, encryption, and access control settings. You can define a Stream Pool in the console and specify which streams to put into each group. There is no limit on the number of streams that can belong to one stream pool.

Load Balancing Connection Recycling

Because the Kafka protocol uses long-lived TCP connections, the Streaming Kafka compatibility layer implements a load-balancing mechanism to periodically balance connections between front-end nodes. This mechanism periodically closes connections to force new ones. Most Kafka SDKs handle these disconnections automatically when consuming, but producing to Streaming using the Kafka API may raise disconnection errors. These can be mitigated by adding retries to your requests. Retries are part of the Kafka SDK and are automatically enabled, and you can explicitly configure their behavior.

Configuration

This section describes how to configure the Kafka compatibility feature of Streaming.

Endpoints

For bootstrap servers, use your region endpoint on port 9092. For example:

streaming.us-phoenix-1.oci.oraclecloud.com:9092

Authentication

Authentication with the Kafka protocol is by auth-token. You can generate auth tokens in the console in the Console user details page.

It's a good idea to create a dedicated group/user and grant that group the permission to manage streams in the appropriate compartment or tenancy. You then can generate an authToken for the user you created and use it in your Kafka client configuration.

Your username should be in the format:

tenancyName/username/streamPoolId

Kafka Configuration

You will need to set up the following properties for your Kafka client.

For the Java SDK:

Properties properties = new Properties();
	properties.put("bootstrap.servers", "streaming.{region}.oci.oraclecloud.com:9092");
	properties.put("security.protocol", "SASL_SSL");
	properties.put("sasl.mechanism", "PLAIN");
	properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{tenancyName}/{username}/{streamPoolId}\" password=\"{authToken}\";");

Recommended settings for Java SDK producers:

properties.put("retries", 5); // retries on transient errors and load balancing disconnection
		properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB

Recommended settings for Java SDK consumers:

properties.put("max.partition.fetch.bytes", 1024 * 1024); // limit request size to 1MB per partition

 

For the Librdkafka SDK:

'metadata.broker.list': 'streaming.{region}.oci.oraclecloud.com:9092',
			 'security.protocol': 'SASL_SSL',
		 	 'sasl.mechanisms': 'PLAIN',
			 'sasl.username': '{tenancyName}/{username}/{streamPoolID}',
		         'sasl.password': '{authToken}'

Recommended settings for Librdkafka SDK producers:

'message.send.max.retries': 5 // retries on transient errors and load balancing disconnection
		'max.request.size': 1024 * 1024 // limit request size to 1MB

Recommended settings for Librdkafka SDK consumers:

'max.partition.fetch.bytes': 1024 * 1024 // limit request size to 1MB per partition

Kafka Connect

You can use Kafka Connectors with Streaming to connect with external sources of data, such as databases and file systems.

Note

For more information on Kafka Connect, see the official Kafka Connect documentation.

 

Configuring Kafka Connectors with Streaming

To use your connectors with Streaming, create a Kafka Connect Harness using the Console or the command line interface. The Streaming service will create the three topics (config, offset, and status) that are required to use Kafka Connect. The topics contain the OCID of the Kafka Connect Harness in the name.

Place these topic names in the properties file for the Kafka Connectors you want to use with Streaming.

For example:

# Relevant Kafka Connect setting
config.storage.topic:[harnessOcid]-config
offset.storage.topic:[harnessOcid]-offset
status.storage.topic:[harnessOcid]-status

Next, set the bootstrap server in your Kafka Connector configuration to the endpoint for Streaming.

Note

For a list of endpoints for Streaming, see the Streaming section in API Reference and Endpoints.

The following shows an example Kafka Connector configuration file:

bootstrap.servers=https://streaming-beta.r2.oracleiaas.com:9092
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<userid>" password="<authToken>";
producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_PLAINTEXT
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<userid>" password="<authToken>";
consumer.sasl.mechanism=PLAIN
consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<userid>" password="<authToken>";
config.storage.topic:[configurationOcid]-config
offset.storage.topic:[configurationOcid]-offset
status.storage.topic:[configurationOcid]-status

Kafka Connector Configuration Limitations

You can use multiple Kafka Connectors with the same Kafka Connect Harness. In cases that require producing or consuming streams in separate compartments or where more capacity is required to avoid hitting throttle limits on the Kafka Connect Harness (for example: too many connectors, or connectors with too many workers), you can create more Kafka Connector Harnesses.

The three compacted topics that are created with a Kafka Connect Harness are meant to be used by the Kafka Connect Harness to store configuration and state management data, and should not be used to store your data. To ensure that the Kafka Connect Harness topics are being used for their intended purpose by the connectors, there are hard throttle limits of 50 kb/s and 50 rps in place for these topics.

Kafka Connect Harnesses created in a given compartment will only work for streams in the same compartment.

For more information on managing Kafka Connect Harnesses using the Console, see Managing Kafka Connect Harnesses.

Required IAM Policy

To use Oracle Cloud Infrastructure, you must be given the required type of access in a policy  written by an administrator, whether you're using the Console or the REST API with an SDK, CLI, or other tool. If you try to perform an action and get a message that you don’t have permission or are unauthorized, confirm with your administrator the type of access you've been granted and which compartment  you should work in.

For administrators: The policy in Let streaming users manage streams lets the specified group do everything with streaming and related Streaming service resources.

If you're new to policies, see Getting Started with Policies and Common Policies. If you want to dig deeper into writing policies for databases, see Details for the Streaming Service in the IAM policy reference.

For More Information

See the Apache Kafka documentation.