Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions api/thrift/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,9 @@ struct ConfigProperties {
5: optional map<string, string> serving
}

struct TableDependency {
struct TableInfo {
// fully qualified table name
1: optional string table

// DEPENDENCY_RANGE_LOGIC
// 1. get final start_partition, end_partition
// 2. break into step ranges
// 3. for each dependency
// a. dependency_start: max(query.start - startOffset, startCutOff)
// b. dependency_end: min(query.end - endOffset, endCutOff)
2: optional Window startOffset
3: optional Window endOffset
4: optional string startCutOff
5: optional string endCutOff

# if not present we will pull from defaults
// needed to enumerate what partitions are in a range
100: optional string partitionColumn
Expand All @@ -89,6 +77,22 @@ struct TableDependency {
* is sufficient. What this means is that latest available partition prior to end cut off will be used.
**/
200: optional bool isCumulative
}

struct TableDependency {
// fully qualified table name
1: optional TableInfo tableInfo

// DEPENDENCY_RANGE_LOGIC
// 1. get final start_partition, end_partition
// 2. break into step ranges
// 3. for each dependency
// a. dependency_start: max(query.start - startOffset, startCutOff)
// b. dependency_end: min(query.end - endOffset, endCutOff)
2: optional Window startOffset
3: optional Window endOffset
4: optional string startCutOff
5: optional string endCutOff

/**
* JoinParts could use data from batch backfill-s or upload tables when available
Expand Down Expand Up @@ -126,6 +130,7 @@ struct ExecutionInfo {
11: optional i32 stepDays
12: optional bool historicalBackfill
13: optional list<TableDependency> tableDependencies
14: optional TableInfo outputTableInfo

# relevant for streaming jobs
200: optional list<KvDependency> kvDependency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ class JoinBackfill(join: Join) extends TabularNode[Join](join) {
dep.setEndOffset(noShift)
dep.setStartCutOff(query.getStartPartition)
dep.setEndCutOff(query.getEndPartition)
dep.setIsCumulative(false)
dep.setTable(bootstrapPart.getTable)
dep.tableInfo.setIsCumulative(false)
dep.tableInfo.setTable(bootstrapPart.getTable)

dep
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ class StagingQueryNode(stagingQuery: StagingQuery) extends TabularNode[StagingQu
val result = new TableDependency()
result.setStartOffset(noShift)
result.setEndOffset(noShift)
result.setIsCumulative(false)
result.setTable(tableName)
result.tableInfo.setIsCumulative(false)
result.tableInfo.setTable(tableName)
result
}
}.toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ object DependencyResolver {
if (startCutOff != null) result.setStartCutOff(startCutOff)
if (endCutOff != null) result.setEndCutOff(endCutOff)

result.setIsCumulative(source.isCumulative)
result.setTable(table)
result.tableInfo.setIsCumulative(source.isCumulative)
result.tableInfo.setTable(table)

result
}
Expand All @@ -87,7 +87,7 @@ object DependencyResolver {
return NoPartitions
}

if (tableDep.isCumulative) {
if (tableDep.tableInfo.isCumulative) {
return LatestPartitionInRange(end, tableDep.getEndCutOff)
}

Expand Down