Accelerate the Value of Data

Events API

All CRUD and Match events from Reltio APIs are sent to queues/topics as messages in JSON format.

The events from Reltio APIs are sent to queues/topics as messages in JSON format. Therefore, the messages that are published to streaming destinations are actually serialized representations of these events.

Important: If there are changes inside a relation or an entity, which is part of reference attribute for some related entities, the ovChangedflag is always true forENTITY_CHANGED events sent for these related entities due to a reference attribute change. This is a type of optimization to avoid potential huge negative performance impact in delta calculation in the event some referenced entity with numerous related entities was changed.
Note:
  • The platform streaming functionality works with at least one message delivery semantic.
  • If an attribute is connected to the Reference Data Management (RDM), any change in the RDM mapping leading to changes in the canonical codes or values, will not generate an Entity Changed event in MDM. This result is due to the raw value stored in the attribute, which remains unchanged.

Streaming Message Format

Messages getting published to the streaming destination are serialized representations of events of the following two types:

  • CRUD Events
  • Match Events

CRUD Events

Headers: tenantId; eventType; sourceObjectUri.

{
	"type": <string>,
	"object": <serialized ObjectTO (EntityTO/RelationTO)>,
	"ovChanged": <true|false> //only for ENTITY_CHANGED and RELATIONSHIP_CHANGED, optional - might not present when "analyzeOvChanges" streaming property is set to false
}

ObjectTO is serialized based on the JMSEventsFilteringFields streaming property, and only object fields listed there are added to the final JSON.

RelationTO is serialized based on the RelationEventsFilteringFields streaming property, and only relation fields listed there are added to the final JSON.

The following types of CRUD events are supported:

  • ENTITY_CREATED: This event indicates that the entity was created. This event does not contain any additional fields.
  • ENTITY_REMOVED: This event indicates that the entity was removed (by DELETE /entity/id request).This event does not contain any additional fields.
  • ENTITY_REMOVED_GDPR: This event indicates that the entity was removed (by GDPR request). After removing only two records for every entity, these are left: ENTITY_CREATED and ENTITY_REMOVED_GDPR. This event does not contain any additional fields.
  • ENTITY_LOST_MERGE: This event indicates that the entity is a loser in a merge with another entity.This event does not contain any additional fields.
  • ENTITY_CHANGED: This event indicates that the entity was changed. This event does not contain any additional fields.
    Note: The ENTITY_CHANGED event is not created when an entity's attributes are not changed.

    For example: There are 3 entities - entity A, entity B, and entity C. Entity A references entity B and entity B references entities A and C. Now, when you update entity A, these updates are propagated first to entity B and then to entity C. Since there are no attribute updates done for entities B and C, and only the object version is updated, the ENTITY_CHANGED events are not created for these entities.

  • RELATIONSHIP_CREATED: This event indicates that a relationship was created between this entity and any other entity. In the case of an added Reference Attribute (for example, Address), the ENTITY_CHANGED event also appears in the result.
  • RELATIONSHIP_REMOVED: This event indicates that a relationship between the entity and some other entity was removed. In the case of a Reference Attribute (for example, Address) the ENTITY_CHANGED event also appears in the result.
  • RELATIONSHIP_REMOVED_GDPR: This event indicates that a relationship between the entity and some other entity was removed (by GDPR request). In the case of a Reference Attribute (for example, Address), the ENTITY_CHANGED event also appears in the result.
  • RELATIONSHIP_CHANGED: This event indicates that a relationship between the entity and some other entity was changed. In the case of the Reference Attribute (for example, Address), the ENTITY_CHANGED event also appears in the result.
  • GROUP_CREATED: Currently, not used.
  • GROUP_REMOVED: Currently, not used.
  • GROUP_CHANGED: Currently, not used.
  • INTERACTION_CREATED: This event indicates that the interaction was created. This event does not contain any additional fields.
  • INTERACTION_REMOVED: This event indicates that the interaction was removed (by DELETE /entity/id request).This event does not contain any additional fields.
  • INTERACTION_CHANGED: This event indicates that the interaction was changed. This event does not contain any additional fields.
  • ENTITIES_SPLITTED: For the winner entity, this event indicates that some entities were unmerged from it. For an unmerged entity, it indicates that the entity was unmerged from its master.
  • ENTITIES_MATCHES_CHANGED: This event is fired when list of Potential Matches for an entity was changed.
  • RELATION_LOST_MERGE: This event is generated when merging relationships for loser relations.
  • RELATIONSHIP_MERGED: This event is fired when merging relations (like ENTITIES_MERGED for entities).
  • ENTITY_BUSINESS_PROCESS_CHANGED: For workflow functionality, there are commonly designated business process objects connected to an entity. For any change to the business process for an entity (even if the business process was created or deleted), the platform fires the event.
  • CHANGE_REQUEST_CREATED: This event is generated when a Data Change Request (DCR) is created.
  • CHANGE_REQUEST_CHANGED: This event is generated when a Data Change Request (DCR) is changed.
  • CHANGE_REQUEST_REMOVED: This event is generated when a Data Change Request (DCR) is deleted.
Note: Lookup code and value in attributes are serialized to event JSON. Refer to this example:

Example: RELATIONSHIP_CREATED

{
  "type": "RELATIONSHIP_CREATED",
  "object": {
    "uri": "relations/VnEqtJj",
    "type": "configuration/relationTypes/HasAddress",
    "createdBy": "auto20190828AdminawgMp",
    "createdTime": 1566970796552,
    "updatedBy": "auto20190828AdminawgMp",
    "updatedTime": 1566970796552,
    "startRefPinned": false,
    "startRefIgnored": false,
    "endRefPinned": false,
    "endRefIgnored": false,
    "attributes": {
      "BusinessCode": [
        {
          "type": "configuration/relationTypes/HasAddress/attributes/BusinessCode",
          "ov": true,
          "value": "AAA",
          "lookupCode": "A",
          "lookupRawValue": "A",
          "uri": "relations/VnEqtJj/attributes/BusinessCode/1dM9qjo8J"
        }
      ],
      "CareOf": [
        {
          "type": "configuration/relationTypes/HasAddress/attributes/CareOf",
          "ov": true,
          "value": "B",
          "uri": "relations/VnEqtJj/attributes/CareOf/1dM9qjsOZ"
        }
      ],
      "AddressType": [
        {
          "type": "configuration/relationTypes/HasAddress/attributes/AddressType",
          "ov": true,
          "value": "India",
          "lookupCode": "INDIA",
          "lookupRawValue": "091",
          "uri": "relations/VnEqtJj/attributes/AddressType/1dM9qjwep"
        }
      ],
      "SourceCD": [
        {
          "type": "configuration/relationTypes/HasAddress/attributes/SourceCD",
          "ov": true,
          "value": "081",
          "lookupError": "1003: RDM canonical value mapping not found for value [081] and source [Reltio] in tenant [tst01messaging]",
          "uri": "relations/VnEqtJj/attributes/SourceCD/1dM9qk0v5"
        }
      ]
    },
    "crosswalks": [
      {
        "uri": "relations/VnEqtJj/crosswalks/1dM9qk5BL",
        "type": "configuration/sources/Reltio",
        "value": "VnEqtJj",
        "reltioLoadDate": "2019-08-28T05:39:56.552Z",
        "createDate": "2019-08-28T05:39:56.552Z",
        "updateDate": "2019-08-28T05:39:56.552Z",
        "attributes": [
          "relations/VnEqtJj/attributes/BusinessCode/1dM9qjo8J",
          "relations/VnEqtJj/attributes/AddressType/1dM9qjwep",
          "relations/VnEqtJj/attributes/SourceCD/1dM9qk0v5",
          "relations/VnEqtJj/attributes/CareOf/1dM9qjsOZ"
        ],
        "singleAttributeUpdateDates": {}
      }
    ]
  }
}
Attention: The event message payload for the following event types contains only the event name. These event types do not contain any information other than the name of the event.
  • CHANGE_REQUEST_CREATED: This event is generated when a Data Change Request (DCR) is created.
  • CHANGE_REQUEST_CHANGED: This event is generated when a Data Change Request (DCR) is changed.
  • CHANGE_REQUEST_REMOVED: This event is generated when a Data Change Request (DCR) is deleted.

The ENTITIES_SPLITTED event type has an extra field containing split participant URIs:

{
	"type": <string>,
	"object": <serialized ObjectTO (EntityTO/RelationTO)>,
	"ovChanged": <true|false>,
	"uris": <List<string>>
}

The following types of CRUD events are not supported:

  • ACTIVITY_CREATED_UPDATED
  • ACTIVITY_REMOVED
  • ENTITY_LOSER_REMOVED
  • RELATION_WOULD_CHANGE
  • RELATION_WOULD_DELETE
  • ENTITIES_CATEGORY_TREE_STRUCTURE_CHANGED
Note: DCR events are fully supported.

Match Events

Headers: tenantId; eventType; sourceObjectUri.

{
	"type": <string>,
      "updatedTime": <timestamp>,
      "uris": <List<string>>
}

updateTime appears only if "JMSIncludeMergeTime": true (by default, it is false) is specified in the field in the streaming configuration, as shown in this example:

"streamingConfig":{
   "streamingEnabled":true,
   "analyzeOvChanges":false,
   "JMSIncludeMergeTime":true,
   "JMSEventsTimeToLive":86400000,
   "JMSEventsFilteringFields":[
      "uri",
      "crosswalks",
      "type",
      "VeevaSync",
      "createdBy",
      "createdTime",
      "updatedBy",
      "updatedTime",
      "startObject",
      "endObject",
      "attributes"
   ],
   "messaging":{
      "destinations":[
         {
            "provider":"default",
            "type":"queue",
            "name":"queue.MyTenant.allEvents"
         },
         {
            "provider":"default",
            "type":"queue",
            "name":"queue.MyTenant.dtssEvents",
            "dtssQueue":true
         }
      ]
   }
}

Only type “ENTITIES_MERGED” is supported. “uris” contain uris of all merge participants. The "updatedTime" contains the timestamp when the entities were merged.

Important: In the case when changes occur inside a relation or an entity, which is part of the reference attribute for some related entities, the ovChanged flag is always true for ENTITY_CHANGED events sent for these related entities due to a reference attribute change. This is a type of optimization to avoid potential huge negative performance impact in delta calculation in case some referenced entity with may related entities was changed.

Internally, these event types are supported:

  • ENTITIES_MERGED: the entity was merged by the platform with some other entities.
  • ENTITIES_MERGED_MANUALLY: the entity was merged by the user's _sameAs request.
  • ENTITIES_MERGED_ON_THE_FLY: a phantom entity was merged with the entity.
Note: When the system sends these event types to the external queue, it always converts the type of the event into ENTITIES_MERGED in the typeFilter property in the Tenant physical configuration "streamingConfig"-"messaging"->"destinations" section. Therefore, even if you indicate ENTITIES_MERGED_MANUALLY, or ENTITIES_MERGED_ON_THE_FLY, the propagated event is ENTITIES_MERGED.

Uris contains the URIs of all merge participants.

Support of Entity Type Filtering Using the typeFilter Property

The typeFilter attribute is a collection of event type names to stream, and events of different types are ignored. This attribute is not required, and the default value is null.

Support of Entity Type Filtering Using the objectFilter Property

The objectFilter attribute is a filter expression for an event object. An event is not sent to a destination if its object does not match the filter. The expression structure is the same as for search filters. This attribute is not required, and the default value is null.

Important: For CRUD Events, the objectFilter property allows messages to be filtered by the entire object content (type, attributes, createdTime, etc). However, for MERGE events, it allows filtering by the following properties only of the Winner entity: type, createdTime, createdBy, updatedTime, or updatedBy.
For example, the following filter is valid for CRUD Events:
objectFilter=equals(type,'configuration/entityTypes/Individual') and startsWith(attributes.FirstName, 'An')

This type of filter does not work properly for MATCHING events, even if the *Winner* Entity has FirstName = 'Ann' or FirstName = 'Andy'. Such an event does not appear in the queue.

However, the following filter works with CRUD/MATCH Events properly:
objectFilter=equals(type,'configuration/entityTypes/Individual') and gt(updatedTime, 1567595145000)

Streaming Events That are Larger Than the Queue Size Limit

Whenever the compressed size of an event exceeds the specified queue size limits of 256 KB for AWS SQS and 10MB for Google PubSub, the event streaming processor sets the exceededQueueSizeLimit flag to true and then sends the event.

If event size with compressed payload exceeds the specified limit, then, only the sourceObjectUri is included in the payload. The client application then makes an additional request to Reltio to get the object content, by using the GET {{baseurl}}/api/{{tenant}}/entities/object_id request.

Example

{
 "messageAttributes": {
  "tenantId": "RP79554",
  "messageId": "67deac11-31ce-4402-a052-c193c64fe824",
  "eventType": "ENTITY_CHANGED",
  "contentType": "application/json",
  "sourceObjectUri": "entities/1bAuWfOV"
 },
 "body": {
  "type": "ENTITY_CHANGED", 
  "ovChanged": false, 
  "exceededQueueSizeLimit": true 
 }
}

Examples of Events

Examples for CRUD and Match events are given below.

CRUD Event Examples

Example 1: ENTITY_CREATED
{
    "type": "ENTITY_CREATED",
    "object":{
        "URI": "entities/01aQuFl",
        "type": "configuration/entityTypes/Individual",
        "createdBy": "admin",
        "createdTime": 1363645383201,     
        "updatedTime": 1363645383201,
        "startDate": 192315600000,
        "crosswalks": [
            {
                "URI": "entities/3n4t7JZ/crosswalks/1UwSn",
                "type": "configuration/sources/Reltio",
                "value": "3n4t7JZ",
                "createDate": "2014-06-20T01:42:48.451Z",
                "updateDate": "2014-06-20T01:42:48.451Z",
                "attributes": ["entities/3n4t7JZ/attributes/FirstName/1UwWv"],
                "singleAttributeUpdateDates": {
                    "entities/3n4t7JZ/attributes/FirstName/1UwWv": "2014-06-20T01:42:48.451Z"
                }
            },
            {
                "URI": "entities/3n4t7JZ/crosswalks/Gun",
                "type": "configuration/sources/ASource",
                "value": "Individual.100",
                "createDate": "2014-06-10T14:43:00.923Z",
                "updateDate": "2014-06-10T14:43:00.923Z",
                "attributes": ["entities/3n4t7JZ/attributes/FirstName/Gqf", "entities/3n4t7JZ/attributes/LastName/GmX"],
                "singleAttributeUpdateDates": {}
            }
        ]
    }
}                   
                    
Example 2: CHANGE_REQUEST_CHANGED
{
   "type":"CHANGE_REQUEST_CHANGED",
   "object":{
      "uri":"changeRequests/0000Fyz",
      "createdBy":"User",
      "createdTime":1549473854379,
      "updatedBy":"User",
      "updatedTime":1549473855260,
      "changes":{
         "entities/0000Dqr":[
            {
               "id":"0000jon",
               "type":"CREATE_ENTITY",
               "createdTime":1549473855260,
               "createdBy":"User",
               "newValue":{
                  "uri":"entities/0000Dqr",
                  "type":"configuration/entityTypes/Location",
                  "attributes":{
                     "AddressLine1":[
                        {
                           "value":"Line"
                        }
                     ]
                  }
               }
            }
         ]
      },
      "type":"configuration/changeRequestTypes/someChangeRequestType",
      "state":"AWAITING_REVIEW"
   }
}
Example 3: CHANGE_REQUEST_CREATED
{
   "type":"CHANGE_REQUEST_CREATED",
   "object":{
      "uri":"changeRequests/0001Deb",
      "createdBy":"User",
      "createdTime":1549473833725,
      "updatedBy":"User",
      "updatedTime":1549473833725,
      "changes":{
         "relations/0000Aef":[
            {
               "id":"00019OL",
               "type":"DELETE_RELATIONSHIP",
               "createdTime":1549473833725,
               "createdBy":"User"
            }
         ]
      },
      "type":"configuration/changeRequestTypes/someChangeRequestType",
      "state":"AWAITING_REVIEW"
   }
}
Example 4: CHANGE_REQUEST_REMOVED
{  
   "type":"CHANGE_REQUEST_REMOVED",
   "object":{  
      "uri":"changeRequests/0000Fyz",
      "type":null,
      "state":"AWAITING_REVIEW"
   }
}
Match Event Example: ENTITIES_MERGED
{
    "type": "ENTITIES_MERGED",
    "updatedTime": 1567596147000,
    "uris": [
        "entities/00MNwCe",
        "entities/00MXpxo",
    ]
} 
 

updatedTime will be added to the ENTITIES_MERGED event when a property (JMSIncludeMergeTime) in a tenant configuration is enabled (set to true). Also, the field needs to be included in the JMSEventsFilteringFields list.

Consumer Implementation Examples

Amazon SQS

Maven Dependencies
<dependencies>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-sns</artifactId>
            <version>1.11.180</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
</dependencies>                
            
SQS Consumer
import java.util.ArrayList;
import java.util.List;
 
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
 
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
 
public class SqsConsumer {
 
    private static final Logger logger = LogManager.getLogger(SqsConsumer.class);
 
    public static void main(String[] args) {
        String accessKey = "AccessKey";
        String secretKey = "SecretKey";
        String region = "us-east-1";
 
        //Create SQS client:
        AWSCredentialsProvider provider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey));
        AmazonSQS sqs = AmazonSQSClient.builder().withCredentials(provider).withRegion(region).build();
 
        //Resolve queue URL by name:
        String queueUrl = sqs.getQueueUrl("QueueName").getQueueUrl();
 
        //Create and start poller:
        SqsPoller poller = new SqsPoller(queueUrl, sqs);
        poller.start();
 
        //...
 
        //Stop eventually:
        poller.stopRunning();
    }
 
    private static class SqsPoller extends Thread {
        private final String queueUrl;
        private final AmazonSQS sqs;
        private final ObjectMapper mapper = new ObjectMapper();
        private boolean keepRunning = true;
 
        public SqsPoller(String queueUrl, AmazonSQS sqs) {
            this.queueUrl = queueUrl;
            this.sqs = sqs;
        }
 
        @Override
        public void run() {
            while (keepRunning) {
                performReceive();
            }
        }
 
        public void stopRunning() {
            this.keepRunning = false;
        }
 
        private void performReceive() {
            //Request a batch with up to 10 messages (SQS limitation):
            ReceiveMessageRequest request = new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(10);
            List<Message> messages = sqs.receiveMessage(request).getMessages();
 
            List<DeleteMessageBatchRequestEntry> toDelete = new ArrayList<>();
            int item = 0;
            for (Message message : messages) {
                // Invoke a method to process the message:
                onMessage(message);
 
                //Collect the receipts for successfully processed messages so they can be acknowledged at the end:
                toDelete.add(new DeleteMessageBatchRequestEntry(String.valueOf(++item), message.getReceiptHandle()));
            }
            deleteMessages(toDelete);
        }
 
        private void deleteMessages(List<DeleteMessageBatchRequestEntry> toDelete) {
            if (!toDelete.isEmpty()) {
                try {
                    DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(queueUrl, toDelete);
                    DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
                    for (BatchResultErrorEntry error: result.getFailed()) {
                        logger.error("Cannot acknowledge message: " + error.getMessage());
                    }
                } catch (Exception e) {
                    logger.error("Cannot acknowledge messages batch", e);
                }
            }
        }
 
        private void onMessage(Message message) {
            try {
                JsonNode json = mapper.readTree(message.getBody());
                String eventType = json.findValue("type").asText();
                switch (eventType) {
                    case "ENTITY_CREATED":
                    case "ENTITY_CHANGED":
                        //Handle in some way:
                        break;
                    case "RELATIONSHIP_CREATED":
                    case "RELATIONSHIP_CHANGED":
                        //Handle in another way:
                        break;
                    //...
                    default:
                        //Ignore others;
                }
            } catch (Exception e) {
                logger.error(e);
            }
        }
    }
}                    
                

Google PubSub

Maven Dependencies
<dependencies>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-pubsub</artifactId>
            <version>0.40.0-beta</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
</dependencies>                    
                
PubSub Consumer
import java.io.FileInputStream;
 
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.threeten.bp.Duration;
 
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.Credentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
 
public class PubSubConsumer {
 
    private static final Logger logger = LogManager.getLogger(PubSubConsumer.class);
 
    public static void main(String[] args) throws Exception {
        //Load credentials from previously exported ServiceAccount key file:
        Credentials credentials = ServiceAccountCredentials.fromStream(new FileInputStream("PathToServiceAccountKeyFile"));
        CredentialsProvider provider = FixedCredentialsProvider.create(credentials);
 
        //Create a subscription reader:
        ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of("projectName", "subscriptionId");
        Subscriber subscriber = Subscriber.newBuilder(subscriptionName, new MessageHandler()).
            setCredentialsProvider(provider).setMaxAckExtensionPeriod(Duration.ofMinutes(1L)).build();
        subscriber.startAsync();
 
        //...
 
        //Stop eventually:
        subscriber.stopAsync();
    }
 
    private static class MessageHandler implements MessageReceiver {
 
        private final ObjectMapper mapper = new ObjectMapper();
 
        @Override
        public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
            try {
                JsonNode json = mapper.readTree(message.getData().toStringUtf8());
                String eventType = json.findValue("type").asText();
                switch (eventType) {
                    case "ENTITY_CREATED":
                    case "ENTITY_CHANGED":
                        //Handle in some way:
                        break;
                    case "RELATIONSHIP_CREATED":
                    case "RELATIONSHIP_CHANGED":
                        //Handle in another way:
                        break;
                    //...
                    default:
                        //Ignore others;
                }
            } catch (Exception e) {
                logger.error(e);
            }
        }
    }
}