Message streaming
Message streaming enables the Reltio platform to process events from an internal queue into an external queue or topic in JSON format.
CRUD
) and MATCH
events into the following external message bus queues or topics in JSON format. The following queues are supported:- Amazon Simple Queue Service (SQS)
- Amazon Simple Notification Service (SNS)
- Google Cloud Pub/Sub (Publisher/Subscriber)
- Microsoft Azure Service Bus
- The messages are sent into different queues
- The payloads of messages are filtered
- The messages are filtered by types of event
- The messages are filtered by object values
-
The messages are filtered for Operational values
The following image explains how the internal event queues are turned into external queues.
Reltio recommends that you don’t use events in external queues for real-time interactions, which can cause delays of up to 10 minutes in receiving the events. The data processing activities in Reltio generate events that flow through internal event queues to keep the data up to date in different Reltio databases. Processing of events in the event queue is an asynchronous process. Events are transferred to a secondary queue after they’re successfully processed by the primary queue. These events support features such as search, matching, history, and interactions. The time it takes to update all databases can vary depending on factors such as the queue size and configuration parameters.
Reltio message streaming is NOT FIFO - and here's why
In Reltio Platform, multiple requests are processed in parallel using multiple nodes. Sending simultaneous requests enables fast and efficient scaling, although it can also create a potential scenario where one request that comes a few milliseconds after an earlier request is processed faster. The second request will then get into the event queue first. Reltio doesn’t guarantee the order of processing in the same order that requests are added to the queue, because of various internal processes. In this scenario, the use of FIFO isn’t relevant, and instead, the use of object version and message timestamp is recommended to order events.
Configuration: Tenant configuration
- The
skipPayload
field for streaming configuration must be added. - The original JMS attributes
JMSEventsTimeToLive
andJMSEventsFilteringFields
are still used.
Example: Streaming Configuration
"streamingConfig": {
"streamingEnabled": "true",
"streamingAPIEnabled": false,
"analyzeOvChanges": false,
"JMSEventsTimeToLive": 86400000,
"largeObjectsSupport": false,
"JMSEventsFilteringFields": [
"uri",
"crosswalks",
"type",
"VeevaSync",
"createdBy",
"createdTime",
"updatedBy",
"updatedTime",
"startObject",
"endObject",
"attributes"
],
"messaging": {
"destinations": [
{
"provider": "default",
"type": "queue",
"name": "queue.MyTenant.allEvents"
},
{
"provider": "default",
"type": "queue",
"name": "queue.MyTenant.dtssEvents",
"dtssQueue": true
}
]
}
}
To know about the various properties in the streaming configuration, see Table 1: Streaming Configuration Parameters
Property name | Required | Description | Type |
---|---|---|---|
streamingEnabled | Yes | This parameter enables or disables events publishing. | boolean |
streamingAPIEnabled
| No | If the value of the parameter is false then events from a data load endpoint and a reindex endpoint aren’t sent to JMS .The default value is | boolean |
skipPayload | No | If the value is true , the platform only returns the type of event and ovChanges properties, and not the entire payload. The default value is false . | boolean |
analyzeOvChanges | No | If the value of the parameter is true , then all ENTITY_CHANGED events will include an extra boolean attribute ovChanged .This indicates whether The default value is | boolean |
JMSEventsTimeToLive | Yes | This parameter indicates the expiration time for events in milliseconds. | long |
JMSEventsFilteringFields | No | Indicates the entity fields that must be included in the event. If it isn’t specified, then the filtering isn’t done and all content belonging to the entity is added in the event. | array |
RelationEventsFilteringFields | No | Indicates the entity’s relation fields that must be included in the event. If it isn’t specified, then the filtering isn’t done and all content belonging to the entity is added in the event. | array |
largeObjectsSupport | No | The support of large objects in External Streaming Service is implemented. When the compressed size of the event exceeds the following limit, the event will be sent again to include the URI of the object. The exceededQueueSizeLimit :true field instead of the entire message.The default value is | boolean |
JMSIncludeMergeTime | No | If the value of the parameter is true , the updatedTime are added to the ENTITIES_MERGED event.The default value is | boolean |
emptyStartEndRelationCrosswalks | No | If the value of the parameter is true then crosswalks collections ofstartObject and endObject in relation events are always empty.The default value is false. | boolean |
messaging | No | A list of Messaging Destination configurations. At least one | object array |
Messaging destinations
JMSEventsFilteringFields
and RelationEventsFilteringFields
parameters for each destination. These filters will override the main filters. Now, you can also skip the entire payload for each destination using the skipPayload
flag."streamingConfig": {
"streamingEnabled": true,
"streamingAPIEnabled": true,
"JMSUser": "...",
"JMSPassword": "...",
"JMSBrokerURL": "...",
"JMSDestinationType": "Queue",
"JMSDestinationPrefix": "Reltio.Events",
"JMSIncludeMergeTime": true,
"analyzeOvChanges": false,
"emptyStartEndRelationCrosswalks": false,
"largeObjectsSupport": false,
"JMSEventsTimeToLive": 86400000,
"JMSEventsFilteringFields": [
"uri",
"crosswalks",
"type"
],
"RelationEventsFilteringFields": [
"type",
"uri"
],
"skipPayload": false,
"messaging": {
"destinations": [
{
"provider": "default",
"type": "queue",
"name": "queue.MyTenant.allEvents",
"JMSEventsFilteringFields": [
"uri",
"type"
],
"RelationEventsFilteringFields": [
"type"
],
"skipPayload": false
},
{
"provider": "default",
"type": "queue",
"name": "queue.MyTenant.dtssEvents",
"skipPayload": true,
"dtssQueue": true
}
]
}
}
Include only Operational values (OV)
ovOnly
parameter. A sample is given below: {
(...),
"streamingConfig": {
(...),
"ovOnly": true|false,
"messaging": {
"destinations": [
{
(...),
"ovOnly": true|false
}
]
}
}
}
ovOnly
parameter is added at the following two levels in streaming configuration: - As a boolean value at the root of the
streamingConfig
object, which defines the global state of this property and defaults to false. - As a boolean value in the defined messaging destination. If you do not define this value, the value in the
streamingConfig
root is taken as default.
If you set this parameter to true
, in either of the above two levels, events will be filtered based on the event and payload type. If the Payload type
is snapshot
, attributes that have operational values are streamed. If the Payload type
is Delta
, all changed events are filtered (for example, ENTITY_CHANGED or RELATIONSHIP_CHANGED events) based on the ovChanged
attribute.
Let's see a few examples where events are streamed and where they are not.
- Example 1 - Ignored event that will not be streamed:
{ "type": "ENTITY_CHANGED", "uri": "entities/hcp1", "deltas": { "ovChanged": false, "delta": [ { "type": "ATTRIBUTE_ADDED", "attributeType": "configuration/entityTypes/HCP/attributes/LastName", "newValue": { "value": "LN2", "ov": false, "id": "7", "sources": [ "NPI" ], "crosswalks": [ { "type": "configuration/sources/NPI", "value": "NPI2", "sourceTable": "source_table" } ] } } ], "entityType": "HCP" } }
- Example 2 - Ignored event that will not be streamed
{ "type": "ENTITY_CHANGED", "uri": "entities/hcp1", "deltas": { "ovChanged": false, "delta": [ { "type": "ATTRIBUTE_CHANGED", "attributeType": "configuration/entityTypes/HCP/attributes/LastName", "newValue": { "value": "LN2", "ov": false, "id": "7", "sources": [ "NPI" ], "crosswalks": [ { "type": "configuration/sources/NPI", "value": "NPI2", "sourceTable": "source_table" } ] }, "oldValue": { "value": "LN1", "ov": false, "id": "7", "sources": [ "NPI" ], "crosswalks": [ { "type": "configuration/sources/NPI", "value": "NPI2", "sourceTable": "source_table" } ] } } ], "entityType": "HCP" } }
- Example 3 - Ignored event that will not be streamed
{ "type": "RELATIONSHIP_CHANGED", "uri": "relations/rel1", "deltas": { "ovChanged": false, "relationType": "HasAddress", "startObjectUri": "entities/hcp1", "endObjectUri": "entities/loc1", "delta": [ { "type": "ATTRIBUTE_REMOVED", "attributeType": "configuration/relationTypes/HasAddress/attributes/AddressType", "oldValue": { "value": "AddressType", "ov": false, "id": "19", "sources": [ "NPI" ], "crosswalks": [ { "type": "configuration/sources/NPI", "value": "NPI_REL2", "sourceTable": "source_table" } ] } } ], "startObjectType": "HCP", "endObjectType": "Location" } }
- Example 4 - Filtered event
- Original event that is streamed only if
ovValue
isfalse
{ "type": "ENTITY_CHANGED", "uri": "entities/hcp1", "deltas": { "ovChanged": true, "delta": [ { "type": "ATTRIBUTE_ADDED", "attributeType": "configuration/entityTypes/HCP/attributes/Address", "newValue": { "ov": true, "id": "rel1", "crosswalks": [ { "type": "configuration/sources/NPI", "value": "NPI_REL1", "sourceTable": "source_table" } ] } }, { "type": "ATTRIBUTE_ADDED", "attributeType": "configuration/entityTypes/HCP/attributes/LastName", "newValue": { "value": "LN2", "ov": false, "id": "7", "sources": [ "NPI" ], "crosswalks": [ { "type": "configuration/sources/NPI", "value": "NPI2", "sourceTable": "source_table" } ] } } ], "entityType": "HCP" } }
- Streamed event after filter
{ "type": "ENTITY_CHANGED", "uri": "entities/hcp1", "deltas": { "ovChanged": true, "delta": [ { "type": "ATTRIBUTE_ADDED", "attributeType": "configuration/entityTypes/HCP/attributes/Address", "newValue": { "ov": true, "id": "rel1", "crosswalks": [ { "type": "configuration/sources/NPI", "value": "NPI_REL1", "sourceTable": "source_table" } ] } } ], "entityType": "HCP" } }
- Original event that is streamed only if
To know about the attribute for each destination that is specified in the tenant configuration, see Table 2: Messaging Destination Attributes
Attribute | Required | Default | Description |
---|---|---|---|
provider | No | default value | Defines the alias of the provider, as configured for the Reltio environment. |
type | No | queue | Defines the type of destination. The supported types may be provider-specific. |
name | Yes | NA | Defines the name of the destination. |
dtssQueue | No | false | Defines a true or false flag indicating that this queue is used by DTSS. |
enabled | No | true | Indicates whether this destination is used or ignored. |
typeFilter | No | null | Defines the collection of event type names to stream. Events of different types will be ignored. |
awsAccountId | No | null | Defines the AWS account identifier for SQS queue. This attribute uniquely identifies an AWS account and provides a full queue for Amazon Resource Name (ARN) within the name field. |
objectFilter | No | null | Defines the filter expression for the event object. The event must not be sent to a destination if the object doesn’t match with the filter. The expression structure is the same as for search filters. For more information, see Filtering Entities. |
format | No | JSON | Defines the format of the events that are sent into the queue. JSON - message will be sent in string JSON format. |
skipPayload | No | boolean | If the value is true , the platform only returns the type of event and ovChanges properties, and not the entire payload. The default value is false . |
ovValue | No | false | If true, operational values are published in the queue destination. |
payloadType | No | snapshot | Specifies the content payload type, which can be either snapshot (the full object state after it was modified) or deltas (the changed object attributes). |
JMSEventsFilteringFields | No | NA | Indicates the entity fields that must be included in the event. If it’s isn’t specified, then the filtering isn’t done and all content belonging to the entity is added in the event. |
RelationEventsFilteringFields | No | NA | Indicates the entity’s relation fields that must be included in the event. If it isn’t specified, then the filtering isn’t done and all content belonging to the entity is added in the event. |
Decode and use JSON_ZIP_BASE64 messages
JSON_ZIP_BASE64 messages are JSON messages encoded with Base64-encoded data. These JSON messages (UTF-8) are first zipped using gzip and then encoded to base64.
- Run the code to decode the Base64 string to a bytes array in your console.
- Unzip the string using gzip.For example:
echo "H4sIAAAAAAAAAKtWKkmtKFGyUlDySM3JyVcozy/KSVGqBQD//REqFwAAAA==" | base64 --decode | gunzip
{"text": "Hello world"}
Destination API
POST /tenants/{tenantId}/messaging/destinations
{
"provider": "default",
"type": "queue",
"name": "queue.MyTenant.allEvents",
"JMSEventsFilteringFields": [
"uri",
"type"
],
"RelationEventsFilteringFields": [
"type"
],
"skipPayload": false
}
Example Configurations
skipPayload
flag is set to false:{
"type": "ENTITY_CHANGED",
"object": {
"uri": "entities/e1",
"roles": [
"configuration/roles/SeniorMember",
"configuration/roles/CTO"
],
"label": "John Doe",
…
},
"ovChanged": true
}
{
"type": "ENTITY_CHANGED",
"ovChanged": true
}
skipPayload
flag is set to true:{
"type": "ENTITY_CHANGED",
"ovChanged": true
}