Create Reltio object type processing tasks in Snowflake
Learn how to create a task that moves data between the staging and landing tables using scripts.
Processing tasks move data from the landing table to the staging table, which ensures that you always see the latest Reltio data in Snowflake.Create processing tasks for each Reltio object type: entities, relationships, interactions, matches, merges, and links.
- In the Snowflake application worksheet area, create a new SQL worksheet.
- In the SQL worksheet, run these commands to create a processing task for each Reltio object type. Run each command individually and then check the result in step 3 before creating a processing task for the next object type:
Entities processing task Standard
Legacy (optional)CREATE TASK "<database_name>"."<schema_name>"."entitiesTask" WAREHOUSE = <warehouse_name> SCHEDULE = '1 minute' USER_TASK_TIMEOUT_MS = 86400000 WHEN SYSTEM$STREAM_HAS_DATA('"<database_name>"."<schema_name>"."entitiesStream"') AS MERGE INTO "<database_name>"."<schema_name>"."entities" as target USING (SELECT staging."uri", staging."version", staging."timestamp", staging."type", staging."json", staging."deleted", staging."linked" FROM "<database_name>"."<schema_name>"."entitiesStream" as staging, (SELECT "uri", MAX ("version") as "version" FROM "<database_name>"."<schema_name>"."entitiesStream" GROUP BY "uri") as latest WHERE staging."uri" = latest."uri" AND staging."version" = latest."version" AND staging."objectType" = 'entity' QUALIFY ROW_NUMBER() OVER (PARTITION BY staging."uri" ORDER BY staging."version" DESC, staging."timestamp" DESC) = 1) as staging_dedup on target."uri" = staging_dedup."uri" WHEN MATCHED AND staging_dedup."deleted" = FALSE AND staging_dedup."linked" = FALSE AND (staging_dedup."version" > target."version" OR (staging_dedup."version" = target."version" AND staging_dedup."timestamp" > target."timestamp")) THEN UPDATE SET target."timestamp" = staging_dedup."timestamp", target."version" = staging_dedup."version", target."type" = staging_dedup."type", target."attributes" = staging_dedup."json":attributes, target."crosswalks" = staging_dedup."json":crosswalks, target."analyticsAttributes" = staging_dedup."json":analyticsAttributes, target."updatedBy" = staging_dedup."json":updatedBy, target."updatedTime" = staging_dedup."json":updatedTime, target."startDate" = staging_dedup."json":startDate, target."endDate" = staging_dedup."json":endDate, target."active" = TRUE, target."tags" = staging_dedup."json":tags, target."commitTime" = staging_dedup."json":commitTime WHEN MATCHED AND (staging_dedup."deleted" = TRUE OR staging_dedup."linked" = TRUE) AND (staging_dedup."version" > target."version" OR (staging_dedup."version" = target."version" AND staging_dedup."timestamp" > target."timestamp")) THEN UPDATE SET target."timestamp" = staging_dedup."timestamp", target."version" = staging_dedup."version", target."attributes" = NULL, target."crosswalks" = NULL, target."analyticsAttributes" = NULL, target."updatedBy" = staging_dedup."json":"updatedBy", target."updatedTime" = staging_dedup."json":updatedTime, target."startDate" = staging_dedup."json":startDate, target."endDate" = staging_dedup."json":endDate, target."active" = FALSE, target."tags" = staging_dedup."json":tags, target."commitTime" = staging_dedup."json":commitTime WHEN NOT MATCHED AND staging_dedup."deleted" = FALSE AND staging_dedup."linked" = FALSE THEN INSERT ("uri", "version", "timestamp", "type", "attributes", "crosswalks", "analyticsAttributes", "createdBy", "createdTime", "updatedBy", "updatedTime", "commitTime", "startDate", "endDate", "active", "tags") values (staging_dedup."uri", staging_dedup."version", staging_dedup."timestamp", staging_dedup."type", staging_dedup."json":attributes, staging_dedup."json":crosswalks, staging_dedup."json":analyticsAttributes, staging_dedup."json":createdBy, staging_dedup."json":createdTime, staging_dedup."json":updatedBy, staging_dedup."json":updatedTime, staging_dedup."json":commitTime, staging_dedup."json":startDate, staging_dedup."json":endDate, TRUE, "json":tags) WHEN NOT MATCHED AND (staging_dedup."deleted" = TRUE OR staging_dedup."linked" = TRUE) THEN INSERT ("uri", "version", "timestamp", "type", "createdBy", "createdTime", "updatedBy", "updatedTime", "commitTime", "startDate", "endDate", "active", "tags") values (staging_dedup."uri", staging_dedup."version", staging_dedup."timestamp", staging_dedup."type", staging_dedup."json":createdBy, staging_dedup."json":createdTime, staging_dedup."json":updatedBy, staging_dedup."json":updatedTime, staging_dedup."json":commitTime,staging_dedup."json":startDate, staging_dedup."json":endDate, FALSE, "json":tags);
CREATE TASK "<database_name>"."<schema_name>"."entities<entityType>OvTask" WAREHOUSE = <warehouse_name> SCHEDULE = '1 minute' USER_TASK_TIMEOUT_MS = 86400000 WHEN SYSTEM$STREAM_HAS_DATA('<database_name>.<schema_name>."entities<entityType>OvStream"') AS MERGE INTO <database_name>.<schema_name>."entity_<entityType>_ov" as target USING (SELECT staging."uri", staging."version", staging."timestamp", "SINGLE_VALUE_TRANSFORMATION"(staging."json") as "json", staging."deleted", staging."linked" FROM <database_name>.<schema_name>."entities<entityType>OvStream" as staging, (SELECT "uri", MAX ("version") as "version" FROM <database_name>.<schema_name>."entities<entityType>OvStream" GROUP BY "uri") as latest WHERE staging."uri" = latest."uri" AND staging."version" = latest."version" AND staging."objectType" = 'entity' AND "type" = 'configuration/entityTypes/<entityType>' QUALIFY ROW_NUMBER() OVER (PARTITION BY staging."uri" ORDER BY staging."version" DESC, staging."timestamp" DESC) = 1) as staging_dedup on target."uri" = staging_dedup."uri" WHEN MATCHED AND staging_dedup."deleted" = FALSE AND staging_dedup."linked" = FALSE AND (staging_dedup."version" > target."version" OR (staging_dedup."version" = target."version" AND staging_dedup."timestamp" > target."timestamp")) THEN UPDATE SET target."timestamp" = staging_dedup."timestamp", target."version" = staging_dedup."version", target."json" = staging_dedup."json", target."deleted" = staging_dedup."deleted", target."linked" = staging_dedup."linked", target."endDate" = staging_dedup."json":endDate WHEN MATCHED AND (staging_dedup."deleted" = TRUE OR staging_dedup."linked" = TRUE) AND (staging_dedup."version" > target."version" OR (staging_dedup."version" = target."version" AND staging_dedup."timestamp" > target."timestamp")) THEN UPDATE SET target."timestamp" = staging_dedup."timestamp", target."version" = staging_dedup."version", target."json" = NULL, target."deleted" = staging_dedup."deleted", target."linked" = staging_dedup."linked", target."endDate" = staging_dedup."json":endDate WHEN NOT MATCHED AND staging_dedup."deleted" = FALSE AND staging_dedup."linked" = FALSE THEN INSERT ("uri", "version", "timestamp", "json", "deleted", "linked", "endDate") values (staging_dedup."uri", staging_dedup."version", staging_dedup."timestamp", staging_dedup."json", staging_dedup."deleted", staging_dedup."linked", staging_dedup."json":endDate) WHEN NOT MATCHED AND (staging_dedup."deleted" = TRUE OR staging_dedup."linked" = TRUE) THEN INSERT ("uri", "version", "timestamp", "deleted", "linked", "endDate") values (staging_dedup."uri", staging_dedup."version", staging_dedup."timestamp", staging_dedup."deleted", staging_dedup."linked", staging_dedup."json":endDate);
Relations processing task CREATE TASK "<database_name>"."<schema_name>"."relationsTask" WAREHOUSE = <warehouse_name> SCHEDULE = '1 minute' USER_TASK_TIMEOUT_MS = 86400000 WHEN SYSTEM$STREAM_HAS_DATA('"<database_name>"."<schema_name>"."relationsStream"') AS MERGE INTO "<database_name>"."<schema_name>"."relations" as target USING (SELECT staging."uri", staging."version", staging."timestamp", staging."type", staging."json", staging."deleted", staging."linked" FROM "<database_name>"."<schema_name>"."relationsStream" as staging, (SELECT "uri", MAX ("version") as "version" FROM "<database_name>"."<schema_name>"."relationsStream" GROUP BY "uri") as latest WHERE staging."uri" = latest."uri" AND staging."version" = latest."version" AND staging."objectType" = 'relation' QUALIFY ROW_NUMBER() OVER (PARTITION BY staging."uri" ORDER BY staging."version" DESC, staging."timestamp" DESC) = 1) as staging_dedup on target."uri" = staging_dedup."uri" WHEN MATCHED AND staging_dedup."deleted" = FALSE AND staging_dedup."linked" = FALSE AND (staging_dedup."version" > target."version" OR (staging_dedup."version" = target."version" AND staging_dedup."timestamp" > target."timestamp")) THEN UPDATE SET target."version" = staging_dedup."version", target."timestamp" = staging_dedup."timestamp", target."type" = staging_dedup."type", target."attributes" = staging_dedup."json":attributes, target."crosswalks" = staging_dedup."json":crosswalks, target."startObject" = staging_dedup."json":startObject, target."endObject" = staging_dedup."json":endObject, target."startRefPinned" = staging_dedup."json":startRefPinned, target."startRefIgnored" = staging_dedup."json":startRefIgnored, target."endRefPinned" = staging_dedup."json":endRefPinned, target."endRefIgnored" = staging_dedup."json":endRefIgnored, target."updatedBy" = staging_dedup."json":updatedBy, target."updatedTime" = staging_dedup."json":updatedTime, target."startDate" = staging_dedup."json":startDate, target."endDate" = staging_dedup."json":endDate, target."active" = TRUE, target."commitTime" = staging_dedup."json":commitTime WHEN MATCHED AND (staging_dedup."deleted" = TRUE OR staging_dedup."linked" = TRUE) AND (staging_dedup."version" > target."version" OR (staging_dedup."version" = target."version" AND staging_dedup."timestamp" > target."timestamp")) THEN UPDATE SET target."version" = staging_dedup."version", target."timestamp" = staging_dedup."timestamp", target."attributes" = NULL, target."crosswalks" = NULL, target."startObject" = NULL, target."endObject" = NULL, target."startRefPinned" = NULL, target."startRefIgnored" = NULL, target."endRefPinned" = NULL, target."endRefIgnored" = NULL, target."updatedBy" = staging_dedup."json":updatedBy, target."updatedTime" = staging_dedup."json":updatedTime, target."startDate" = staging_dedup."json":startDate, target."endDate" = staging_dedup."json":endDate, target."active" = FALSE, target."commitTime" = staging_dedup."json":commitTime WHEN NOT MATCHED AND staging_dedup."deleted" = FALSE AND staging_dedup."linked" = FALSE THEN INSERT ("uri", "version", "timestamp", "type", "attributes", "crosswalks", "startObject", "endObject", "startRefPinned", "startRefIgnored", "endRefPinned", "endRefIgnored", "createdBy", "createdTime", "updatedBy", "updatedTime", "commitTime", "startDate", "endDate", "active") values (staging_dedup."uri", staging_dedup."version", staging_dedup."timestamp", staging_dedup."type", staging_dedup."json":attributes, staging_dedup."json":crosswalks, staging_dedup."json":startObject, staging_dedup."json":endObject, staging_dedup."json":startRefPinned, staging_dedup."json":startRefIgnored, staging_dedup."json":endRefPinned, staging_dedup."json":endRefIgnored, staging_dedup."json":createdBy, staging_dedup."json":createdTime, staging_dedup."json":updatedBy, staging_dedup."json":updatedTime, staging_dedup."json":commitTime, staging_dedup."json":startDate, staging_dedup."json":endDate, TRUE) WHEN NOT MATCHED AND (staging_dedup."deleted" = TRUE OR staging_dedup."linked" = TRUE) THEN INSERT ("uri", "version", "timestamp", "type", "createdBy", "createdTime", "updatedBy", "updatedTime", "commitTime", "startDate", "endDate", "active") values (staging_dedup."uri", staging_dedup."version", staging_dedup."timestamp", staging_dedup."type", staging_dedup."json":createdBy, staging_dedup."json":createdTime, staging_dedup."json":updatedBy, staging_dedup."json":updatedTime, staging_dedup."json":commitTime, staging_dedup."json":startDate, staging_dedup."json":endDate, FALSE);
Interactions task CREATE TASK "<database_name>"."<schema_name>"."interactionsTask" WAREHOUSE = <warehouse_name> SCHEDULE = '1 minute' USER_TASK_TIMEOUT_MS = 86400000 WHEN SYSTEM$STREAM_HAS_DATA('"<database_name>"."<schema_name>"."interactionsStream"') AS MERGE INTO "<database_name>"."<schema_name>"."interactions" as target USING (SELECT staging."uri", staging."version", staging."timestamp", staging."type", staging."json", staging."deleted", staging."linked" FROM "<database_name>"."<schema_name>"."interactionsStream" as staging, (SELECT "uri", MAX ("version") as "version" FROM "<database_name>"."<schema_name>"."interactionsStream" GROUP BY "uri") as latest WHERE staging."uri" = latest."uri" AND staging."version" = latest."version" AND staging."objectType" = 'interaction' QUALIFY ROW_NUMBER() OVER (PARTITION BY staging."uri" ORDER BY staging."version" DESC, staging."timestamp" DESC) = 1) as staging_dedup on target."uri" = staging_dedup."uri" WHEN MATCHED AND staging_dedup."deleted" = FALSE AND staging_dedup."linked" = FALSE AND (staging_dedup."version" > target."version" OR (staging_dedup."version" = target."version" AND staging_dedup."timestamp" > target."timestamp")) THEN UPDATE SET target."version" = staging_dedup."version", target."type" = staging_dedup."type", target."attributes" = staging_dedup."json":attributes, target."crosswalks" = staging_dedup."json":crosswalks, target."members" = staging_dedup."json":members, target."updatedBy" = staging_dedup."json":updatedBy, target."updatedTime" = staging_dedup."json":updatedTime, target."active" = TRUE, target."commitTime" = staging_dedup."json":commitTime WHEN MATCHED AND (staging_dedup."deleted" = TRUE OR staging_dedup."linked" = TRUE) AND (staging_dedup."version" > target."version" OR (staging_dedup."version" = target."version" AND staging_dedup."timestamp" > target."timestamp")) THEN UPDATE SET target."version" = staging_dedup."version", target."attributes" = NULL, target."crosswalks" = NULL, target."members" = NULL, target."updatedBy" = staging_dedup."json":"updatedBy", target."updatedTime" = staging_dedup."json":updatedTime, target."active" = FALSE, target."commitTime" = staging_dedup."json":commitTime WHEN NOT MATCHED AND staging_dedup."deleted" = FALSE AND staging_dedup."linked" = FALSE THEN INSERT ("uri", "version", "timestamp", "type", "attributes", "crosswalks", "members", "createdBy", "createdTime", "updatedBy", "updatedTime", "commitTime", "active") values (staging_dedup."uri", staging_dedup."version", staging_dedup."timestamp", staging_dedup."type", staging_dedup."json":attributes, staging_dedup."json":crosswalks, staging_dedup."json":members, staging_dedup."json":createdBy, staging_dedup."json":createdTime, staging_dedup."json":updatedBy, staging_dedup."json":updatedTime, staging_dedup."json":commitTime, TRUE) WHEN NOT MATCHED AND (staging_dedup."deleted" = TRUE OR staging_dedup."linked" = TRUE) THEN INSERT ("uri", "version", "timestamp", "type", "createdBy", "createdTime", "updatedBy", "updatedTime", "commitTime", "active") values (staging_dedup."uri", staging_dedup."version", staging_dedup."timestamp", staging_dedup."type", staging_dedup."json":createdBy, staging_dedup."json":createdTime, staging_dedup."json":updatedBy, staging_dedup."json":updatedTime, staging_dedup."json":commitTime, FALSE);
Matches processing task CREATE TASK "<database_name>"."<schema_name>"."matchesTask" WAREHOUSE = <warehouse_name> SCHEDULE = '1 minute' USER_TASK_TIMEOUT_MS = 86400000 WHEN SYSTEM$STREAM_HAS_DATA('"<database_name>"."<schema_name>"."matchesStream"') AS MERGE INTO "<database_name>"."<schema_name>"."matches" as target USING (SELECT staging."json":entityId as "entityId", staging."json":timestamp as "match_timestamp", staging."type", staging."json", staging."deleted", staging."linked" FROM "<database_name>"."<schema_name>"."matchesStream" as staging, (SELECT "json":entityId as "entityId", MAX ("json":timestamp) as "match_timestamp" FROM "<database_name>"."<schema_name>"."matchesStream" GROUP BY "entityId") as latest WHERE staging."json":entityId = latest."entityId" AND staging."json":timestamp = latest."match_timestamp" AND staging."objectType" = 'match' QUALIFY ROW_NUMBER() OVER (PARTITION BY staging."json":entityId ORDER BY staging."json":version DESC, staging."json":timestamp DESC) = 1) as staging_dedup on target."entityId" = staging_dedup."entityId" WHEN MATCHED AND staging_dedup."deleted" = FALSE AND (staging_dedup."json":version > target."version" OR (staging_dedup."json":version = target."version" AND staging_dedup."match_timestamp" > target."timestamp")) THEN UPDATE SET target."timestamp" = staging_dedup."match_timestamp", target."potential_matches" = staging_dedup."json":potential_matches, target."not_matches" = staging_dedup."json":not_matches, target."manual_matches" = staging_dedup."json":manual_matches, target."active" = TRUE, target."version" = staging_dedup."json":version WHEN MATCHED AND staging_dedup."deleted" = TRUE AND (staging_dedup."json":version > target."version" OR (staging_dedup."json":version = target."version" AND staging_dedup."match_timestamp" > target."timestamp")) THEN UPDATE SET target."timestamp" = staging_dedup."match_timestamp", target."potential_matches" = NULL, target."not_matches" = NULL, target."manual_matches" = NULL, target."active" = FALSE, target."version" = staging_dedup."json":version WHEN NOT MATCHED AND staging_dedup."deleted" = FALSE THEN INSERT ("entityId", "timestamp", "potential_matches", "not_matches", "manual_matches", "version", "active") values (staging_dedup."json":entityId, staging_dedup."match_timestamp", staging_dedup."json":potential_matches, staging_dedup."json":not_matches, staging_dedup."json":manual_matches, staging_dedup."json":version, TRUE) WHEN NOT MATCHED AND staging_dedup."deleted" = TRUE THEN INSERT ("entityId", "timestamp", "version", "active") values (staging_dedup."json":entityId, staging_dedup."match_timestamp", staging_dedup."json":version, FALSE);
Merges task CREATE TASK "<database_name>"."<schema_name>"."mergesTask" WAREHOUSE = <warehouse_name> SCHEDULE = '1 minute' WHEN SYSTEM$STREAM_HAS_DATA('"<database_name>"."<schema_name>"."mergesStream"') AS MERGE INTO "<database_name>"."<schema_name>"."merges" as target USING (SELECT staging."json":mergeKey as mergeKey, staging."json":winnerId as winnerId, staging."json":loserId as loserId, staging."json":matchRules as matchRules, staging."json":mergeRulesUris as mergeRulesUris, staging."json":timestamp as "merge_timestamp", staging."type", staging."json", staging."deleted", staging."json":directWinner as directWinner FROM "<database_name>"."<schema_name>"."mergesStream" as staging, (SELECT "json":loserId as loserId, MAX ("json":timestamp) as "merge_timestamp" FROM "<database_name>"."<schema_name>"."mergesStream" where "objectType" = 'merge' GROUP BY loserId) as latest WHERE staging."json":loserId = latest.loserId AND staging."json":timestamp = latest."merge_timestamp" AND staging."objectType" = 'merge' QUALIFY ROW_NUMBER() OVER (PARTITION BY staging."json":loserId ORDER BY staging."json":timestamp DESC) = 1) as staging_dedup on target."loserId" = staging_dedup.loserId WHEN MATCHED AND staging_dedup."deleted" = FALSE AND staging_dedup."merge_timestamp" > target."timestamp" THEN UPDATE SET target."timestamp" = staging_dedup."merge_timestamp", target."winnerId" = staging_dedup.winnerId, target."matchRules" = staging_dedup.matchRules, target."mergeRulesUris" = staging_dedup.mergeRulesUris, target."type" = staging_dedup."type", target."active" = TRUE, target."directWinner" = staging_dedup.directWinner WHEN MATCHED AND staging_dedup."deleted" = TRUE AND staging_dedup."merge_timestamp" > target."timestamp" THEN UPDATE SET target."timestamp" = staging_dedup."merge_timestamp", target."winnerId" = NULL, target."matchRules" = NULL, target."mergeRulesUris" = NULL, target."type" = NULL, target."active" = FALSE, target."directWinner" = NULL WHEN NOT MATCHED AND staging_dedup."deleted" = FALSE THEN INSERT ("timestamp", "mergeKey", "winnerId", "loserId", "matchRules", "mergeRulesUris", "type", "active", "directWinner") values (staging_dedup."merge_timestamp", staging_dedup.mergeKey, staging_dedup.winnerId, staging_dedup.loserId, staging_dedup.matchRules, staging_dedup.mergeRulesUris, staging_dedup."type", TRUE, staging_dedup.directWinner) WHEN NOT MATCHED AND staging_dedup."deleted" = TRUE THEN INSERT ("timestamp", "mergeKey", "loserId", "active") values (staging_dedup."merge_timestamp", staging_dedup.mergeKey, staging_dedup.loserId, FALSE);
Links task CREATE TASK "<database_name>"."<schema_name>"."linksTask" WAREHOUSE = <warehouse_name> SCHEDULE = '1 minute' WHEN SYSTEM$STREAM_HAS_DATA('"<database_name>"."<schema_name>"."linksStream"') AS MERGE INTO "<database_name>"."<schema_name>"."links" as target USING (SELECT staging."json":winnerId as winnerId, staging."json":loserId as loserId, staging."json":timestamp as "merge_timestamp", staging."json", staging."deleted" FROM "<database_name>"."<schema_name>"."linksStream" as staging, (SELECT "json":loserId as loserId, MAX ("json":timestamp) as "merge_timestamp" FROM "<database_name>"."<schema_name>"."linksStream" where "objectType" = 'links' GROUP BY loserId) as latest WHERE staging."json":loserId = latest.loserId AND staging."json":timestamp = latest."merge_timestamp" AND staging."objectType" = 'links' QUALIFY ROW_NUMBER() OVER (PARTITION BY staging."json":loserId ORDER BY staging."json":timestamp DESC) = 1) as staging_dedup on target."loserId" = staging_dedup.loserId WHEN MATCHED AND staging_dedup."deleted" = FALSE AND staging_dedup."merge_timestamp" > target."timestamp" THEN UPDATE SET target."timestamp" = staging_dedup."merge_timestamp", target."winnerId" = staging_dedup.winnerId, target."active" = TRUE WHEN MATCHED AND staging_dedup."deleted" = TRUE AND staging_dedup."merge_timestamp" > target."timestamp" THEN UPDATE SET target."timestamp" = staging_dedup."merge_timestamp", target."winnerId" = NULL, target."active" = FALSE WHEN NOT MATCHED AND staging_dedup."deleted" = FALSE THEN INSERT ("timestamp", "winnerId", "loserId", "active") values (staging_dedup."merge_timestamp", staging_dedup.winnerId, staging_dedup.loserId, TRUE) WHEN NOT MATCHED AND staging_dedup."deleted" = TRUE THEN INSERT ("timestamp", "loserId", "active") values (staging_dedup."merge_timestamp", staging_dedup.loserId, FALSE);
where:<warehouse_name>: Is the name of the Snowflake warehouse you created in Create a Snowflake warehouse.
-
<database_name>: Is the name of the database you created in Create a Snowflake database.
-
<schema_name>: Is the name of the schema you created in Create a Snowflake schema.
<entityType>: Is the Reltio entity type you want to create or run a legacy processing task for.
- In the SQL worksheet, view the Successfully created notification message.
For general information, see Create snowflake processing tasks in the Snowflake SQL Command Reference.