Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
eaacfa6
code to write daily irs
kambstreat May 30, 2025
40b6cb2
store incremental agg and compute final IRs
kambstreat Jun 3, 2025
a014b6e
Store hops to inc tables
kambstreat Jun 7, 2025
32d559e
add code changes to generate final output from IR for AVG
kambstreat Jun 14, 2025
37293df
change function structure and variable names
kambstreat Jun 19, 2025
6263706
remove unused functions
kambstreat Jun 19, 2025
cb4325b
change function defs
kambstreat Jun 19, 2025
796ef96
make changes
kambstreat Jun 19, 2025
f218b23
change function order
kambstreat Jun 19, 2025
b1d4ee9
add new field is_incremental to python api
kambstreat Jun 20, 2025
2ab7659
get argument for isIncremental in scala spark backend
kambstreat Jun 20, 2025
238c781
add unit test for incremental groupby
kambstreat Jun 20, 2025
8edfd27
reuse table ccreation
kambstreat Jul 18, 2025
e903683
Update GroupByTest
kambstreat Jul 18, 2025
0bdc4fc
Add GroupByTest for events
kambstreat Jul 18, 2025
7987931
changes for incrementalg
kambstreat Sep 3, 2025
2b26d45
resolve merge conflicts
kambstreat Sep 3, 2025
7b62a43
add last hole logic for incrementnal bacckfill
kambstreat Sep 5, 2025
aeeb5ec
fix syntax
kambstreat Sep 5, 2025
9180d23
fix bug : backfill only for missing holes
kambstreat Sep 6, 2025
ee81672
fix none error for inc Table
kambstreat Sep 7, 2025
29a3f28
add incremental table queryable range
kambstreat Sep 19, 2025
aa16010
add logging for tableUtils
kambstreat Sep 19, 2025
ff41cc9
add log
kambstreat Sep 19, 2025
aa25f9f
fill incremental holes
kambstreat Sep 22, 2025
3efe8cd
modify incremental aggregation parts
kambstreat Oct 2, 2025
a3bece6
remove logs for debugging
kambstreat Oct 7, 2025
32f6ac9
support serializeIR
Oct 10, 2025
6c9e74d
add the actually important file
Oct 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ class RowAggregator(val inputSchema: Seq[(String, DataType)], val aggregationPar
.toArray
.zip(columnAggregators.map(_.irType))

val incrementalOutputSchema = aggregationParts
.map(_.incrementalOutputColumnName)
.toArray
.zip(columnAggregators.map(_.irType))

val outputSchema: Array[(String, DataType)] = aggregationParts
.map(_.outputColumnName)
.toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class SawtoothOnlineAggregatorTest extends TestCase {
operation = Operation.HISTOGRAM,
inputColumn = "action",
windows = Seq(
new Window(3, TimeUnit.DAYS),
new Window(3, TimeUnit.DAYS)
)
)
)
Expand All @@ -162,15 +162,15 @@ class SawtoothOnlineAggregatorTest extends TestCase {

val finalBatchIr = FinalBatchIr(
Array[Any](
null, // collapsed (T-1 -> T)
null // collapsed (T-1 -> T)
),
Array(
Array.empty, // 1‑day hops (not used)
Array( // 1-hour hops
hop(1, 1746745200000L), // 2025-05-08 23:00:00 UTC
hop(1, 1746766800000L), // 2025-05-09 05:00:00 UTC
Array.empty, // 1‑day hops (not used)
Array( // 1-hour hops
hop(1, 1746745200000L), // 2025-05-08 23:00:00 UTC
hop(1, 1746766800000L) // 2025-05-09 05:00:00 UTC
),
Array.empty // 5‑minute hops (not used)
Array.empty // 5‑minute hops (not used)
)
)
val queryTs = batchEndTs + 100
Expand Down
3 changes: 2 additions & 1 deletion api/py/ai/chronon/group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def GroupBy(
tags: Optional[Dict[str, str]] = None,
derivations: Optional[List[ttypes.Derivation]] = None,
deprecation_date: Optional[str] = None,
description: Optional[str] = None,
is_incremental: Optional[bool] = False,
**kwargs,
) -> ttypes.GroupBy:
"""
Expand Down Expand Up @@ -570,6 +570,7 @@ def _normalize_source(source):
backfillStartDate=backfill_start_date,
accuracy=accuracy,
derivations=derivations,
isIncremental=is_incremental,
)
validate_group_by(group_by)
return group_by
9 changes: 8 additions & 1 deletion api/py/test/sample/scripts/spark_submit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@

set -euxo pipefail
CHRONON_WORKING_DIR=${CHRONON_TMPDIR:-/tmp}/${USER}
echo $CHRONON_WORKING_DIR
mkdir -p ${CHRONON_WORKING_DIR}
export TEST_NAME="${APP_NAME}_${USER}_test"
unset PYSPARK_DRIVER_PYTHON
unset PYSPARK_PYTHON
unset SPARK_HOME
unset SPARK_CONF_DIR
export LOG4J_FILE="${CHRONON_WORKING_DIR}/log4j_file"
export LOG4J_FILE="${CHRONON_WORKING_DIR}/log4j.properties"
cat > ${LOG4J_FILE} << EOF
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
Expand All @@ -47,6 +48,9 @@ EOF
$SPARK_SUBMIT_PATH \
--driver-java-options " -Dlog4j.configuration=file:${LOG4J_FILE}" \
--conf "spark.executor.extraJavaOptions= -XX:ParallelGCThreads=4 -XX:+UseParallelGC -XX:+UseCompressedOops" \
--conf "spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 -Dlog4j.configuration=file:${LOG4J_FILE}" \
--conf "spark.sql.warehouse.dir=/home/chaitu/projects/chronon/spark-warehouse" \
--conf "javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=/home/chaitu/projects/chronon/hive-metastore/metastore_db;create=true" \
--conf spark.sql.shuffle.partitions=${PARALLELISM:-4000} \
--conf spark.dynamicAllocation.maxExecutors=${MAX_EXECUTORS:-1000} \
--conf spark.default.parallelism=${PARALLELISM:-4000} \
Expand Down Expand Up @@ -77,3 +81,6 @@ tee ${CHRONON_WORKING_DIR}/${APP_NAME}_spark.log




#--conf "spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 -Dlog4j.rootLogger=INFO,console" \

2 changes: 1 addition & 1 deletion api/py/test/sample/teams.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
},
"common_env": {
"VERSION": "latest",
"SPARK_SUBMIT_PATH": "[TODO]/path/to/spark-submit",
"SPARK_SUBMIT_PATH": "spark-submit",
"JOB_MODE": "local[*]",
"HADOOP_DIR": "[STREAMING-TODO]/path/to/folder/containing",
"CHRONON_ONLINE_CLASS": "[ONLINE-TODO]your.online.class",
Expand Down
6 changes: 5 additions & 1 deletion api/src/main/scala/ai/chronon/api/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ object Extensions {
def cleanName: String = metaData.name.sanitize

def outputTable = s"${metaData.outputNamespace}.${metaData.cleanName}"

def incrementalOutputTable = s"${metaData.outputNamespace}.${metaData.cleanName}_inc"
def preModelTransformsTable = s"${metaData.outputNamespace}.${metaData.cleanName}_pre_mt"
def outputLabelTable = s"${metaData.outputNamespace}.${metaData.cleanName}_labels"
def outputFinalView = s"${metaData.outputNamespace}.${metaData.cleanName}_labeled"
Expand Down Expand Up @@ -179,6 +179,10 @@ object Extensions {

def outputColumnName =
s"${aggregationPart.inputColumn}_$opSuffix${aggregationPart.window.suffix}${bucketSuffix}"

def incrementalOutputColumnName =
s"${aggregationPart.inputColumn}_$opSuffix${bucketSuffix}"

}

implicit class AggregationOps(aggregation: Aggregation) {
Expand Down
1 change: 1 addition & 0 deletions api/thrift/api.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ struct GroupBy {
6: optional string backfillStartDate
// Optional derivation list
7: optional list<Derivation> derivations
8: optional bool isIncremental
}

struct JoinPart {
Expand Down
74 changes: 74 additions & 0 deletions online/src/main/scala/ai/chronon/online/SparkConversions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,78 @@ object SparkConversions {
extraneousRecord
)
}

/**
* Converts a single Spark column value to Chronon normalized IR format.
*
* This is the inverse of toSparkRow() - used when reading pre-computed IR values
* from Spark DataFrames. Each IR column in the DataFrame is converted based on its
* Chronon IR type.
*
* Examples:
* - Count IR: Long → Long (pass-through, primitives stay primitives)
* - Sum IR: Double → Double (pass-through)
* - Average IR: Spark Row(sum, count) → Array[Any](sum, count)
* - UniqueCount IR: Spark Array[T] → java.util.ArrayList[T]
* - Histogram IR: Spark Map[K,V] → java.util.HashMap[K,V]
* - ApproxPercentile IR: Array[Byte] → Array[Byte] (pass-through for binary)
*
* @param sparkValue The value from a Spark DataFrame column
* @param irType The Chronon IR type for this column (from RowAggregator.incrementalOutputSchema)
* @return Normalized IR value ready for denormalize()
*/
def fromSparkValue(sparkValue: Any, irType: api.DataType): Any = {
if (sparkValue == null) return null

(sparkValue, irType) match {
// Primitives - pass through (Count, Sum, Min, Max, Binary sketches)
case (v,
api.IntType | api.LongType | api.ShortType | api.ByteType | api.FloatType | api.DoubleType |
api.StringType | api.BooleanType | api.BinaryType) =>
v

// Spark Row → Array[Any] (Average, Variance, Skew, Kurtosis, FirstK/LastK)
case (row: Row, api.StructType(_, fields)) =>
val arr = new Array[Any](fields.length)
fields.zipWithIndex.foreach {
case (field, idx) =>
arr(idx) = fromSparkValue(row.get(idx), field.fieldType)
}
arr

// Spark mutable.WrappedArray → util.ArrayList (UniqueCount, TopK, BottomK)
case (arr: mutable.WrappedArray[_], api.ListType(elementType)) =>
val result = new util.ArrayList[Any](arr.length)
arr.foreach { elem =>
result.add(fromSparkValue(elem, elementType))
}
result

// Spark native Array → util.ArrayList (alternative array representation)
case (arr: Array[_], api.ListType(elementType)) =>
val result = new util.ArrayList[Any](arr.length)
arr.foreach { elem =>
result.add(fromSparkValue(elem, elementType))
}
result

// Spark scala.collection.Map → util.HashMap (Histogram)
case (map: scala.collection.Map[_, _], api.MapType(keyType, valueType)) =>
val result = new util.HashMap[Any, Any]()
map.foreach {
case (k, v) =>
result.put(
fromSparkValue(k, keyType),
fromSparkValue(v, valueType)
)
}
result

case (value, tpe) =>
throw new IllegalArgumentException(
s"Cannot convert Spark value $value (${value.getClass.getSimpleName}) " +
s"to Chronon IR type $tpe"
)
}
}
}
5 changes: 5 additions & 0 deletions spark/src/main/scala/ai/chronon/spark/DataRange.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ case class PartitionRange(start: String, end: String)(implicit tableUtils: Table
}
}

def daysBetween: Int = {
if (start == null || end == null) 0
else Stream.iterate(start)(tableUtils.partitionSpec.after).takeWhile(_ <= end).size
}

def isSingleDay: Boolean = {
start == end
}
Expand Down
3 changes: 2 additions & 1 deletion spark/src/main/scala/ai/chronon/spark/Driver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,8 @@ object Driver {
tableUtils,
args.stepDays.toOption,
args.startPartitionOverride.toOption,
!args.runFirstHole()
!args.runFirstHole(),
args.groupByConf.isIncremental
)

if (args.shouldExport()) {
Expand Down
Loading