Events API
All CRUD and Match events from Reltio APIs are sent to queues/topics as messages in JSON format.
The events from Reltio APIs are sent to queues/topics as messages in JSON format. Therefore, the messages that are published to streaming destinations are actually serialized representations of these events.
ovChanged
flag is
always true
forENTITY_CHANGED
events sent for these
related entities due to a reference attribute change. This is a type of optimization to
avoid potential huge negative performance impact in delta calculation in the event some
referenced entity with numerous related entities was changed.- The platform streaming functionality works with at least one message delivery semantic.
- If an attribute is connected to the Reference Data Management (RDM), any change in the RDM mapping leading to changes in the canonical codes or values, will not generate an Entity Changed event in MDM. This result is due to the raw value stored in the attribute, which remains unchanged.
Streaming Message Format
Messages getting published to the streaming destination are serialized representations of events of the following two types:
- CRUD Events
- Match Events
CRUD Events
Headers: tenantId
; eventType
;
sourceObjectUri
.
{
"type": <string>,
"object": <serialized ObjectTO (EntityTO/RelationTO)>,
"ovChanged": <true|false> //only for ENTITY_CHANGED and RELATIONSHIP_CHANGED, optional - might not present when "analyzeOvChanges" streaming property is set to false
}
ObjectTO
is serialized based on the
JMSEventsFilteringFields
streaming property, and only object
fields listed there are added to the final JSON.
RelationTO
is serialized based on the
RelationEventsFilteringFields
streaming property, and only
relation fields listed there are added to the final JSON.
The following types of CRUD events are supported:
ENTITY_CREATED
: This event indicates that the entity was created. This event does not contain any additional fields.ENTITY_REMOVED
: This event indicates that the entity was removed (byDELETE /entity/id
request).This event does not contain any additional fields.ENTITY_REMOVED_GDPR
: This event indicates that the entity was removed (by GDPR request). After removing only two records for every entity, these are left:ENTITY_CREATED
andENTITY_REMOVED_GDPR
. This event does not contain any additional fields.ENTITY_LOST_MERGE
: This event indicates that the entity is a loser in a merge with another entity.This event does not contain any additional fields.ENTITY_CHANGED
: This event indicates that the entity was changed. This event does not contain any additional fields.Note: TheENTITY_CHANGED
event is not created when an entity's attributes are not changed.For example: There are 3 entities - entity A, entity B, and entity C. Entity A references entity B and entity B references entities A and C. Now, when you update entity A, these updates are propagated first to entity B and then to entity C. Since there are no attribute updates done for entities B and C, and only the object version is updated, the
ENTITY_CHANGED
events are not created for these entities.RELATIONSHIP_CREATED
: This event indicates that a relationship was created between this entity and any other entity. In the case of an added Reference Attribute (for example,Address
), theENTITY_CHANGED
event also appears in the result.RELATIONSHIP_REMOVED
: This event indicates that a relationship between the entity and some other entity was removed. In the case of a Reference Attribute (for example,Address
) theENTITY_CHANGED
event also appears in the result.RELATIONSHIP_REMOVED_GDPR
: This event indicates that a relationship between the entity and some other entity was removed (by GDPR request). In the case of a Reference Attribute (for example,Address
), theENTITY_CHANGED
event also appears in the result.RELATIONSHIP_CHANGED
: This event indicates that a relationship between the entity and some other entity was changed. In the case of the Reference Attribute (for example, Address), theENTITY_CHANGED
event also appears in the result.GROUP_CREATED
: Currently, not used.GROUP_REMOVED
: Currently, not used.GROUP_CHANGED
: Currently, not used.INTERACTION_CREATED
: This event indicates that the interaction was created. This event does not contain any additional fields.INTERACTION_REMOVED
: This event indicates that the interaction was removed (byDELETE /entity/id
request).This event does not contain any additional fields.INTERACTION_CHANGED
: This event indicates that the interaction was changed. This event does not contain any additional fields.ENTITIES_SPLITTED
: For the winner entity, this event indicates that some entities were unmerged from it. For an unmerged entity, it indicates that the entity was unmerged from its master.ENTITIES_MATCHES_CHANGED
: This event is fired when list of Potential Matches for an entity was changed.RELATION_LOST_MERGE
: This event is generated when merging relationships for loser relations.RELATIONSHIP_MERGED
: This event is fired when merging relations (likeENTITIES_MERGED
for entities).ENTITY_BUSINESS_PROCESS_CHANGED
: For workflow functionality, there are commonly designated business process objects connected to an entity. For any change to the business process for an entity (even if the business process was created or deleted), the platform fires the event.CHANGE_REQUEST_CREATED
: This event is generated when a Data Change Request (DCR) is created.CHANGE_REQUEST_CHANGED
: This event is generated when a Data Change Request (DCR) is changed.CHANGE_REQUEST_REMOVED
: This event is generated when a Data Change Request (DCR) is deleted.
Example: RELATIONSHIP_CREATED
{
"type": "RELATIONSHIP_CREATED",
"object": {
"uri": "relations/VnEqtJj",
"type": "configuration/relationTypes/HasAddress",
"createdBy": "auto20190828AdminawgMp",
"createdTime": 1566970796552,
"updatedBy": "auto20190828AdminawgMp",
"updatedTime": 1566970796552,
"startRefPinned": false,
"startRefIgnored": false,
"endRefPinned": false,
"endRefIgnored": false,
"attributes": {
"BusinessCode": [
{
"type": "configuration/relationTypes/HasAddress/attributes/BusinessCode",
"ov": true,
"value": "AAA",
"lookupCode": "A",
"lookupRawValue": "A",
"uri": "relations/VnEqtJj/attributes/BusinessCode/1dM9qjo8J"
}
],
"CareOf": [
{
"type": "configuration/relationTypes/HasAddress/attributes/CareOf",
"ov": true,
"value": "B",
"uri": "relations/VnEqtJj/attributes/CareOf/1dM9qjsOZ"
}
],
"AddressType": [
{
"type": "configuration/relationTypes/HasAddress/attributes/AddressType",
"ov": true,
"value": "India",
"lookupCode": "INDIA",
"lookupRawValue": "091",
"uri": "relations/VnEqtJj/attributes/AddressType/1dM9qjwep"
}
],
"SourceCD": [
{
"type": "configuration/relationTypes/HasAddress/attributes/SourceCD",
"ov": true,
"value": "081",
"lookupError": "1003: RDM canonical value mapping not found for value [081] and source [Reltio] in tenant [tst01messaging]",
"uri": "relations/VnEqtJj/attributes/SourceCD/1dM9qk0v5"
}
]
},
"crosswalks": [
{
"uri": "relations/VnEqtJj/crosswalks/1dM9qk5BL",
"type": "configuration/sources/Reltio",
"value": "VnEqtJj",
"reltioLoadDate": "2019-08-28T05:39:56.552Z",
"createDate": "2019-08-28T05:39:56.552Z",
"updateDate": "2019-08-28T05:39:56.552Z",
"attributes": [
"relations/VnEqtJj/attributes/BusinessCode/1dM9qjo8J",
"relations/VnEqtJj/attributes/AddressType/1dM9qjwep",
"relations/VnEqtJj/attributes/SourceCD/1dM9qk0v5",
"relations/VnEqtJj/attributes/CareOf/1dM9qjsOZ"
],
"singleAttributeUpdateDates": {}
}
]
}
}
CHANGE_REQUEST_CREATED
: This event is generated when a Data Change Request (DCR) is created.CHANGE_REQUEST_CHANGED
: This event is generated when a Data Change Request (DCR) is changed.CHANGE_REQUEST_REMOVED
: This event is generated when a Data Change Request (DCR) is deleted.
The ENTITIES_SPLITTED
event type has an extra field containing split
participant URIs:
{
"type": <string>,
"object": <serialized ObjectTO (EntityTO/RelationTO)>,
"ovChanged": <true|false>,
"uris": <List<string>>
}
The following types of CRUD events are not supported:
ACTIVITY_CREATED_UPDATED
ACTIVITY_REMOVED
ENTITY_LOSER_REMOVED
RELATION_WOULD_CHANGE
RELATION_WOULD_DELETE
ENTITIES_CATEGORY_TREE_STRUCTURE_CHANGED
Match Events
Headers: tenantId
; eventType
;
sourceObjectUri
.
{
"type": <string>,
"updatedTime": <timestamp>,
"uris": <List<string>>
}
updateTime
appears only if "JMSIncludeMergeTime":
true
(by default, it is false
) is specified in the
field in the streaming configuration, as shown in this example:
"streamingConfig":{
"streamingEnabled":true,
"analyzeOvChanges":false,
"JMSIncludeMergeTime":true,
"JMSEventsTimeToLive":86400000,
"JMSEventsFilteringFields":[
"uri",
"crosswalks",
"type",
"VeevaSync",
"createdBy",
"createdTime",
"updatedBy",
"updatedTime",
"startObject",
"endObject",
"attributes"
],
"messaging":{
"destinations":[
{
"provider":"default",
"type":"queue",
"name":"queue.MyTenant.allEvents"
},
{
"provider":"default",
"type":"queue",
"name":"queue.MyTenant.dtssEvents",
"dtssQueue":true
}
]
}
}
Only type “ENTITIES_MERGED”
is supported. “uris”
contain uris of all merge participants. The "updatedTime"
contains
the timestamp when the entities were merged.
ovChanged
flag is always true for
ENTITY_CHANGED
events sent for these related entities due to a
reference attribute change. This is a type of optimization to avoid potential huge
negative performance impact in delta calculation in case some referenced entity with
may related entities was changed.Internally, these event types are supported:
ENTITIES_MERGED
: the entity was merged by the platform with some other entities.ENTITIES_MERGED_MANUALLY
: the entity was merged by the user's_sameA
s request.ENTITIES_MERGED_ON_THE_FLY
: a phantom entity was merged with the entity.
ENTITIES_MERGED
in the
typeFilter
property in the Tenant physical configuration
"streamingConfig"-"messaging"->"destinations"
section.
Therefore, even if you indicate ENTITIES_MERGED_MANUALLY
, or
ENTITIES_MERGED_ON_THE_FLY
, the propagated event is
ENTITIES_MERGED
.Uris
contains the URIs of all merge participants.
Support of Entity Type Filtering Using the typeFilter
Property
The typeFilter
attribute is a collection of event type names to
stream, and events of different types are ignored. This attribute is not required,
and the default value is null
.
Support of Entity Type Filtering Using the objectFilter
Property
The objectFilter
attribute is a filter expression for an event
object. An event is not sent to a destination if its object does not match the
filter. The expression structure is the same as for search filters. This attribute
is not required, and the default value is null
.
objectFilter
property
allows messages to be filtered by the entire object content (type
,
attributes
, createdTime
, etc). However, for
MERGE
events, it allows filtering by the following properties
only of the Winner entity: type
, createdTime
,
createdBy
, updatedTime
, or
updatedBy
.objectFilter=equals(type,'configuration/entityTypes/Individual') and startsWith(attributes.FirstName, 'An')
This type of filter does not work properly for MATCHING events, even if the *Winner*
Entity has FirstName
= 'Ann'
or
FirstName
= 'Andy'
. Such an event does not
appear in the queue.
objectFilter=equals(type,'configuration/entityTypes/Individual') and gt(updatedTime, 1567595145000)
Streaming Events That are Larger Than the Queue Size Limit
Whenever the compressed size of an event exceeds the specified queue size limits of
256 KB for AWS SQS and 10MB for Google PubSub, the event streaming processor sets
the exceededQueueSizeLimit
flag to true and then sends the
event.
If event size with compressed payload exceeds the specified limit, then, only the sourceObjectUri
is included in the payload. The client application then makes an additional request to Reltio to get the object content, by using the GET {{baseurl}}/api/{{tenant}}/entities/object_id
request.
Example
{
"messageAttributes": {
"tenantId": "RP79554",
"messageId": "67deac11-31ce-4402-a052-c193c64fe824",
"eventType": "ENTITY_CHANGED",
"contentType": "application/json",
"sourceObjectUri": "entities/1bAuWfOV"
},
"body": {
"type": "ENTITY_CHANGED",
"ovChanged": false,
"exceededQueueSizeLimit": true
}
}
Examples of Events
Examples for CRUD and Match events are given below.
CRUD Event Examples
{
"type": "ENTITY_CREATED",
"object":{
"URI": "entities/01aQuFl",
"type": "configuration/entityTypes/Individual",
"createdBy": "admin",
"createdTime": 1363645383201,
"updatedTime": 1363645383201,
"startDate": 192315600000,
"crosswalks": [
{
"URI": "entities/3n4t7JZ/crosswalks/1UwSn",
"type": "configuration/sources/Reltio",
"value": "3n4t7JZ",
"createDate": "2014-06-20T01:42:48.451Z",
"updateDate": "2014-06-20T01:42:48.451Z",
"attributes": ["entities/3n4t7JZ/attributes/FirstName/1UwWv"],
"singleAttributeUpdateDates": {
"entities/3n4t7JZ/attributes/FirstName/1UwWv": "2014-06-20T01:42:48.451Z"
}
},
{
"URI": "entities/3n4t7JZ/crosswalks/Gun",
"type": "configuration/sources/ASource",
"value": "Individual.100",
"createDate": "2014-06-10T14:43:00.923Z",
"updateDate": "2014-06-10T14:43:00.923Z",
"attributes": ["entities/3n4t7JZ/attributes/FirstName/Gqf", "entities/3n4t7JZ/attributes/LastName/GmX"],
"singleAttributeUpdateDates": {}
}
]
}
}
{
"type":"CHANGE_REQUEST_CHANGED",
"object":{
"uri":"changeRequests/0000Fyz",
"createdBy":"User",
"createdTime":1549473854379,
"updatedBy":"User",
"updatedTime":1549473855260,
"changes":{
"entities/0000Dqr":[
{
"id":"0000jon",
"type":"CREATE_ENTITY",
"createdTime":1549473855260,
"createdBy":"User",
"newValue":{
"uri":"entities/0000Dqr",
"type":"configuration/entityTypes/Location",
"attributes":{
"AddressLine1":[
{
"value":"Line"
}
]
}
}
}
]
},
"type":"configuration/changeRequestTypes/someChangeRequestType",
"state":"AWAITING_REVIEW"
}
}
{
"type":"CHANGE_REQUEST_CREATED",
"object":{
"uri":"changeRequests/0001Deb",
"createdBy":"User",
"createdTime":1549473833725,
"updatedBy":"User",
"updatedTime":1549473833725,
"changes":{
"relations/0000Aef":[
{
"id":"00019OL",
"type":"DELETE_RELATIONSHIP",
"createdTime":1549473833725,
"createdBy":"User"
}
]
},
"type":"configuration/changeRequestTypes/someChangeRequestType",
"state":"AWAITING_REVIEW"
}
}
{
"type":"CHANGE_REQUEST_REMOVED",
"object":{
"uri":"changeRequests/0000Fyz",
"type":null,
"state":"AWAITING_REVIEW"
}
}
{
"type": "ENTITIES_MERGED",
"updatedTime": 1567596147000,
"uris": [
"entities/00MNwCe",
"entities/00MXpxo",
]
}
updatedTime
will be added to the ENTITIES_MERGED
event when a property (JMSIncludeMergeTime
) in a tenant
configuration is enabled (set to true
). Also, the field needs to be
included in the JMSEventsFilteringFields
list.
Consumer Implementation Examples
Amazon SQS
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sns</artifactId>
<version>1.11.180</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
public class SqsConsumer {
private static final Logger logger = LogManager.getLogger(SqsConsumer.class);
public static void main(String[] args) {
String accessKey = "AccessKey";
String secretKey = "SecretKey";
String region = "us-east-1";
//Create SQS client:
AWSCredentialsProvider provider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey));
AmazonSQS sqs = AmazonSQSClient.builder().withCredentials(provider).withRegion(region).build();
//Resolve queue URL by name:
String queueUrl = sqs.getQueueUrl("QueueName").getQueueUrl();
//Create and start poller:
SqsPoller poller = new SqsPoller(queueUrl, sqs);
poller.start();
//...
//Stop eventually:
poller.stopRunning();
}
private static class SqsPoller extends Thread {
private final String queueUrl;
private final AmazonSQS sqs;
private final ObjectMapper mapper = new ObjectMapper();
private boolean keepRunning = true;
public SqsPoller(String queueUrl, AmazonSQS sqs) {
this.queueUrl = queueUrl;
this.sqs = sqs;
}
@Override
public void run() {
while (keepRunning) {
performReceive();
}
}
public void stopRunning() {
this.keepRunning = false;
}
private void performReceive() {
//Request a batch with up to 10 messages (SQS limitation):
ReceiveMessageRequest request = new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(10);
List<Message> messages = sqs.receiveMessage(request).getMessages();
List<DeleteMessageBatchRequestEntry> toDelete = new ArrayList<>();
int item = 0;
for (Message message : messages) {
// Invoke a method to process the message:
onMessage(message);
//Collect the receipts for successfully processed messages so they can be acknowledged at the end:
toDelete.add(new DeleteMessageBatchRequestEntry(String.valueOf(++item), message.getReceiptHandle()));
}
deleteMessages(toDelete);
}
private void deleteMessages(List<DeleteMessageBatchRequestEntry> toDelete) {
if (!toDelete.isEmpty()) {
try {
DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(queueUrl, toDelete);
DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
for (BatchResultErrorEntry error: result.getFailed()) {
logger.error("Cannot acknowledge message: " + error.getMessage());
}
} catch (Exception e) {
logger.error("Cannot acknowledge messages batch", e);
}
}
}
private void onMessage(Message message) {
try {
JsonNode json = mapper.readTree(message.getBody());
String eventType = json.findValue("type").asText();
switch (eventType) {
case "ENTITY_CREATED":
case "ENTITY_CHANGED":
//Handle in some way:
break;
case "RELATIONSHIP_CREATED":
case "RELATIONSHIP_CHANGED":
//Handle in another way:
break;
//...
default:
//Ignore others;
}
} catch (Exception e) {
logger.error(e);
}
}
}
}
Google PubSub
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>0.40.0-beta</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
import java.io.FileInputStream;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.threeten.bp.Duration;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.Credentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
public class PubSubConsumer {
private static final Logger logger = LogManager.getLogger(PubSubConsumer.class);
public static void main(String[] args) throws Exception {
//Load credentials from previously exported ServiceAccount key file:
Credentials credentials = ServiceAccountCredentials.fromStream(new FileInputStream("PathToServiceAccountKeyFile"));
CredentialsProvider provider = FixedCredentialsProvider.create(credentials);
//Create a subscription reader:
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of("projectName", "subscriptionId");
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, new MessageHandler()).
setCredentialsProvider(provider).setMaxAckExtensionPeriod(Duration.ofMinutes(1L)).build();
subscriber.startAsync();
//...
//Stop eventually:
subscriber.stopAsync();
}
private static class MessageHandler implements MessageReceiver {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
try {
JsonNode json = mapper.readTree(message.getData().toStringUtf8());
String eventType = json.findValue("type").asText();
switch (eventType) {
case "ENTITY_CREATED":
case "ENTITY_CHANGED":
//Handle in some way:
break;
case "RELATIONSHIP_CREATED":
case "RELATIONSHIP_CHANGED":
//Handle in another way:
break;
//...
default:
//Ignore others;
}
} catch (Exception e) {
logger.error(e);
}
}
}
}