Oracle Cloud Infrastructure Documentation

Consuming Messages

This topic covers how to consume messages from a stream.

Overview

Consuming messages requires you to:

  • Create a cursor
  • Use the cursor to read messages

Using Cursors

A cursor is a pointer to a location in a stream. This location could be a pointer to a specific offset or time in a partition, or to a groups' current location.

To consume messages, use the CreateCursor or CreateGroupCursor API to create a cursor on a partition to indicate where to start consuming messages.

There are five supported cursor types:

  • TRIM_HORIZON - Start consuming from the oldest available message in the stream. Create a cursor at the TRIM_HORIZON to consume all messages in a stream.
  • AT_OFFSET - Start consuming at a specified offset. The offset must be greater than or equal to the offset of the oldest message and less than or equal to the latest published offset.
  • AFTER_OFFSET - Start consuming after the given offset. This cursor has the same restrictions as the AT_OFFSET cursor.
  • AT_TIME - Start consuming from a given time. The timestamp of the returned message will be on or after the supplied time.
  • LATEST - Start consuming messages that were published after you created the cursor.

Consuming Messages

Once you've created a cursor, you can start to consume messages using GetMessages. Each call to GetMessages returns the cursor to use in the next GetMessages call. The returned cursor will never be null and expires in 5 minutes. As long as you keep consuming, you should never have to re-create a cursor.

Consuming Messages using a Consumer Group

Consumers can be configured to consume messages as part of a group. Stream partitions are distributed among members of a group, such that messages from any single partition are only sent to a single consumer.

Partition assignments are rebalanced as consumers join or leave the group. Group consumption is accomplished using the same cursor mechanism as with single consumers, but using a different kind of cursor.

To create a consumer, create a group cursor, providing a group, instance name, and cursor type. Groups are created on the first request to create a cursor; their retention period is the same as their assigned stream:

CreateGroupCursorRequest groupRequest = CreateGroupCursorRequest.builder()
                .streamId(streamId)
                .createGroupCursorDetails(CreateGroupCursorDetails.builder()
                        .groupName(groupName)
                        .instanceName(instanceName)
                        .type(CreateGroupCursorDetails.Type.TrimHorizon)
                        .commitOnGet(true)
                        .build())
                .build());
 
CreateGroupCursorResponse groupCursorResponse = streamClient.createGroupCursor(groupRequest);
String groupCursor = groupCursorResponse.getCursor().getValue();
// this groupCursor can be used in the same message loop a described above; subsequent getMessages calls return an updated groupCursor.

 

Once you've created a cursor, you can start to consume messages using GetMessages. Each call to GetMessages returns the cursor to use in the next GetMessages call. The returned cursor will never be null and expires in 5 minutes. As long as you keep consuming, you should never have to re-create a cursor.

Required IAM Policy

To use Oracle Cloud Infrastructure, you must be given the required type of access in a An IAM document that specifies who has what type of access to your resources. It is used in different ways: to mean an individual statement written in the policy language; to mean a collection of statements in a single, named "policy" document (which has an Oracle Cloud ID (OCID) assigned to it); and to mean the overall body of policies your organization uses to control access to resources. written by an administrator, whether you're using the Console or the REST API with an SDK, CLI, or other tool. If you try to perform an action and get a message that you don’t have permission or are unauthorized, confirm with your administrator the type of access you've been granted and which A collection of related resources that can be accessed only by certain groups that have been given permission by an administrator in your organization. you should work in.

For administrators: The policy in Let streaming users manage streams lets the specified group do everything with streaming and related Streaming service resources.

If you're new to policies, see Getting Started with Policies and Common Policies. If you want to dig deeper into writing policies for databases, see Details for the Streaming Service in the IAM policy reference.

Using the Console

You cannot use the console to consume messages.

Using the API

For information about using the API and signing requests, see REST APIs and Security Credentials. For information about SDKs, see Software Development Kits and Command Line Interface.

Use these API operations to consume messages: