Message streaming

Message streaming enables the Reltio platform to process events from an internal queue into an external queue or topic in JSON format.

When the Reltio Platform processes events from an internal queue, message streaming sends Create/Read/Update/Delete (CRUD) and MATCH events into the following external message bus queues or topics in JSON format. The following queues are supported:
  • Amazon Simple Queue Service (SQS)
  • Amazon Simple Notification Service (SNS)
  • Google Cloud Pub/Sub (Publisher/Subscriber)
  • Microsoft Azure Service Bus
Message Streaming includes the following tasks:
  • The messages are sent into different queues
  • The payloads of messages are filtered
  • The messages are filtered by types of event
  • The messages are filtered by object values

The following image explains how the internal event queues are turned into external queues.

Reltio recommends that you don’t use events in external queues for real-time interactions, which can cause delays of up to 10 minutes in receiving the events. The data processing activities in Reltio generate events that flow through internal event queues to keep the data up to date in different Reltio databases. Processing of events in the event queue is an asynchronous process. Events are transferred to a secondary queue after they’re successfully processed by the primary queue. These events support features such as search, matching, history, and interactions. The time it takes to update all databases can vary depending on factors such as the queue size and configuration parameters.

Reltio message streaming is NOT FIFO - and here's why

In Reltio Platform, multiple requests are processed in parallel using multiple nodes. Sending simultaneous requests enables fast and efficient scaling, although it can also create a potential scenario where one request that comes a few milliseconds after an earlier request is processed faster. The second request will then get into the event queue first. Reltio doesn’t guarantee the order of processing in the same order that requests are added to the queue, because of various internal processes. In this scenario, the use of FIFO isn’t relevant, and instead, the use of object version and message timestamp is recommended to order events.

Configuration: Tenant configuration

Note: For the tenant configuration:
  • The skipPayload field for streaming configuration must be added.
  • The original JMS attributes JMSEventsTimeToLive and JMSEventsFilteringFields are still used.
The Streaming Configuration example describes the new format of the streaming configuration.

Example: Streaming Configuration



"streamingConfig": {
  "streamingEnabled": "true",
  "streamingAPIEnabled": false,
  "analyzeOvChanges": false,
  "JMSEventsTimeToLive": 86400000,
  "largeObjectsSupport": false,
  "JMSEventsFilteringFields": [
    "uri",
    "crosswalks",
    "type",
    "VeevaSync",
    "createdBy",
    "createdTime",
    "updatedBy",
    "updatedTime",
    "startObject",
    "endObject",
    "attributes"
  ],
  "messaging": {
    "destinations": [
      {
        "provider": "default",
        "type": "queue",
        "name": "queue.MyTenant.allEvents"
      },
      {
        "provider": "default",
        "type": "queue",
        "name": "queue.MyTenant.dtssEvents",
        "dtssQueue": true
      }
    ]
  }
}

To know about the various properties in the streaming configuration, see Table 1: Streaming Configuration Parameters

Table 1. Streaming configuration parameters
Property name Required Description Type
streamingEnabled Yes This parameter enables or disables events publishing. boolean
streamingAPIEnabled No If the value of the parameter is false then events from a data load endpoint and a reindex endpoint aren’t sent to JMS.

The default value is true.

boolean
skipPayload No If the value is true, the platform only returns the type of event and ovChanges properties, and not the entire payload. The default value is false. boolean
analyzeOvChanges No If the value of the parameter is true, then all ENTITY_CHANGED events will include an extra boolean attribute ovChanged.

This indicates whether ov=true attribute values were changed or not.

The default value is false. Also, you may specify analyzeOvChanges attribute for any REST request to the API. This attribute will override the option from streamingConfig.

boolean
JMSEventsTimeToLive Yes This parameter indicates the expiration time for events in milliseconds. long
JMSEventsFilteringFields No Indicates the entity fields that must be included in the event. If it isn’t specified, then the filtering isn’t done and all content belonging to the entity is added in the event. array
RelationEventsFilteringFields No Indicates the entity’s relation fields that must be included in the event. If it isn’t specified, then the filtering isn’t done and all content belonging to the entity is added in the event. array
largeObjectsSupport No The support of large objects in External Streaming Service is implemented. When the compressed size of the event exceeds the following limit, the event will be sent again to include the URI of the object. The exceededQueueSizeLimit:true field instead of the entire message.

The default value is false.

boolean
JMSIncludeMergeTime No If the value of the parameter is true, the updatedTime are added to the ENTITIES_MERGED event.

The default value is false.

boolean
emptyStartEndRelationCrosswalks No If the value of the parameter is true then crosswalks collections ofstartObject and endObject in relation events are always empty.

The default value is false.

boolean
messaging No A list of Messaging Destination configurations.

At least one dest is required.

object array

Messaging destinations

Tenants may have event messages published to multiple destinations. The new streaming configuration enables the setting of the values of the JMSEventsFilteringFields and RelationEventsFilteringFields parameters for each destination. These filters will override the main filters. Now, you can also skip the entire payload for each destination using the skipPayload flag.
"streamingConfig": {
    "streamingEnabled": true,
    "streamingAPIEnabled": true,
    "JMSUser": "...",
    "JMSPassword": "...",
    "JMSBrokerURL": "...",
    "JMSDestinationType": "Queue",
    "JMSDestinationPrefix": "Reltio.Events",
    "JMSIncludeMergeTime": true,
    "analyzeOvChanges": false,
    "emptyStartEndRelationCrosswalks": false,
    "largeObjectsSupport": false,
    "JMSEventsTimeToLive": 86400000,
    "JMSEventsFilteringFields": [
        "uri",
        "crosswalks",
        "type"
    ],
    "RelationEventsFilteringFields": [
        "type",
        "uri"
    ],
    "skipPayload": false,
    "messaging": {
        "destinations": [
            {
                "provider": "default",
                "type": "queue",
                "name": "queue.MyTenant.allEvents",
                "JMSEventsFilteringFields": [
                    "uri",
                    "type"
                ],
                "RelationEventsFilteringFields": [
                    "type"                
                ],
                "skipPayload": false
            },
            {
                "provider": "default",
                "type": "queue",
                "name": "queue.MyTenant.dtssEvents",
                "skipPayload": true,
                "dtssQueue": true
            }
        ]
    }
}

To know about the attribute for each destination that is specified in the tenant configuration, see Table 2: Messaging Destination Attributes

Table 2. Messaging destination attributes
Attribute Required Default Description
provider No default value Defines the alias of the provider, as configured for the Reltio environment.
type No queue Defines the type of destination. The supported types may be provider-specific.
name Yes NA Defines the name of the destination.
dtssQueue No false Defines a true or false flag indicating that this queue is used by DTSS.
enabled No true Indicates whether this destination is used or ignored.
typeFilter No null Defines the collection of event type names to stream. Events of different types will be ignored.
awsAccountId No null Defines the AWS account identifier for SQS queue. This attribute uniquely identifies an AWS account and provides a full queue for Amazon Resource Name (ARN) within the name field.
objectFilter No null Defines the filter expression for the event object. The event must not be sent to a destination if the object doesn’t match with the filter. The expression structure is the same as for search filters.

For more information, see Filtering Entities.

format No JSON Defines the format of the events that are sent into the queue.

JSON - message will be sent in string JSON format. JSON_ZIP_BASE64 is the string JSON that was zipped using gzip and converted to Base64. For more information, see topics Zip files with GZIP and Decode and use JSON_ZIP_BASE64 messages.

skipPayload No boolean If the value is true, the platform only returns the type of event and ovChanges properties, and not the entire payload. The default value is false.
payloadType No snapshot Specifies the content payload type, which can be either snapshot (the full object state after it was modified) or deltas (the changed object attributes).
JMSEventsFilteringFields No NA Indicates the entity fields that must be included in the event. If it’s isn’t specified, then the filtering isn’t done and all content belonging to the entity is added in the event.
RelationEventsFilteringFields No NA Indicates the entity’s relation fields that must be included in the event. If it isn’t specified, then the filtering isn’t done and all content belonging to the entity is added in the event.

Decode and use JSON_ZIP_BASE64 messages

JSON_ZIP_BASE64 messages are JSON messages encoded with Base64-encoded data. These JSON messages (UTF-8) are first zipped using gzip and then encoded to base64.

To decode and use the message, you must first perform these steps, and then use the JSON file to build integrations:
  1. Run the code to decode the Base64 string to a bytes array in your console.
  2. Unzip the string using gzip.
    For example:
    echo "H4sIAAAAAAAAAKtWKkmtKFGyUlDySM3JyVcozy/KSVGqBQD//REqFwAAAA==" | base64 --decode | gunzip
The unzipped byte sequence is now displayed as a JSON string. For example:
{"text": "Hello world"}

Destination API

Use the following request to post the destination streaming configuration for the tenant:
POST /tenants/{tenantId}/messaging/destinations
The request body is as shown:
{
  "provider": "default",
  "type": "queue",
  "name": "queue.MyTenant.allEvents",
  "JMSEventsFilteringFields": [
    "uri",
    "type"
  ],
  "RelationEventsFilteringFields": [
    "type"
  ],
  "skipPayload": false
}

Example Configurations

The following configuration displays the body of the event when the skipPayload flag is set to false:
{
  "type": "ENTITY_CHANGED",
  "object": {
    "uri": "entities/e1",
    "roles": [
      "configuration/roles/SeniorMember",
      "configuration/roles/CTO"
    ],
    "label": "John Doe",
    …
  },
  "ovChanged": true
}
{
  "type": "ENTITY_CHANGED",
  "ovChanged": true
}
The following configuration displays the body of the event when the skipPayload flag is set to true:
{
  "type": "ENTITY_CHANGED",
  "ovChanged": true
}