Incremental Data Access

Incremental Data Access helps you read files updated in a given window of time.

In some cases, you only need access to changed (or deleted) objects in a given window of time (For example, a 1 hour delta).

To avoid loading all objects into Spark (and then filtering out those which were modified/deleted before a point in time), which is inefficient in case of large tenants, you can use Incremental Data Access which only reads updated files in a given time window.

Scope of Incremental Data Access features:

  • Provides seamless access to incremental data in the same format as the original data
  • Provides updated data
  • Provides access to Entities, Relations, Interactions

Usage

Use the following parameters:

  • deltaWindow
    • Specifies the size of the time window in hours. This window is configurable per tenant with the default value of 24hrs. Maximum supported size is 105hrs (4.38 days).
  • withDeleted
    • Specifies if deleted objects should be loaded.
    • Works if deltaWindow is specified.
    • If withDeleted=true is specified, column deletedTime is added to the result dataframe.
      Note: A null value means that the record is not deleted.

1. Incremental Data Access with deltaWindow

The following example illustrates the use of Incremental Data Access with the deltaWindow option:

val df1:DataFrame = af_framework
  .dataAccess()
  .dataset(
    new EntityDatasetBuilder()
      .ofType("configuration/entityTypes/HCP")
      .select("Id")
      .asTable("hcp")
  )
  .deltaWindow("1h")
  .build()

For Data Access API 2.0:

val entities: DataFrame = framework.entities("configuration/entityTypes/HCP", deltaWindow = "1h")

2. Incremental Data Access with deltaWindow and withDeleted

The following example illustrates the use of Incremental Data Access with the deltaWindow and withDeleted options:

final Dataset<Row> dataframe = framework.dataAccess()
  .dataset(
    EntityDatasetBuilder.withAllFields("configuration/entityTypes/HCP")
  )
  .deltaWindow(deltaWindow + 1, ChronoUnit.HOURS) //equivalent to .deltaWindow("1h")
  .withDeleted(true)
  .build()

For Data Access API 2.0:

val entities: DataFrame = framework.entities("configuration/entityTypes/HCP", deltaWindow = "1h", withDeleted = true)

3. Incremental Interactions from Updates

val interactionType = "ExhibitionEvent"
val intData:DataFrame = af_framework
.dataAccess()
    .dataset(    
        new InteractionDatasetBuilder()                    
            .ofType(s"configuration/interactionTypes/$interactionType")                    
            .select("Id")                    
            .select("attributes")                         
    )    
    .deltaWindow("1h")    
    .build()
intData.count()
intData.printSchema()
intData.show()

For Data Access API 2.0:

val interactionType = "ExhibitionEvent"
val intData:DataFrame =
af_framework.interactions("configuration/interactionTypes/$interactionType",
deltaWindow = "1h")
intData.count()
intData.printSchema()
intData.show()

4. Incremental Interactions, Deleted Objects in Updates

import com.reltio.analytics.utils.MetadataUtils
val intData1:DataFrame = af_framework
.dataAccess  
    .dataset( 
    InteractionDatasetBuilder    
        .withAllFields(MetadataUtils.interactionTypeUri("configuration/interactionTypes/ExhibitionEvent")))    
        .deltaWindow("1h")    
        .withDeleted(true)    
        .build()    
val intData2:DataFrame = af_framework
.dataAccess  
    .dataset(  
    InteractionDatasetBuilder    
        .withAllFields(MetadataUtils.interactionTypeUri("configuration/interactionTypes/ExhibitionEvent")))    
        .deltaWindow("1h") 
        .build()     
val count1 = intData1.count()     
val count2 = intData2.count()

In this example, count1 equals count2+1.

For Data Access API 2.0:

val intData1:DataFrame =
af_framework.interactions("configuration/interactionTypes/ExhibitionEvent", deltaWindow ="1h",withDeleted = true)
val intData2:DataFrame =
af_framework.interactions("configuration/interactionTypes/ExhibitionEvent", deltaWindow ="1h",withDeleted = false)     
val count1 = intData1.count()     
val count2 = intData2.count()

In this example, count1 equals count2+1.