Unify and manage your data

Message streaming

Learn about Reltio's event processing.

Message streaming enables the Reltio platform to process events from an internal queue into an external queue or topic in JSON format.

When the Reltio Platform processes events from an internal queue, message streaming sends Create/Read/Update/Delete (CRUD) and MATCH events into the following external message bus queues or topics in JSON format. The following queues are supported:
  • Amazon Simple Queue Service (SQS)
  • Amazon Simple Notification Service (SNS)
  • Google Cloud Pub/Sub (Publisher/Subscriber)
  • Microsoft Azure Service Bus
Message Streaming includes the following tasks:
  • The messages are sent into different queues
  • The payloads of messages are filtered
  • The messages are filtered by types of event
  • The messages are filtered by object values
  • The messages are filtered for Operational values

The following image explains how the internal event queues are turned into external queues.

Reltio recommends that you don’t use events in external queues for real-time interactions, which can cause delays of up to 10 minutes in receiving the events. The data processing activities in Reltio generate events that flow through internal event queues to keep the data up to date in different Reltio databases. Processing of events in the event queue is an asynchronous process. Events are transferred to a secondary queue after they’re successfully processed by the primary queue. These events support features such as search, matching, history, and interactions. The time it takes to update all databases can vary depending on factors such as the queue size and configuration parameters.

Reltio message streaming is NOT FIFO - and here's why

In Reltio Platform, multiple requests are processed in parallel using multiple nodes. Sending simultaneous requests enables fast and efficient scaling, although it can also create a potential scenario where one request that comes a few milliseconds after an earlier request is processed faster. The second request will then get into the event queue first. Reltio doesn’t guarantee the order of processing in the same order that requests are added to the queue, because of various internal processes. In this scenario, the use of First In, First Out (FIFO) isn’t relevant, and instead, the use of object version and message timestamp is recommended to order events.

Configuration: Tenant configuration

Note: For the tenant configuration:
  • The skipPayload field for streaming configuration must be added.
  • The original JMS attributes JMSEventsTimeToLive and JMSEventsFilteringFields are still used.
The Streaming Configuration example describes the new format of the streaming configuration.

Example: Streaming Configuration

"streamingConfig": {
    "streamingEnabled": true,
    "streamingAPIEnabled": true,
    "JMSUser": "...",
    "JMSPassword": "...",
    "JMSBrokerURL": "...",
    "JMSDestinationType": "Queue",
    "JMSDestinationPrefix": "Reltio.Events",
    "JMSIncludeMergeTime": true,
    "analyzeOvChanges": false,
    "emptyStartEndRelationCrosswalks": false,
    "largeObjectsSupport": false,
    "JMSEventsTimeToLive": 86400000,
    "JMSEventsFilteringFields": [
        "uri",
        "crosswalks",
        "type"
    ],
    "RelationEventsFilteringFields": [
        "type",
        "uri"
    ],
    "skipPayload": false,
    "messaging": {
        "destinations": [
            {
                "provider": "default",
                "type": "queue",
                "name": "queue.MyTenant.allEvents",
                "JMSEventsFilteringFields": [
                    "uri",
                    "type"
                ],
                "RelationEventsFilteringFields": [
                    "type"                
                ],
                "skipPayload": false
            },
            {
                "provider": "default",
                "type": "queue",
                "name": "queue.MyTenant.dtssEvents",
                "skipPayload": true,
                "dtssQueue": true
            }
        ]
    }
}

To know more about the properties in the streaming configuration, see the following table:

Table 1. Streaming configuration parameters
Property nameRequired DescriptionType
streamingEnabledYesThis parameter enables or disables events publishing.boolean
streamingAPIEnabled NoIf the value of the parameter is false then events from a data load endpoint and a reindex endpoint aren’t sent to JMS.

The default value is true.

boolean
skipPayloadNoIf the value is true, the platform only returns the type of event and ovChanges properties, and not the entire payload. The default value is false.boolean
analyzeOvChangesNoIf the value of the parameter is true, then all ENTITY_CHANGED events will include an extra boolean attribute ovChanged.

This indicates whether ov=true attribute values were changed or not.

The default value is false. Also, you may specify analyzeOvChanges attribute for any REST request to the API. This attribute will override the option from streamingConfig.

boolean
JMSEventsTimeToLiveYesThis parameter indicates the expiration time for events in milliseconds.long
JMSEventsFilteringFieldsNoIndicates the entity fields that must be included in the event. If it isn’t specified, then the filtering isn’t done and all content belonging to the entity is added in the event.array
RelationEventsFilteringFieldsNoIndicates the entity’s relation fields that must be included in the event. If it isn’t specified, then the filtering isn’t done and all content belonging to the entity is added in the event.array
largeObjectsSupportNoThe support of large objects in External Streaming Service is implemented. When the compressed size of the event exceeds the following limit, the event will be sent again to include the URI of the object. The exceededQueueSizeLimit:true field instead of the entire message.

The default value is false.

boolean
JMSIncludeMergeTimeNoIf the value of the parameter is true, the updatedTime are added to the ENTITIES_MERGED event.

The default value is false.

boolean
emptyStartEndRelationCrosswalksNoIf the value of the parameter is true then crosswalks collections ofstartObject and endObject in relation events are always empty.

The default value is false.

boolean
messagingNoA list of Messaging Destination configurations.

At least one dest is required.

object array

Messaging destinations

Tenants may have event messages published to multiple destinations. The streaming configuration enables setting values for the JMSEventsFilteringFields and RelationEventsFilteringFields parameters, for each destination. These filters override the main filters. Additionally, you can skip the entire payload for each destination using the skipPayload flag.

"streamingConfig": {
(...)
    "messaging": {
        "destinations": [
            {
                "provider": "default",
                "type": "queue",
                "name": "queue.MyTenant.allEvents",
                "JMSEventsFilteringFields": [
                    "uri",
                    "type"
                ],
                "RelationEventsFilteringFields": [
                    "type"                
                ],
                "skipPayload": false
            },
            {
                "provider": "default",
                "type": "queue",
                "name": "queue.MyTenant.dtssEvents",
                "skipPayload": true,
                "dtssQueue": true
            }
        ]
    }
}
To know about the attribute for each destination that is specified in the tenant configuration, see the following table:
Table 2. Messaging destination attributes
AttributeRequired DefaultDescription
providerNodefault valueDefines the alias of the provider, as configured for the Reltio environment.
typeNoqueueDefines the type of destination. The supported types may be provider-specific.
nameYesNADefines the name of the destination.
dtssQueueNofalseDefines a true or false flag indicating that this queue is used by DTSS.
enabledNotrueIndicates whether this destination is used or ignored.
typeFilterNonullDefines the collection of event type names to stream. Events of different types will be ignored.
awsAccountIdNonullDefines the AWS account identifier for SQS queue. This attribute uniquely identifies an AWS account and provides a full queue for Amazon Resource Name (ARN) within the name field.
objectFilterNonullDefines the filter expression for the event object. The event must not be sent to a destination if the object doesn't match with the filter. The expression structure is the same as for search filters.

There are three ways of referencing an attribute in the filter

  • attributes.<attribute-name> references an attribute present in the current object. For example, equals(attributes.Name, 'Reltio') matches events where the entity currently has Reltio as the value in the Name attribute;

  • '<full-attribute-path>' references an attribute present at the provided path and it must be enclosed in quotes. For example, equals('configuration/entityTypes/Organization/attributes/Name', 'Reltio') matches events where the entity currently has Reltio as the value in the attribute located at configuration/entityTypes/Organization/attributes/Name. The single quotes are only required for the objectFilter property - the filtering of entities does not normally require it;

  • delta.attributes.<attribute-name> references an attribute present in the event's delta collection and matches its value with the newValue value, even if the payload itself does not stream it - which is the case with the SNAPSHOT payload. For example, equals(delta.attributes.Name, 'Reltio') matches events where the delta collection has Reltio in the newValue property for the Name attribute. Only the newValue property located in the delta collection is considered, neither oldValue or attributes contained within the object but not in delta are ever matched.

Note: Not all operators may be used with attributes of referenced attributes. For example, changes(delta.attributes.Organization.Name) is not valid.

For more information, see Filtering Entities.

formatNoJSONDefines the format of the events that are sent into the queue.

JSON - message will be sent in string JSON format. JSON_ZIP_BASE64 is the string JSON that was zipped using gzip and converted to Base64. For more information, see topics Zip files with GZIP and Decode and use JSON_ZIP_BASE64 messages.

skipPayloadNobooleanIf the value is true, the platform only returns the type of event and ovChanges properties, and not the entire payload. The default value is false.
ovValueNofalseIf true, operational values are published in the queue destination.
payloadTypeNoSNAPSHOTSpecifies the content payload type, which can be:
  • SNAPSHOT - the full object state after it was modified
  • DELTA - only the modified object attributes
  • SNAPSHOT_WITH_DELTA - SNAPSHOT and DELTA combined
JMSEventsFilteringFieldsNoNAIndicates the entity fields that must be included in the event. If it's not specified, then the filtering doesn't occur, and all content belonging to the entity is added in the event.
RelationEventsFilteringFieldsNoNAIndicates the entity's relation fields that must be included in the event. If it isn't specified, then the filtering doesn't occur, and all content belonging to the entity is added in the event.

Ignore event payload

This example shows the body of the event when the skipPayload flag is set to false:
{
  "type": "ENTITY_CHANGED",
  "object": {
    "uri": "entities/e1",
    "roles": [
      "configuration/roles/SeniorMember",
      "configuration/roles/CTO"
    ],
    "label": "John Doe",
    …
  },
  "ovChanged": true
}
Example for the body of the event when the skipPayload flag is set to true:
{
  "type": "ENTITY_CHANGED",
  "ovChanged": true
}

Include only Operational values (OV)

Opt to include only the OV in the queue destination by specifying the ovOnly parameter. Example:
{
	(...),
	"streamingConfig": {
		(...),
		"ovOnly": [true, false],
		"messaging": {
			"destinations": [
				{
					(...),
					"ovOnly": [true, false]
				}
			]
		}
	}
}
The ovOnly parameter is added at the following two levels in streaming configuration:
  • As a boolean value at the root of the streamingConfig object, which defines the global state of this property and defaults to false.
  • As a boolean value in the defined messaging destination. If you don't define this value, the value in the streamingConfig root is used by default.
If you set the ovOnly parameter to true, in either of the two levels above, events will be filtered based on the event and Payload type:
  • Payload type = SNAPSHOT: streams all attributes with OV.
  • Payload type = DELTA: streams all changed events, based on ovChanged (e.g., ENTITY_CHANGED or RELATIONSHIP_CHANGED events).
    Note: For this Payload type, if there are NO changes to OV properties, the event won't be streamed.
  • Payload type = SNAPSHOT_WITH_DELTA: streams the combined results of SNAPSHOT and DELTA payload types.

Let's see examples for the different Payload types:

This example illustrates Payload type = SNAPSHOT:

{
  "type": "ENTITY_CREATED",
  "object": {
    "uri": "entities/hcp1",
    "type": "configuration/entityTypes/HCP",
    "createdBy": "test",
    "createdTime": 1720773510454,
    "updatedBy": "test",
    "updatedTime": 1720773510454,
    "attributes": {
      "FirstName": [
        {
          "type": "configuration/entityTypes/HCP/attributes/FirstName",
          "ov": true,
          "value": "FN1",
          "uri": "entities/hcp1/attributes/FirstName/0"
        }
      ]
    },
    "crosswalks": [
      {
        "uri": "entities/hcp1/crosswalks/2",
        "type": "configuration/sources/NPI",
        "sourceTable": "source_table",
        "value": "NPI_HCP1",
        "reltioLoadDate": "2024-07-12T08:38:30.454Z",
        "createDate": "2024-07-12T08:38:30.454Z",
        "updateDate": "2024-07-12T08:38:30.454Z",
        "attributes": [
          "entities/hcp1/attributes/FirstName/0",
          "entities/hcp1/attributes/LastName/1"
        ],
        "singleAttributeUpdateDates": {}
      }
    ],
    "analyticsAttributes": {},
    "label": "FN1 LN1",
    "secondaryLabel": ""
  }
}

{
  "type": "RELATIONSHIP_CREATED",
  "object": {
    "uri": "relations/rel1",
    "type": "configuration/relationTypes/HasAddress",
    "createdBy": "test",
    "createdTime": 1720773510454,
    "updatedBy": "test",
    "updatedTime": 1720773510454,
    "startRefPinned": false,
    "startRefIgnored": false,
    "endRefPinned": false,
    "endRefIgnored": false,
    "analyticsAttributes": {},
    "crosswalks": [
      {
        "uri": "relations/rel1/crosswalks/8",
        "type": "configuration/sources/NPI",
        "sourceTable": "source_table",
        "value": "NPI_REL1",
        "reltioLoadDate": "2024-07-12T08:38:30.454Z",
        "createDate": "2024-07-12T08:38:30.454Z",
        "updateDate": "2024-07-12T08:38:30.454Z",
        "attributes": [
          "relations/rel1/attributes/AddressType/7"
        ],
        "singleAttributeUpdateDates": {}
      }
    ],
    "startObject": {
      "objectURI": "entities/hcp1",
      "crosswalks": [
        {
          "uri": "entities/hcp1/crosswalks/9",
          "type": "configuration/sources/Reltio",
          "value": "hcp1"
        }
      ]
    },
    "endObject": {
      "objectURI": "entities/loc1",
      "crosswalks": [
        {
          "uri": "entities/loc1/crosswalks/10",
          "type": "configuration/sources/Reltio",
          "value": "loc1"
        }
      ]
    }
  }
}
These examples illustrate Payload type = DELTA.
  • Example 1 - Event that won't be streamed:
    {
    	"type": "ENTITY_CHANGED",
    	"uri": "entities/hcp1",
    	"deltas": {
    		"ovChanged": false,
    		"delta": [
    			{
    				"type": "ATTRIBUTE_ADDED",
    				"attributeType": "configuration/entityTypes/HCP/attributes/LastName",
    				"newValue": {
    					"value": "LN2",
    					"ov": false,
    					"id": "7",
    					"sources": [
    						"NPI"
    					],
    					"crosswalks": [
    						{
    							"type": "configuration/sources/NPI",
    							"value": "NPI2",
    							"sourceTable": "source_table"
    						}
    					]
    				}
    			}
    		],
    		"entityType": "HCP"
    	}
    }
  • Example 2 - Event that won't be streamed:
    {
    	"type": "ENTITY_CHANGED",
    	"uri": "entities/hcp1",
    	"deltas": {
    		"ovChanged": false,
    		"delta": [
    			{
    				"type": "ATTRIBUTE_CHANGED",
    				"attributeType": "configuration/entityTypes/HCP/attributes/LastName",
    				"newValue": {
    					"value": "LN2",
    					"ov": false,
    					"id": "7",
    					"sources": [
    						"NPI"
    					],
    					"crosswalks": [
    						{
    							"type": "configuration/sources/NPI",
    							"value": "NPI2",
    							"sourceTable": "source_table"
    						}
    					]
    				},
    				"oldValue": {
    					"value": "LN1",
    					"ov": false,
    					"id": "7",
    					"sources": [
    						"NPI"
    					],
    					"crosswalks": [
    						{
    							"type": "configuration/sources/NPI",
    							"value": "NPI2",
    							"sourceTable": "source_table"
    						}
    					]
    				}
    			}
    		],
    		"entityType": "HCP"
    	}
    }
  • Example 3 - Event that won't be streamed:
    {
    	"type": "RELATIONSHIP_CHANGED",
    	"uri": "relations/rel1",
    	"deltas": {
    		"ovChanged": false,
    		"relationType": "HasAddress",
    		"startObjectUri": "entities/hcp1",
    		"endObjectUri": "entities/loc1",
    		"delta": [
    			{
    				"type": "ATTRIBUTE_REMOVED",
    				"attributeType": "configuration/relationTypes/HasAddress/attributes/AddressType",
    				"oldValue": {
    					"value": "AddressType",
    					"ov": false,
    					"id": "19",
    					"sources": [
    						"NPI"
    					],
    					"crosswalks": [
    						{
    							"type": "configuration/sources/NPI",
    							"value": "NPI_REL2",
    							"sourceTable": "source_table"
    						}
    					]
    				}
    			}
    		],
    		"startObjectType": "HCP",
    		"endObjectType": "Location"
    	}
    }
  • Example 4 - Filtered event
    • Original event that is streamed only if ovOnly is false
      {
      	"type": "ENTITY_CHANGED",
      	"uri": "entities/hcp1",
      	"deltas": {
      		"ovChanged": true,
      		"delta": [
      			{
      				"type": "ATTRIBUTE_ADDED",
      				"attributeType": "configuration/entityTypes/HCP/attributes/Address",
      				"newValue": {
      					"ov": true,
      					"id": "rel1",
      					"crosswalks": [
      						{
      							"type": "configuration/sources/NPI",
      							"value": "NPI_REL1",
      							"sourceTable": "source_table"
      						}
      					]
      				}
      			},
      			{
      				"type": "ATTRIBUTE_ADDED",
      				"attributeType": "configuration/entityTypes/HCP/attributes/LastName",
      				"newValue": {
      					"value": "LN2",
      					"ov": false,
      					"id": "7",
      					"sources": [
      						"NPI"
      					],
      					"crosswalks": [
      						{
      							"type": "configuration/sources/NPI",
      							"value": "NPI2",
      							"sourceTable": "source_table"
      						}
      					]
      				}
      			}
      		],
      		"entityType": "HCP"
      	}
      }
    • Streamed event after filter
      {
      	"type": "ENTITY_CHANGED",
      	"uri": "entities/hcp1",
      	"deltas": {
      		"ovChanged": true,
      		"delta": [
      			{
      				"type": "ATTRIBUTE_ADDED",
      				"attributeType": "configuration/entityTypes/HCP/attributes/Address",
      				"newValue": {
      					"ov": true,
      					"id": "rel1",
      					"crosswalks": [
      						{
      							"type": "configuration/sources/NPI",
      							"value": "NPI_REL1",
      							"sourceTable": "source_table"
      						}
      					]
      				}
      			}
      		],
      		"entityType": "HCP"
      	}
      }

This example illustratesPayload type = SNAPSHOT_WITH_DELTA:

{
	"type": "ENTITY_CREATED",
	"object": {
		"uri": "entities/hcp1",
		"type": "configuration/entityTypes/HCP",
		"createdBy": "test",
		"createdTime": 1720773510454,
		"updatedBy": "test",
		"updatedTime": 1720773510454,
		"attributes": {
			"FirstName": [
				{
					"type": "configuration/entityTypes/HCP/attributes/FirstName",
					"ov": true,
					"value": "FN1",
					"uri": "entities/hcp1/attributes/FirstName/0"
				}
			],
			"LastName": [
				{
					"type": "configuration/entityTypes/HCP/attributes/LastName",
					"ov": true,
					"value": "LN1",
					"uri": "entities/hcp1/attributes/LastName/1"
				}
			]
		},
		"crosswalks": [
			{
				"uri": "entities/hcp1/crosswalks/2",
				"type": "configuration/sources/NPI",
				"sourceTable": "source_table",
				"value": "NPI_HCP1",
				"reltioLoadDate": "2024-07-12T08:38:30.454Z",
				"createDate": "2024-07-12T08:38:30.454Z",
				"updateDate": "2024-07-12T08:38:30.454Z",
				"attributes": [
					"entities/hcp1/attributes/FirstName/0",
					"entities/hcp1/attributes/LastName/1"
				],
				"singleAttributeUpdateDates": {}
			}
		],
		"analyticsAttributes": {},
		"label": "FN1 LN1",
		"secondaryLabel": ""
	},
	"uri": "entities/hcp1",
	"deltas": {
		"ovChanged": false,
		"delta": [
			{
				"type": "ENTITY_CREATED",
				"newValue": {
					"uri": "entities/hcp1",
					"type": "configuration/entityTypes/HCP",
					"createdBy": "test",
					"createdTime": 1720773510454,
					"updatedBy": "test",
					"updatedTime": 1720773510454,
					"attributes": {
						"FirstName": [
							{
								"type": "configuration/entityTypes/HCP/attributes/FirstName",
								"ov": true,
								"value": "FN1",
								"uri": "entities/hcp1/attributes/FirstName/0"
							}
						],
						"LastName": [
							{
								"type": "configuration/entityTypes/HCP/attributes/LastName",
								"ov": true,
								"value": "LN1",
								"uri": "entities/hcp1/attributes/LastName/1"
							}
						]
					},
					"crosswalks": [
						{
							"uri": "entities/hcp1/crosswalks/2",
							"type": "configuration/sources/NPI",
							"sourceTable": "source_table",
							"value": "NPI_HCP1",
							"reltioLoadDate": "2024-07-12T08:38:30.454Z",
							"createDate": "2024-07-12T08:38:30.454Z",
							"updateDate": "2024-07-12T08:38:30.454Z",
							"attributes": [
								"entities/hcp1/attributes/FirstName/0",
								"entities/hcp1/attributes/LastName/1"
							],
							"singleAttributeUpdateDates": {}
						}
					],
					"analyticsAttributes": {},
					"label": "FN1 LN1",
					"secondaryLabel": ""
				}
			}
		],
		"entityType": "HCP"
	}
}

Decode and use JSON_ZIP_BASE64 messages

JSON_ZIP_BASE64 messages are JSON messages encoded with Base64-encoded data. These JSON messages (UTF-8) are first zipped using gzip and then encoded to base64.

To decode and use the message, you must first perform these steps, and then use the JSON file to build integrations:
  1. Run the code to decode the Base64 string to a bytes array in your console.
  2. Unzip the string using gzip.
    For example:
    echo "H4sIAAAAAAAAAKtWKkmtKFGyUlDySM3JyVcozy/KSVGqBQD//REqFwAAAA==" | base64 --decode | gunzip
The unzipped byte sequence is now displayed as a JSON string. For example:
{"text": "Hello world"}

Destination API

Use the following request to post the destination streaming configuration for the tenant:
POST /tenants/{tenantId}/messaging/destinations
Request body:
{
  "provider": "default",
  "type": "queue",
  "name": "queue.MyTenant.allEvents",
  "JMSEventsFilteringFields": [
    "uri",
    "type"
  ],
  "RelationEventsFilteringFields": [
    "type"
  ],
  "skipPayload": false
}