Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions api/src/main/scala/ai/chronon/api/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1188,9 +1188,9 @@ object Extensions {
}

def partitionSpec(defaultSpec: PartitionSpec): PartitionSpec = {
val column = Option(query.partitionColumn).getOrElse(defaultSpec.column)
val format = Option(query.partitionFormat).getOrElse(defaultSpec.format)
val interval = Option(query.partitionInterval).getOrElse(WindowUtils.Day)
val column = Option(query).flatMap((q) => Option(q.partitionColumn)).getOrElse(defaultSpec.column)
val format = Option(query).flatMap((q) => Option(q.partitionFormat)).getOrElse(defaultSpec.format)
val interval = Option(query).flatMap((q) => Option(q.partitionInterval)).getOrElse(WindowUtils.Day)
PartitionSpec(column, format, interval.millis)
}
}
Expand Down
19 changes: 10 additions & 9 deletions spark/src/main/scala/ai/chronon/spark/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,23 @@
package ai.chronon.spark

import ai.chronon.api
import ai.chronon.api.{Accuracy, AggregationPart, Constants, DataModel, DataType, PartitionRange}
import ai.chronon.api.ColorPrinter.ColorString
import ai.chronon.api.DataModel.{ENTITIES, EVENTS}
import ai.chronon.api.Extensions._
import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.api.{Accuracy, AggregationPart, Constants, DataModel, DataType, PartitionRange}
import ai.chronon.online.serde.SparkConversions
import ai.chronon.spark.Driver.parseConf
import ai.chronon.spark.Extensions.QuerySparkOps
import ai.chronon.spark.catalog.TableUtils
import ai.chronon.spark.submission.ItemSketchSerializable
import org.apache.datasketches.frequencies.ErrorType
import ai.chronon.spark.catalog.TableUtils
import org.apache.spark.sql.{types, DataFrame, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{DataFrame, Row, types}
import org.slf4j.{Logger, LoggerFactory}

import scala.collection.{immutable, mutable, Seq}
import scala.collection.mutable.ListBuffer
import scala.collection.{Seq, immutable, mutable}

class Analyzer(tableUtils: TableUtils,
conf: Any,
Expand Down Expand Up @@ -267,10 +266,12 @@ class Analyzer(tableUtils: TableUtils,
JoinUtils.getRangeToFill(joinConf.left, tableUtils, endDate, historicalBackfill = joinConf.historicalBackfill)
logger.info(s"Join range to fill $rangeToFill")
val unfilledRanges = tableUtils
.unfilledRanges(joinConf.metaData.outputTable,
rangeToFill,
Some(Seq(joinConf.left.table)),
inputPartitionColumnNames = Seq(joinConf.left.query.effectivePartitionColumn))
.unfilledRanges(
joinConf.metaData.outputTable,
rangeToFill,
Some(Seq(joinConf.left.table)),
inputPartitionColumnNames = Seq(joinConf.left.query.partitionSpec(tableUtils.partitionSpec).column)
)
.getOrElse(Seq.empty)

joinConf.joinParts.toScala.foreach { part =>
Expand Down
9 changes: 7 additions & 2 deletions spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ object BootstrapInfo {
.foreach(part => {
// practically there should only be one logBootstrapPart per Join, but nevertheless we will loop here
val schema = tableUtils.getSchemaFromTable(part.table)
val missingKeys = part.keys(joinConf, part.query.effectivePartitionColumn).filterNot(schema.fieldNames.contains)
val missingKeys = part
.keys(joinConf, part.query.partitionSpec(tableUtils.partitionSpec).column)
.filterNot(schema.fieldNames.contains)
collectException(assert(
missingKeys.isEmpty,
s"Log table ${part.table} does not contain some specified keys: ${missingKeys.prettyInline}, table schema: ${schema.pretty}"
Expand All @@ -203,7 +205,10 @@ object BootstrapInfo {
val range = PartitionRange(part.startPartition, part.endPartition)
val bootstrapDf =
tableUtils
.scanDf(part.query, part.table, Some(Map(part.query.effectivePartitionColumn -> null)), range = Some(range))
.scanDf(part.query,
part.table,
Some(Map(part.query.partitionSpec(tableUtils.partitionSpec).column -> null)),
range = Some(range))
val schema = bootstrapDf.schema
// We expect partition column and not effectivePartitionColumn because of the scanDf rename
val missingKeys = part.keys(joinConf, tableUtils.partitionColumn).filterNot(schema.fieldNames.contains)
Expand Down
27 changes: 1 addition & 26 deletions spark/src/main/scala/ai/chronon/spark/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
package ai.chronon.spark

import ai.chronon.api
import ai.chronon.api.{Constants, PartitionRange, PartitionSpec, TimeRange, Window}
import ai.chronon.api.Extensions.{SourceOps, WindowOps}
import ai.chronon.api.{Constants, PartitionRange, PartitionSpec, TimeRange}
import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.online.serde.{AvroConversions, SparkConversions}
import ai.chronon.spark.catalog.TableUtils
Expand Down Expand Up @@ -308,28 +307,4 @@ object Extensions {
result
}
}

implicit class SourceSparkOps(source: api.Source)(implicit tableUtils: TableUtils) {

def partitionColumn: String = {
Option(source.query.partitionColumn).getOrElse(tableUtils.partitionColumn)
}

def partitionFormat: String = {
Option(source.query.partitionFormat).getOrElse(tableUtils.partitionFormat)
}

def partitionInterval: Window = {
Option(source.query.partitionInterval).getOrElse(tableUtils.partitionSpec.intervalWindow)
}

def partitionSpec: PartitionSpec = {
PartitionSpec(partitionColumn, partitionFormat, partitionInterval.millis)
}
}

implicit class QuerySparkOps(query: api.Query) {
def effectivePartitionColumn(implicit tableUtils: TableUtils): String =
Option(query).flatMap(q => Option(q.partitionColumn)).getOrElse(tableUtils.partitionColumn)
}
}
30 changes: 11 additions & 19 deletions spark/src/main/scala/ai/chronon/spark/GroupBy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
package ai.chronon.spark

import ai.chronon.aggregator.base.TimeTuple
import ai.chronon.aggregator.row.ColumnAggregator
import ai.chronon.aggregator.row.RowAggregator
import ai.chronon.aggregator.row.{ColumnAggregator, RowAggregator}
import ai.chronon.aggregator.windowing._
import ai.chronon.api
import ai.chronon.api.{
Expand All @@ -28,30 +27,23 @@ import ai.chronon.api.{
ParametricMacro,
PartitionRange,
PartitionSpec,
TsUtils,
TimeRange
TimeRange,
TsUtils
}
import ai.chronon.spark.catalog.TableUtils
import ai.chronon.api.DataModel.ENTITIES
import ai.chronon.api.DataModel.EVENTS
import ai.chronon.api.DataModel.{ENTITIES, EVENTS}
import ai.chronon.api.Extensions._
import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.online.serde.RowWrapper
import ai.chronon.online.serde.SparkConversions
import ai.chronon.online.serde.{RowWrapper, SparkConversions}
import ai.chronon.spark.catalog.TableUtils
import ai.chronon.spark.Extensions._
import ai.chronon.spark.Extensions.SourceSparkOps
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types._
import org.apache.spark.util.sketch.BloomFilter
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.{Logger, LoggerFactory}

import java.util
import scala.collection.Seq
import scala.collection.mutable
import scala.collection.{mutable, Seq}

class GroupBy(val aggregations: Seq[api.Aggregation],
val keyColumns: Seq[String],
Expand Down Expand Up @@ -626,8 +618,8 @@ object GroupBy {
window: Option[api.Window]): PartitionRange = {

implicit val tu: TableUtils = tableUtils
val effectiveQueryRange = queryRange.translate(source.partitionSpec)
implicit val sourcePartitionSpec: PartitionSpec = source.partitionSpec
val effectiveQueryRange = queryRange.translate(source.query.partitionSpec(tableUtils.partitionSpec))
implicit val sourcePartitionSpec: PartitionSpec = source.query.partitionSpec(tableUtils.partitionSpec)

// from here on down - the math is based entirely on source partition spec
val PartitionRange(queryStart, queryEnd) = effectiveQueryRange
Expand Down
10 changes: 5 additions & 5 deletions spark/src/main/scala/ai/chronon/spark/JoinBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ abstract class JoinBase(val joinConfCloned: api.Join,
rangeToFill,
Some(Seq(joinConfCloned.left.table)),
skipFirstHole = skipFirstHole,
inputPartitionColumnNames = Seq(joinConfCloned.left.query.effectivePartitionColumn)
inputPartitionColumnNames = Seq(joinConfCloned.left.query.partitionSpec(tableUtils.partitionSpec).column)
)
.getOrElse(Seq.empty))
}
Expand Down Expand Up @@ -291,8 +291,8 @@ abstract class JoinBase(val joinConfCloned: api.Join,
val existingLeftRange = tableUtils.partitions(
joinConfCloned.left.table,
partitionRange = Option(rangeToFill),
tablePartitionSpec = Option(joinConfCloned.left.partitionSpec),
partitionColumnName = joinConfCloned.left.partitionSpec.column
tablePartitionSpec = Option(joinConfCloned.left.query.partitionSpec(tableUtils.partitionSpec)),
partitionColumnName = joinConfCloned.left.query.partitionSpec(tableUtils.partitionSpec).column
)
val requested = rangeToFill.partitions
val fillableRanges = requested.filter(existingLeftRange.contains)
Expand All @@ -309,8 +309,8 @@ abstract class JoinBase(val joinConfCloned: api.Join,
rangeToFill,
Some(Seq(joinConfCloned.left.table)),
skipFirstHole = skipFirstHole,
inputPartitionColumnNames = Seq(joinConfCloned.left.query.effectivePartitionColumn),
inputPartitionSpecs = Seq(joinConfCloned.left.partitionSpec)
inputPartitionColumnNames = Seq(joinConfCloned.left.query.partitionSpec(tableUtils.partitionSpec).column),
inputPartitionSpecs = Seq(joinConfCloned.left.query.partitionSpec(tableUtils.partitionSpec))
)
.getOrElse(Seq.empty)

Expand Down
4 changes: 2 additions & 2 deletions spark/src/main/scala/ai/chronon/spark/JoinUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ object JoinUtils {
}

implicit val tu: TableUtils = tableUtils
val effectiveLeftSpec = joinConf.left.partitionSpec
val effectiveLeftSpec = joinConf.left.query.partitionSpec(tableUtils.partitionSpec)
val effectiveLeftRange = range.translate(effectiveLeftSpec)

val partitionColumnOfLeft = effectiveLeftSpec.column
Expand Down Expand Up @@ -154,7 +154,7 @@ object JoinUtils {
}

implicit val tu: TableUtils = tableUtils
val leftSpec = leftSource.partitionSpec
val leftSpec = leftSource.query.partitionSpec(tableUtils.partitionSpec)

val firstAvailablePartitionOpt =
tableUtils.firstAvailablePartition(leftSource.table,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ai.chronon.spark.batch

import ai.chronon.api.Extensions.{BootstrapPartOps, DateRangeOps, ExternalPartOps, MetadataOps, SourceOps, StringsOps}
import ai.chronon.api.Extensions._
import ai.chronon.api.ScalaJavaConversions.ListOps
import ai.chronon.api.{Constants, DateRange, PartitionRange, PartitionSpec, StructField, StructType}
import ai.chronon.online.serde.SparkConversions
Expand Down Expand Up @@ -87,7 +87,7 @@ class JoinBootstrapJob(node: JoinBootstrapNode, range: DateRange)(implicit table
var bootstrapDf =
tableUtils.scanDf(part.query,
part.table,
Some(Map(part.query.effectivePartitionColumn -> null)),
Some(Map(part.query.partitionSpec(tableUtils.partitionSpec).column -> null)),
range = Some(bootstrapRange))

// attach semantic_hash for either log or regular table bootstrap
Expand Down