Accelerate the Value of Data

Message streaming

Message streaming enables the Reltio platform to process events from an internal queue into an external queue or topic in JSON format.

When the Reltio Platform processes events from an internal queue, message streaming sends Create/Read/Update/Delete (CRUD) and MATCH events into the following external message bus queues or topics in JSON format. The following queues are supported:
  • Amazon Simple Queue Service (SQS)
  • Amazon Simple Notification Service (SNS)
  • Google Cloud Pub/Sub (Publisher/Subscriber)
  • Microsoft Azure Service Bus
Message Streaming includes the following tasks:
  • The messages are sent into different queues
  • The payloads of messages are filtered
  • The messages are filtered by types of event
  • The messages are filtered by object values
  • The messages are filtered for Operational values

The following image explains how the internal event queues are turned into external queues.

Reltio recommends that you don’t use events in external queues for real-time interactions, which can cause delays of up to 10 minutes in receiving the events. The data processing activities in Reltio generate events that flow through internal event queues to keep the data up to date in different Reltio databases. Processing of events in the event queue is an asynchronous process. Events are transferred to a secondary queue after they’re successfully processed by the primary queue. These events support features such as search, matching, history, and interactions. The time it takes to update all databases can vary depending on factors such as the queue size and configuration parameters.

Reltio message streaming is NOT FIFO - and here's why

In Reltio Platform, multiple requests are processed in parallel using multiple nodes. Sending simultaneous requests enables fast and efficient scaling, although it can also create a potential scenario where one request that comes a few milliseconds after an earlier request is processed faster. The second request will then get into the event queue first. Reltio doesn’t guarantee the order of processing in the same order that requests are added to the queue, because of various internal processes. In this scenario, the use of FIFO isn’t relevant, and instead, the use of object version and message timestamp is recommended to order events.

Configuration: Tenant configuration

Note: For the tenant configuration:
  • The skipPayload field for streaming configuration must be added.
  • The original JMS attributes JMSEventsTimeToLive and JMSEventsFilteringFields are still used.
The Streaming Configuration example describes the new format of the streaming configuration.

Example: Streaming Configuration



"streamingConfig": {
  "streamingEnabled": "true",
  "streamingAPIEnabled": false,
  "analyzeOvChanges": false,
  "JMSEventsTimeToLive": 86400000,
  "largeObjectsSupport": false,
  "JMSEventsFilteringFields": [
    "uri",
    "crosswalks",
    "type",
    "VeevaSync",
    "createdBy",
    "createdTime",
    "updatedBy",
    "updatedTime",
    "startObject",
    "endObject",
    "attributes"
  ],
  "messaging": {
    "destinations": [
      {
        "provider": "default",
        "type": "queue",
        "name": "queue.MyTenant.allEvents"
      },
      {
        "provider": "default",
        "type": "queue",
        "name": "queue.MyTenant.dtssEvents",
        "dtssQueue": true
      }
    ]
  }
}

To know about the various properties in the streaming configuration, see Table 1: Streaming Configuration Parameters

Table 1. Streaming configuration parameters
Property nameRequired DescriptionType
streamingEnabledYesThis parameter enables or disables events publishing.boolean
streamingAPIEnabled NoIf the value of the parameter is false then events from a data load endpoint and a reindex endpoint aren’t sent to JMS.

The default value is true.

boolean
skipPayloadNoIf the value is true, the platform only returns the type of event and ovChanges properties, and not the entire payload. The default value is false.boolean
analyzeOvChangesNoIf the value of the parameter is true, then all ENTITY_CHANGED events will include an extra boolean attribute ovChanged.

This indicates whether ov=true attribute values were changed or not.

The default value is false. Also, you may specify analyzeOvChanges attribute for any REST request to the API. This attribute will override the option from streamingConfig.

boolean
JMSEventsTimeToLiveYesThis parameter indicates the expiration time for events in milliseconds.long
JMSEventsFilteringFieldsNoIndicates the entity fields that must be included in the event. If it isn’t specified, then the filtering isn’t done and all content belonging to the entity is added in the event.array
RelationEventsFilteringFieldsNoIndicates the entity’s relation fields that must be included in the event. If it isn’t specified, then the filtering isn’t done and all content belonging to the entity is added in the event.array
largeObjectsSupportNoThe support of large objects in External Streaming Service is implemented. When the compressed size of the event exceeds the following limit, the event will be sent again to include the URI of the object. The exceededQueueSizeLimit:true field instead of the entire message.

The default value is false.

boolean
JMSIncludeMergeTimeNoIf the value of the parameter is true, the updatedTime are added to the ENTITIES_MERGED event.

The default value is false.

boolean
emptyStartEndRelationCrosswalksNoIf the value of the parameter is true then crosswalks collections ofstartObject and endObject in relation events are always empty.

The default value is false.

boolean
messagingNoA list of Messaging Destination configurations.

At least one dest is required.

object array

Messaging destinations

Tenants may have event messages published to multiple destinations. The new streaming configuration enables the setting of the values of the JMSEventsFilteringFields and RelationEventsFilteringFields parameters for each destination. These filters will override the main filters. Now, you can also skip the entire payload for each destination using the skipPayload flag.
"streamingConfig": {
    "streamingEnabled": true,
    "streamingAPIEnabled": true,
    "JMSUser": "...",
    "JMSPassword": "...",
    "JMSBrokerURL": "...",
    "JMSDestinationType": "Queue",
    "JMSDestinationPrefix": "Reltio.Events",
    "JMSIncludeMergeTime": true,
    "analyzeOvChanges": false,
    "emptyStartEndRelationCrosswalks": false,
    "largeObjectsSupport": false,
    "JMSEventsTimeToLive": 86400000,
    "JMSEventsFilteringFields": [
        "uri",
        "crosswalks",
        "type"
    ],
    "RelationEventsFilteringFields": [
        "type",
        "uri"
    ],
    "skipPayload": false,
    "messaging": {
        "destinations": [
            {
                "provider": "default",
                "type": "queue",
                "name": "queue.MyTenant.allEvents",
                "JMSEventsFilteringFields": [
                    "uri",
                    "type"
                ],
                "RelationEventsFilteringFields": [
                    "type"                
                ],
                "skipPayload": false
            },
            {
                "provider": "default",
                "type": "queue",
                "name": "queue.MyTenant.dtssEvents",
                "skipPayload": true,
                "dtssQueue": true
            }
        ]
    }
}

Include only Operational values (OV)

You can also opt to include only the operational values in the queue destination by specifying the ovOnly parameter. A sample is given below:
{
	(...),
	"streamingConfig": {
		(...),
		"ovOnly": true|false,
		"messaging": {
			"destinations": [
				{
					(...),
					"ovOnly": true|false
				}
			]
		}
	}
}
The ovOnly parameter is added at the following two levels in streaming configuration:
  • As a boolean value at the root of the streamingConfig object, which defines the global state of this property and defaults to false.
  • As a boolean value in the defined messaging destination. If you do not define this value, the value in the streamingConfig root is taken as default.

If you set this parameter to true, in either of the above two levels, events will be filtered based on the event and payload type. If the Payload type is snapshot, attributes that have operational values are streamed. If the Payload type is Delta, all changed events are filtered (for example, ENTITY_CHANGED or RELATIONSHIP_CHANGED events) based on the ovChanged attribute.

Let's see a few examples where events are streamed and where they are not.

  • Example 1 - Ignored event that will not be streamed:
    {
    	"type": "ENTITY_CHANGED",
    	"uri": "entities/hcp1",
    	"deltas": {
    		"ovChanged": false,
    		"delta": [
    			{
    				"type": "ATTRIBUTE_ADDED",
    				"attributeType": "configuration/entityTypes/HCP/attributes/LastName",
    				"newValue": {
    					"value": "LN2",
    					"ov": false,
    					"id": "7",
    					"sources": [
    						"NPI"
    					],
    					"crosswalks": [
    						{
    							"type": "configuration/sources/NPI",
    							"value": "NPI2",
    							"sourceTable": "source_table"
    						}
    					]
    				}
    			}
    		],
    		"entityType": "HCP"
    	}
    }
  • Example 2 - Ignored event that will not be streamed
    {
    	"type": "ENTITY_CHANGED",
    	"uri": "entities/hcp1",
    	"deltas": {
    		"ovChanged": false,
    		"delta": [
    			{
    				"type": "ATTRIBUTE_CHANGED",
    				"attributeType": "configuration/entityTypes/HCP/attributes/LastName",
    				"newValue": {
    					"value": "LN2",
    					"ov": false,
    					"id": "7",
    					"sources": [
    						"NPI"
    					],
    					"crosswalks": [
    						{
    							"type": "configuration/sources/NPI",
    							"value": "NPI2",
    							"sourceTable": "source_table"
    						}
    					]
    				},
    				"oldValue": {
    					"value": "LN1",
    					"ov": false,
    					"id": "7",
    					"sources": [
    						"NPI"
    					],
    					"crosswalks": [
    						{
    							"type": "configuration/sources/NPI",
    							"value": "NPI2",
    							"sourceTable": "source_table"
    						}
    					]
    				}
    			}
    		],
    		"entityType": "HCP"
    	}
    }
  • Example 3 - Ignored event that will not be streamed
    {
    	"type": "RELATIONSHIP_CHANGED",
    	"uri": "relations/rel1",
    	"deltas": {
    		"ovChanged": false,
    		"relationType": "HasAddress",
    		"startObjectUri": "entities/hcp1",
    		"endObjectUri": "entities/loc1",
    		"delta": [
    			{
    				"type": "ATTRIBUTE_REMOVED",
    				"attributeType": "configuration/relationTypes/HasAddress/attributes/AddressType",
    				"oldValue": {
    					"value": "AddressType",
    					"ov": false,
    					"id": "19",
    					"sources": [
    						"NPI"
    					],
    					"crosswalks": [
    						{
    							"type": "configuration/sources/NPI",
    							"value": "NPI_REL2",
    							"sourceTable": "source_table"
    						}
    					]
    				}
    			}
    		],
    		"startObjectType": "HCP",
    		"endObjectType": "Location"
    	}
    }
  • Example 4 - Filtered event
    • Original event that is streamed only if ovValue is false
      {
      	"type": "ENTITY_CHANGED",
      	"uri": "entities/hcp1",
      	"deltas": {
      		"ovChanged": true,
      		"delta": [
      			{
      				"type": "ATTRIBUTE_ADDED",
      				"attributeType": "configuration/entityTypes/HCP/attributes/Address",
      				"newValue": {
      					"ov": true,
      					"id": "rel1",
      					"crosswalks": [
      						{
      							"type": "configuration/sources/NPI",
      							"value": "NPI_REL1",
      							"sourceTable": "source_table"
      						}
      					]
      				}
      			},
      			{
      				"type": "ATTRIBUTE_ADDED",
      				"attributeType": "configuration/entityTypes/HCP/attributes/LastName",
      				"newValue": {
      					"value": "LN2",
      					"ov": false,
      					"id": "7",
      					"sources": [
      						"NPI"
      					],
      					"crosswalks": [
      						{
      							"type": "configuration/sources/NPI",
      							"value": "NPI2",
      							"sourceTable": "source_table"
      						}
      					]
      				}
      			}
      		],
      		"entityType": "HCP"
      	}
      }
    • Streamed event after filter
      {
      	"type": "ENTITY_CHANGED",
      	"uri": "entities/hcp1",
      	"deltas": {
      		"ovChanged": true,
      		"delta": [
      			{
      				"type": "ATTRIBUTE_ADDED",
      				"attributeType": "configuration/entityTypes/HCP/attributes/Address",
      				"newValue": {
      					"ov": true,
      					"id": "rel1",
      					"crosswalks": [
      						{
      							"type": "configuration/sources/NPI",
      							"value": "NPI_REL1",
      							"sourceTable": "source_table"
      						}
      					]
      				}
      			}
      		],
      		"entityType": "HCP"
      	}
      }

To know about the attribute for each destination that is specified in the tenant configuration, see Table 2: Messaging Destination Attributes

Table 2. Messaging destination attributes
AttributeRequired DefaultDescription
providerNodefault valueDefines the alias of the provider, as configured for the Reltio environment.
typeNoqueueDefines the type of destination. The supported types may be provider-specific.
nameYesNADefines the name of the destination.
dtssQueueNo falseDefines a true or false flag indicating that this queue is used by DTSS.
enabledNotrueIndicates whether this destination is used or ignored.
typeFilterNonullDefines the collection of event type names to stream. Events of different types will be ignored.
awsAccountIdNonullDefines the AWS account identifier for SQS queue. This attribute uniquely identifies an AWS account and provides a full queue for Amazon Resource Name (ARN) within the name field.
objectFilterNonullDefines the filter expression for the event object. The event must not be sent to a destination if the object doesn’t match with the filter. The expression structure is the same as for search filters.

For more information, see Filtering Entities.

formatNoJSONDefines the format of the events that are sent into the queue.

JSON - message will be sent in string JSON format. JSON_ZIP_BASE64 is the string JSON that was zipped using gzip and converted to Base64. For more information, see topics Zip files with GZIP and Decode and use JSON_ZIP_BASE64 messages.

skipPayloadNobooleanIf the value is true, the platform only returns the type of event and ovChanges properties, and not the entire payload. The default value is false.
ovValueNofalseIf true, operational values are published in the queue destination.
payloadTypeNo snapshotSpecifies the content payload type, which can be either snapshot (the full object state after it was modified) or deltas (the changed object attributes).
JMSEventsFilteringFieldsNoNAIndicates the entity fields that must be included in the event. If it’s isn’t specified, then the filtering isn’t done and all content belonging to the entity is added in the event.
RelationEventsFilteringFieldsNoNAIndicates the entity’s relation fields that must be included in the event. If it isn’t specified, then the filtering isn’t done and all content belonging to the entity is added in the event.

Decode and use JSON_ZIP_BASE64 messages

JSON_ZIP_BASE64 messages are JSON messages encoded with Base64-encoded data. These JSON messages (UTF-8) are first zipped using gzip and then encoded to base64.

To decode and use the message, you must first perform these steps, and then use the JSON file to build integrations:
  1. Run the code to decode the Base64 string to a bytes array in your console.
  2. Unzip the string using gzip.
    For example:
    echo "H4sIAAAAAAAAAKtWKkmtKFGyUlDySM3JyVcozy/KSVGqBQD//REqFwAAAA==" | base64 --decode | gunzip
The unzipped byte sequence is now displayed as a JSON string. For example:
{"text": "Hello world"}

Destination API

Use the following request to post the destination streaming configuration for the tenant:
POST /tenants/{tenantId}/messaging/destinations
The request body is as shown:
{
  "provider": "default",
  "type": "queue",
  "name": "queue.MyTenant.allEvents",
  "JMSEventsFilteringFields": [
    "uri",
    "type"
  ],
  "RelationEventsFilteringFields": [
    "type"
  ],
  "skipPayload": false
}

Example Configurations

The following configuration displays the body of the event when the skipPayload flag is set to false:
{
  "type": "ENTITY_CHANGED",
  "object": {
    "uri": "entities/e1",
    "roles": [
      "configuration/roles/SeniorMember",
      "configuration/roles/CTO"
    ],
    "label": "John Doe",
    …
  },
  "ovChanged": true
}
{
  "type": "ENTITY_CHANGED",
  "ovChanged": true
}
The following configuration displays the body of the event when the skipPayload flag is set to true:
{
  "type": "ENTITY_CHANGED",
  "ovChanged": true
}