Using Consumer Groups

Consumers can be configured to consume messages as part of a group. In a production environment with multiple partitions, using a consumer group is our recommended method of consuming Streaming messages.

Each stream partition is assigned to a member of a consumer group. An individual member of a consumer group is called an instance. Each instance in a consumer group receives messages from one or more partitions, unless there are more instances than partitions. Instances in excess of the partition count for the stream do not receive messages.

Consumer groups handle the coordination that is required for multiple consumers to share the consumption of a stream. A consumer group automatically:

  • Assigns one or more partitions to an instance
  • Tracks the messages received by the group and manages commits
  • Requests the proper partition(s) and offset(s) on behalf of each instance
  • Balances the group as instances join or leave

Up to 50 consumer groups can read from a single stream. Each consumer group receives all of the messages in the stream at least once.

Consumer groups are ephemeral. They disappear when they're not used for the retention period of the stream.

Creating a Consumer Group

A consumer group is created on the first CreateGroupCursor request. Group cursors define a group name/instance name pair. When you create your group cursor, you should provide the ID of the stream, a group name, an instance name, and one of the following supported cursor types:

  • TRIM_HORIZON - The group will start consuming from the oldest available message in the stream.
  • AT_TIME - The group will start consuming from a given time. The timestamp of the returned message will be on or after the supplied time.
  • LATEST - The group will start consuming messages that were published after you created the cursor.

Group cursor types are ignored on CreateGroupCursor calls that include the name of an existing group. That group's committed offsets are used instead of the provided cursor type.

Streaming uses the instance name to identify members of the group when managing offsets. Use unique instance names for each instance of the consumer group.

If you want the Streaming service to handle committing offsets, you should leave the group cursor's commitOnGet value set to true. We recommend using this method to reduce application complexity since your application does not have to handle commits.

Consuming as a Group

After your instances join the consumer group, they can read messages from the stream using GetMessages. Each call to GetMessages returns the cursor to use in the next GetMessages call as the opc-next-cursor header value. The returned cursor is never null, but it expires in five minutes. As long as you keep consuming, you should never have to re-create a cursor.

When Streaming receives a request for messages from an instance, the service:

  • Checks to see whether a group rebalance is necessary
  • Commits the offset(s) from that instance's previous request, if any
  • Responds with the messages defined by the request's cursor

GetMessages batch sizes are based on the average message size published to that stream. By default, the service returns as many messages as possible. You can use the limit parameter to specify any value up to 10,000, but consider your average message size to avoid exceeding throughput on the stream or timeouts.

If there are no more unread messages in the partition, Streaming returns a list of empty messages.

Because consumer groups remove instances that have stopped consuming messages for more than 30 seconds, you should request fewer messages to avoid timeouts, or extend the timeout using ConsumerHeartbeat.

A partition cannot be assigned to multiple instances within the same consumer group. If you have more instances than partitions, the unassigned instances can send GetMesages requests, but they won't receive any messages. They remain otherwise idle until the consumer group needs to replace an instance, such as when an existing member of the group does not act within the timeout period.

If you need to manually update the group's position, you can use UpdateGroup to reset the location of all consumers in the group to the specified location in the stream.

Offsets and Commits

Offsets indicate the location of a message within a partition. If a consumer restarts or you need to recover from a failure, you can use the offset to restart reading from the stream.

When you use a consumer group, Streaming handles offsets automatically. The default behavior of commitOnGet=true means that offsets from the previous request are committed. For example:

For consumer A:

  • A calls GetMessages and receives messages from an arbitrary partition, with offsets of 1–100.
  • A processes all 100 messages successfully.
  • A calls GetMessages, and the Streaming service commits offset 100 and returns messages with offsets 101–200.
  • A processes 15 messages, and then goes offline unexpectedly (for more than 30 seconds).

A new consumer B:

  • B calls GetMessages, and the Streaming service uses the latest committed offset and returns messages with offsets 101–200.
  • B continues the message loop.

In this example, a portion (15) of the messages were processed at least once, which means that they could have been processed more than once, but no data is lost.

Streaming provides "at-least-once" semantics for consumer groups. Consider when offsets are committed in a message loop. If a consumer goes offline before committing a batch of messages, that batch might be given to another consumer. When a partition is given to another consumer, the consumer uses the latest committed offset to start consumption. The consumer doesn't get messages before the committed offset. We recommend that consumer applications take care of duplicates.

Note

Message offsets aren't dense. Offsets are monotonically increasing numbers. They do not decrease, and sometimes they increase by more than one. For example, if you publish two messages to the same partition, the first message could have an offset of 42 and the second message could have an offset of 45 (offsets 43 and 44 being non-existent).

If you want to override the default offset behavior and implement a custom offset commit mechanism, set commitOnGet to false when creating the group cursor. You can use ConsumerCommit to commit messages without reading more messages. ConsumerCommit returns a cursor for you to use in your next request.

Caution

Writing custom commit logic is complicated and full of race conditions and considerations. Many cases exist in which some internal state is changed, and the client is required to handle the situation.

Balancing and Rebalancing

Streaming considers the number of partitions in the stream and the number of instances in the consumer group when assessing balance. Group balancing is automatic. Each consumer is assigned to one or more partitions based on the following calculation:

(nPartitions / nConsumers) ± 1

For example, if there are eight partitions in the stream and four consumers in the group, each consumer is assigned to two partitions. If there are 10 partitions in the stream and four consumers in the group, two consumers are assigned to two partitions, and two consumers are assigned to three partitions.

As instances join or leave a consumer group and requests are made for messages, partition assignments are reassessed. If the stream has at least one partition more than the number of current instances in the group, and a new instance joins, partitions are reassigned to all instances, including the new one. If an instance in the group stops consuming messages for more than 30 seconds, or fails to send a ConsumerHeartbeat within 30 seconds, that instance is removed from the consumer group and its partition is reassigned, if possible, to another instance.

These events are called rebalancing. The instances in the group are not aware of the rebalancing process, but the group has coordinated to own a mutually exclusive set of partitions in the stream.

At the end of a successful rebalance operation for a consumer group, every partition within the stream is owned by an instance within the group.

In this way, you can scale the number of instances up to the number of the partitions until each instance is consuming messages from only one partition. This configuration maximizes your stream's available throughput. After that point, any new instance joining the group remains in an idle state without being assigned to any partition.