Message streaming
Learn about Reltio's event processing.
Message streaming enables the Reltio platform to process events from an internal queue into an external queue or topic in JSON format.
CRUD
) and MATCH
events into the following external message bus queues or topics in JSON format. The following queues are supported:- Amazon Simple Queue Service (SQS)
- Amazon Simple Notification Service (SNS)
- Google Cloud Pub/Sub (Publisher/Subscriber)
- Microsoft Azure Service Bus
- The messages are sent into different queues
- The payloads of messages are filtered
- The messages are filtered by types of event
- The messages are filtered by object values
- The messages are filtered for Operational values
The following image explains how the internal event queues are turned into external queues.
Reltio recommends that you don’t use events in external queues for real-time interactions, which can cause delays of up to 10 minutes in receiving the events. The data processing activities in Reltio generate events that flow through internal event queues to keep the data up to date in different Reltio databases. Processing of events in the event queue is an asynchronous process. Events are transferred to a secondary queue after they’re successfully processed by the primary queue. These events support features such as search, matching, history, and interactions. The time it takes to update all databases can vary depending on factors such as the queue size and configuration parameters.
Reltio message streaming is NOT FIFO - and here's why
In Reltio Platform, multiple requests are processed in parallel using multiple nodes. Sending simultaneous requests enables fast and efficient scaling, although it can also create a potential scenario where one request that comes a few milliseconds after an earlier request is processed faster. The second request will then get into the event queue first. Reltio doesn’t guarantee the order of processing in the same order that requests are added to the queue, because of various internal processes. In this scenario, the use of First In, First Out (FIFO) isn’t relevant, and instead, the use of object version and message timestamp is recommended to order events.
Configuration: Tenant configuration
- The
skipPayload
field for streaming configuration must be added. - The original JMS attributes
JMSEventsTimeToLive
andJMSEventsFilteringFields
are still used.
Example: Streaming Configuration
"streamingConfig": {
"streamingEnabled": true,
"streamingAPIEnabled": true,
"JMSUser": "...",
"JMSPassword": "...",
"JMSBrokerURL": "...",
"JMSDestinationType": "Queue",
"JMSDestinationPrefix": "Reltio.Events",
"JMSIncludeMergeTime": true,
"analyzeOvChanges": false,
"emptyStartEndRelationCrosswalks": false,
"largeObjectsSupport": false,
"JMSEventsTimeToLive": 86400000,
"JMSEventsFilteringFields": [
"uri",
"crosswalks",
"type"
],
"RelationEventsFilteringFields": [
"type",
"uri"
],
"skipPayload": false,
"messaging": {
"destinations": [
{
"provider": "default",
"type": "queue",
"name": "queue.MyTenant.allEvents",
"JMSEventsFilteringFields": [
"uri",
"type"
],
"RelationEventsFilteringFields": [
"type"
],
"skipPayload": false
},
{
"provider": "default",
"type": "queue",
"name": "queue.MyTenant.dtssEvents",
"skipPayload": true,
"dtssQueue": true
}
]
}
}
To know more about the properties in the streaming configuration, see the following table:
Property name | Required | Description | Type |
---|---|---|---|
streamingEnabled | Yes | This parameter enables or disables events publishing. | boolean |
streamingAPIEnabled
| No | If the value of the parameter is false then events from a data load endpoint and a reindex endpoint aren’t sent to JMS .The default value is | boolean |
skipPayload | No | If the value is true , the platform only returns the type of event and ovChanges properties, and not the entire payload. The default value is false . | boolean |
analyzeOvChanges | No | If the value of the parameter is true , then all ENTITY_CHANGED events will include an extra boolean attribute ovChanged .This indicates whether The default value is | boolean |
JMSEventsTimeToLive | Yes | This parameter indicates the expiration time for events in milliseconds. | long |
JMSEventsFilteringFields | No | Indicates the entity fields that must be included in the event. If it isn’t specified, then the filtering isn’t done and all content belonging to the entity is added in the event. | array |
RelationEventsFilteringFields | No | Indicates the entity’s relation fields that must be included in the event. If it isn’t specified, then the filtering isn’t done and all content belonging to the entity is added in the event. | array |
largeObjectsSupport | No | The support of large objects in External Streaming Service is implemented. When the compressed size of the event exceeds the following limit, the event will be sent again to include the URI of the object. The exceededQueueSizeLimit :true field instead of the entire message.The default value is | boolean |
JMSIncludeMergeTime | No | If the value of the parameter is true , the updatedTime are added to the ENTITIES_MERGED event.The default value is | boolean |
emptyStartEndRelationCrosswalks | No | If the value of the parameter is true then crosswalks collections ofstartObject and endObject in relation events are always empty.The default value is false. | boolean |
messaging | No | A list of Messaging Destination configurations. At least one | object array |
Messaging destinations
JMSEventsFilteringFields
and RelationEventsFilteringFields
parameters, for each destination. These filters override the main filters. Additionally, you can skip the entire payload for each destination using the skipPayload
flag.
"streamingConfig": {
(...)
"messaging": {
"destinations": [
{
"provider": "default",
"type": "queue",
"name": "queue.MyTenant.allEvents",
"JMSEventsFilteringFields": [
"uri",
"type"
],
"RelationEventsFilteringFields": [
"type"
],
"skipPayload": false
},
{
"provider": "default",
"type": "queue",
"name": "queue.MyTenant.dtssEvents",
"skipPayload": true,
"dtssQueue": true
}
]
}
}
Attribute | Required | Default | Description |
---|---|---|---|
provider | No | default value | Defines the alias of the provider, as configured for the Reltio environment. |
type | No | queue | Defines the type of destination. The supported types may be provider-specific. |
name | Yes | NA | Defines the name of the destination. |
dtssQueue | No | false | Defines a true or false flag indicating that this queue is used by DTSS. |
enabled | No | true | Indicates whether this destination is used or ignored. |
typeFilter | No | null | Defines the collection of event type names to stream. Events of different types will be ignored. |
awsAccountId | No | null | Defines the AWS account identifier for SQS queue. This attribute uniquely identifies an AWS account and provides a full queue for Amazon Resource Name (ARN) within the name field. |
objectFilter | No | null | Defines the filter expression for the event object. The event must not be sent to a destination if the object doesn't match with the filter. The expression structure is the same as for search filters. There are three ways of referencing an attribute in the filter
Note: Not all operators may be used with attributes of referenced attributes. For example,
changes(delta.attributes.Organization.Name) is not valid.For more information, see Filtering Entities. |
format | No | JSON | Defines the format of the events that are sent into the queue. JSON - message will be sent in string JSON format. |
skipPayload | No | boolean | If the value is true , the platform only returns the type of event and ovChanges properties, and not the entire payload. The default value is false . |
ovValue | No | false | If true, operational values are published in the queue destination. |
payloadType | No | SNAPSHOT | Specifies the content payload type, which can be:
|
JMSEventsFilteringFields | No | NA | Indicates the entity fields that must be included in the event. If it's not specified, then the filtering doesn't occur, and all content belonging to the entity is added in the event. |
RelationEventsFilteringFields | No | NA | Indicates the entity's relation fields that must be included in the event. If it isn't specified, then the filtering doesn't occur, and all content belonging to the entity is added in the event. |
Ignore event payload
skipPayload
flag is set to false:{
"type": "ENTITY_CHANGED",
"object": {
"uri": "entities/e1",
"roles": [
"configuration/roles/SeniorMember",
"configuration/roles/CTO"
],
"label": "John Doe",
…
},
"ovChanged": true
}
skipPayload
flag is set to true:{
"type": "ENTITY_CHANGED",
"ovChanged": true
}
Include only Operational values (OV)
ovOnly
parameter. Example: {
(...),
"streamingConfig": {
(...),
"ovOnly": [true, false],
"messaging": {
"destinations": [
{
(...),
"ovOnly": [true, false]
}
]
}
}
}
ovOnly
parameter is added at the following two levels in streaming configuration: - As a boolean value at the root of the
streamingConfig
object, which defines the global state of this property and defaults to false. - As a boolean value in the defined messaging destination. If you don't define this value, the value in the
streamingConfig
root is used by default.
ovOnly
parameter to true
, in either of the two levels above, events will be filtered based on the event and Payload type
:Payload type
=SNAPSHOT
: streams all attributes with OV.Note: For change request events, theattributes
section may be empty, because OV is not calculated for DCR.Payload type
=DELTA
: streams all changed events, based onovChanged
(e.g., ENTITY_CHANGED or RELATIONSHIP_CHANGED events).Note: For thisPayload type
, if there are NO changes to OV properties, the event won't be streamed.Payload type
=SNAPSHOT_WITH_DELTA
: streams the combined results ofSNAPSHOT
andDELTA
payload types.
Let's see examples for the different Payload types:
This example illustrates Payload type
= SNAPSHOT
:
{
"type": "ENTITY_CREATED",
"object": {
"uri": "entities/hcp1",
"type": "configuration/entityTypes/HCP",
"createdBy": "test",
"createdTime": 1720773510454,
"updatedBy": "test",
"updatedTime": 1720773510454,
"attributes": {
"FirstName": [
{
"type": "configuration/entityTypes/HCP/attributes/FirstName",
"ov": true,
"value": "FN1",
"uri": "entities/hcp1/attributes/FirstName/0"
}
]
},
"crosswalks": [
{
"uri": "entities/hcp1/crosswalks/2",
"type": "configuration/sources/NPI",
"sourceTable": "source_table",
"value": "NPI_HCP1",
"reltioLoadDate": "2024-07-12T08:38:30.454Z",
"createDate": "2024-07-12T08:38:30.454Z",
"updateDate": "2024-07-12T08:38:30.454Z",
"attributes": [
"entities/hcp1/attributes/FirstName/0",
"entities/hcp1/attributes/LastName/1"
],
"singleAttributeUpdateDates": {}
}
],
"analyticsAttributes": {},
"label": "FN1 LN1",
"secondaryLabel": ""
}
}
{
"type": "RELATIONSHIP_CREATED",
"object": {
"uri": "relations/rel1",
"type": "configuration/relationTypes/HasAddress",
"createdBy": "test",
"createdTime": 1720773510454,
"updatedBy": "test",
"updatedTime": 1720773510454,
"startRefPinned": false,
"startRefIgnored": false,
"endRefPinned": false,
"endRefIgnored": false,
"analyticsAttributes": {},
"crosswalks": [
{
"uri": "relations/rel1/crosswalks/8",
"type": "configuration/sources/NPI",
"sourceTable": "source_table",
"value": "NPI_REL1",
"reltioLoadDate": "2024-07-12T08:38:30.454Z",
"createDate": "2024-07-12T08:38:30.454Z",
"updateDate": "2024-07-12T08:38:30.454Z",
"attributes": [
"relations/rel1/attributes/AddressType/7"
],
"singleAttributeUpdateDates": {}
}
],
"startObject": {
"objectURI": "entities/hcp1",
"crosswalks": [
{
"uri": "entities/hcp1/crosswalks/9",
"type": "configuration/sources/Reltio",
"value": "hcp1"
}
]
},
"endObject": {
"objectURI": "entities/loc1",
"crosswalks": [
{
"uri": "entities/loc1/crosswalks/10",
"type": "configuration/sources/Reltio",
"value": "loc1"
}
]
}
}
}
Payload type
= DELTA
.- Example 1 - Event that won't be streamed:
{ "type": "ENTITY_CHANGED", "uri": "entities/hcp1", "deltas": { "ovChanged": false, "delta": [ { "type": "ATTRIBUTE_ADDED", "attributeType": "configuration/entityTypes/HCP/attributes/LastName", "newValue": { "value": "LN2", "ov": false, "id": "7", "sources": [ "NPI" ], "crosswalks": [ { "type": "configuration/sources/NPI", "value": "NPI2", "sourceTable": "source_table" } ] } } ], "entityType": "HCP" } }
- Example 2 - Event that won't be streamed:
{ "type": "ENTITY_CHANGED", "uri": "entities/hcp1", "deltas": { "ovChanged": false, "delta": [ { "type": "ATTRIBUTE_CHANGED", "attributeType": "configuration/entityTypes/HCP/attributes/LastName", "newValue": { "value": "LN2", "ov": false, "id": "7", "sources": [ "NPI" ], "crosswalks": [ { "type": "configuration/sources/NPI", "value": "NPI2", "sourceTable": "source_table" } ] }, "oldValue": { "value": "LN1", "ov": false, "id": "7", "sources": [ "NPI" ], "crosswalks": [ { "type": "configuration/sources/NPI", "value": "NPI2", "sourceTable": "source_table" } ] } } ], "entityType": "HCP" } }
- Example 3 - Event that won't be streamed:
{ "type": "RELATIONSHIP_CHANGED", "uri": "relations/rel1", "deltas": { "ovChanged": false, "relationType": "HasAddress", "startObjectUri": "entities/hcp1", "endObjectUri": "entities/loc1", "delta": [ { "type": "ATTRIBUTE_REMOVED", "attributeType": "configuration/relationTypes/HasAddress/attributes/AddressType", "oldValue": { "value": "AddressType", "ov": false, "id": "19", "sources": [ "NPI" ], "crosswalks": [ { "type": "configuration/sources/NPI", "value": "NPI_REL2", "sourceTable": "source_table" } ] } } ], "startObjectType": "HCP", "endObjectType": "Location" } }
- Example 4 - Filtered event
- Original event that is streamed only if
ovOnly
isfalse
{ "type": "ENTITY_CHANGED", "uri": "entities/hcp1", "deltas": { "ovChanged": true, "delta": [ { "type": "ATTRIBUTE_ADDED", "attributeType": "configuration/entityTypes/HCP/attributes/Address", "newValue": { "ov": true, "id": "rel1", "crosswalks": [ { "type": "configuration/sources/NPI", "value": "NPI_REL1", "sourceTable": "source_table" } ] } }, { "type": "ATTRIBUTE_ADDED", "attributeType": "configuration/entityTypes/HCP/attributes/LastName", "newValue": { "value": "LN2", "ov": false, "id": "7", "sources": [ "NPI" ], "crosswalks": [ { "type": "configuration/sources/NPI", "value": "NPI2", "sourceTable": "source_table" } ] } } ], "entityType": "HCP" } }
- Streamed event after filter
{ "type": "ENTITY_CHANGED", "uri": "entities/hcp1", "deltas": { "ovChanged": true, "delta": [ { "type": "ATTRIBUTE_ADDED", "attributeType": "configuration/entityTypes/HCP/attributes/Address", "newValue": { "ov": true, "id": "rel1", "crosswalks": [ { "type": "configuration/sources/NPI", "value": "NPI_REL1", "sourceTable": "source_table" } ] } } ], "entityType": "HCP" } }
- Original event that is streamed only if
This example illustratesPayload type
= SNAPSHOT_WITH_DELTA
:
{
"type": "ENTITY_CREATED",
"object": {
"uri": "entities/hcp1",
"type": "configuration/entityTypes/HCP",
"createdBy": "test",
"createdTime": 1720773510454,
"updatedBy": "test",
"updatedTime": 1720773510454,
"attributes": {
"FirstName": [
{
"type": "configuration/entityTypes/HCP/attributes/FirstName",
"ov": true,
"value": "FN1",
"uri": "entities/hcp1/attributes/FirstName/0"
}
],
"LastName": [
{
"type": "configuration/entityTypes/HCP/attributes/LastName",
"ov": true,
"value": "LN1",
"uri": "entities/hcp1/attributes/LastName/1"
}
]
},
"crosswalks": [
{
"uri": "entities/hcp1/crosswalks/2",
"type": "configuration/sources/NPI",
"sourceTable": "source_table",
"value": "NPI_HCP1",
"reltioLoadDate": "2024-07-12T08:38:30.454Z",
"createDate": "2024-07-12T08:38:30.454Z",
"updateDate": "2024-07-12T08:38:30.454Z",
"attributes": [
"entities/hcp1/attributes/FirstName/0",
"entities/hcp1/attributes/LastName/1"
],
"singleAttributeUpdateDates": {}
}
],
"analyticsAttributes": {},
"label": "FN1 LN1",
"secondaryLabel": ""
},
"uri": "entities/hcp1",
"deltas": {
"ovChanged": false,
"delta": [
{
"type": "ENTITY_CREATED",
"newValue": {
"uri": "entities/hcp1",
"type": "configuration/entityTypes/HCP",
"createdBy": "test",
"createdTime": 1720773510454,
"updatedBy": "test",
"updatedTime": 1720773510454,
"attributes": {
"FirstName": [
{
"type": "configuration/entityTypes/HCP/attributes/FirstName",
"ov": true,
"value": "FN1",
"uri": "entities/hcp1/attributes/FirstName/0"
}
],
"LastName": [
{
"type": "configuration/entityTypes/HCP/attributes/LastName",
"ov": true,
"value": "LN1",
"uri": "entities/hcp1/attributes/LastName/1"
}
]
},
"crosswalks": [
{
"uri": "entities/hcp1/crosswalks/2",
"type": "configuration/sources/NPI",
"sourceTable": "source_table",
"value": "NPI_HCP1",
"reltioLoadDate": "2024-07-12T08:38:30.454Z",
"createDate": "2024-07-12T08:38:30.454Z",
"updateDate": "2024-07-12T08:38:30.454Z",
"attributes": [
"entities/hcp1/attributes/FirstName/0",
"entities/hcp1/attributes/LastName/1"
],
"singleAttributeUpdateDates": {}
}
],
"analyticsAttributes": {},
"label": "FN1 LN1",
"secondaryLabel": ""
}
}
],
"entityType": "HCP"
}
}
Decode and use JSON_ZIP_BASE64 messages
JSON_ZIP_BASE64 messages are JSON messages encoded with Base64-encoded data. These JSON messages (UTF-8) are first zipped using gzip and then encoded to base64.
- Run the code to decode the Base64 string to a bytes array in your console.
- Unzip the string using gzip.For example:
echo "H4sIAAAAAAAAAKtWKkmtKFGyUlDySM3JyVcozy/KSVGqBQD//REqFwAAAA==" | base64 --decode | gunzip
{"text": "Hello world"}
Destination API
POST /tenants/{tenantId}/messaging/destinations
{
"provider": "default",
"type": "queue",
"name": "queue.MyTenant.allEvents",
"JMSEventsFilteringFields": [
"uri",
"type"
],
"RelationEventsFilteringFields": [
"type"
],
"skipPayload": false
}