Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ class SawtoothAggregator(aggregations: Seq[Aggregation], inputSchema: Seq[(Strin
if (sortedEndTimes == null || sortedEndTimes.isEmpty) return

if (sortedInputs == null || sortedInputs.isEmpty) {
sortedEndTimes.foreach(query => consumer(query, windowedAggregator.finalize(baseIR)))
val finalized = windowedAggregator.finalize(baseIR)
sortedEndTimes.foreach(query => consumer(query, finalized))
return
}

Expand All @@ -163,34 +164,16 @@ class SawtoothAggregator(aggregations: Seq[Aggregation], inputSchema: Seq[(Strin
baseIR
}

var queryFinalized = if (baseIR == null) {
new Array[Any](windowedAggregator.length)
} else {
windowedAggregator.finalize(queryIr)
}

while (queryIdx < sortedEndTimes.length) {

var didClone = false

while (inputIdx < sortedInputs.length && sortedInputs(inputIdx).ts < sortedEndTimes(queryIdx).ts) {

// clone only if necessary - queryIrs differ between consecutive endTimes
if (!didClone) {
queryIr = windowedAggregator.clone(queryIr)
didClone = true
}

windowedAggregator.update(queryIr, sortedInputs(inputIdx))
queryIr = windowedAggregator.update(queryIr, sortedInputs(inputIdx))
inputIdx += 1
}

// re-use the finalized values if there are no events between two query times.
if (didClone) {
queryFinalized = windowedAggregator.finalize(queryIr)
}
val clonedIr = windowedAggregator.clone(queryIr)
consumer(sortedEndTimes(queryIdx), windowedAggregator.finalize(clonedIr))

consumer(sortedEndTimes(queryIdx), queryFinalized)
queryIdx += 1
}
}
Expand Down
6 changes: 5 additions & 1 deletion spark/src/main/scala/ai/chronon/spark/JoinBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,11 @@ abstract class JoinBase(val joinConfCloned: api.Join,

val wholeRange = PartitionRange(unfilledRanges.minBy(_.start).start, unfilledRanges.maxBy(_.end).end)

val runSmallMode = JoinUtils.runSmallMode(tableUtils, leftDf(joinConfCloned, wholeRange, tableUtils).get)
val leftDataOpt = leftDf(joinConfCloned, wholeRange, tableUtils)
require(leftDataOpt.nonEmpty,
s"left side of the join ${joinConfCloned.metaData.name} produced empty data in range $wholeRange")

val runSmallMode = JoinUtils.runSmallMode(tableUtils, leftDataOpt.get)

val effectiveRanges = if (runSmallMode) {
Seq(wholeRange)
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/JoinUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ object JoinUtils {
.getOrElse(df)

if (!allowEmpty && result.isEmpty) {
logger.info(s"Left side query below produced 0 rows in range $effectiveLeftRange, and allowEmpty=false.")
logger.info(s"Left side query produced 0 rows in range $effectiveLeftRange, and allowEmpty=false.")
return None
}

Expand Down
14 changes: 0 additions & 14 deletions spark/src/main/scala/ai/chronon/spark/join/AggregationInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,6 @@ case class AggregationInfo(hopsAggregator: HopsAggregator,

type Rows = mutable.WrappedArray[SparkRow]

@transient lazy val aggregatingUdf: UserDefinedFunction = {
val parent = this

udf(
new UDF2[Rows, Rows, Array[CGenericRow]] {
override def call(leftRows: Rows, rightRows: Rows): Array[CGenericRow] = {
val result = sawtoothAggregate(parent)(leftRows, rightRows)
result
}
},
spark.ArrayType(outputSparkSchema)
)
}

def aggregate(leftRows: Rows, rightRows: Rows): Array[CGenericRow] = {
sawtoothAggregate(this)(leftRows, rightRows)
}
Expand Down
2 changes: 2 additions & 0 deletions spark/src/main/scala/ai/chronon/spark/join/SawtoothUdf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ object SawtoothUdf {
// compute windows up-to 5min accuracy for the queries
val nonRealtimeIrs = sawtoothAggregator.computeWindows(hops, headStartTimes)

assert(headStartTimes.length == nonRealtimeIrs.length)

// STEP-4. join tailAccurate - Irs with headTimeStamps and headEvents
// to achieve realtime accuracy
val result = Array.fill[CGenericRow](leftRows.size)(null)
Expand Down
Loading
Loading