Accelerate the Value of Data

Create Reltio object type processing tasks in Snowflake

Learn how to create a task that moves data between the staging and landing tables using scripts.

Before you create processing tasks in Snowflake, you must have created Reltio object type streams. For more information, see topic Create Reltio object type streams in Snowflake.

Processing tasks move data from the landing table to the staging table, which ensures that you always see the latest Reltio data in Snowflake.Create processing tasks for each Reltio object type: entities, relationships, interactions, matches, merges, and links.

To create Reltio object type processing task in Snowflake:
  1. In the Snowflake application worksheet area, create a new SQL worksheet.
  2. In the SQL worksheet, run these commands to create a processing task for each Reltio object type. Run each command individually and then check the result in step 3 before creating a processing task for the next object type:
    Entities processing taskStandard
    CREATE TASK "<database_name>"."<schema_name>"."entitiesTask"
      WAREHOUSE = <warehouse_name>
      SCHEDULE = '1 minute'
      USER_TASK_TIMEOUT_MS = 86400000
    WHEN
      SYSTEM$STREAM_HAS_DATA('"<database_name>"."<schema_name>"."entitiesStream"')
    AS
      MERGE INTO "<database_name>"."<schema_name>"."entities" as target USING
          (SELECT staging."uri", staging."version", staging."timestamp", staging."type", staging."json", staging."deleted", staging."linked"
           FROM "<database_name>"."<schema_name>"."entitiesStream" as staging,
           (SELECT "uri", MAX ("version") as "version"
              FROM "<database_name>"."<schema_name>"."entitiesStream"
              GROUP BY "uri") as latest
            WHERE staging."uri" = latest."uri" AND staging."version" = latest."version" AND staging."objectType" = 'entity'
            QUALIFY ROW_NUMBER() OVER (PARTITION BY staging."uri" ORDER BY staging."version" DESC, staging."timestamp" DESC) = 1) as staging_dedup
        on target."uri" = staging_dedup."uri"
        WHEN MATCHED AND staging_dedup."deleted" = FALSE AND staging_dedup."linked" = FALSE AND (staging_dedup."version" > target."version" OR (staging_dedup."version" = target."version" AND staging_dedup."timestamp" > target."timestamp")) THEN UPDATE SET target."timestamp" = staging_dedup."timestamp", target."version" = staging_dedup."version", target."type" = staging_dedup."type", target."attributes" = staging_dedup."json":attributes, target."crosswalks" = staging_dedup."json":crosswalks, target."analyticsAttributes" = staging_dedup."json":analyticsAttributes, target."updatedBy" = staging_dedup."json":updatedBy, target."updatedTime" = staging_dedup."json":updatedTime, target."startDate" = staging_dedup."json":startDate, target."endDate" = staging_dedup."json":endDate, target."active" = TRUE, target."tags" = staging_dedup."json":tags, target."commitTime" = staging_dedup."json":commitTime
        WHEN MATCHED AND (staging_dedup."deleted" = TRUE OR staging_dedup."linked" = TRUE) AND (staging_dedup."version" > target."version" OR (staging_dedup."version" = target."version" AND staging_dedup."timestamp" > target."timestamp")) THEN UPDATE SET target."timestamp" = staging_dedup."timestamp", target."version" = staging_dedup."version", target."attributes" = NULL, target."crosswalks" = NULL, target."analyticsAttributes" = NULL, target."updatedBy" = staging_dedup."json":"updatedBy", target."updatedTime" = staging_dedup."json":updatedTime, target."startDate" = staging_dedup."json":startDate, target."endDate" = staging_dedup."json":endDate, target."active" = FALSE, target."tags" = staging_dedup."json":tags, target."commitTime" = staging_dedup."json":commitTime
        WHEN NOT MATCHED AND staging_dedup."deleted" = FALSE AND staging_dedup."linked" = FALSE THEN INSERT ("uri", "version", "timestamp", "type", "attributes", "crosswalks", "analyticsAttributes", "createdBy", "createdTime", "updatedBy", "updatedTime", "commitTime", "startDate", "endDate", "active", "tags") values (staging_dedup."uri", staging_dedup."version", staging_dedup."timestamp", staging_dedup."type", staging_dedup."json":attributes, staging_dedup."json":crosswalks, staging_dedup."json":analyticsAttributes, staging_dedup."json":createdBy, staging_dedup."json":createdTime, staging_dedup."json":updatedBy, staging_dedup."json":updatedTime, staging_dedup."json":commitTime, staging_dedup."json":startDate, staging_dedup."json":endDate, TRUE, "json":tags)
        WHEN NOT MATCHED AND (staging_dedup."deleted" = TRUE OR staging_dedup."linked" = TRUE) THEN INSERT ("uri", "version", "timestamp", "type", "createdBy", "createdTime", "updatedBy", "updatedTime", "commitTime", "startDate", "endDate", "active", "tags") values (staging_dedup."uri", staging_dedup."version", staging_dedup."timestamp", staging_dedup."type", staging_dedup."json":createdBy, staging_dedup."json":createdTime, staging_dedup."json":updatedBy, staging_dedup."json":updatedTime, staging_dedup."json":commitTime,staging_dedup."json":startDate, staging_dedup."json":endDate, FALSE, "json":tags);
    
    Legacy (optional)
    CREATE TASK "<database_name>"."<schema_name>"."entities<entityType>OvTask"
      WAREHOUSE = <warehouse_name>
      SCHEDULE = '1 minute'
      USER_TASK_TIMEOUT_MS = 86400000
    WHEN
      SYSTEM$STREAM_HAS_DATA('<database_name>.<schema_name>."entities<entityType>OvStream"')
    AS
      MERGE INTO <database_name>.<schema_name>."entity_<entityType>_ov" as target USING
          (SELECT staging."uri", staging."version", staging."timestamp", "SINGLE_VALUE_TRANSFORMATION"(staging."json") as "json", staging."deleted", staging."linked"
           FROM <database_name>.<schema_name>."entities<entityType>OvStream" as staging,
           (SELECT "uri", MAX ("version") as "version"
              FROM <database_name>.<schema_name>."entities<entityType>OvStream"
              GROUP BY "uri") as latest
            WHERE staging."uri" = latest."uri" AND staging."version" = latest."version" AND staging."objectType" = 'entity' AND "type" = 'configuration/entityTypes/<entityType>'
            QUALIFY ROW_NUMBER() OVER (PARTITION BY staging."uri" ORDER BY staging."version" DESC, staging."timestamp" DESC) = 1) as staging_dedup on target."uri" = staging_dedup."uri"
        WHEN MATCHED AND staging_dedup."deleted" = FALSE AND staging_dedup."linked" = FALSE AND (staging_dedup."version" > target."version" OR (staging_dedup."version" = target."version" AND staging_dedup."timestamp" > target."timestamp")) THEN UPDATE SET target."timestamp" = staging_dedup."timestamp", target."version" = staging_dedup."version", target."json" = staging_dedup."json", target."deleted" = staging_dedup."deleted", target."linked" = staging_dedup."linked", target."endDate" = staging_dedup."json":endDate
        WHEN MATCHED AND (staging_dedup."deleted" = TRUE OR staging_dedup."linked" = TRUE) AND (staging_dedup."version" > target."version" OR (staging_dedup."version" = target."version" AND staging_dedup."timestamp" > target."timestamp")) THEN UPDATE SET target."timestamp" = staging_dedup."timestamp", target."version" = staging_dedup."version", target."json" = NULL, target."deleted" = staging_dedup."deleted", target."linked" = staging_dedup."linked", target."endDate" = staging_dedup."json":endDate
        WHEN NOT MATCHED AND staging_dedup."deleted" = FALSE AND staging_dedup."linked" = FALSE THEN INSERT ("uri", "version", "timestamp", "json", "deleted", "linked", "endDate") values (staging_dedup."uri", staging_dedup."version", staging_dedup."timestamp", staging_dedup."json", staging_dedup."deleted", staging_dedup."linked", staging_dedup."json":endDate)
        WHEN NOT MATCHED AND (staging_dedup."deleted" = TRUE OR staging_dedup."linked" = TRUE) THEN INSERT ("uri", "version", "timestamp", "deleted", "linked", "endDate") values (staging_dedup."uri", staging_dedup."version", staging_dedup."timestamp", staging_dedup."deleted", staging_dedup."linked", staging_dedup."json":endDate);
    Relations processing task
    CREATE TASK "<database_name>"."<schema_name>"."relationsTask"
      WAREHOUSE = <warehouse_name>
      SCHEDULE = '1 minute'
      USER_TASK_TIMEOUT_MS = 86400000
    WHEN
      SYSTEM$STREAM_HAS_DATA('"<database_name>"."<schema_name>"."relationsStream"')
    AS
      MERGE INTO "<database_name>"."<schema_name>"."relations" as target USING
          (SELECT staging."uri", staging."version", staging."timestamp", staging."type", staging."json", staging."deleted", staging."linked"
           FROM "<database_name>"."<schema_name>"."relationsStream" as staging,
           (SELECT "uri", MAX ("version") as "version"
              FROM "<database_name>"."<schema_name>"."relationsStream"
              GROUP BY "uri") as latest
            WHERE staging."uri" = latest."uri" AND staging."version" = latest."version" AND staging."objectType" = 'relation'
            QUALIFY ROW_NUMBER() OVER (PARTITION BY staging."uri" ORDER BY staging."version" DESC, staging."timestamp" DESC) = 1) as staging_dedup
        on target."uri" = staging_dedup."uri"
        WHEN MATCHED AND staging_dedup."deleted" = FALSE AND staging_dedup."linked" = FALSE AND (staging_dedup."version" > target."version" OR (staging_dedup."version" = target."version" AND staging_dedup."timestamp" > target."timestamp")) THEN UPDATE SET target."version" = staging_dedup."version", target."timestamp" = staging_dedup."timestamp", target."type" = staging_dedup."type", target."attributes" = staging_dedup."json":attributes, target."crosswalks" = staging_dedup."json":crosswalks, target."startObject" = staging_dedup."json":startObject, target."endObject" = staging_dedup."json":endObject, target."startRefPinned" = staging_dedup."json":startRefPinned, target."startRefIgnored" = staging_dedup."json":startRefIgnored, target."endRefPinned" = staging_dedup."json":endRefPinned, target."endRefIgnored" = staging_dedup."json":endRefIgnored, target."updatedBy" = staging_dedup."json":updatedBy, target."updatedTime" = staging_dedup."json":updatedTime, target."startDate" = staging_dedup."json":startDate, target."endDate" = staging_dedup."json":endDate, target."active" = TRUE, target."commitTime" = staging_dedup."json":commitTime
        WHEN MATCHED AND (staging_dedup."deleted" = TRUE OR staging_dedup."linked" = TRUE) AND (staging_dedup."version" > target."version" OR (staging_dedup."version" = target."version" AND staging_dedup."timestamp" > target."timestamp")) THEN UPDATE SET target."version" = staging_dedup."version", target."timestamp" = staging_dedup."timestamp", target."attributes" = NULL, target."crosswalks" = NULL, target."startObject" = NULL, target."endObject" = NULL, target."startRefPinned" = NULL, target."startRefIgnored" = NULL, target."endRefPinned" = NULL, target."endRefIgnored" = NULL, target."updatedBy" = staging_dedup."json":updatedBy, target."updatedTime" = staging_dedup."json":updatedTime, target."startDate" = staging_dedup."json":startDate, target."endDate" = staging_dedup."json":endDate, target."active" = FALSE, target."commitTime" = staging_dedup."json":commitTime
        WHEN NOT MATCHED AND staging_dedup."deleted" = FALSE AND staging_dedup."linked" = FALSE THEN INSERT ("uri", "version", "timestamp", "type", "attributes", "crosswalks", "startObject", "endObject", "startRefPinned", "startRefIgnored", "endRefPinned", "endRefIgnored", "createdBy", "createdTime", "updatedBy", "updatedTime", "commitTime", "startDate", "endDate", "active") values (staging_dedup."uri", staging_dedup."version", staging_dedup."timestamp", staging_dedup."type", staging_dedup."json":attributes, staging_dedup."json":crosswalks, staging_dedup."json":startObject, staging_dedup."json":endObject, staging_dedup."json":startRefPinned, staging_dedup."json":startRefIgnored, staging_dedup."json":endRefPinned, staging_dedup."json":endRefIgnored, staging_dedup."json":createdBy, staging_dedup."json":createdTime, staging_dedup."json":updatedBy, staging_dedup."json":updatedTime, staging_dedup."json":commitTime, staging_dedup."json":startDate, staging_dedup."json":endDate, TRUE)
        WHEN NOT MATCHED AND (staging_dedup."deleted" = TRUE OR staging_dedup."linked" = TRUE) THEN INSERT ("uri", "version", "timestamp", "type", "createdBy", "createdTime", "updatedBy", "updatedTime", "commitTime", "startDate", "endDate", "active") values (staging_dedup."uri", staging_dedup."version", staging_dedup."timestamp", staging_dedup."type", staging_dedup."json":createdBy, staging_dedup."json":createdTime, staging_dedup."json":updatedBy, staging_dedup."json":updatedTime, staging_dedup."json":commitTime, staging_dedup."json":startDate, staging_dedup."json":endDate, FALSE);
    Interactions task
    CREATE TASK "<database_name>"."<schema_name>"."interactionsTask"
      WAREHOUSE = <warehouse_name>
      SCHEDULE = '1 minute'
      USER_TASK_TIMEOUT_MS = 86400000
    WHEN
      SYSTEM$STREAM_HAS_DATA('"<database_name>"."<schema_name>"."interactionsStream"')
    AS
      MERGE INTO "<database_name>"."<schema_name>"."interactions" as target USING
          (SELECT staging."uri", staging."version", staging."timestamp", staging."type", staging."json", staging."deleted", staging."linked"
           FROM "<database_name>"."<schema_name>"."interactionsStream" as staging,
           (SELECT "uri", MAX ("version") as "version"
              FROM "<database_name>"."<schema_name>"."interactionsStream"
              GROUP BY "uri") as latest
            WHERE staging."uri" = latest."uri" AND staging."version" = latest."version" AND staging."objectType" = 'interaction'
            QUALIFY ROW_NUMBER() OVER (PARTITION BY staging."uri" ORDER BY staging."version" DESC, staging."timestamp" DESC) = 1) as staging_dedup
        on target."uri" = staging_dedup."uri"
        WHEN MATCHED AND staging_dedup."deleted" = FALSE AND staging_dedup."linked" = FALSE AND (staging_dedup."version" > target."version" OR (staging_dedup."version" = target."version" AND staging_dedup."timestamp" > target."timestamp")) THEN UPDATE SET target."version" = staging_dedup."version", target."type" = staging_dedup."type", target."attributes" = staging_dedup."json":attributes, target."crosswalks" = staging_dedup."json":crosswalks, target."members" = staging_dedup."json":members, target."updatedBy" = staging_dedup."json":updatedBy, target."updatedTime" = staging_dedup."json":updatedTime, target."active" = TRUE, target."commitTime" = staging_dedup."json":commitTime
    WHEN MATCHED AND (staging_dedup."deleted" = TRUE OR staging_dedup."linked" = TRUE) AND (staging_dedup."version" > target."version" OR (staging_dedup."version" = target."version" AND staging_dedup."timestamp" > target."timestamp")) THEN UPDATE SET target."version" = staging_dedup."version", target."attributes" = NULL, target."crosswalks" = NULL, target."members" = NULL, target."updatedBy" = staging_dedup."json":"updatedBy", target."updatedTime" = staging_dedup."json":updatedTime, target."active" = FALSE, target."commitTime" = staging_dedup."json":commitTime
    WHEN NOT MATCHED AND staging_dedup."deleted" = FALSE AND staging_dedup."linked" = FALSE THEN INSERT ("uri", "version", "timestamp", "type", "attributes", "crosswalks", "members", "createdBy", "createdTime", "updatedBy", "updatedTime", "commitTime", "active") values (staging_dedup."uri", staging_dedup."version", staging_dedup."timestamp", staging_dedup."type", staging_dedup."json":attributes, staging_dedup."json":crosswalks, staging_dedup."json":members, staging_dedup."json":createdBy, staging_dedup."json":createdTime, staging_dedup."json":updatedBy, staging_dedup."json":updatedTime, staging_dedup."json":commitTime, TRUE)
    WHEN NOT MATCHED AND (staging_dedup."deleted" = TRUE OR staging_dedup."linked" = TRUE) THEN INSERT ("uri", "version", "timestamp", "type", "createdBy", "createdTime", "updatedBy", "updatedTime", "commitTime", "active") values (staging_dedup."uri", staging_dedup."version", staging_dedup."timestamp", staging_dedup."type", staging_dedup."json":createdBy, staging_dedup."json":createdTime, staging_dedup."json":updatedBy, staging_dedup."json":updatedTime, staging_dedup."json":commitTime, FALSE);
    Matches processing task
    CREATE TASK "<database_name>"."<schema_name>"."matchesTask"
      WAREHOUSE = <warehouse_name>
      SCHEDULE = '1 minute'
      USER_TASK_TIMEOUT_MS = 86400000
    WHEN
      SYSTEM$STREAM_HAS_DATA('"<database_name>"."<schema_name>"."matchesStream"')
    AS
      MERGE INTO "<database_name>"."<schema_name>"."matches" as target USING
          (SELECT staging."json":entityId as "entityId", staging."json":timestamp as "match_timestamp", staging."type", staging."json", staging."deleted", staging."linked"
           FROM "<database_name>"."<schema_name>"."matchesStream" as staging,
           (SELECT "json":entityId as "entityId", MAX ("json":timestamp) as "match_timestamp"
              FROM "<database_name>"."<schema_name>"."matchesStream"
              GROUP BY "entityId") as latest
            WHERE staging."json":entityId = latest."entityId" AND staging."json":timestamp = latest."match_timestamp" AND staging."objectType" = 'match'
            QUALIFY ROW_NUMBER() OVER (PARTITION BY staging."json":entityId ORDER BY staging."json":version DESC,  staging."json":timestamp DESC) = 1) as staging_dedup
        on target."entityId" = staging_dedup."entityId"
        WHEN MATCHED AND staging_dedup."deleted" = FALSE AND (staging_dedup."json":version > target."version" OR (staging_dedup."json":version = target."version" AND staging_dedup."match_timestamp" > target."timestamp")) THEN UPDATE SET target."timestamp" = staging_dedup."match_timestamp", target."potential_matches" = staging_dedup."json":potential_matches, target."not_matches" = staging_dedup."json":not_matches, target."manual_matches" = staging_dedup."json":manual_matches, target."active" = TRUE, target."version" = staging_dedup."json":version
    WHEN MATCHED AND staging_dedup."deleted" = TRUE AND (staging_dedup."json":version > target."version" OR (staging_dedup."json":version = target."version" AND staging_dedup."match_timestamp" > target."timestamp")) THEN UPDATE SET target."timestamp" = staging_dedup."match_timestamp", target."potential_matches" = NULL, target."not_matches" = NULL, target."manual_matches" = NULL, target."active" = FALSE, target."version" = staging_dedup."json":version
    WHEN NOT MATCHED AND staging_dedup."deleted" = FALSE THEN INSERT ("entityId", "timestamp", "potential_matches", "not_matches", "manual_matches", "version", "active") values (staging_dedup."json":entityId, staging_dedup."match_timestamp", staging_dedup."json":potential_matches, staging_dedup."json":not_matches, staging_dedup."json":manual_matches, staging_dedup."json":version, TRUE)
    WHEN NOT MATCHED AND staging_dedup."deleted" = TRUE THEN INSERT ("entityId", "timestamp", "version", "active") values (staging_dedup."json":entityId, staging_dedup."match_timestamp", staging_dedup."json":version, FALSE);
    
    Merges task
    CREATE TASK "<database_name>"."<schema_name>"."mergesTask"
      WAREHOUSE = <warehouse_name>
      SCHEDULE = '1 minute'
    WHEN
      SYSTEM$STREAM_HAS_DATA('"<database_name>"."<schema_name>"."mergesStream"')
    AS
      MERGE INTO "<database_name>"."<schema_name>"."merges" as target USING
          (SELECT staging."json":mergeKey as mergeKey, staging."json":winnerId as winnerId, staging."json":loserId as loserId, staging."json":matchRules as matchRules, staging."json":mergeRulesUris as mergeRulesUris, staging."json":timestamp as "merge_timestamp", staging."type", staging."json", staging."deleted", staging."json":directWinner as directWinner
           FROM "<database_name>"."<schema_name>"."mergesStream" as staging,
           (SELECT "json":loserId as loserId, MAX ("json":timestamp) as "merge_timestamp"
              FROM "<database_name>"."<schema_name>"."mergesStream"
              where "objectType" = 'merge'
              GROUP BY loserId) as latest
            WHERE staging."json":loserId = latest.loserId AND staging."json":timestamp = latest."merge_timestamp" AND staging."objectType" = 'merge'
            QUALIFY ROW_NUMBER() OVER (PARTITION BY staging."json":loserId ORDER BY staging."json":timestamp DESC) = 1) as staging_dedup
        on target."loserId" = staging_dedup.loserId
        WHEN MATCHED AND staging_dedup."deleted" = FALSE AND staging_dedup."merge_timestamp" > target."timestamp" THEN UPDATE SET target."timestamp" = staging_dedup."merge_timestamp", target."winnerId" = staging_dedup.winnerId, target."matchRules" = staging_dedup.matchRules, target."mergeRulesUris" = staging_dedup.mergeRulesUris, target."type" = staging_dedup."type", target."active" = TRUE, target."directWinner" = staging_dedup.directWinner
        WHEN MATCHED AND staging_dedup."deleted" = TRUE AND staging_dedup."merge_timestamp" > target."timestamp" THEN UPDATE SET target."timestamp" = staging_dedup."merge_timestamp", target."winnerId" = NULL, target."matchRules" = NULL, target."mergeRulesUris" = NULL, target."type" = NULL, target."active" = FALSE, target."directWinner" = NULL
        WHEN NOT MATCHED AND staging_dedup."deleted" = FALSE THEN INSERT ("timestamp", "mergeKey", "winnerId", "loserId", "matchRules", "mergeRulesUris", "type", "active", "directWinner") values (staging_dedup."merge_timestamp", staging_dedup.mergeKey, staging_dedup.winnerId, staging_dedup.loserId, staging_dedup.matchRules, staging_dedup.mergeRulesUris, staging_dedup."type", TRUE, staging_dedup.directWinner)
        WHEN NOT MATCHED AND staging_dedup."deleted" = TRUE THEN INSERT ("timestamp", "mergeKey", "loserId", "active") values (staging_dedup."merge_timestamp", staging_dedup.mergeKey,  staging_dedup.loserId, FALSE);
    
    Links task
    CREATE TASK "<database_name>"."<schema_name>"."linksTask"
      WAREHOUSE = <warehouse_name>
      SCHEDULE = '1 minute'
    WHEN
      SYSTEM$STREAM_HAS_DATA('"<database_name>"."<schema_name>"."linksStream"')
    AS
      MERGE INTO "<database_name>"."<schema_name>"."links" as target USING
          (SELECT staging."json":winnerId as winnerId, staging."json":loserId as loserId, staging."json":timestamp as "merge_timestamp", staging."json", staging."deleted"
           FROM "<database_name>"."<schema_name>"."linksStream" as staging,
           (SELECT "json":loserId as loserId, MAX ("json":timestamp) as "merge_timestamp"
              FROM "<database_name>"."<schema_name>"."linksStream"
              where "objectType" = 'links'
              GROUP BY loserId) as latest
            WHERE staging."json":loserId = latest.loserId AND staging."json":timestamp = latest."merge_timestamp" AND staging."objectType" = 'links'
            QUALIFY ROW_NUMBER() OVER (PARTITION BY staging."json":loserId ORDER BY staging."json":timestamp DESC) = 1) as staging_dedup
        on target."loserId" = staging_dedup.loserId
        WHEN MATCHED AND staging_dedup."deleted" = FALSE AND staging_dedup."merge_timestamp" > target."timestamp" THEN UPDATE SET target."timestamp" = staging_dedup."merge_timestamp", target."winnerId" = staging_dedup.winnerId, target."active" = TRUE
        WHEN MATCHED AND staging_dedup."deleted" = TRUE AND staging_dedup."merge_timestamp" > target."timestamp" THEN UPDATE SET target."timestamp" = staging_dedup."merge_timestamp", target."winnerId" = NULL, target."active" = FALSE
        WHEN NOT MATCHED AND staging_dedup."deleted" = FALSE THEN INSERT ("timestamp", "winnerId", "loserId", "active") values (staging_dedup."merge_timestamp", staging_dedup.winnerId, staging_dedup.loserId, TRUE)
        WHEN NOT MATCHED AND staging_dedup."deleted" = TRUE THEN INSERT ("timestamp", "loserId", "active") values (staging_dedup."merge_timestamp",  staging_dedup.loserId, FALSE);
    
    where:
  3. In the SQL worksheet, view the Successfully created notification message.

For general information, see Create snowflake processing tasks in the Snowflake SQL Command Reference.