Create a connector in Connector Hub to transfer stream data from the Streaming service to a target service.
For more information about the Streaming service, see Streaming.
A connector that's defined with a Streaming source and (optional) function task supports the following targets: Functions, Logging Analytics, Object Storage, and Streaming. The Notifications target is supported only when no function task is used.
Together with the retention period, the Streaming source's read position determines where in the stream to start moving data. You specify the read position when you specify the source connection.
Private Endpoint Prerequisites for Streams 🔗
Private endpoint configuration is supported for source and target streams. Following are prerequisites for accessing streams that use private endpoints.
Ensure that you're granted permissions according to the following policies:
allow group <group-name> to manage virtual-network-family in compartment id <compartment-ocid>
where any {request.operation='CreatePrivateEndpoint',
request.operation='UpdatePrivateEndpoint',
request.operation='DeletePrivateEndpoint',
request.operation='ChangePrivateEndpointCompartment',
request.operation='EnableReverseConnection',
request.operation='ModifyReverseConnection',
request.operation='DisableReverseConnection'
}
allow group <group-name> to read stream-family in compartment id <compartment-ocid>
Compartment: Select the compartment that contains the stream that you want.
Stream pool: Select the stream pool that contains the stream that you want.
Stream: Select the name of the stream that you want to receive data from.
Read position: Specify the cursor position from which to start reading the stream.
Latest: Starts reading messages published after creating the connector.
If the first run of a new connector with this configuration is successful, then it moves data from the connector's creation time. If the first run fails (such as with missing policies), then after resolution the connector either moves data from the connector's creation time or, if the creation time is outside the retention period, the oldest available data in the stream. For example, consider a connector created at 10 a.m. for a stream with a two-hour retention period. If failed runs are resolved at 11 a.m., then the connector moves data from 10 a.m. If failed runs are resolved at 1 p.m., then the connector moves the oldest available data in the stream.
Later runs move data from the next position in the stream. If a later run fails, then after resolution the connector moves data from the next position in the stream or the oldest available data in the stream, depending on the stream's retention period.
Trim Horizon: Starts reading from the oldest available message in the stream.
If the first run of a new connector with this configuration is successful, then it moves data from the oldest available data in the stream. If the first run fails (such as with missing policies), then after resolution the connector moves the oldest available data in the stream, regardless of the stream's retention period.
Later runs move data from the next position in the stream. If a later run fails, then after resolution the connector moves data from the next position in the stream or the oldest available data in the stream, depending on the stream's retention period.
(Optional)
Under Configure function task, configure a function task to process stream data using the Functions service):
Select task: Select Function.
Compartment: Select the compartment that contains the function that you want.
Function application: Select the name of the function application that includes the function you want.
Function: Select the name of the function that you want to use to process the data received from the source.
For use by the connector as a task, the function must be configured to return one of the following responses:
List of JSON entries (must set the response header Content-Type=application/json)
Single JSON entry (must set the response header Content-Type=application/json)
Single binary object (must set the response header Content-Type=application/octet-stream)
Show additional options: Select this link and specify limits for each batch of data sent to the function. To use manual settings, provide values for batch size limit (KBs) and batch time limit (seconds).
Considerations for function tasks:
Connector Hub doesn't parse the output of the function task. The output of the function task is written as-is to the target. For example, when using a Notifications target with a function task, all messages are sent as raw JSON blobs.
Functions are invoked synchronously with 6 MB of data per invocation. If data exceeds 6 MB, then the connector invokes the function again to move the data that's over the limit. Such invocations are handled sequentially.
Functions can execute for up to five minutes. See Delivery Details.
Function tasks are limited to scalar functions.
If you selected Functions as the target, under Configure target, configure the function to send the log data to. Then, skip to step 15.
Compartment: Select the compartment that contains the function that you want.
Function application: Select the name of the function application that contains the function that you want.
Function: Select the name of the function that you want to send the data to.
Show additional options: Select this link and specify limits for each batch of data sent to the function. To use manual settings, provide values for batch size limit (either KBs or number of messages) and batch time limit (seconds).
For example, limit batch size by selecting either 5,000 kilobytes or 10 messages. An example batch time limit is 5 seconds.
Considerations for Functions targets:
The connector flushes source data as a JSON list in batches. Maximum batch, or payload, size is 6 MB.
Functions are invoked synchronously with 6 MB of data per invocation. If data exceeds 6 MB, then the connector invokes the function again to move the data that's over the limit. Such invocations are handled sequentially.
Functions can execute for up to five minutes. See Delivery Details.
Don't return data from Functions targets to connectors. Connector Hub doesn't read data returned from Functions targets.
If you selected Logging Analytics as the target, under Configure target, configure the log group to send the log data to. Then, skip to step 15.
Compartment: Select the compartment that contains the log group that you want.
If you selected Notifications as the target, under Configure target, configure the topic to send the log data to. Then, skip to step 15.
Compartment: Select the compartment that contains the topic that you want.
Topic: Select the name of the topic that you want to send the data to. For Streaming targets, messages are sent as raw JSON blobs.
Considerations for Notifications targets:
The Notifications target is supported with the Streaming source only when no function task is used.
The maximum message size for the Notifications target is 128 KB. Any message that exceeds the maximum size is dropped.
SMS messages exhibit unexpected results for certain connector configurations. This issue is limited to topics that contain SMS subscriptions for the indicated connector configurations. For more information, see Multiple SMS messages for a single notification.
If you selected Object Storage as the target, under Configure target, configure the bucket to send the log data to. Then, skip to step 15.
Compartment: Select the compartment that contains the bucket that you want.
Bucket: Select the name of the bucket that you want to send the data to.
Object Name Prefix: Optionally enter a prefix value.
Show additional options: Select this link and optionally enter values for batch size (in MBs) and batch time (in milliseconds).
Considerations for Object Storage targets:
Batch rollover details:
Batch rollover size: 100 MB
Batch rollover time: 7 minutes
Files saved to Object Storage are compressed using gzip.
If you selected Streaming as the target, under Configure target, configure the stream to send the log data to.
Compartment: Select the compartment that contains the stream that you want.
Stream: Select the name of the stream that you want to send the data to.
To accept default policies, select the Create link provided for each default policy.
Default policies are offered for any authorization required for this connector to access source, task, and target services.
You can get this authorization through these default policies or through group-based policies. The default policies are offered whenever you use the Console to create or edit a connector. The only exception is when the exact policy already exists in IAM, in which case the default policy isn't offered. For more information about this authorization requirement, see Authentication and Authorization.
If you don't have permissions to accept default policies, contact your administrator.
Automatically created policies remain when connectors are deleted. As a best practice, delete associated policies when deleting the connector.
To review a newly created policy, select the associated view link.
(Optional)
Add one or more tags to the connector: Select Show Advanced Options to show the Add Tags section.
If you have permissions to create a resource, then you also have permissions to apply free-form tags to that resource. To apply a defined tag, you must have permissions to use the tag namespace. For more information about tagging, see Resource Tags. If you're not sure whether to apply tags, skip this option or ask an administrator. You can apply tags later.
Select Create.
The creation process begins, and its progress is displayed. On completion, the connector's details page opens.
Private Endpoints: Configure Ingress and Egress Rules 🔗
If you created a connector with a stream that uses a private endpoint, then configure ingress and egress rules to let the connector access that private endpoint.
Get the IP address for the private endpoint of the stream pool by viewing details of the stream pool on the Console. See Listing Streams and Stream Pools.
Allow primary traffic: Add an ingress rule to the NSG or security list with the following configuration.
To change the source or target to use a different private stream, or to use a source or target other than Streaming, re-create the connector with the source and target that you want. An example of the need for a different private stream is a stream that was moved to a different stream pool. In that case, re-create the connector using the moved stream. Ensure that you deactivate or delete the old connector with the stream source or target that you don't want any more.
Confirm That the New Connector Moves Data 🔗
After you create the connector, confirm that it's moving data.
Enable logs for the connector to get details on data flow.
Check for expected results at the target service.
Confirming that data is moved helps you avoid automatic deactivation, which happens when a connector fails for a long time.
Private streams: To change the source or target to use a different private stream, or to use a source or target other than Streaming, re-create the connector with the source and target that you want. An example of the need for a different private stream is a stream that was moved to a different stream pool. In that case, re-create the connector using the moved stream. Ensure that you deactivate or delete the old connector with the stream source or target that you don't want any more.