Creating a Connector with a Streaming Source

Create a connector in Connector Hub to transfer stream data from the Streaming service to a target service.

For more information about the Streaming service, see Streaming.

A connector that's defined with a Streaming source and (optional) function task supports the following targets: Functions, Logging Analytics, Object Storage, and Streaming. The Notifications target is supported only when no function task is used.

For an example of the Connector Hub workflow, see Overview of Connector Hub. For an example of a connector that uses Monitoring as the source, see Scenario: Sending Metrics to Object Storage.

The retention period for the Streaming source in Connector Hub is customer-defined. See Limits on Streaming Resources. For more information about delivery, see Delivery Details.

Together with the retention period, the Streaming source's read position determines where in the stream to start moving data. You specify the read position when you specify the source connection.

Note

  • For stream input schema, see Message Reference.
  • For a Notifications target with Streaming source, all messages are sent as raw JSON blobs.
    1. Open the navigation menu and click Analytics & AI. Under Messaging, click Connector Hub.
    2. On the Connectors page, select a compartment.
    3. Click Create connector.
    4. On the Create connector page, enter a user-friendly name for the new connector and an optional description. Avoid entering confidential information.
    5. Select the compartment where you want to store the new connector.
    6. Under Configure connector, for Source, select Streaming.
    7. For Target, select the service that you want to transfer the stream data to:
      • Functions: Send stream data to a function.
      • Logging Analytics: Send stream data to a log group.
      • Notifications: Send stream data to a topic. Notifications is supported only when no function task is defined.
      • Object Storage: Send stream data to a bucket.
      • Streaming: Send stream data to a stream.
    8. (Optional) To enable service logs for the new connector, click the Logs switch and provide the following values:
      • Log category: The value Connector Tracking is automatically selected.
      • Compartment: Select the compartment that you want for storing the service logs for the connector.
      • Log group: Select the log group that you want for storing the service logs. To create a new log group, click Create new group and then enter a name.
      • Log name: Optionally enter a name for the log.
      • Show advanced options:
        • Log retention: Optionally specify how long to keep the service logs (default: 30 days).
    9. Under Configure source connection, select the source stream:
      • Compartment: Select the compartment that contains the stream that you want.
      • Stream pool: Select the stream pool that contains the stream that you want.

        Private endpoint configuration isn't supported. For stream pool configuration details, see Creating Stream Pools.

      • Stream: Select the name of the stream that you want to receive data from.
      • Read position: Specify the cursor position from which to start reading the stream.
        • Latest: Starts reading messages published after creating the connector.
          • If the first run of a new connector with this configuration is successful, then it moves data from the connector's creation time. If the first run fails (such as with missing policies), then after resolution the connector either moves data from the connector's creation time or, if the creation time is outside the retention period, the oldest available data in the stream. For example, consider a connector created at 10 a.m. for a stream with a two-hour retention period. If failed runs are resolved at 11 a.m., then the connector moves data from 10 a.m. If failed runs are resolved at 1 p.m., then the connector moves the oldest available data in the stream.
          • Later runs move data from the next position in the stream. If a later run fails, then after resolution the connector moves data from the next position in the stream or the oldest available data in the stream, depending on the stream's retention period.
        • Trim Horizon: Starts reading from the oldest available message in the stream.
          • If the first run of a new connector with this configuration is successful, then it moves data from the oldest available data in the stream. If the first run fails (such as with missing policies), then after resolution the connector moves the oldest available data in the stream, regardless of the stream's retention period.
          • Later runs move data from the next position in the stream. If a later run fails, then after resolution the connector moves data from the next position in the stream or the oldest available data in the stream, depending on the stream's retention period.
    10. (Optional) Under Configure function task, configure a function task to process stream data using the Functions service):
      • Select task: Select Function.
      • Compartment: Select the compartment that contains the function that you want.
      • Function application: Select the name of the function application that includes the function you want.
      • Function: Select the name of the function that you want to use to process the data received from the source.

        For use by the connector as a task, the function must be configured to return one of the following responses:

        • List of JSON entries (must set the response header Content-Type=application/json)
        • Single JSON entry (must set the response header Content-Type=application/json)
        • Single binary object (must set the response header Content-Type=application/octet-stream)
      • Show additional options: Click this link and specify limits for each batch of data sent to the function. If you choose to use manual settings, provide values for batch size limit (KBs) and batch time limit (seconds).

      Considerations for function tasks:

      • Connector Hub doesn't parse the output of the function task. The output of the function task is written as-is to the target. For example, when using a Notifications target with a function task, all messages are sent as raw JSON blobs.
      • Functions are invoked synchronously with 6 MB of data per invocation. If data exceeds 6 MB, then the connector invokes the function again to move the data that's over the limit. Such invocations are handled sequentially.
      • Functions can execute for up to five minutes.
      • Function tasks are limited to scalar functions.
    11. If you selected Functions as the target, under Configure target, configure the function to send the log data to. Then, skip to step 15.
      • Compartment: Select the compartment that contains the function that you want.
      • Function application: Select the name of the function application that contains the function that you want.
      • Function: Select the name of the function that you want to send the data to.
      • Show additional options: Click this link and specify limits for each batch of data sent to the function. If you choose to use manual settings, provide values for batch size limit (either KBs or number of messages) and batch time limit (seconds).

        For example, limit batch size by selecting either 5,000 kilobytes or 10 messages. An example batch time limit is 5 seconds.

      Considerations for Functions targets:

      • The connector flushes source data as a JSON list in batches. Maximum batch, or payload, size is 6 MB.
      • Functions are invoked synchronously with 6 MB of data per invocation. If data exceeds 6 MB, then the connector invokes the function again to move the data that's over the limit. Such invocations are handled sequentially.
      • Functions can execute for up to five minutes.
      • Don't return data from Functions targets to connectors. Connector Hub doesn't read data returned from Functions targets.
    12. If you selected Logging Analytics as the target, under Configure target, configure the log group to send the log data to. Then, skip to step 15.
      • Compartment: Select the compartment that contains the log group that you want.
      • Log group: Select the log group that you want.
      • Log source identifier (for Streaming source only): Select the log source.
    13. If you selected Notifications as the target, under Configure target, configure the topic to send the log data to. Then, skip to step 15.
      • Compartment: Select the compartment that contains the topic that you want.
      • Topic: Select the name of the topic that you want to send the data to. For Streaming targets, messages are sent as raw JSON blobs.

      Considerations for Notifications targets:

      • The Notifications target is supported with the Streaming source only when no function task is used.
      • The maximum message size for the Notifications target is 128 KB. Any message that exceeds the maximum size is dropped.
      • SMS messages exhibit unexpected results for certain connector configurations. This issue is limited to topics that contain SMS subscriptions for the indicated connector configurations. For more information, see Multiple SMS messages for a single notification.
    14. If you selected Object Storage as the target, under Configure target, configure the bucket to send the log data to. Then, skip to step 15.
      • Compartment: Select the compartment that contains the bucket that you want.
      • Bucket: Select the name of the bucket that you want to send the data to.
      • Object Name Prefix: Optionally enter a prefix value.
      • Show additional options: Click this link and optionally enter values for batch size (in MBs) and batch time (in milliseconds).

      Considerations for Object Storage targets:

      • Batch rollover details:

        • Batch rollover size: 100 MB
        • Batch rollover time: 7 minutes
      • Files saved to Object Storage are compressed using gzip.

    15. If you selected Streaming as the target, under Configure target, configure the stream to send the log data to.
      • Compartment: Select the compartment that contains the stream that you want.
      • Stream: Select the name of the stream that you want to send the data to.

      Private endpoint configuration isn't supported. For stream pool configuration details, see Creating Stream Pools.

    16. To accept default policies, click the Create link provided for each default policy.

      Default policies are offered for any authorization required for this connector to access source, task, and target services.

      You can get this authorization through these default policies or through group-based policies. The default policies are offered whenever you use the Console to create or edit a connector. The only exception is when the exact policy already exists in IAM, in which case the default policy isn't offered. For more information about this authorization requirement, see Authentication and Authorization.

      • If you don't have permissions to accept default policies, contact your administrator.
      • Automatically created policies remain when connectors are deleted. As a best practice, delete associated policies when deleting the connector.

      To review a newly created policy, click the associated view link.

    17. (Optional) Assign tags to the connector. Click Show advanced options and then provide values for the tagging fields.
      • Tag namespace: To add a defined tag, select an existing namespace. If adding a free-from tag, leave blank.
      • Tag key: To add a defined tag, select an existing tag key. To add a free-form tag, type the key name that you want.
      • Tag value: Type the tag value that you want.
      • Add tag: Click to add another tag.
    18. Click Create.
  • Use the oci sch service-connector create command and required parameters to create a connector with a Streaming source:

    oci sch service-connector create --display-name "<display_name>" --compartment-id <compartment_OCID> --source [<stream_source_in_JSON>] --target [<target_in_JSON>]

    For a complete list of parameters and values for CLI commands, see the CLI Command Reference.

  • Run the CreateServiceConnector operation to create a connector.

    To create a connector with a Streaming source, populate source in the request (CreateServiceConnectorDetails) with streaming details. For an example, see StreamingSourceDetails.

Confirm That the New Connector Moves Data

After you create the connector, confirm that it's moving data.

  • Enable logs for the connector to get details on data flow.
  • Check for expected results at the target service.

Confirming that data is moved helps you avoid automatic deactivation, which happens when a connector fails for a long time.