Sync to Data Pipeline API
Learn more about how to use the syncToDataPipeline API to trigger a full reindex of tenant data and stream the results to the Data Pipeline Hub.
The syncToDataPipeline triggers a background reindex job that synchronizes all tenant data and streams it to the Data Pipeline Hub. The job processes entities, relations, interactions, potential matches, merges, and activities. Also, the job scopes the reindex to specific data types, entity types, or relation types. Stop and Pause operations are supported. For more information about this API and to try it out, see Sync to Data Pipeline API.
Background tasks
When the reindex job starts, it creates a background task for each requested data type. Each task scans the entire database and streams the results to the Data Pipeline Hub. Monitor these tasks in the .
Each task scans the entire database regardless of the updatedSince, dataTypes, entityType, relationType, or distributed parameters specified in the request. Task execution time remains the same regardless of the parameter values.
The following background tasks run as part of the job.
| Data type | Background task |
|---|---|
| Entities | ReindexDataTask |
| Deleted entities | ReindexDeletedDataTask |
| Relations | ReindexRelationsTask |
| Deleted relations | ReindexDeletedRelationsTask |
| Interactions | ReindexInteractionsTask |
| Deleted interactions | ReindexDeletedInteractionsTask |
| Potential matches | PotentialMatchesReindexTask |
| Activity | ReindexActivityTask |
| Merges | ReindexMergesTask |
HTTP method and endpoint
POST http(s)://{{environment}}.reltio.com/reltio/api/{tenantId}/syncToDataPipelineReplace {{environment}} with your tenant environment name.
The following table describes the endpoint path parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
tenantId | String | Yes | ID of the tenant for which to trigger the reindex. Example: ce5627DYnQ6abcD |
Request headers
The following request headers must be included.
| Header | Value | Required |
|---|---|---|
Authorization | Bearer <access_token> | Yes |
Content-Type | application/json | Yes |
Query parameters
The following table describes the supported query parameters.
| Parameter | Type | Required | Description | Accepted values / Default |
|---|---|---|---|---|
dataTypes | String | No | Comma-separated list of data types to reindex and stream. If omitted, all data types are processed. | entities, relations, interactions, matches, merges, activities, deleted_entities, deleted_relations, deleted_interactions. Default: all |
updatedSince | Integer (int64) | No | Reindexes objects updated after the specified timestamp in epoch milliseconds. Default: | Default: 0 |
adapters | String | No | Specifies the target adapter to stream data to. If omitted, all enabled adapters are used. | — |
reindexDeleted | Boolean | No | If set to true, adds sub-tasks to reindex deleted entities and relations. This parameter is ignored when specific dataTypes values are provided. | true or false. Default: |
entityType | String | No | Restricts the reindex scope to a specific entity type. Only applies when dataTypes includes entities. | Example: Individual |
relationType | String | No | Restricts the reindex scope to a specific relation type. Only applies when dataTypes includes relations. | Example: HasAddress |
distributed | Boolean | No | If set to true, the job runs in distributed mode, splitting the reindex across multiple parallel tasks. | true or false.Default: |
taskPartsCount | Integer | No | Number of parallel tasks created for distributed reindexing. Each task processes its own subset of objects. Must not exceed the | Default: 2 |
distributedTaskIndex | Integer | No | Specifies which part of the distributed job to run. The value must be within the range defined by taskPartsCount. For example, if taskPartsCount is 3, valid values are 1, 2, or 3. Use this to rerun a specific part that previously failed. Only applies when | 1 to taskPartsCount.Default: None |
Request body
["entities/I1", "entities/I2", "entities/I3"]Example request
The following table shows common ways to use the Sync to Data Pipeline API. Use the request that matches your reindex requirement.
| Scenario | When to use | Request |
|---|---|---|
| Send all data (default) | Reindex and stream all tenant data to the Data Pipeline Hub. |
|
| Include deleted and loser records | Reindex all data, including deleted entities and relations and loser records from merges. |
|
| Sync only entities | Reindex and stream entity data only, excluding all other data types. |
|
| Sync only relations | Reindex and stream relation data only, excluding all other data types. |
|
| Sync a specific entity type | Restrict the reindex scope to a single entity type. Replace HCP with your entity type. |
|
| Sync a specific relation type | Restrict the reindex scope to a single relation type. Replace HCOToHCP with your relation type. |
|
| Sync specific entity records by URI | Reindex only a known set of entity records. Include the URIs in the request body. |
|
| Sync specific deleted or loser records by URI | Reindex only a known set of deleted or loser entity records. Include the URIs in the request body. |
|
| Sync all data for specific entity records | Stream all events for specific records, including entity created or updated events, match events, and merge pairs where the specified entity is the loser. |
|
| Sync events after a specific timestamp | Reindex only records updated after a specific point in time. Replace xxxxxxxxxxxx with the epoch timestamp. |
|
| Sync in distributed mode with multiple parameters | Run the reindex job across multiple parallel tasks, scoped to specific data types, entity type, relation type, and timestamp. |
|
Response body
The following table describes the fields returned in the response body for each background task created by the reindex job.
| Field | Type | Description |
|---|---|---|
id | String | Unique identifier of the background task. |
groupId | String | Unique identifier that groups all background tasks created by the same reindex job. |
createdTime | Number | Timestamp when the task was created, in epoch milliseconds. |
createdBy | String | Email address of the user who triggered the reindex job. |
updatedTime | Number | Timestamp of the last task update, in epoch milliseconds. |
updatedBy | String | Email address of the user who last updated the task. |
type | String | Fully qualified class name of the background task. Identifies the data type being reindexed. |
status | String | Current execution status of the task. Example: SCHEDULED. |
name | String | Description of the reindex operation. |
createdOnHost | String | Hostname of the server that created the task. |
parallelExecution | Boolean | Indicates whether the task runs in parallel execution mode. |
nodesGroup | String | Node group on which the task runs. |
parameters.tenantId | String | Tenant ID for which the reindex is running. |
parameters.updatedSince | String | The updatedSince value applied to the task. |
parameters.eventQueueProcessors | String | Event queue processors used to stream data. |
parameters.streamingDestinations | String | Streaming destination queue for the tenant. |
currentState | Object | Current execution state of the task. Empty when newly scheduled. |
duration | String | Elapsed execution time of the task. |
Example response
[
{
"id": "9916103e-8757-4ad1-8d69-4d6afa17b125",
"groupId": "27856767-5cef-4970-92b9-7403c714b293",
"createdTime": 1665054423545,
"createdBy": "test@reltio.com",
"updatedTime": 1665054423545,
"updatedBy": "test@reltio.com",
"type": "com.reltio.businesslogic.tasks.reindex.ReindexInteractionsTask",
"status": "SCHEDULED",
"name": "Reindexing interactions in tenant LocalTest",
"createdOnHost": "DESKTOP-1N09DOF",
"parallelExecution": false,
"nodesGroup": "default",
"parameters": {
"tenantId": "LocalTest",
"eventQueueProcessors": "crud_streaming,datapipeline_snowflake_processor",
"streamingDestinations": "03ff3091f067027b0d484b4e02e36861_queue_local-datapipeline-events_LocalTest"
},
"currentState": {},
"duration": "0s"
},
{
"id": "5d5b448d-377e-4937-a098-43a6bebacf9b",
"groupId": "21e1fa97-d90b-4609-a0d4-0356e3f2a3fd",
"createdTime": 1665054423545,
"createdBy": "test@reltio.com",
"updatedTime": 1665054423545,
"updatedBy": "test@reltio.com",
"type": "com.reltio.businesslogic.tasks.reindex.PotentialMatchesReindexTask",
"status": "SCHEDULED",
"name": "Reindexing potential matches information for tenant LocalTest",
"createdOnHost": "DESKTOP-1N09DOF",
"parallelExecution": false,
"nodesGroup": "default",
"parameters": {
"tenantId": "LocalTest",
"eventQueueProcessors": "crud_streaming,datapipeline_snowflake_processor",
"streamingDestinations": "03ff3091f067027b0d484b4e02e36861_queue_local-datapipeline-events_LocalTest"
},
"currentState": {},
"duration": "0s"
},
{
"id": "2bb5f03b-8163-46fe-a383-4523f77cf5b4",
"groupId": "7df39c85-b230-4b7d-bc02-1cc10d68b891",
"createdTime": 1665054423545,
"createdBy": "test@reltio.com",
"updatedTime": 1665054423545,
"updatedBy": "test@reltio.com",
"type": "com.reltio.businesslogic.tasks.reindex.ReindexMergesTask",
"status": "SCHEDULED",
"name": "Reindexing of merges in tenant LocalTest",
"createdOnHost": "DESKTOP-1N09DOF",
"parallelExecution": false,
"nodesGroup": "default",
"parameters": {
"tenantId": "LocalTest",
"eventQueueProcessors": "match_streaming,datapipeline_snowflake_processor",
"streamingDestinations": "03ff3091f067027b0d484b4e02e36861_queue_local-datapipeline-events_LocalTest"
},
"currentState": {},
"duration": "0s"
}
]
400 Bad Request response. Review the error details, correct the request, and try again.{
"severity": "Error",
"errorMessage": "Failed to parse content as JSON",
"errorCode": 150,
"errorDetailMessage": "Expecting request content in JSON format, failed to parse.",
"innerErrorData": {
"exception": "com.reltio.rest.data.marshalling.MarshallerException"
}
}Monitor the reindex job
After submitting the request, monitor the reindex job using the following APIs.
| Goal | API |
|---|---|
| Check the status of all active tasks | Get active tasks for tenant |
| Review completed task history | Get task history for tenant |
| Check the status of a specific task by ID | Get task by ID for tenant |
| Verify that processing is complete | Tenant Queue Status API, confirm the queue count is 0Note: A non-zero queue count does not always indicate that the sync is still running. Ongoing activity in the tenant may contribute to the count. Check for other active processes before concluding the sync is incomplete. |