-
Notifications
You must be signed in to change notification settings - Fork 8
feat: batch node runner #870
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Warning Rate limit exceeded@tchow-zlai has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 0 minutes and 19 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (2)
""" WalkthroughThe changes remove the generic and context-based design from the Changes
Sequence Diagram(s)sequenceDiagram
participant Test as BatchNodeRunnerTest
participant Runner as BatchNodeRunner
participant Spark as SparkSession
participant UnionJoin as UnionJoin
Test->>Runner: run(metadata, nodeContent, partitionRange)
Runner->>Spark: Initialize session
Runner->>Runner: Inspect nodeContent
alt MonolithJoin node
Runner->>UnionJoin: computeJoinAndSave(...)
UnionJoin-->>Runner: Join results saved
Runner->>Spark: Create Join instance, computeJoin
Spark-->>Runner: DataFrame (join results)
Runner->>Runner: Show sample results
else Unsupported node
Runner->>Runner: Throw exception
end
Suggested reviewers
Poem
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (4)
spark/src/main/scala/ai/chronon/spark/batch/NodeRunner.scala (1)
12-16:readFilesis a no-op with unsafe return typeStub returns
Seq[Any], loses type safety and silently hides TODO.- def readFiles(folderPath: String): Seq[Any] = { - // read files from folder using metadata - Seq.empty - } + // TODO: implement + def readFiles(folderPath: String): Seq[YourTypedRow] = Seq.emptyspark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala (2)
18-19:sparkval unusedBuilt session isn’t referenced afterwards. Either use it or drop it.
- val spark = SparkSessionBuilder.build(f"node-batch-${metadata.name}") + SparkSessionBuilder.build(f"node-batch-${metadata.name}") // returns implicit session
22-30: Typos in log stringsExtra
)at end of messages.- logger.info(s"Processing range $range)") + logger.info(s"Processing range $range") ... - logger.info(s"Wrote range $range)") + logger.info(s"Wrote range $range")spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala (1)
76-84: Double compute may waste minutesTest runs
computeJoinAndSavethen reruns same join viaBatchNodeRunner; second run isn’t needed for assertion.- UnionJoin.computeJoinAndSave(joinConf, dateRange) - val batchNodeRunner = new BatchNodeRunner() + val batchNodeRunner = new BatchNodeRunner() @@ - batchNodeRunner.run(joinConf.metaData, joinNodeContent, dateRange, tableUtils) + batchNodeRunner.run(joinConf.metaData, joinNodeContent, dateRange, tableUtils) // either/or
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala(0 hunks)spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/batch/NodeRunner.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala(1 hunks)
💤 Files with no reviewable changes (1)
- api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala
⏰ Context from checks skipped due to timeout of 90000ms (17)
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_commons_tests
- GitHub Check: service_tests
- GitHub Check: streaming_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: api_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: flink_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: online_tests
- GitHub Check: join_tests
- GitHub Check: aggregator_tests
- GitHub Check: fetcher_tests
- GitHub Check: batch_tests
- GitHub Check: spark_tests
- GitHub Check: enforce_triggered_workflows
| class BatchNodeRunner extends NodeRunner { | ||
|
|
||
| override def run(metadata: MetaData, conf: NodeContent, range: PartitionRange, tableUtils: TableUtils): Unit = | ||
| conf.getSetField match { | ||
| case NodeContent._Fields.MONOLITH_JOIN => { | ||
| val monolithJoin = conf.getMonolithJoin | ||
| require(monolithJoin.isSetJoin, "MonolithJoinNode must have a join set") | ||
| val spark = SparkSessionBuilder.build(f"node-batch-${metadata.name}") | ||
| val joinConf = monolithJoin.join | ||
| val joinName = joinConf.metaData.name | ||
|
|
||
| if (tableUtils.sparkSession.conf.get("spark.chronon.join.backfill.mode.skewFree", "false").toBoolean) { | ||
| logger.info(s" >>> Running join backfill in skew free mode <<< ") | ||
|
|
||
| logger.info(s"Filling partitions for join:$joinName, partitions:[${range.start}, ${range.end}]") | ||
|
|
||
| logger.info(s"Processing range $range)") | ||
| UnionJoin.computeJoinAndSave(joinConf, range)(tableUtils) | ||
| logger.info(s"Wrote range $range)") | ||
|
|
||
| } | ||
|
|
||
| val join = new Join( | ||
| joinConf, | ||
| range.end, | ||
| tableUtils | ||
| ) | ||
|
|
||
| val df = join.computeJoin(overrideStartPartition = Option(range.start)) | ||
|
|
||
| df.show(numRows = 3, truncate = 0, vertical = true) | ||
| logger.info(s"\nShowing three rows of output above.\nQuery table `${joinName}` for more.\n") | ||
|
|
||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Session never stopped
SparkSessionBuilder.build starts a new session but run never calls stop, leaking executors.
@@
- SparkSessionBuilder.build(f"node-batch-${metadata.name}")
+ val spark = SparkSessionBuilder.build(f"node-batch-${metadata.name}")
@@
logger.info(s"\nShowing three rows of output above.\nQuery table `${joinName}` for more.\n")
+
+ spark.stop()Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala between
lines 11 and 44, the SparkSession created by SparkSessionBuilder.build is never
stopped, causing resource leaks. To fix this, ensure that after all processing
with the SparkSession is complete, you call spark.stop() to properly release
resources. Add spark.stop() at the end of the run method or use a try-finally
block to guarantee the session is stopped even if exceptions occur.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala (1)
54-56: SparkSession never stoppedPrevious review still applies → executors leak. Wrap the public
runintry … finallyand callspark.stop().- run(metadata, conf, range, tableUtils(metadata.name)) + val spark = SparkSessionBuilder.build(s"batch-node-runner-${metadata.name}") + try { + val utils = TableUtils(spark) + run(metadata, conf, range, utils) + } finally { + spark.stop() + }
🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala (1)
32-34: Log string typoExtra closing
)muddles the message.- logger.info(s"Processing range $range)") + logger.info(s"Processing range $range")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala
⏰ Context from checks skipped due to timeout of 90000ms (17)
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: batch_tests
- GitHub Check: fetcher_tests
- GitHub Check: join_tests
- GitHub Check: spark_tests
- GitHub Check: online_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: service_tests
- GitHub Check: service_commons_tests
- GitHub Check: api_tests
- GitHub Check: aggregator_tests
- GitHub Check: flink_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (1)
spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala (1)
61-61: Duplicating dataset doubles row-count silently
itemQueriesDf.union(itemQueriesDf)inflates data while the final assertion only checks equality with the duplicated DF, masking accidental blow-ups. Consider using the original count or explicit deduping.
spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala
Outdated
Show resolved
Hide resolved
| BatchNodeRunner.run(joinConf.metaData, joinNodeContent, dateRange, tableUtils) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Test bypasses public API
Calling the package-private overload hides the Spark leak fixed above. Invoke BatchNodeRunner.run(metadata, content, range) instead.
🤖 Prompt for AI Agents
In spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala at
lines 82 to 83, the test calls a package-private overload of
BatchNodeRunner.run, bypassing the public API and hiding a Spark leak fix.
Replace the call to BatchNodeRunner.run(joinConf.metaData, joinNodeContent,
dateRange, tableUtils) with the public API method BatchNodeRunner.run(metadata,
content, range) to ensure the test uses the intended public interface and
includes the leak fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
api/src/main/scala/ai/chronon/api/planner/TableDependencies.scala (2)
6-6: Prefer non-deprecated CollectionConverters
scala.collection.JavaConverters._is deprecated since 2.13; switch toscala.jdk.CollectionConverters._to avoid deprecation noise.-import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._
11-13: Minor: simplify Option handlingA tiny simplification—using
foldremoves the intermediateOption:- Option(stagingQuery.tableDependencies) - .map(_.asScala.toSeq) - .getOrElse(Seq.empty) + Option(stagingQuery.tableDependencies) + .fold(Seq.empty[api.TableInfo])(_.asScala.toSeq)Not critical, but trims a map/GetOrElse pair.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
api/src/main/scala/ai/chronon/api/planner/TableDependencies.scala(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
api/src/main/scala/ai/chronon/api/planner/TableDependencies.scala (1)
api/src/main/java/ai/chronon/api/thrift/Option.java (1)
Option(25-143)
⏰ Context from checks skipped due to timeout of 90000ms (17)
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: streaming_tests
- GitHub Check: fetcher_tests
- GitHub Check: spark_tests
- GitHub Check: batch_tests
- GitHub Check: service_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_commons_tests
- GitHub Check: api_tests
- GitHub Check: online_tests
- GitHub Check: aggregator_tests
- GitHub Check: flink_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
| import ai.chronon.api | ||
| import ai.chronon.api.PartitionRange | ||
| import ai.chronon.planner.NodeContent | ||
| trait NodeRunner { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this meant to be the same runner to trigger non batch workloads too? If so, we'll probably want range to be optional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah we'll extend for non batch workloads so I'll make it optional. Although right now we use PartitionRange(null, null) to represent something unbounded.
| val joinName = metadata.name | ||
|
|
||
| if (tableUtils.sparkSession.conf.get("spark.chronon.join.backfill.mode.skewFree", "false").toBoolean) { | ||
| logger.info(s" >>> Running join backfill in skew free mode <<< ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably worth including these / similar log lines in the skew join case too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Summary
Checklist
Summary by CodeRabbit