Skip to content
3 changes: 3 additions & 0 deletions .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ common:scala_2.13 --define=SCALA_VERSION=2.13.12
common --repo_env=SCALA_VERSION=2.12.18
common --define=SCALA_VERSION=2.12.18

# common --repo_env=SCALA_VERSION=2.13.12
# common --define=SCALA_VERSION=2.13.12

build --java_language_version=11
build --java_runtime_version=11
build --remote_cache=https://storage.googleapis.com/zipline-bazel-cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ class HopsAggregator(minQueryTs: Long,
.mkString(", ")
logger.info(s"""Left bounds: $readableHopsToBoundsMap
|minQueryTs = ${TsUtils.toStr(minQueryTs)}""".stripMargin)

result
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import ai.chronon.api.Extensions.WindowMapping
import ai.chronon.api.Extensions.WindowOps

import java.util
import scala.collection.Seq
import scala.collection.{Seq, mutable}

// Head Sliding, Tail Hopping Window - effective window size when plotted against query timestamp
// will look the edge of sawtooth - instead of like a straight line.
Expand Down Expand Up @@ -139,6 +139,62 @@ class SawtoothAggregator(aggregations: Seq[Aggregation], inputSchema: Seq[(Strin
}
result
}

// method is used to generate head-realtime ness on top of hops
// But without the requirement that the input be sorted
def cumulateAndFinalizeSorted(sortedInputs: mutable.Buffer[Row], // don't need to be sorted
sortedEndTimes: mutable.Buffer[Row], // sorted,
baseIR: Array[Any],
consumer: (Row, Array[Any]) => Unit): Unit = {

if (sortedEndTimes == null || sortedEndTimes.isEmpty) return

if (sortedInputs == null || sortedInputs.isEmpty) {
sortedEndTimes.foreach(query => consumer(query, windowedAggregator.finalize(baseIR)))
return
}

var inputIdx = 0
var queryIdx = 0

var queryIr = if (baseIR == null) {
new Array[Any](windowedAggregator.length)
} else {
baseIR
}

var queryFinalized = if (baseIR == null) {
new Array[Any](windowedAggregator.length)
} else {
windowedAggregator.finalize(queryIr)
}

while (queryIdx < sortedEndTimes.length) {

var didClone = false

while (inputIdx < sortedInputs.length && sortedInputs(inputIdx).ts < sortedEndTimes(queryIdx).ts) {

// clone only if necessary - queryIrs differ between consecutive endTimes
if (!didClone) {
queryIr = windowedAggregator.clone(queryIr)
didClone = true
}

windowedAggregator.update(queryIr, sortedInputs(inputIdx))
inputIdx += 1
}

// re-use the finalized values if there are no events between two query times.
if (didClone) {
queryFinalized = windowedAggregator.finalize(queryIr)
}

consumer(sortedEndTimes(queryIdx), queryFinalized)
queryIdx += 1
}
}

}

private class Entry(var startIndex: Int, var endIndex: Int, var ir: Any) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import ai.chronon.api._
import scala.collection.Seq

// This implements the two-stack-lite algorithm
// To understand the intuition behind the algorithm I highly recommend reading the intuition text in the end of this file
// To understand the intuition behind the algorithm I highly recommend reading the snippet in the end of this file
class TwoStackLiteAggregator(inputSchema: StructType,
aggregations: Seq[Aggregation],
resolution: Resolution = FiveMinuteResolution) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,24 @@ class NaiveAggregator(aggregator: RowAggregator,
// initialize the result - convention is to append the timestamp in the end
val results: Array[Array[Any]] = Array.fill(queries.length)(Array.fill(aggregator.length)(null))
if (inputRows == null) return results

for (inputRow <- inputRows) {
for (endTimeIndex <- queries.indices) {

val queryTime = queries(endTimeIndex)

for (col <- aggregator.indices) {

val windowStart = TsUtils.round(queryTime - windows(col).millis, tailHops(col))

if (windowStart <= inputRow.ts && inputRow.ts < TsUtils.round(queryTime, headRoundingMillis)) {
aggregator.columnAggregators(col).update(results(endTimeIndex), inputRow)
}
}

}
}

results
}
}
94 changes: 94 additions & 0 deletions scripts/perf/run_unionjoin_with_manual_dump.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#!/bin/bash

# UnionJoinTest with Manual Heap Dump Generation

cd /Users/nsimha/repos/chronon
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Script contains hardcoded user-specific paths.

The script is tied to a specific user's directory structure, limiting reusability.

Make the script more portable:

+# Get the script directory and project root
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+PROJECT_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)"
+DUMP_DIR="${PROJECT_ROOT}/.ijwb"

-cd /Users/nsimha/repos/chronon || exit 1
+cd "$PROJECT_ROOT" || exit 1

-RUNFILES_DIR="$(pwd)/bazel-bin/spark/join_test_test_suite_src_test_scala_ai_chronon_spark_test_join_UnionJoinTest.scala.runfiles"
+RUNFILES_DIR="${PROJECT_ROOT}/bazel-bin/spark/join_test_test_suite_src_test_scala_ai_chronon_spark_test_join_UnionJoinTest.scala.runfiles"

-HEAP_DUMP_FILE="/Users/nsimha/repos/chronon/.ijwb/unionjoin-heapdump-$(date +%Y%m%d-%H%M%S).hprof"
+HEAP_DUMP_FILE="${DUMP_DIR}/unionjoin-heapdump-$(date +%Y%m%d-%H%M%S).hprof"

Also applies to: 20-21, 38-38, 73-73

🧰 Tools
🪛 Shellcheck (0.10.0)

[warning] 5-5: Use 'cd ... || exit' or 'cd ... || return' in case cd fails.

(SC2164)

🤖 Prompt for AI Agents
In scripts/perf/run_unionjoin_with_manual_dump.sh at lines 5, 20-21, 38, and 73,
the script uses hardcoded user-specific paths which reduce portability. Replace
these absolute paths with relative paths or use environment variables to
dynamically determine the base directory. This will make the script reusable
across different environments and users without modification.

⚠️ Potential issue

Add error handling for directory change.

The cd command should handle failure cases.

Apply this fix:

-cd /Users/nsimha/repos/chronon
+cd /Users/nsimha/repos/chronon || exit 1
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
cd /Users/nsimha/repos/chronon
-cd /Users/nsimha/repos/chronon
+cd /Users/nsimha/repos/chronon || exit 1
🧰 Tools
🪛 Shellcheck (0.10.0)

[warning] 5-5: Use 'cd ... || exit' or 'cd ... || return' in case cd fails.

(SC2164)

🤖 Prompt for AI Agents
In scripts/perf/run_unionjoin_with_manual_dump.sh at line 5, the cd command
lacks error handling for failure cases. Modify the cd command to check if
changing the directory was successful, and if not, print an error message and
exit the script with a non-zero status to prevent further execution in the wrong
directory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we add this file in platform instead of chronon?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm - we can move it later if it indeed belongs there. I am not convinced that it should be far from the code it is meant to profile.


# Build the test target
echo "Building UnionJoinTest..."
bazel build //spark:join_test_test_suite_src_test_scala_ai_chronon_spark_test_join_UnionJoinTest.scala

# Get the generated script path
BAZEL_SCRIPT="bazel-bin/spark/join_test_test_suite_src_test_scala_ai_chronon_spark_test_join_UnionJoinTest.scala"

if [[ ! -f "$BAZEL_SCRIPT" ]]; then
echo "Error: Bazel script not found at $BAZEL_SCRIPT"
exit 1
fi

# Set up runfiles directory for classpath resolution
RUNFILES_DIR="$(pwd)/bazel-bin/spark/join_test_test_suite_src_test_scala_ai_chronon_spark_test_join_UnionJoinTest.scala.runfiles"
export JAVA_RUNFILES="$RUNFILES_DIR"
export TEST_SRCDIR="$RUNFILES_DIR"

# Extract and expand classpath
RUNPATH="${RUNFILES_DIR}/chronon/"
RUNPATH="${RUNPATH#$PWD/}"
RAW_CLASSPATH=$(sed -n '249p' "$BAZEL_SCRIPT" | cut -d'"' -f2)
CLASSPATH=$(echo "$RAW_CLASSPATH" | sed "s|\${RUNPATH}|$RUNPATH|g")

if [[ -z "$CLASSPATH" ]]; then
echo "Error: Failed to extract classpath"
exit 1
fi

echo "Successfully extracted classpath (${#CLASSPATH} characters)"

# Create heap dump file path
HEAP_DUMP_FILE="/Users/nsimha/repos/chronon/.ijwb/unionjoin-heapdump-$(date +%Y%m%d-%H%M%S).hprof"
echo "Heap dump will be saved to: $HEAP_DUMP_FILE"

# JVM settings with more aggressive heap dump options
JVM_OPTS="-Xmx8g -Xms2g" # Reduced max heap to force memory pressure
MODULE_OPENS="--add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED"
SYSTEM_PROPS="-DRULES_SCALA_MAIN_WS_NAME=chronon -DRULES_SCALA_ARGS_FILE=spark/join_test_test_suite_src_test_scala_ai_chronon_spark_test_join_UnionJoinTest.scala.args"

# Heap dump and GC options
PROFILER_OPTS="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$HEAP_DUMP_FILE -XX:+PrintGC -XX:+PrintGCDetails -XX:+UseG1GC -Xlog:gc*:unionjoin-gc-$(date +%Y%m%d-%H%M%S).log:time"

echo "Starting UnionJoinTest with heap profiling..."
echo "Memory settings: $JVM_OPTS"
echo "Working directory: $(pwd)"
echo ""

# Run in background and get PID for manual heap dump
java \
$JVM_OPTS \
$MODULE_OPENS \
$PROFILER_OPTS \
$SYSTEM_PROPS \
-classpath "$CLASSPATH" \
io.bazel.rulesscala.scala_test.Runner &

JAVA_PID=$!
echo "Java process started with PID: $JAVA_PID"

# Wait a bit for the process to start and consume some memory
echo "Waiting 30 seconds for test to initialize..."
sleep 30

# Check if process is still running
if kill -0 $JAVA_PID 2>/dev/null; then
echo "Generating manual heap dump..."
MANUAL_DUMP="/Users/nsimha/repos/chronon/.ijwb/unionjoin-manual-$(date +%Y%m%d-%H%M%S).hprof"
jcmd $JAVA_PID GC.run_finalization
jcmd $JAVA_PID VM.gc
jcmd $JAVA_PID VM.gc
jcmd $JAVA_PID GC.run_finalization
sleep 5
jcmd $JAVA_PID VM.gc
echo "Creating heap dump with jcmd..."
jcmd $JAVA_PID GC.dump_heap $MANUAL_DUMP
echo "Manual heap dump saved to: $MANUAL_DUMP"
else
echo "Process already terminated"
fi

# Wait for the test to complete
wait $JAVA_PID
EXIT_CODE=$?

echo ""
echo "Test completed with exit code: $EXIT_CODE"
echo "Checking for heap dump files..."
ls -la /Users/nsimha/repos/chronon/.ijwb/*.hprof 2>/dev/null || echo "No heap dump files found"
27 changes: 26 additions & 1 deletion spark/src/main/scala/ai/chronon/spark/Driver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package ai.chronon.spark

import ai.chronon.api
import ai.chronon.api.{Constants, DateRange, ThriftJsonCodec}
import ai.chronon.api.{Constants, DateRange, PartitionRange, ThriftJsonCodec}
import ai.chronon.api.Constants.MetadataDataset
Comment on lines +20 to 21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove unused DateRange import

DateRange isn't referenced in this file; will trigger -Ywarn-unused-import.

-import ai.chronon.api.{Constants, DateRange, PartitionRange, ThriftJsonCodec}
+import ai.chronon.api.{Constants, PartitionRange, ThriftJsonCodec}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import ai.chronon.api.{Constants, DateRange, PartitionRange, ThriftJsonCodec}
import ai.chronon.api.Constants.MetadataDataset
import ai.chronon.api.{Constants, PartitionRange, ThriftJsonCodec}
import ai.chronon.api.Constants.MetadataDataset
🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/Driver.scala on lines 20 to 21, the
import statement includes `DateRange` which is not used anywhere in the file.
Remove the `DateRange` import to clean up the code and avoid compiler warnings
about unused imports.

import ai.chronon.api.Extensions.{GroupByOps, JoinPartOps, MetadataOps, SourceOps}
import ai.chronon.api.planner.RelevantLeftForJoinPart
Expand All @@ -27,6 +27,7 @@ import ai.chronon.online.fetcher.{ConfPathOrName, FetchContext, FetcherMain, Met
import ai.chronon.orchestration.{JoinMergeNode, JoinPartNode}
import ai.chronon.spark.batch._
import ai.chronon.spark.catalog.{Format, TableUtils}
import ai.chronon.spark.join.UnionJoin
import ai.chronon.spark.stats.{CompareBaseJob, CompareJob, ConsistencyJob}
import ai.chronon.spark.stats.drift.{Summarizer, SummaryPacker, SummaryUploader}
import ai.chronon.spark.streaming.JoinSourceRunner
Expand Down Expand Up @@ -275,6 +276,30 @@ object Driver {

def run(args: Args): Unit = {
val tableUtils = args.buildTableUtils()

if (tableUtils.sparkSession.conf.get("spark.chronon.join.backfill.mode.skewFree", "false").toBoolean) {
logger.info(s" >>> Running join backfill in skew free mode <<< ")
val startPartition = args.startPartitionOverride.toOption.getOrElse(args.joinConf.left.query.startPartition)
val endPartition = args.endDate()

val joinName = args.joinConf.metaData.name
val stepDays = args.stepDays.toOption.getOrElse(1)

logger.info(
s"Filling partitions for join:$joinName, partitions:[$startPartition, $endPartition], steps:$stepDays")

val partitionRange = PartitionRange(startPartition, endPartition)(tableUtils.partitionSpec)
Copy link
Contributor

@david-zlai david-zlai May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be careful using tableUtils.partitionSpec because it defaults to the spark.chronon.partition.format here.

Defaulting to do that led to this bug because users can technically define a custom partitionSpec at the Source level.

we could do something like:

        val partitionRange = PartitionRange(startPartition, endPartition)(args.joinConf.left.partitionSpec)

cc @tchow-zlai

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we know if all of their tables follow a consistent partition spec now? I remember we did ask them to staging query that one additional table. so maybe we're good here now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is supposed to be the output partition spec. while reading left & right data we translate to output partition spec.

val partitionSteps = partitionRange.steps(stepDays)

partitionSteps.zipWithIndex.foreach { case (stepRange, idx) =>
logger.info(s"Processing range $stepRange (${idx + 1}/${partitionSteps.length})")
UnionJoin.computeJoinAndSave(args.joinConf, stepRange)(tableUtils)
logger.info(s"Wrote range $stepRange (${idx + 1}/${partitionSteps.length})")
}

return
}

val join = new Join(
args.joinConf,
args.endDate(),
Expand Down
1 change: 1 addition & 0 deletions spark/src/main/scala/ai/chronon/spark/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ object Extensions {
}

implicit class DataframeOps(df: DataFrame) {

@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass)
private val tableUtils: TableUtils = TableUtils(df.sparkSession)
private implicit val partitionSpec: PartitionSpec = tableUtils.partitionSpec
Expand Down
41 changes: 41 additions & 0 deletions spark/src/main/scala/ai/chronon/spark/GroupBy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,47 @@ object GroupBy {
result.setSources(newSources)
}

def inputDf(groupByConfOld: api.GroupBy,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks similar to def from

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't want to touch that - yet. It is very much in the critical path for everything.

queryRange: PartitionRange,
tableUtils: TableUtils,
computeDependency: Boolean = false): DataFrame = {

logger.info(s"\n----[Processing GroupBy: ${groupByConfOld.metaData.name}]----")

val groupByConf = replaceJoinSource(groupByConfOld, queryRange, tableUtils, computeDependency)

val inputDf = groupByConf.sources.toScala
.map { source =>
sourceDf(groupByConf,
source,
groupByConf.getKeyColumns.toScala,
queryRange,
tableUtils,
groupByConf.maxWindow,
groupByConf.inferredAccuracy)

}
.reduce { (df1, df2) =>
// align the columns by name - when one source has select * the ordering might not be aligned
val columns1 = df1.schema.fields.map(_.name)
df1.union(df2.selectExpr(columns1: _*))
}

def doesNotNeedTime = !Option(groupByConf.getAggregations).exists(_.toScala.needsTimestamp)
def hasValidTimeColumn = inputDf.schema.find(_.name == Constants.TimeColumn).exists(_.dataType == LongType)

require(
doesNotNeedTime || hasValidTimeColumn,
s"Time column, ts doesn't exists (or is not a LONG type) for groupBy ${groupByConf.metaData.name}, but you either have windowed aggregation(s) or time based aggregation(s) like: " +
"first, last, firstK, lastK. \n" +
"Please note that for the entities case, \"ts\" needs to be explicitly specified in the selects."
)

// at-least one of the keys should be present in the row.
val nullFilterClause = groupByConf.keyColumns.toScala.map(key => s"($key IS NOT NULL)").mkString(" OR ")
inputDf.filter(nullFilterClause)
}

def from(groupByConfOld: api.GroupBy,
queryRange: PartitionRange,
tableUtils: TableUtils,
Expand Down
Loading