Unify and manage your data

Sync to Data Pipeline API

Learn more about how to use the syncToDataPipeline API to trigger a full reindex of tenant data and stream the results to the Data Pipeline Hub.

The syncToDataPipeline triggers a background reindex job that synchronizes all tenant data and streams it to the Data Pipeline Hub. The job processes entities, relations, interactions, potential matches, merges, and activities. Also, the job scopes the reindex to specific data types, entity types, or relation types. Stop and Pause operations are supported. For more information about this API and to try it out, see Sync to Data Pipeline API.

Background tasks

When the reindex job starts, it creates a background task for each requested data type. Each task scans the entire database and streams the results to the Data Pipeline Hub. Monitor these tasks in the Console > Tenant Management > Jobs.

Each task scans the entire database regardless of the updatedSince, dataTypes, entityType, relationType, or distributed parameters specified in the request. Task execution time remains the same regardless of the parameter values.

The following background tasks run as part of the job.

Data typeBackground task
EntitiesReindexDataTask
Deleted entitiesReindexDeletedDataTask
RelationsReindexRelationsTask
Deleted relationsReindexDeletedRelationsTask
InteractionsReindexInteractionsTask
Deleted interactionsReindexDeletedInteractionsTask
Potential matchesPotentialMatchesReindexTask
ActivityReindexActivityTask
MergesReindexMergesTask

HTTP method and endpoint

Use the following HTTP method and endpoint path to submit the request:
POST http(s)://{{environment}}.reltio.com/reltio/api/{tenantId}/syncToDataPipeline

Replace {{environment}} with your tenant environment name.

The following table describes the endpoint path parameters:

ParameterTypeRequiredDescription
tenantIdStringYesID of the tenant for which to trigger the reindex. Example: ce5627DYnQ6abcD

Request headers

The following request headers must be included.

HeaderValueRequired
AuthorizationBearer <access_token>Yes
Content-Typeapplication/jsonYes

Query parameters

The following table describes the supported query parameters.

ParameterTypeRequiredDescriptionAccepted values / Default
dataTypesStringNoComma-separated list of data types to reindex and stream. If omitted, all data types are processed.entities, relations, interactions, matches, merges, activities, deleted_entities, deleted_relations, deleted_interactions.

Default: all

updatedSinceInteger (int64)NoReindexes objects updated after the specified timestamp in epoch milliseconds.

Default: 0.

Default: 0
adaptersStringNoSpecifies the target adapter to stream data to. If omitted, all enabled adapters are used.
reindexDeletedBooleanNoIf set to true, adds sub-tasks to reindex deleted entities and relations. This parameter is ignored when specific dataTypes values are provided.true or false.

Default: false

entityTypeStringNoRestricts the reindex scope to a specific entity type. Only applies when dataTypes includes entities.Example: Individual
relationTypeStringNoRestricts the reindex scope to a specific relation type. Only applies when dataTypes includes relations.Example: HasAddress
distributedBooleanNoIf set to true, the job runs in distributed mode, splitting the reindex across multiple parallel tasks.true or false.

Default: false

taskPartsCountIntegerNoNumber of parallel tasks created for distributed reindexing. Each task processes its own subset of objects.

Must not exceed the maxTaskPartsCount property in your tenant configuration. Only applies when distributed=true.

Default: 2
distributedTaskIndexIntegerNoSpecifies which part of the distributed job to run. The value must be within the range defined by taskPartsCount. For example, if taskPartsCount is 3, valid values are 1, 2, or 3. Use this to rerun a specific part that previously failed.

Only applies when distributed=true.

1 to taskPartsCount.

Default: None

Request body

The syncToDataPipeline accepts an optional request body containing a list of entity URIs. Use the request body when you need to scope the reindex to a specific set of entity records rather than reindexing all records of a given data type. When a list of entity URIs is provided, the reindex processes only those records. When no body is provided, the reindex processes all records matching the query parameters.
["entities/I1", "entities/I2", "entities/I3"]
Note: The request body is supported only for entity URIs. It cannot be used to scope the reindex to specific relation, interaction, or other object types.

Example request

The following table shows common ways to use the Sync to Data Pipeline API. Use the request that matches your reindex requirement.

ScenarioWhen to useRequest
Send all data (default)Reindex and stream all tenant data to the Data Pipeline Hub.
POST https://{env}.reltio.com/reltio/api/{tenantId}/syncToDataPipeline
Include deleted and loser recordsReindex all data, including deleted entities and relations and loser records from merges.
POST https://{env}.reltio.com/reltio/api/{tenantId}/syncToDataPipeline?reindexDeleted=true
Sync only entitiesReindex and stream entity data only, excluding all other data types.
POST https://{env}.reltio.com/reltio/api/{tenantId}/syncToDataPipeline?dataTypes=entities
Sync only relationsReindex and stream relation data only, excluding all other data types.
POST https://{env}.reltio.com/reltio/api/{tenantId}/syncToDataPipeline?dataTypes=relations
Sync a specific entity typeRestrict the reindex scope to a single entity type. Replace HCP with your entity type.
POST https://{env}.reltio.com/reltio/api/{tenantId}/syncToDataPipeline?dataTypes=entities&entityType=HCP
Sync a specific relation typeRestrict the reindex scope to a single relation type. Replace HCOToHCP with your relation type.
POST https://{env}.reltio.com/reltio/api/{tenantId}/syncToDataPipeline?dataTypes=relations&relationType=HCOToHCP
Sync specific entity records by URIReindex only a known set of entity records. Include the URIs in the request body.
POST https://{env}.reltio.com/reltio/api/{tenantId}/syncToDataPipeline?dataTypes=entities

["entities/I1", "entities/I2", "entities/I3"]
Sync specific deleted or loser records by URIReindex only a known set of deleted or loser entity records. Include the URIs in the request body.
POST https://{env}.reltio.com/reltio/api/{tenantId}/syncToDataPipeline?dataTypes=deleted_entities

["entities/I1", "entities/I2", "entities/I3"]
Sync all data for specific entity recordsStream all events for specific records, including entity created or updated events, match events, and merge pairs where the specified entity is the loser.
POST https://{env}.reltio.com/reltio/api/{tenantId}/syncToDataPipeline

["entities/I1", "entities/I2", "entities/I3"]
Sync events after a specific timestampReindex only records updated after a specific point in time. Replace xxxxxxxxxxxx with the epoch timestamp.
POST https://{env}.reltio.com/reltio/api/{tenantId}/syncToDataPipeline?updatedSince=xxxxxxxxxxxx
Sync in distributed mode with multiple parametersRun the reindex job across multiple parallel tasks, scoped to specific data types, entity type, relation type, and timestamp.
POST https://{env}.reltio.com/reltio/api/{tenantId}/syncToDataPipeline?dataTypes=matches,merges,interactions&distributed=true&distributedTaskIndex=0&taskPartsCount=4&updatedSince=123456&reindexDeleted=true&entityType=HCP&relationType=Location

["entities/I1", "entities/I2", "entities/I3"]

Response body

The following table describes the fields returned in the response body for each background task created by the reindex job.

FieldTypeDescription
idStringUnique identifier of the background task.
groupIdStringUnique identifier that groups all background tasks created by the same reindex job.
createdTimeNumberTimestamp when the task was created, in epoch milliseconds.
createdByStringEmail address of the user who triggered the reindex job.
updatedTimeNumberTimestamp of the last task update, in epoch milliseconds.
updatedByStringEmail address of the user who last updated the task.
typeStringFully qualified class name of the background task. Identifies the data type being reindexed.
statusStringCurrent execution status of the task. Example: SCHEDULED.
nameStringDescription of the reindex operation.
createdOnHostStringHostname of the server that created the task.
parallelExecutionBooleanIndicates whether the task runs in parallel execution mode.
nodesGroupStringNode group on which the task runs.
parameters.tenantIdStringTenant ID for which the reindex is running.
parameters.updatedSinceStringThe updatedSince value applied to the task.
parameters.eventQueueProcessorsStringEvent queue processors used to stream data.
parameters.streamingDestinationsStringStreaming destination queue for the tenant.
currentStateObjectCurrent execution state of the task. Empty when newly scheduled.
durationStringElapsed execution time of the task.

Example response

All requests return the same response structure, an array of background tasks created for the reindex job. The following example shows a response for a request that reindexes interactions, potential matches, and merges.
[
  {
    "id": "9916103e-8757-4ad1-8d69-4d6afa17b125",
    "groupId": "27856767-5cef-4970-92b9-7403c714b293",
    "createdTime": 1665054423545,
    "createdBy": "test@reltio.com",
    "updatedTime": 1665054423545,
    "updatedBy": "test@reltio.com",
    "type": "com.reltio.businesslogic.tasks.reindex.ReindexInteractionsTask",
    "status": "SCHEDULED",
    "name": "Reindexing interactions in tenant LocalTest",
    "createdOnHost": "DESKTOP-1N09DOF",
    "parallelExecution": false,
    "nodesGroup": "default",
    "parameters": {
      "tenantId": "LocalTest",
      "eventQueueProcessors": "crud_streaming,datapipeline_snowflake_processor",
      "streamingDestinations": "03ff3091f067027b0d484b4e02e36861_queue_local-datapipeline-events_LocalTest"
    },
    "currentState": {},
    "duration": "0s"
  },
  {
    "id": "5d5b448d-377e-4937-a098-43a6bebacf9b",
    "groupId": "21e1fa97-d90b-4609-a0d4-0356e3f2a3fd",
    "createdTime": 1665054423545,
    "createdBy": "test@reltio.com",
    "updatedTime": 1665054423545,
    "updatedBy": "test@reltio.com",
    "type": "com.reltio.businesslogic.tasks.reindex.PotentialMatchesReindexTask",
    "status": "SCHEDULED",
    "name": "Reindexing potential matches information for tenant LocalTest",
    "createdOnHost": "DESKTOP-1N09DOF",
    "parallelExecution": false,
    "nodesGroup": "default",
    "parameters": {
      "tenantId": "LocalTest",
      "eventQueueProcessors": "crud_streaming,datapipeline_snowflake_processor",
      "streamingDestinations": "03ff3091f067027b0d484b4e02e36861_queue_local-datapipeline-events_LocalTest"
    },
    "currentState": {},
    "duration": "0s"
  },
  {
    "id": "2bb5f03b-8163-46fe-a383-4523f77cf5b4",
    "groupId": "7df39c85-b230-4b7d-bc02-1cc10d68b891",
    "createdTime": 1665054423545,
    "createdBy": "test@reltio.com",
    "updatedTime": 1665054423545,
    "updatedBy": "test@reltio.com",
    "type": "com.reltio.businesslogic.tasks.reindex.ReindexMergesTask",
    "status": "SCHEDULED",
    "name": "Reindexing of merges in tenant LocalTest",
    "createdOnHost": "DESKTOP-1N09DOF",
    "parallelExecution": false,
    "nodesGroup": "default",
    "parameters": {
      "tenantId": "LocalTest",
      "eventQueueProcessors": "match_streaming,datapipeline_snowflake_processor",
      "streamingDestinations": "03ff3091f067027b0d484b4e02e36861_queue_local-datapipeline-events_LocalTest"
    },
    "currentState": {},
    "duration": "0s"
  }
]
If the request is invalid, the API returns a 400 Bad Request response. Review the error details, correct the request, and try again.
{
  "severity": "Error",
  "errorMessage": "Failed to parse content as JSON",
  "errorCode": 150,
  "errorDetailMessage": "Expecting request content in JSON format, failed to parse.",
  "innerErrorData": {
    "exception": "com.reltio.rest.data.marshalling.MarshallerException"
  }
}

Monitor the reindex job

After submitting the request, monitor the reindex job using the following APIs.

GoalAPI
Check the status of all active tasksGet active tasks for tenant
Review completed task historyGet task history for tenant
Check the status of a specific task by IDGet task by ID for tenant
Verify that processing is completeTenant Queue Status API, confirm the queue count is 0
Note: A non-zero queue count does not always indicate that the sync is still running. Ongoing activity in the tenant may contribute to the count. Check for other active processes before concluding the sync is incomplete.