Oracle Cloud Infrastructure Documentation

Using the Streaming SDK

This topic covers how to use the Streaming SDK.

Getting Started

For information on installing and configuring the Oracle Cloud Infrastructure SDKs, see Developer Tools.

Architecture Overview

Streaming contains the following key building blocks:

  • Stream: A partitioned, append-only log of messages.
  • Partition: A section of a stream. Partitions allow you to distribute a stream by splitting messages across multiple nodes. Each partition can be placed on a separate machine to allow for multiple consumers to read from a topic in parallel.
  • Producer: An entity that publishes messages to a stream.
  • Consumer: An entity that reads messages from a stream.
  • Consumer group: A group of consumers that can read independently read messages from separate partitions of a stream.

Streaming Clients

The Streaming SDK is encapsulated in two clients: the StreamAdminClient and the StreamClient.

The StreamAdminClient incorporates the control plane operations of the streaming service. You can use it to create, delete, update, modify, and list streams.

To instantiate the StreamAdminClient object:

StreamAdminClient adminClient = new StreamAdminClient([authProvider]);
adminClient.setEndpoint("https://streaming.r2.oracleiaas.com"); // You cannot use the setRegion method

 

The StreamClient is used to publish and consume messages.

To instantiate a StreamClient object:

// First you have to get the stream you want to consume/publish.
// You can either make a CreateStream, GetStream, or ListStream call. They all return a "messagesEndpoint" as part of a Stream object.
// That endpoint NEEDS to be used when creating the StreamClient object.
GetStreamRequest getStreamRequest = GetStreamRequest.builder().streamId(streamId).build();
Stream stream = adminClient.getStream(getStreamRequest).getStream();
 
StreamClient streamClient = new StreamClient([authProvider]);
streamClient.setEndpoint(stream.getMessagesEndpoint());

Creating a Stream

To create a stream, use the createStream method of StreamAdminClient. Creating a stream is an asynchronous operation. You can check on the completion of the create operation by checking that the lifecycleStateDetails property of your new stream is either Active or Failed.

The following is an example showing how to create a stream:

// No error handling
CreateStreamDetails createStreamDetails =  CreateStreamDetails.builder()
   .partitions(5) // number of partitions you want in your stream

.name("myStream") // the name of the stream - only used in the console .compartmentId(tenancy) // the compartment id you want your stream to live in .build();

// You can also add tags to the createStreamDetails object. CreateStreamRequest createStreamRequest = CreateStreamRequest.builder() .createStreamDetails(createStreamDetails) .build(); Stream stream = adminClient.createStream(createStreamRequest).getStream();

while (stream.getLifecycleState() != Stream.LifecycleState.Active && stream.getLifecycleState() != Stream.LifecycleState.Failed) {

GetStreamRequest getStreamRequest = GetStreamRequest.builder().streamId(stream.getId()).build(); stream = adminClient.getStream(getStreamRequest).getStream(); } // Handle stream Failure

Deleting a Stream

To delete a stream, use the deleteStream method API of the StreamAdminClient. Deleting a stream is an asynchronous operation; the stream state changes to Deleted once the delete operation is finished. During the deletion process, the stream can't be used for consuming or producing messages.

The following example shows how to use the deleteStream method to delete a stream:

// No error handling DeleteStreamRequest deleteStreamRequest =

DeleteStreamRequest.builder()

.streamId(stream.getId()) .build(); adminClient.deleteStream(deleteStreamRequest);

Listing Streams

Use the listStreams method to return a list of streams for a given compartment.

You can filter the returned list by OCID, life cycle state, and name.

The results can be sorted in ascending or descending order by name or creation time.

The results are passed back in a paginated list. A token is passed back with each page of results; pass this token back to the getOpcNextPage method to retrieve the next page of results. A null token returned from getOpcNextPage indicates that no more results are available.

For example:

// No error handling
ListStreamsRequest listStreamsRequest =
        ListStreamsRequest.builder()
                .compartmentId(tenancy)
                .build();
// You can filter by OCID (exact match only) [builder].id(streamId) -> This will return 0..1 item
// You can filter by name (exact match only) [builder].name(name) -> This will return 0..n items
// You can order the result per TimeCreated or Name [builder].sortBy(SortBy.[TimeCreated|Name])
// You can change the ordering [builder].sortOrder(SortOrder.[Asc|Desc])
// You can filter by lifecycleState [builder].lifecycleState(lifecycleState)
 
String page;
do {
    ListStreamsResponse listStreamsResponse = adminClient.listStreams(listStreamsRequest);
    List<StreamSummary> streams = listStreamsResponse.getItems();
    // Do something with the streams
    page = listStreamsResponse.getOpcNextPage();
} while (page != null);

Retrieving Stream Details

To get details about a stream, use the getStream method and then examine the properties of the stream. For example:

// No error handling
GetStreamRequest getStreamRequest =
        GetStreamRequest.builder()
                .streamId(streamId)
                .build();
 
Stream stream = adminClient.getStream(getStreamRequest).getStream();

Publishing Messages

Once a stream is created and active, you can publish messages using the streamClient.putMessages method.

A message is composed of a key (which can be null) and a value. Both key and value are byte arrays.

The message is published to a partition in the stream. If there is more than one partition, the partition where the message is published is calculated using the message's key. If the key is null, the partition is calculated using a subset of the value. For messages with a null key, do not expect messages with same value to go on the same partition since the partitioning scheme may change. Sending a null key will put the message in a random partition. If you want to ensure that messages with the same value go to the same partition, you should use the same key for those messages.

The following code shows how to publish a message:

// No error handling
List<PutMessagesDetailsEntry> messages = new ArrayList<>();
 
for (int i = 0; i < 40; i++) {
    byte[] key = "myKey".getBytes(Charsets.UTF_8); // In that case, all messages will go on the same partition since the key is the same.
    byte[] value = UUID.randomUUID().toString().getBytes(Charsets.UTF_8);
    messages.add(new PutMessagesDetailsEntry(key, value));
}
 
PutMessagesDetails putMessagesDetails =
        PutMessagesDetails.builder()
                .messages(messages)
                .build();
 
PutMessagesRequest putMessagesRequest =
        PutMessagesRequest.builder()
                .putMessagesDetails(putMessagesDetails)
                .build();
 
PutMessagesResult putMessagesResult = streamClient.putMessages(putMessagesRequest).getPutMessagesResult();
// It's not because the call didn't fail that the messages were successfully published!
int failures = putMessagesResult.getFailures();
// If failures is > 0, it means we have a partial-success call.
List<PutMessagesResultEntry> entries = putMessagesResult.getEntries();
// entries is a list of the same size as the list of messages you sent.
// It is guaranteed that the order of the messages is the same as when you sent them.
// Each entry contains either "offset/partition/timestamp" if the message was successfully published
// or "error/errorMessage" if it failed.
if (failures != 0) {
    entries.forEach(entry -> {
        if (StringUtils.isNotEmpty(entry.getError())) {
            // That particular message failed to get published.
            // It could be a throttle error and in that case error would be "429" and errorMessage would contain a meaningful message.
            // Or it could be an internal error on our side and error would be "500".
             
            // Possible solution would be to republish only failed messages.
        }
    });
}

Consuming Messages

Consuming messages requires the use of a cursor, which is a pointer to an offset into a partition.

There are five supported cursor types:

  • TRIM_HORIZON - Start consuming from the oldest available message in the stream. Create a cursor at the TRIM_HORIZON to consume all messages in a stream.
  • AT_OFFSET - Start consuming at a specified offset. The offset must be greater than or equal to the offset of the oldest message and less than or equal to the latest published offset.
  • AFTER_OFFSET - Start consuming after the given offset. This cursor has the same restrictions as the AT_OFFSET cursor.
  • AT_TIME - Start consuming from a given time. The timestamp of the returned message will be on or after the supplied time.
  • LATEST - Start consuming messages that were published after you created the cursor.

To create a TRIM_HORIZON cursor, which starts consuming starting from the oldest available message:

// No error handling
CreateCursorDetails createCursorDetails =
        CreateCursorDetails.builder()
                .type(Type.TrimHorizon)
                .partition("0")
                .build();
// If using AT_OFFSET or AFTER_OFFSET you need to specify the offset [builder].offset(offset)
// If using AT_TIME you need to specify the time [builder].time(new Date(xxx))
 
CreateCursorRequest createCursorRequest =
        CreateCursorRequest.builder()
                .createCursorDetails(createCursorDetails)
                .build();
 
String cursor = streamClient.createCursor(createCursorRequest).getCursor().getValue();
// Cursor will then be used to get messages from the stream.

Once you've created a cursor, you can start to consume messages using the GetMessages method. Each call to GetMessages returns the cursor to use in the next GetMessages call. The returned cursor is not null and expires in 5 minutes. As long as you keep consuming, you should never have to re-create a cursor.

Here's an example of using a cursor to retrieve messages:

// No error handling (there is a high chance of getting a throttling error using a tight loop)
while (true) { // or your own exit condition
    GetMessagesRequest getMessagesRequest =
            GetMessagesRequest.builder()
                    .cursor(cursor)
                    .build();
 
    GetMessagesResponse getMessagesResponse = streamClient.getMessages(getMessagesRequest);
 
    // This could be empty, but we will always return an updated cursor
    getMessagesResponse.getItems().forEach(message -> {
        // Process the message
    });
 
    cursor = getMessagesResponse.getOpcNextCursor();Consuming Messages

Using Consumer Groups

Consumers can be configured to consume messages as part of a group. Stream partitions are distributed among members of a group, such that messages from any single partition are only sent to a single consumer.

Partition assignments are rebalanced as consumers join or leave the group. Group consumption is accomplished using the same cursor mechanism as with single consumers, but using a different kind of cursor.

How Consumer Groups Work

A consumer group is a set of instances which coordinate to consume messages from all of the partitions in a stream. Instances maintain membership to a group through interaction; lack of interaction for a period of time results in a timeout, removing the instance from the group. Partitions are reserved for specific instances in a group; reservations are rebalanced in response to specific group events, such as an instance joining the group, or instance time-out.

While an instance maintains membership to a group, calls to get messages will return messages from only partitions reserved for that instance. Partition reservation attempts to ensure that messages are only processed by a single instance.

The set of instances in a group is expected to change over time; transience can occur for many reasons, commonly because of server failure, scaling patterns, or operational management.

Persisting consumer processing state beyond the lifetime of an instance, allows instances to come and go, picking up where appropriate in context of the group.

An instance commits offsets of processed messages to ensure that they are not processed again by other instances in the same group.

Consuming Messages with a Consumer Group

To create a consumer group, create a group cursor, providing a group, instance name, and cursor type. Groups are created on the first request to create a cursor; their retention period is the same as their assigned stream:

CreateGroupCursorRequest groupRequest = CreateGroupCursorRequest.builder()
                .streamId(streamId)
                .createGroupCursorDetails(CreateGroupCursorDetails.builder()
                        .groupName(groupName)
                        .instanceName(instanceName)
                        .type(CreateGroupCursorDetails.Type.TrimHorizon)
                        .commitOnGet(true)
                        .build())
                .build());
 
CreateGroupCursorResponse groupCursorResponse = streamClient.createGroupCursor(groupRequest);
String groupCursor = groupCursorResponse.getCursor().getValue();
// this groupCursor can be used in the same message loop a described above; subsequent getMessages calls return an updated groupCursor.

 

Once you've created a cursor, you can start to consume messages using GetMessages. Each call to GetMessages returns the cursor to use in the next GetMessages call. The returned cursor is never null and expires in 5 minutes. As long as you keep consuming, you should never have to re-create a cursor.