Using Oracle Cloud Infrastructure SDKs with Streaming

This topic provides guidance on and examples of how to use Oracle Cloud Infrastructure Software Development Kits (SDKs) with Streaming.

Overview

Oracle Cloud Infrastructure provides SDKs so that you can interact with Streaming without having to create a framework. The SDKs let you manage streams, stream pools, and Kafka Connect configurations, and publish and consume messages. Refer to the Streaming Service Overview for key concepts and additional information.

This section includes examples that use the SDK for Java, but basic Streaming usage examples are included with all our SDKs. For more information about using the SDKs, see the SDK Guides.

Streaming Clients

The SDKs encapsulate the Streaming service in two clients: the StreamAdminClient and the StreamClient.

StreamAdminClient

The StreamAdminClient incorporates the control plane operations of Streaming. You can use it to create, delete, update, modify, and list streams.

To instantiate the StreamAdminClient object:

StreamAdminClient adminClient = new StreamAdminClient([authProvider]);
adminClient.setEndpoint("<streaming_endpoint>"); // You cannot use the setRegion method

StreamClient

The StreamClient is used to publish and consume messages.

To instantiate a StreamClient object:

// First you have to get the stream you want to consume from/publish to.
// You can either make a CreateStream, GetStream, or ListStream call. They all return a "messagesEndpoint" as part of a Stream object.
// That endpoint needs to be used when creating the StreamClient object.
GetStreamRequest getStreamRequest = GetStreamRequest.builder().streamId(streamId).build();
Stream stream = adminClient.getStream(getStreamRequest).getStream();
 
StreamClient streamClient = new StreamClient([authProvider]);
streamClient.setEndpoint(stream.getMessagesEndpoint());

Managing Streams

Creating streams

To create a stream, use the createStream method of StreamAdminClient.

Creating a stream is an asynchronous operation. You can check on the completion of the create operation by checking that the lifecycleStateDetails property of your new stream is either Active or Failed. See Managing Streams for more information.

The following is an example showing how to create a stream:

// No error handling
CreateStreamDetails createStreamDetails =  CreateStreamDetails.builder()
   .partitions(5) // number of partitions you want in your stream
   .name("myStream") // the name of the stream - only used in the console
   .compartmentId(tenancy) // the compartment id you want your stream to live in
   .build();

// You can also add tags to the createStreamDetails object.
CreateStreamRequest createStreamRequest =
   CreateStreamRequest.builder()
     .createStreamDetails(createStreamDetails)
     .build();
 
Stream stream = adminClient.createStream(createStreamRequest).getStream();

while (stream.getLifecycleState() != Stream.LifecycleState.Active && stream.getLifecycleState() != Stream.LifecycleState.Failed) {  
   GetStreamRequest getStreamRequest = GetStreamRequest.builder().streamId(stream.getId()).build();
   stream = adminClient.getStream(getStreamRequest).getStream();
}
 
// Handle stream Failure
Listing streams

Use the listStreams method to return a list of streams for a given compartment.

You can filter the returned list by OCID, life cycle state, and name.

The results can be sorted in ascending or descending order by name or creation time.

The results are passed back in a paginated list. A token is passed back with each page of results; pass this token back to the getOpcNextPage method to retrieve the next page of results. A null token returned from getOpcNextPage indicates that no more results are available.

For example:

// No error handling
ListStreamsRequest listStreamsRequest =
        ListStreamsRequest.builder()
                .compartmentId(tenancy)
                .build();
// You can filter by OCID (exact match only) [builder].id(streamId) -> This will return 0..1 item
// You can filter by name (exact match only) [builder].name(name) -> This will return 0..n items
// You can order the result per TimeCreated or Name [builder].sortBy(SortBy.[TimeCreated|Name])
// You can change the ordering [builder].sortOrder(SortOrder.[Asc|Desc])
// You can filter by lifecycleState [builder].lifecycleState(lifecycleState)
 
String page;
do {
    ListStreamsResponse listStreamsResponse = adminClient.listStreams(listStreamsRequest);
    List<StreamSummary> streams = listStreamsResponse.getItems();
    // Do something with the streams
    page = listStreamsResponse.getOpcNextPage();
} while (page != null);
Retrieving stream details

To get details about a stream, use the getStream method and then examine the properties of the stream. For example:

// No error handling
GetStreamRequest getStreamRequest =
        GetStreamRequest.builder()
                .streamId(streamId)
                .build();
 
Stream stream = adminClient.getStream(getStreamRequest).getStream();
Deleting streams

To delete a stream, use the deleteStream method API of the StreamAdminClient. Deleting a stream is an asynchronous operation; the stream state changes to Deleted once the delete operation is finished. During the deletion process, the stream can't be used for consuming or producing messages.

The following example shows how to use the deleteStream method to delete a stream:

// No error handling
DeleteStreamRequest deleteStreamRequest =   

    DeleteStreamRequest.builder()     

      .streamId(stream.getId())
       .build();

adminClient.deleteStream(deleteStreamRequest);

Managing Kafka Connect Configurations

In order to use Kafka Connect with Streaming, you need a Kafka Connect configuration, or Kafka Connect harness. You can retrieve the OCID for a harness when you create a new harness or use an existing one. For more information, see Using Kafka Connect.

Creating a Kafka Connect harness

The following code example shows how to create a Kafka Connect harness:

CreateConnectHarnessDetails createConnectHarnessDetails = CreateConnectHarnessDetails.builder()
    .compartmentId(compartment) //compartment where you want to create connect harness
    .name("myConnectHarness") //connect harness name
    .build();
 
CreateConnectHarnessRequest connectHarnessRequest = CreateConnectHarnessRequest.builder()
    .createConnectHarnessDetails(createConnectHarnessDetails)
    .build();
 
CreateConnectHarnessResponse createConnectHarnessResponse = streamAdminClient.createConnectHarness(connectHarnessRequest);
ConnectHarness connectHarness = createConnectHarnessResponse.getConnectHarness();
 
while (connectHarness.getLifecycleState() != ConnectHarness.LifecycleState.Active && connectHarness.getLifecycleState() != ConnectHarness.LifecycleState.Failed) {
    GetConnectHarnessRequest getConnectHarnessRequest = GetConnectHarnessRequest.builder().connectHarnessId(connectHarness.getId()).build();
     connectHarness = streamAdminClient.getConnectHarness(getConnectHarnessRequest).getConnectHarness();
}
Listing Kafka Connect harnesses

The following code example shows how to list Kafka Connect harnesses:

ListConnectHarnessesRequest listConnectHarnessesRequest = ListConnectHarnessesRequest.builder()
    .compartmentId(comaprtment) // compartment id to list all the connect harnesses.
    .lifecycleState(ConnectHarnessSummary.LifecycleState.Active)
    .build();
 
ListConnectHarnessesResponse listConnectHarnessesResponse = streamAdminClient.listConnectHarnesses(listConnectHarnessesRequest);
List<ConnectHarnessSummary> items = listConnectHarnessesResponse.getItems();

Publishing Messages

Once a stream is created and active, you can publish messages using the streamClient.putMessages method. See Publishing Messages for more information about publishing.

The following code example shows how to publish a message:

// No error handling
List<PutMessagesDetailsEntry> messages = new ArrayList<>();
 
for (int i = 0; i < 40; i++) {
    byte[] key = "<myKey>".getBytes(Charsets.UTF_8); // In this case, all messages will go on the same partition since the key is the same.
    byte[] value = UUID.randomUUID().toString().getBytes(Charsets.UTF_8);
    messages.add(new PutMessagesDetailsEntry(key, value));
}
 
PutMessagesDetails putMessagesDetails =
        PutMessagesDetails.builder()
                .messages(messages)
                .build();
 
PutMessagesRequest putMessagesRequest =
        PutMessagesRequest.builder()
                .putMessagesDetails(putMessagesDetails)
                .build();
 
PutMessagesResult putMessagesResult = streamClient.putMessages(putMessagesRequest).getPutMessagesResult();
// It's not because the call didn't fail that the messages were successfully published!
int failures = putMessagesResult.getFailures();
// If failures is > 0, it means we have a partial-success call.
List<PutMessagesResultEntry> entries = putMessagesResult.getEntries();
// entries is a list of the same size as the list of messages you sent.
// It is guaranteed that the order of the messages is the same as when you sent them.
// Each entry contains either "offset/partition/timestamp" if the message was successfully published
// or "error/errorMessage" if it failed.
if (failures != 0) {
    entries.forEach(entry -> {
        if (StringUtils.isNotEmpty(entry.getError())) {
            // That particular message failed to get published.
            // It could be a throttle error and in that case the error would be "429" and errorMessage would contain a meaningful message.
            // Or it could be an internal error and the error would be "500".

            // Possible solution would be to republish only failed messages.
        }
    });
}

Consuming Messages

Consuming messages requires the use of a cursor, which is a pointer to an offset into a partition. First you must create a cursor, then you must use the cursor to get messages. For more information, see Using Cursors and Getting Messages.

Cursors

This example creates a TRIM_HORIZON cursor, which starts consuming starting from the oldest available message:

// No error handling
CreateCursorDetails createCursorDetails =
        CreateCursorDetails.builder()
                .type(Type.TrimHorizon)
                .partition("0")
                .build();
// If using AT_OFFSET or AFTER_OFFSET you need to specify the offset [builder].offset(offset)
// If using AT_TIME you need to specify the time [builder].time(new Date(xxx))
 
CreateCursorRequest createCursorRequest =
        CreateCursorRequest.builder()
                .createCursorDetails(createCursorDetails)
                .build();
 
String cursor = streamClient.createCursor(createCursorRequest).getCursor().getValue();
// Cursor will then be used to get messages from the stream.

GetMessages

Once you've created a cursor, you can start to consume messages using the GetMessages method. Each call to GetMessages returns the cursor to use in the next GetMessages call.

Here's an example of using a cursor to retrieve messages:

// No error handling (there is a high chance of getting a throttling error using a tight loop)
while (true) { // or your own exit condition
    GetMessagesRequest getMessagesRequest =
            GetMessagesRequest.builder()
                    .cursor(cursor)
                    .build();
 
    GetMessagesResponse getMessagesResponse = streamClient.getMessages(getMessagesRequest);
 
    // This could be empty, but we will always return an updated cursor
    getMessagesResponse.getItems().forEach(message -> {
        // Process the message
    });
 
    cursor = getMessagesResponse.getOpcNextCursor();Consuming Messages

Consuming Messages as a Group

Consumers can be configured to consume messages as part of a group. Stream partitions are distributed among members of a consumer group so that messages from any single partition are only sent to a single consumer. Group consumption is accomplished using the same cursor mechanism as with single consumers, but using a different kind of cursor.

For more information, see Using Consumer Groups.

To create a consumer group, create a group cursor, providing a group, instance name, and cursor type. Group cursors support the following cursor types:

  • TRIM_HORIZON - The group will start consuming from the oldest available message in the stream.
  • AT_TIME - The group will start consuming from a given time. The timestamp of the returned message will be on or after the supplied time.
  • LATEST - The group will start consuming messages that were published after you created the cursor.

Consumer groups are created on the first request to create a cursor. For example:

CreateGroupCursorRequest groupRequest = CreateGroupCursorRequest.builder()
                .streamId(streamId)
                .createGroupCursorDetails(CreateGroupCursorDetails.builder()
                        .groupName(groupName)
                        .instanceName(instanceName)
                        .type(CreateGroupCursorDetails.Type.TrimHorizon)
                        .commitOnGet(true)
                        .build())
                .build());
 
CreateGroupCursorResponse groupCursorResponse = streamClient.createGroupCursor(groupRequest);
String groupCursor = groupCursorResponse.getCursor().getValue();
// this groupCursor can be used in the same message loop a described above; subsequent getMessages calls return an updated groupCursor.

Once you've created a group cursor, you can start to consume messages using GetMessages.