Creating a Connector with a Queue Source

Create a connector in Connector Hub to transfer messages from a queue in the Queue service to a target service.

For more information about the Queue service, see Queue.

A connector that's defined with a Queue source and (optional) function task supports the following targets: Functions, Notifications, Object Storage, and Streaming. For an example of the Connector Hub workflow, see Overview of Connector Hub.

Long Polling

By default, connectors use long polling to read messages from queues. Long polling is useful to prevent tight loop retries when the queue is empty. The read timeout for reading from a queue is 30 seconds. In each move operation, the connector waits for consumable messages to be available in the queue. If the read timeout passes and no messages are available for consumption, then the connector ends the request and retries with another request.

Filters and Consumed Messages

You can filter a queue for specific channels. For API and CLI parameters to filter a queue, see Consuming Messages from a Channel. A message that has been transferred to the connector's target is considered "consumed." To meet requirements of the Queue service, the connector deletes transferred messages from the source queue. For more information, see Consuming Messages.

    1. Open the navigation menu and click Analytics & AI. Under Messaging, click Connector Hub.
    2. On the Connectors page, select a compartment.
    3. Click Create connector.
    4. On the Create connector page, enter a user-friendly name for the new connector and an optional description. Avoid entering confidential information.
    5. Select the compartment where you want to store the new connector.
    6. Under Configure connector, for Source, select Queue.
    7. For Target, select the service that you want to transfer the messages to:
      • Functions: Send messages from a queue to a function.
      • Notifications: Send messages from a queue to a topic. Notifications is supported only when no function task is defined.
      • Object Storage: Send messages from a queue to a bucket.
      • Streaming: Send messages from a queue to a stream.
    8. (Optional) To enable service logs for the new connector, click the Logs switch and provide the following values:
      • Log category: The value Connector Tracking is automatically selected.
      • Compartment: Select the compartment that you want for storing the service logs for the connector.
      • Log group: Select the log group that you want for storing the service logs. To create a new log group, click Create new group and then enter a name.
      • Log name: Optionally enter a name for the log.
      • Show advanced options:
        • Log retention: Optionally specify how long to keep the service logs (default: 30 days).
    9. Under Configure source connection, select the queue that contains the messages that you want:
      • Compartment: Select the compartment that contains the queue that you want.
      • Queue: Select the queue that contains the messages that you want.
      • Channel Filter (under Message filtering) (optional): To filter messages from channels in the queue, enter a value.

        For example, to filter messages by channel ID, enter the channel ID.

        For supported values, see channelFilter at GetMessages (Queue API).

      Note

      A message that has been transferred to the connector's target is considered "consumed." To meet requirements of the Queue service, the connector deletes transferred messages from the source queue. For more information, see Consuming Messages.
    10. (Optional) Under Configure function task, configure a function task to process messages from the queue using the Functions service):
      • Select task: Select Function.
      • Compartment: Select the compartment that contains the function that you want.
      • Function application: Select the name of the function application that includes the function you want.
      • Function: Select the name of the function that you want to use to process the data received from the source.

        For use by the connector as a task, the function must be configured to return one of the following responses:

        • List of JSON entries (must set the response header Content-Type=application/json)
        • Single JSON entry (must set the response header Content-Type=application/json)
        • Single binary object (must set the response header Content-Type=application/octet-stream)
      • Show additional options: Click this link and specify limits for each batch of data sent to the function. If you choose to use manual settings, provide values for batch size limit (KBs) and batch time limit (seconds).

      Considerations for function tasks:

      • Connector Hub doesn't parse the output of the function task. The output of the function task is written as-is to the target. For example, when using a Notifications target with a function task, all messages are sent as raw JSON blobs.
      • Functions are invoked synchronously with 6 MB of data per invocation. If data exceeds 6 MB, then the connector invokes the function again to move the data that's over the limit. Such invocations are handled sequentially.
      • Functions can execute for up to five minutes.
      • Function tasks are limited to scalar functions.
    11. If you selected Functions as the target, under Configure target, configure the function to send the messages from the queue to. Then, skip to step 15.
      • Compartment: Select the compartment that contains the function that you want.
      • Function application: Select the name of the function application that contains the function that you want.
      • Function: Select the name of the function that you want to send the data to.
      • Show additional options: Click this link and specify limits for each batch of data sent to the function. If you choose to use manual settings, provide values for batch size limit (either KBs or number of messages) and batch time limit (seconds).

        For example, limit batch size by selecting either 5,000 kilobytes or 10 messages. An example batch time limit is 5 seconds.

      Considerations for Functions targets:

      • The connector flushes source data as a JSON list in batches. Maximum batch, or payload, size is 6 MB.
      • Functions are invoked synchronously with 6 MB of data per invocation. If data exceeds 6 MB, then the connector invokes the function again to move the data that's over the limit. Such invocations are handled sequentially.
      • Functions can execute for up to five minutes.
      • Don't return data from Functions targets to connectors. Connector Hub doesn't read data returned from Functions targets.
    12. If you selected Notifications as the target, under Configure target, configure the topic to send the messages from the queue to. Then, skip to step 15.
      • Compartment: Select the compartment that contains the topic that you want.
      • Topic: Select the name of the topic that you want to send the data to.
      • Message format: Select the option that you want:
        Note

        Message format options are available for connectors with Logging source only. These options aren't available for connectors with function tasks. When Message format options aren't available, messages are sent as raw JSON blobs.
        • Send formatted messages: Simplified, user-friendly layout.

          To view supported subscription protocols and message types for formatted messages, see Friendly Formatting.

        • Send raw messages: Raw JSON blob.

      Considerations for Notifications targets:

      • The maximum message size for the Notifications target is 128 KB. Any message that exceeds the maximum size is dropped.
      • SMS messages exhibit unexpected results for certain connector configurations. This issue is limited to topics that contain SMS subscriptions for the indicated connector configurations. For more information, see Multiple SMS messages for a single notification.
    13. If you selected Object Storage as the target, under Configure target, configure the bucket to send the messages from the queue to. Then, skip to step 15.
      • Compartment: Select the compartment that contains the bucket that you want.
      • Bucket: Select the name of the bucket that you want to send the data to.
      • Object Name Prefix: Optionally enter a prefix value.
      • Show additional options: Click this link and optionally enter values for batch size (in MBs) and batch time (in milliseconds).

      Considerations for Object Storage targets:

      • Batch rollover details:

        • Batch rollover size: 100 MB
        • Batch rollover time: 7 minutes
      • Files saved to Object Storage are compressed using gzip.

      • Format of data moved from a Monitoring source: Objects. The connector partitions source data from Monitoring by metric namespace and writes the data for each group (namespace) to an object. Each object name includes the following elements.

        <object_name_prefix>/<service_connector_ocid>/<metric_compartment_ocid>/<metric_namespace>/<data_start_timestamp>_<data_end_timestamp>.<sequence_number>.<file_type>.gz

        Within an object, each set of data points is appended to a new line.

    14. If you selected Streaming as the target, under Configure target, configure the stream to send the messages from the queue to.
      • Compartment: Select the compartment that contains the stream that you want.
      • Stream: Select the name of the stream that you want to send the data to.

      Considerations for Streaming targets:

      • Private endpoint configuration isn't supported. For stream pool configuration details, see Creating Stream Pools.
      • Format of data moved from a Monitoring source: Each object is written as a separate message.
    15. To accept default policies, click the Create link provided for each default policy.

      Default policies are offered for any authorization required for this connector to access source, task, and target services.

      You can get this authorization through these default policies or through group-based policies. The default policies are offered whenever you use the Console to create or edit a connector. The only exception is when the exact policy already exists in IAM, in which case the default policy isn't offered. For more information about this authorization requirement, see Authentication and Authorization.

      • If you don't have permissions to accept default policies, contact your administrator.
      • Automatically created policies remain when connectors are deleted. As a best practice, delete associated policies when deleting the connector.

      To review a newly created policy, click the associated view link.

    16. (Optional) Assign tags to the connector. Click Show advanced options and then provide values for the tagging fields.
      • Tag namespace: To add a defined tag, select an existing namespace. If adding a free-from tag, leave blank.
      • Tag key: To add a defined tag, select an existing tag key. To add a free-form tag, type the key name that you want.
      • Tag value: Type the tag value that you want.
      • Add tag: Click to add another tag.
    17. Click Create.
  • Use the oci sch service-connector create command and required parameters to create a connector with a Queue source:

    oci sch service-connector create [...] --source <queue_json_input>

    To pass source details to CLI using a JSON file:

    oci sch service-connector create [...] --source file:queue_source.json

    Example JSON file contents:

    {
        "kind": "plugin",
        "pluginName": "QueueSource",
        "configMap": {
            "queueId": "<queue_json_input>"
        }
    }

    For a complete list of parameters and values for CLI commands, see the CLI Command Reference.

  • Run the CreateServiceConnector operation to create a connector.

    To create a connector with a Queue source, populate source in the request (CreateServiceConnectorDetails) with a connector plugin that uses Queue (pluginName QueueSource, with each queue OCID in configMap). For an example, see PluginSourceDetails.