Skip to content

Commit c0c7dd8

Browse files
committed
join part output alignment
1 parent f0c90a1 commit c0c7dd8

File tree

2 files changed

+15
-5
lines changed

2 files changed

+15
-5
lines changed

spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ case class JoinPartJobContext(leftDf: Option[DfWithStats],
2525
tableProps: Map[String, String],
2626
runSmallMode: Boolean)
2727

28-
class JoinPartJob(node: JoinPartNode, metaData: MetaData, range: DateRange, showDf: Boolean = false)(implicit
28+
// alignOutput forces the job to produce the partitions specified by range.
29+
// legacy behavior was to not align, but that prevents from partition aware orchestration
30+
class JoinPartJob(node: JoinPartNode, metaData: MetaData, range: DateRange, showDf: Boolean = false, alignOutput: Boolean = false)(implicit
2931
tableUtils: TableUtils) {
3032
@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass)
3133
implicit val partitionSpec: PartitionSpec = tableUtils.partitionSpec
@@ -195,7 +197,7 @@ class JoinPartJob(node: JoinPartNode, metaData: MetaData, range: DateRange, show
195197
skewFilteredLeft.select(columns: _*)
196198
}
197199

198-
lazy val shiftedPartitionRange = unfilledPartitionRange.shift(-1)
200+
lazy val shiftedPartitionRange = if(alignOutput) unfilledPartitionRange else unfilledPartitionRange.shift(-1)
199201

200202
val renamedLeftDf = renamedLeftRawDf.select(renamedLeftRawDf.columns.map {
201203
case c if c == tableUtils.partitionColumn =>

spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package ai.chronon.spark.batch
22

33
import ai.chronon.api
4-
import ai.chronon.api.Extensions.{JoinOps, MetadataOps, SourceOps}
4+
import ai.chronon.api.Extensions.{GroupByOps, JoinOps, MetadataOps, SourceOps}
55
import ai.chronon.api.ScalaJavaConversions.IterableOps
6-
import ai.chronon.api.{DateRange, MetaData, PartitionRange}
6+
import ai.chronon.api.{Accuracy, DataModel, DateRange, MetaData, PartitionRange, PartitionSpec}
77
import ai.chronon.planner.{JoinBootstrapNode, JoinDerivationNode, JoinMergeNode, JoinPartNode, SourceWithFilterNode}
88
import ai.chronon.spark.JoinUtils
99
import ai.chronon.spark.catalog.TableUtils
@@ -124,7 +124,15 @@ class ModularMonolith(join: api.Join, dateRange: DateRange)(implicit tableUtils:
124124
.setSkewKeys(join.skewKeys)
125125

126126
StepRunner(dateRange, partMetaData) { stepRange =>
127-
val joinPartJob = new JoinPartJob(joinPartNode, partMetaData, stepRange)
127+
val shiftedRange = if (join.left.dataModel == DataModel.EVENTS && joinPart.groupBy.inferredAccuracy == Accuracy.SNAPSHOT) {
128+
val spec = PartitionSpec.daily
129+
new DateRange()
130+
.setStartDate(spec.before(stepRange.startDate))
131+
.setEndDate(spec.before(stepRange.endDate))
132+
} else {
133+
stepRange
134+
}
135+
val joinPartJob = new JoinPartJob(joinPartNode, partMetaData, shiftedRange, alignOutput = true)
128136
joinPartJob.run(None) // Run without context for now
129137
}
130138
logger.info(s"JoinPartJob completed for: $joinPartGroupByName, output table: $partFullTableName")

0 commit comments

Comments
 (0)