diff --git a/api/thrift/common.thrift b/api/thrift/common.thrift index 73ccc33302..f604403dd7 100644 --- a/api/thrift/common.thrift +++ b/api/thrift/common.thrift @@ -63,21 +63,9 @@ struct ConfigProperties { 5: optional map serving } -struct TableDependency { +struct TableInfo { // fully qualified table name 1: optional string table - - // DEPENDENCY_RANGE_LOGIC - // 1. get final start_partition, end_partition - // 2. break into step ranges - // 3. for each dependency - // a. dependency_start: max(query.start - startOffset, startCutOff) - // b. dependency_end: min(query.end - endOffset, endCutOff) - 2: optional Window startOffset - 3: optional Window endOffset - 4: optional string startCutOff - 5: optional string endCutOff - # if not present we will pull from defaults // needed to enumerate what partitions are in a range 100: optional string partitionColumn @@ -89,6 +77,22 @@ struct TableDependency { * is sufficient. What this means is that latest available partition prior to end cut off will be used. **/ 200: optional bool isCumulative +} + +struct TableDependency { + // fully qualified table name + 1: optional TableInfo tableInfo + + // DEPENDENCY_RANGE_LOGIC + // 1. get final start_partition, end_partition + // 2. break into step ranges + // 3. for each dependency + // a. dependency_start: max(query.start - startOffset, startCutOff) + // b. dependency_end: min(query.end - endOffset, endCutOff) + 2: optional Window startOffset + 3: optional Window endOffset + 4: optional string startCutOff + 5: optional string endCutOff /** * JoinParts could use data from batch backfill-s or upload tables when available @@ -126,6 +130,7 @@ struct ExecutionInfo { 11: optional i32 stepDays 12: optional bool historicalBackfill 13: optional list tableDependencies + 14: optional TableInfo outputTableInfo # relevant for streaming jobs 200: optional list kvDependency diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/physical/JoinBackfill.scala b/orchestration/src/main/scala/ai/chronon/orchestration/physical/JoinBackfill.scala index fed71f1feb..0c070751f2 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/physical/JoinBackfill.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/physical/JoinBackfill.scala @@ -34,8 +34,8 @@ class JoinBackfill(join: Join) extends TabularNode[Join](join) { dep.setEndOffset(noShift) dep.setStartCutOff(query.getStartPartition) dep.setEndCutOff(query.getEndPartition) - dep.setIsCumulative(false) - dep.setTable(bootstrapPart.getTable) + dep.tableInfo.setIsCumulative(false) + dep.tableInfo.setTable(bootstrapPart.getTable) dep } diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/physical/StagingQueryNode.scala b/orchestration/src/main/scala/ai/chronon/orchestration/physical/StagingQueryNode.scala index 07b2d1d557..65fa09186a 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/physical/StagingQueryNode.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/physical/StagingQueryNode.scala @@ -23,8 +23,8 @@ class StagingQueryNode(stagingQuery: StagingQuery) extends TabularNode[StagingQu val result = new TableDependency() result.setStartOffset(noShift) result.setEndOffset(noShift) - result.setIsCumulative(false) - result.setTable(tableName) + result.tableInfo.setIsCumulative(false) + result.tableInfo.setTable(tableName) result } }.toSeq diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/utils/DependencyResolver.scala b/orchestration/src/main/scala/ai/chronon/orchestration/utils/DependencyResolver.scala index cd4d9d0232..14d8fb4426 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/utils/DependencyResolver.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/utils/DependencyResolver.scala @@ -59,8 +59,8 @@ object DependencyResolver { if (startCutOff != null) result.setStartCutOff(startCutOff) if (endCutOff != null) result.setEndCutOff(endCutOff) - result.setIsCumulative(source.isCumulative) - result.setTable(table) + result.tableInfo.setIsCumulative(source.isCumulative) + result.tableInfo.setTable(table) result } @@ -87,7 +87,7 @@ object DependencyResolver { return NoPartitions } - if (tableDep.isCumulative) { + if (tableDep.tableInfo.isCumulative) { return LatestPartitionInRange(end, tableDep.getEndCutOff) }