Skip to content

Conversation

@varant-zlai
Copy link
Collaborator

@varant-zlai varant-zlai commented Feb 20, 2025

Summary

Reasons why we need this:

  1. Compute sharing -- no longer have joinPart tables, but now we have a table per (left, groupb_by)
  2. Different job topology
  3. --Left Source Job (persisted as a table)
  4. ----Bootstrap Job (iff there are bootstraps or external parts)
  5. ----Parallelized joinPart jobs
  6. ------JoinPartsJoinJob/MergeJob (minus derivations)
  7. --------DerivationJob (now this is a separate table)

Notes on this PR:

  1. The current "monolothic join" is refactored to use the new JoinPartJob, and BootstrapJob
  2. However, it is not refactored to use Derivations job.

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

  • New Features

    • Introduced new processing jobs to enhance data joining, merging, bootstrap operations, and derivation workflows.
    • Expanded API capabilities with additional configuration options, including a new date range format.
    • Added new structures for job arguments and configurations, improving the flexibility of data processing tasks.
    • Added a new DateRange structure to represent a range of dates.
  • Refactor

    • Restructured data processing and filtering workflows for improved performance and maintainability.
    • Updated import statements to streamline dependencies and improve code organization.
  • Tests

    • Enhanced test coverage to validate the robustness of join, aggregation, and source operations.

These updates provide a more flexible, reliable, and efficient data processing experience for our users.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 20, 2025

Walkthrough

This pull request introduces updates to build configurations and refactors significant join functionality in the Spark framework. A new entry is added to .bazelignore, and several Thrift structs (e.g., SourceWithFilter, SourceJobArgs, etc.) are introduced. The bootstrap logic is extracted from legacy join methods into dedicated job classes (e.g., BootstrapJob, JoinPartJob, JoinDerivationJob, and MergeJob), while SourceJob now handles skew filtering during data ingestion. Numerous import statements are updated to consolidate dependencies (e.g., PartitionRange and TsUtils), and relevant tests and scripts are adjusted accordingly.

Changes

File(s) Change Summary
.bazelignore Added .git entry to be ignored by Bazel.
api/thrift/orchestration.thrift, api/thrift/hub.thrift, api/thrift/common.thrift Added new Thrift structs and updated dateRange fields to use common.DateRange; removed local DateRange and added a new common one.
spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala, Join.scala, JoinBase.scala, JoinPartJob.scala, SourceJob.scala, JoinDerivationJob.scala, MergeJob.scala Introduced new job classes for bootstrap, join parts, derivation, and merge operations; restructured join logic to delegate processing to these classes.
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala Added methods for skew filtering and handling unfilled records; updated method signature for bloom filter generation.
Test files (e.g., ModularJoinTest.scala, DerivationTest.scala, LogBootstrapTest.scala, TableBootstrapTest.scala) Added and updated tests for modular join, derivation, and bootstrap logic; adjusted metadata names.
Aggregator files (e.g., HopsAggregator.scala, SawtoothAggregator.scala, NaiveAggregator.scala, etc.) Consolidated and updated import statements; removed unused imports for streamlined dependencies.
API files (e.g., Extensions.scala, DataRange.scala, TsUtils.scala) Updated package declarations and consolidated imports.
Online & orchestration files (e.g., OnlineDerivationUtil.scala, DependencyResolver.scala, various test files) Updated import statements to source partitioning utilities from ai.chronon.api.
Script files (build_and_upload_artifacts.sh, run_zipline_quickstart.sh) Updated Python version commands, branch names, and formatting adjustments.
Spark files (e.g., Analyzer.scala, GroupBy.scala, LabelJoin.scala, StagingQuery.scala, TableUtils.scala, etc.) Changed import paths from ai.chronon.online to ai.chronon.api for standardized dependency management.

Sequence Diagram(s)

sequenceDiagram
    participant J as Join
    participant BJ as BootstrapJob
    participant JP as JoinPartJob
    participant JD as JoinDerivationJob
    participant MJ as MergeJob
    participant TU as TableUtils

    J->>BJ: Initialize with BootstrapJobArgs
    BJ->>TU: Scan left DataFrame & compute bootstrap table
    BJ-->>J: Return bootstrap DataFrame
    J->>JP: For each join part, run job with context
    JP->>TU: Compute right table join details
    JP-->>J: Return join-part results
    alt Join Derivation
      J->>JD: Initiate derivation job with args
      JD->>TU: Retrieve base and true left DataFrames
      JD-->>J: Return derived result
    end
    J->>MJ: If merging required, run MergeJob with args
    MJ->>TU: Retrieve and join right parts
    MJ-->>J: Return merged DataFrame
Loading

Suggested reviewers

  • piyush-zlai
  • nikhil-zlai
  • david-zlai
  • chewy-zlai

Poem

In lines of code, a modular art,
New jobs emerge, each playing its part.
Joins are reformed with elegant precision,
Merging data with flawless vision.
Our framework sings a binary tune,
🚀 Code refined under a brilliant moon.
Happy merging and joyful run!

Warning

Review ran into problems

🔥 Problems

GitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository.

Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings.


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (11)
spark/src/main/scala/ai/chronon/spark/SourceJob.scala (1)

14-55: Runtime exception might be harsh.
Consider more descriptive error info.

spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (1)

32-49: Possible naming confusion.
Add logging for output table naming.

spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (2)

38-102: Run method is big.
Use smaller helpers for clarity.


205-315: Nested logic.
Extract repeated steps to simplify read.

spark/src/main/scala/ai/chronon/spark/JoinBase.scala (1)

185-186: Add error handling for bootstrap.
Consider try-catch or fallback to handle computeBootstrapTable failures.

spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (4)

310-310: Parameter rename suggestion.
Rename leftDataModel more descriptively if desired.


444-467: Watch out for large OR clauses.
Big skew lists can cause inefficient queries.


469-489: Potential duplication with skewFilter.
Refactor to avoid repeating similar logic.


498-515: Large dataset note.
Filtering might be slow. Consider partition-based approaches.

spark/src/main/scala/ai/chronon/spark/Join.scala (1)

253-254: Bootstrap job usage.
Add error handling for reliability.

api/thrift/orchestration.thrift (1)

273-276: Add documentation for SourceWithFilter struct.

Document the purpose of the struct and its fields.

+/**
+ * Represents a source with optional key exclusion filters.
+ * source: The data source to filter
+ * excludeKeys: Map of column names to lists of values to exclude
+ */
 struct SourceWithFilter {
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between c555d89 and cabcdc8.

📒 Files selected for processing (10)
  • .bazelignore (1 hunks)
  • api/thrift/orchestration.thrift (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/Bootstrap/scala.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/DerivationJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Join.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinBase.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (4 hunks)
  • spark/src/main/scala/ai/chronon/spark/SourceJob.scala (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • .bazelignore
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: streaming_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (16)
spark/src/main/scala/ai/chronon/spark/SourceJob.scala (3)

1-11: Imports look fine.


57-66: Potential injection risk.
Ensure inputs are safe before building filter strings.


68-73: Check skew filter usage.
If user-provided, sanitize to prevent unintended SQL injection.

spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (3)

1-27: Imports are simple.


141-174: Null padding logic.
Implementation seems correct.


176-187: Field padding.
No issues found.

spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (3)

1-28: Imports appear standard.


31-37: Context usage is neat.


110-203: Complex flow for unfilledRanges.
Add test coverage for edge cases.

spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (5)

21-21: Import usage checks out.


336-336: Log statement looks fine.


491-497: Check for possible injection.
Ensure values in filter SQL are sanitized if external.


517-531: Small mode logic is clear.


533-537: parseSkewKeys is straightforward.

spark/src/main/scala/ai/chronon/spark/Join.scala (1)

332-355: Check concurrency.
Multiple joins in parallel may need careful coordination.

orchestration/src/main/scala/ai/chronon/orchestration/Bootstrap/scala.scala (1)

29-31: Well-structured case class with clear documentation.

Comment on lines 51 to 137
def computeBootstrapTable(leftDf: DataFrame,
bootstrapInfo: BootstrapInfo,
tableProps: Map[String, String] = null): DataFrame = {

def validateReservedColumns(df: DataFrame, table: String, columns: Seq[String]): Unit = {
val reservedColumnsContained = columns.filter(df.schema.fieldNames.contains)
assert(
reservedColumnsContained.isEmpty,
s"Table $table contains columns ${reservedColumnsContained.prettyInline} which are reserved by Chronon."
)
}

val startMillis = System.currentTimeMillis()

// verify left table does not have reserved columns
validateReservedColumns(leftDf, join.left.table, Seq(Constants.BootstrapHash, Constants.MatchedHashes))

tableUtils
.unfilledRanges(bootstrapTable, range, skipFirstHole = skipFirstHole)
.getOrElse(Seq())
.foreach(unfilledRange => {
val parts = Option(join.bootstrapParts)
.map(_.toScala)
.getOrElse(Seq())

val initDf = leftDf
.prunePartition(unfilledRange)
// initialize an empty matched_hashes column for the purpose of later processing
.withColumn(Constants.MatchedHashes, typedLit[Array[String]](null))

val joinedDf = parts.foldLeft(initDf) { case (partialDf, part) =>
logger.info(s"\nProcessing Bootstrap from table ${part.table} for range $unfilledRange")

val bootstrapRange = if (part.isSetQuery) {
unfilledRange.intersect(PartitionRange(part.startPartition, part.endPartition))
} else {
unfilledRange
}
if (!bootstrapRange.valid) {
logger.info(s"partition range of bootstrap table ${part.table} is beyond unfilled range")
partialDf
} else {
var bootstrapDf =
tableUtils.scanDf(part.query,
part.table,
Some(Map(tableUtils.partitionColumn -> null)),
range = Some(bootstrapRange))

// attach semantic_hash for either log or regular table bootstrap
validateReservedColumns(bootstrapDf, part.table, Seq(Constants.BootstrapHash, Constants.MatchedHashes))
if (bootstrapDf.columns.contains(Constants.SchemaHash)) {
bootstrapDf = bootstrapDf.withColumn(Constants.BootstrapHash, col(Constants.SchemaHash))
} else {
bootstrapDf = bootstrapDf.withColumn(Constants.BootstrapHash, lit(part.semanticHash))
}

// include only necessary columns. in particular,
// this excludes columns that are NOT part of Join's output (either from GB or external source)
val includedColumns = bootstrapDf.columns
.filter(bootstrapInfo.fieldNames ++ part.keys(join, tableUtils.partitionColumn)
++ Seq(Constants.BootstrapHash, tableUtils.partitionColumn))
.sorted

bootstrapDf = bootstrapDf
.select(includedColumns.map(col): _*)
// TODO: allow customization of deduplication logic
.dropDuplicates(part.keys(join, tableUtils.partitionColumn).toArray)

coalescedJoin(partialDf, bootstrapDf, part.keys(join, tableUtils.partitionColumn))
// as part of the left outer join process, we update and maintain matched_hashes for each record
// that summarizes whether there is a join-match for each bootstrap source.
// later on we use this information to decide whether we still need to re-run the backfill logic
.withColumn(Constants.MatchedHashes, set_add(col(Constants.MatchedHashes), col(Constants.BootstrapHash)))
.drop(Constants.BootstrapHash)
}
}

// include all external fields if not already bootstrapped
val enrichedDf = padExternalFields(joinedDf, bootstrapInfo)

// set autoExpand = true since log table could be a bootstrap part
enrichedDf.save(bootstrapTable, tableProps, autoExpand = true)
})

val elapsedMins = (System.currentTimeMillis() - startMillis) / (60 * 1000)
logger.info(s"Finished computing bootstrap table ${join.metaData.bootstrapTable} in $elapsedMins minutes")

tableUtils.scanDf(query = null, table = bootstrapTable, range = Some(range))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Method is large.
Break into smaller helpers for readability.

Comment on lines 104 to 108
def getCachedLeftDF(): DataFrame = {
// TODO -- hardcoded ds below, namespacing on table, etc
tableUtils.sql(f"SELECT * FROM ${left.getEvents.getTable}_${ThriftJsonCodec.md5Digest(
left)} where ds BETWEEN ${dateRange.start} AND ${dateRange.end}")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Hardcoded query.
May cause confusion. Parameterize it.

Comment on lines 20 to 25
def fromGroupBy(groupBy: api.GroupBy, dateRange: PartitionRange): DerivationJob = {
val baseOutputTable = "TODO" // Output of the base GroupBy pre-derivation
val finalOutputTable = "TODO" // The actual output table
val derivations = groupBy.derivations.asScala
new DerivationJob(baseOutputTable, finalOutputTable, derivations, dateRange)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Replace TODO placeholders with actual table names.

Table names need to be derived from groupBy configuration.

Comment on lines 27 to 32
def fromJoin(join: api.Join, dateRange: PartitionRange): DerivationJob = {
val baseOutputTable = "TODO" // Output of the base Join pre-derivation
val finalOutputTable = "TODO" // The actual output table
val derivations = join.derivations.asScala
new DerivationJob(baseOutputTable, finalOutputTable, derivations, dateRange)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Replace TODO placeholders with actual table names.

Table names need to be derived from join configuration.

Comment on lines 12 to 17
class DerivationJob(sourceTable: String,
outputTable: String,
derivations: Seq[api.Derivation],
dateRange: PartitionRange) {
def run(): Unit = {}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

⚠️ Potential issue

❓ Verification inconclusive

Implement run() method and add parameter documentation.

Empty implementation needs to be filled in.


🏁 Script executed:

#!/bin/bash
# Check if run() is implemented in other similar job classes
ast-grep --pattern 'class $_(.*) {
  $$$
  def run(): Unit = {
    $$$
  }
  $$$
}'

Length of output: 85


Action: Implement run() & add parameter docs

  • The run() method is still empty—please implement its functionality.
  • Add ScalaDoc/in-line docs for all parameters (sourceTable, outputTable, derivations, dateRange).
  • No non-empty implementations were found via AST grep; manually verify consistency with similar jobs.

Comment on lines 14 to 24
def getCoveringRanges(join: api.Join, fullRange: PartitionRange): Seq[BootstrapOrchestrationResult] = {
/*
0. Modify compile to put BootstrapPart directly onto a joinPart if it's schema-covering
1. For each joinPart that has a bootstrapPart
1. a. Get the intersecting range of the bootstrap part table and `fullRange` -- this is existingBootstrapRange
1. b. Get the computable range within the `fullRange` based on source table partitions -- this is computableRange
1. c. create a BootstrapResult with the above two ranges
1. d. for joinParts without an existing or computable range, do nothing
*/
Seq.empty
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implement getCoveringRanges method.

Empty implementation needs to handle bootstrap part intersection and range computation.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (2)

101-128: Consider consolidating duplicate code with leftDf method.

The method shares significant code with leftDf. Consider extracting common logic to reduce duplication.


547-561: Extract magic numbers as constants.

Consider extracting the number 1 in the limit calculation as a constant for better maintainability.

+  private val THRESHOLD_BUFFER = 1
   if (tableUtils.smallModelEnabled) {
-    val thresholdCount = leftDf.limit(Some(tableUtils.smallModeNumRowsCutoff + 1).get).count()
+    val thresholdCount = leftDf.limit(Some(tableUtils.smallModeNumRowsCutoff + THRESHOLD_BUFFER).get).count()
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between cabcdc8 and 5da2eab.

📒 Files selected for processing (1)
  • spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: groupby_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (4)
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (4)

20-21: LGTM: Import changes align with new functionality.


337-342: LGTM: Method signature simplified.

Improved by requiring only the necessary data model parameter.


474-526: LGTM: Well-structured skew filtering implementation.

Good error handling and logging practices.


528-545: LGTM: Clean implementation of utility methods.

Good error handling and appropriate use of Option types.

Also applies to: 563-567

import ai.chronon.api
import ai.chronon.online.PartitionRange

class Bootstrap {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename file?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (7)
spark/src/main/scala/ai/chronon/spark/SourceJob.scala (1)

20-20: Consider using a more specific exception type.
Throwing a generic RuntimeException may obscure error origins. A custom exception can provide clarity.

spark/src/main/scala/ai/chronon/spark/FinalJoinJob.scala (3)

23-46: Gracefully handle partial failures.
Currently, if any join part fails, the process escalates. Consider partial retries or skip logic.


67-71: Replace println with proper logging.
Direct console prints may get lost in production logs.

- println(s"RightDf")
+ logger.debug("RightDf")

166-179: Enhance error feedback.
Throwing joinedDfTry.failed.get loses context. Log or wrap the exception to preserve stack details.

spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (1)

49-76: Ensure consistent reusability.
run reconstructs a context if none is supplied. Confirm no hidden side effects that could affect chaining multiple runs.

spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (1)

59-259: Consider splitting this test into smaller methods.
It's quite large. More granular tests improve readability.

spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (1)

547-561: Extract magic number in logging message.

Replace hardcoded number with the referenced variable:

-          s"Counted greater than ${tableUtils.smallModeNumRowsCutoff} rows, proceeding with normal computation.")
+          s"Counted greater than $thresholdCount rows, proceeding with normal computation.")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 5da2eab and f64a67a.

📒 Files selected for processing (8)
  • spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/FinalJoinJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Join.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinBase.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (5 hunks)
  • spark/src/main/scala/ai/chronon/spark/SourceJob.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: join_tests
  • GitHub Check: spark_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (12)
spark/src/main/scala/ai/chronon/spark/SourceJob.scala (1)

26-28: Validate potential empty sequence.
If source.dataModel != Events, ensure no unintended behavior when timeProjection is empty.

spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (2)

30-36: Keep context minimal.
The case class holds multiple optional fields. Validate usage to avoid confusion or uninitialized states.


92-141: Double-check table reachability logic.
Only returning None if tableUtils.tableReachable(partTable) is false might skip needed merges. Ensure this path is correct.

spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (1)

253-258: Add a final assertion.
Currently, there's no check on the final joined table's correctness. Consider verifying its results.

spark/src/main/scala/ai/chronon/spark/JoinBase.scala (1)

185-186: Handle potential errors from bootstrap creation.
Wrap in a try/catch or verify table existence to avoid runtime failures.

spark/src/main/scala/ai/chronon/spark/Join.scala (3)

253-254: Validate bootstrap job usage.
Check concurrency or table readiness before calling computeBootstrapTable.


313-313: Revisit the small-mode decision logic.
Large datasets might suffer if forced into runSmallMode incorrectly.


332-357: Check skew-key usage.
Ensure skew handling doesn't discard needed records and the mapping is accurate.

spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (4)

20-21: LGTM!

The new imports are well-organized and necessary for the added functionality.

Also applies to: 25-25, 28-29, 31-31, 39-39


337-342: LGTM!

Good refactoring to reduce coupling by accepting only the required leftDataModel parameter.


569-590: LGTM!

Well-documented time range adjustment logic with clear comments explaining different scenarios.


474-526: LGTM!

The new utility methods are well-implemented with appropriate error handling and logging.

Also applies to: 528-545, 563-567, 593-601

Comment on lines +60 to +74
private def formatFilterString(keys: Option[Map[String, Seq[String]]] = None): Option[String] = {
keys.map { keyMap =>
keyMap
.map { case (keyName, values) =>
generateSkewFilterSql(keyName, values)
}
.filter(_.nonEmpty)
.mkString(" OR ")
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Watch for potential SQL injection if values are user-provided.
Concatenate filter strings carefully or consider parameterization.

Comment on lines 49 to 77
private def getRightPartsData(): Seq[(JoinPart, DataFrame)] = {
joinPartsToTables.map { case (joinPart, partTable) =>
val effectiveRange =
if (join.left.dataModel != Entities && joinPart.groupBy.inferredAccuracy == Accuracy.SNAPSHOT) {
range.shift(-1)
} else {
range
}
val wheres = effectiveRange.whereClauses("ds")
val sql = QueryUtils.build(null, partTable, wheres)
logger.info(s"Pulling data from joinPart table with: $sql")
(joinPart, tableUtils.scanDfBase(null, partTable, List.empty, wheres, None))
}.toSeq
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Evaluate performance for multiple table scans.
Looping over joinPartsToTables can be expensive for large inputs. Consider caching or partition-wise reads.

Comment on lines 143 to 250
private def computeJoinPart(leftDfWithStats: Option[DfWithStats],
joinPart: JoinPart,
joinLevelBloomMapOpt: Option[util.Map[String, BloomFilter]],
skipBloom: Boolean): Option[DataFrame] = {

if (leftDfWithStats.isEmpty) {
// happens when all rows are already filled by bootstrap tables
logger.info(s"\nBackfill is NOT required for ${joinPart.groupBy.metaData.name} since all rows are bootstrapped.")
return None
}

val statsDf = leftDfWithStats.get
val rowCount = statsDf.count
val unfilledRange = statsDf.partitionRange

logger.info(
s"\nBackfill is required for ${joinPart.groupBy.metaData.name} for $rowCount rows on range $unfilledRange")
val rightBloomMap = if (skipBloom) {
None
} else {
JoinUtils.genBloomFilterIfNeeded(joinPart, leftDataModel, rowCount, unfilledRange, joinLevelBloomMapOpt)
}

val rightSkewFilter = JoinUtils.partSkewFilter(joinPart, skewKeys)

def genGroupBy(partitionRange: PartitionRange) =
GroupBy.from(joinPart.groupBy,
partitionRange,
tableUtils,
computeDependency = true,
rightBloomMap,
rightSkewFilter,
showDf = showDf)

// all lazy vals - so evaluated only when needed by each case.
lazy val partitionRangeGroupBy = genGroupBy(unfilledRange)

lazy val unfilledTimeRange = {
val timeRange = statsDf.timeRange
logger.info(s"left unfilled time range: $timeRange")
timeRange
}

val leftSkewFilter =
JoinUtils.skewFilter(Option(joinPart.rightToLeft.values.toSeq), skewKeys, joinPart.rightToLeft.values.toSeq)
// this is the second time we apply skew filter - but this filters only on the keys
// relevant for this join part.
println("leftSkewFilter: " + leftSkewFilter)
lazy val skewFilteredLeft = leftSkewFilter
.map { sf =>
val filtered = statsDf.df.filter(sf)
logger.info(s"""Skew filtering left-df for
|GroupBy: ${joinPart.groupBy.metaData.name}
|filterClause: $sf
|""".stripMargin)
filtered
}
.getOrElse(statsDf.df)

/*
For the corner case when the values of the key mapping also exist in the keys, for example:
Map(user -> user_name, user_name -> user)
the below logic will first rename the conflicted column with some random suffix and update the rename map
*/
lazy val renamedLeftRawDf = {
val columns = skewFilteredLeft.columns.flatMap { column =>
if (joinPart.leftToRight.contains(column)) {
Some(col(column).as(joinPart.leftToRight(column)))
} else if (joinPart.rightToLeft.contains(column)) {
None
} else {
Some(col(column))
}
}
skewFilteredLeft.select(columns: _*)
}

lazy val shiftedPartitionRange = unfilledTimeRange.toPartitionRange.shift(-1)

val renamedLeftDf = renamedLeftRawDf.select(renamedLeftRawDf.columns.map {
case c if c == tableUtils.partitionColumn =>
date_format(renamedLeftRawDf.col(c), tableUtils.partitionFormat).as(c)
case c => renamedLeftRawDf.col(c)
}.toList: _*)
val rightDf = (leftDataModel, joinPart.groupBy.dataModel, joinPart.groupBy.inferredAccuracy) match {
case (Entities, Events, _) => partitionRangeGroupBy.snapshotEvents(unfilledRange)
case (Entities, Entities, _) => partitionRangeGroupBy.snapshotEntities
case (Events, Events, Accuracy.SNAPSHOT) =>
genGroupBy(shiftedPartitionRange).snapshotEvents(shiftedPartitionRange)
case (Events, Events, Accuracy.TEMPORAL) =>
genGroupBy(unfilledTimeRange.toPartitionRange).temporalEvents(renamedLeftDf, Some(unfilledTimeRange))

case (Events, Entities, Accuracy.SNAPSHOT) => genGroupBy(shiftedPartitionRange).snapshotEntities

case (Events, Entities, Accuracy.TEMPORAL) =>
// Snapshots and mutations are partitioned with ds holding data between <ds 00:00> and ds <23:59>.
genGroupBy(shiftedPartitionRange).temporalEntities(renamedLeftDf)
}
val rightDfWithDerivations = if (joinPart.groupBy.hasDerivations) {
val finalOutputColumns = joinPart.groupBy.derivationsScala.finalOutputColumn(rightDf.columns)
val result = rightDf.select(finalOutputColumns: _*)
result
} else {
rightDf
}
if (showDf) {
logger.info(s"printing results for joinPart: ${joinPart.groupBy.metaData.name}")
rightDfWithDerivations.prettyPrint()
}
Some(rightDfWithDerivations)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid repeated skew filter.
Both left and right paths apply skew logic. Merge steps if possible to reduce overhead.

Comment on lines 101 to 128
def leftDfFromSource(left: ai.chronon.api.Source,
range: PartitionRange,
tableUtils: TableUtils,
allowEmpty: Boolean = false,
limit: Option[Int] = None,
skewFilter: Option[String]): Option[DataFrame] = {
val timeProjection = if (left.dataModel == Events) {
Seq(Constants.TimeColumn -> Option(left.query).map(_.timeColumn).orNull)
} else {
Seq()
}
var df = tableUtils.scanDf(left.query,
left.table,
Some(Map(tableUtils.partitionColumn -> null) ++ timeProjection),
range = Some(range))
limit.foreach(l => df = df.limit(l))
val result = skewFilter
.map(sf => {
logger.info(s"left skew filter: $sf")
df.filter(sf)
})
.getOrElse(df)
if (!allowEmpty && result.isEmpty) {
logger.info(s"Left side query below produced 0 rows in range $range, and allowEmpty=false.")
return None
}
Some(result)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor to reduce code duplication.

This method duplicates logic from leftDf. Consider extracting common logic into a private helper method.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (5)
spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (1)

1-356: Nice test setup. Consider negative scenarios or edge cases.

spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala (1)

1-91: Straightforward derivation logic. Check edge cases and null handling.

spark/src/main/scala/ai/chronon/spark/SourceJob.scala (1)

18-21: Constructor checks?
Consider validating sourceWithFilter and range to avoid NPEs later.

spark/src/main/scala/ai/chronon/spark/Join.scala (1)

253-254: Bootstrap failure handling.
If computeBootstrapTable fails, consider returning a Try or handling exceptions more gracefully.

spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (1)

596-604: Possible duplication?
Similar padFields logic appears in Join. Consider extracting.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between f64a67a and a446430.

📒 Files selected for processing (9)
  • spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/FinalJoinJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Join.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (5 hunks)
  • spark/src/main/scala/ai/chronon/spark/SourceJob.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala
  • spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala
  • spark/src/main/scala/ai/chronon/spark/FinalJoinJob.scala
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: spark_tests
🔇 Additional comments (16)
spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala (2)

87-88: Check new param usage. Confirm no impact on existing logic.


127-133: Conditional external parts skip. Ensure coverage for both paths.

spark/src/main/scala/ai/chronon/spark/SourceJob.scala (4)

1-13: Looks good.
All imports and basic setup appear fine.


23-61: Run method logic.
Implementation is straightforward. Ensure that throwing on empty DF suits all use cases.


63-72: Watch out for SQL injection.
This concatenates potentially user-provided keys. Past comment still applies.


74-79: Large NOT IN usage.
Big in-lists can degrade performance. Past suggestion to consider smaller subsets or partitioning is still valid.

spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (10)

20-39: Minor import and usage updates.
Changes look benign; no immediate concerns.


101-128: Duplicate logic.
This method resembles leftDf; consider unifying to avoid redundancy.


337-339: Signature change check.
Ensure all calls are updated after replacing joinConf with leftDataModel.


474-497: Skew filter performance.
Repeated note: very large filters degrade performance. Evaluate safer approaches.


499-519: Part skew filtering.
Same caution about large in-lists.


521-526: SQL injection risk.
Reiterating prior concern when building filter strings from raw input.


528-545: Unfilled records check.
Logic is clear. No issues detected.


547-561: Small mode approach.
Behavior is good; just confirm correct threshold.


563-567: Parsing skew keys.
Simple conversion. No problems noted.


569-594: Accurate shifting.
Time range shift logic is correct.

@varant-zlai varant-zlai force-pushed the vz--refactor_spark_modular branch from a446430 to db7abb7 Compare February 25, 2025 01:18
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
spark/src/main/scala/ai/chronon/spark/Join.scala (1)

334-339: Consider extracting job context creation to a factory method.

The context creation could be simplified and made more reusable.

-              val runContext = JoinPartJobContext(unfilledLeftDf,
-                                                  bloomFilterOpt,
-                                                  partTable,
-                                                  leftTimeRangeOpt,
-                                                  tableProps,
-                                                  runSmallMode)
+              val runContext = JoinPartJobContext.create(
+                unfilledLeftDf,
+                bloomFilterOpt,
+                partTable,
+                leftTimeRangeOpt,
+                tableProps,
+                runSmallMode
+              )
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (1)

596-604: Optimize column padding performance.

Consider using select with a column list instead of multiple withColumn calls for better performance.

-    structType.foldLeft(df) { case (df, field) =>
-      if (df.columns.contains(field.name)) {
-        df
-      } else {
-        df.withColumn(field.name, lit(null).cast(field.dataType))
-      }
-    }
+    val existingColumns = df.columns.toSet
+    val allColumns = structType.map(field =>
+      if (existingColumns.contains(field.name))
+        col(field.name)
+      else
+        lit(null).cast(field.dataType).as(field.name)
+    )
+    df.select(allColumns: _*)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between a446430 and db7abb7.

📒 Files selected for processing (13)
  • .bazelignore (1 hunks)
  • api/thrift/orchestration.thrift (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/Bootstrap.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/FinalJoinJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Join.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinBase.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (5 hunks)
  • spark/src/main/scala/ai/chronon/spark/SourceJob.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (8)
  • .bazelignore
  • api/thrift/orchestration.thrift
  • spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala
  • spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala
  • spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala
  • spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala
  • spark/src/main/scala/ai/chronon/spark/FinalJoinJob.scala
  • spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: join_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (7)
orchestration/src/main/scala/ai/chronon/orchestration/Bootstrap.scala (1)

29-31: LGTM! Well-structured case class.

The case class effectively encapsulates the bootstrap orchestration result with clear field names and appropriate types.

spark/src/main/scala/ai/chronon/spark/SourceJob.scala (2)

74-79: Be mindful of large IN-lists.

Large sequences in NOT IN can degrade performance. Consider dynamic partitioning or alternative filtering for huge lists.


63-72: Watch for potential SQL injection if values are user-provided.

Concatenate filter strings carefully or consider parameterization.

spark/src/main/scala/ai/chronon/spark/JoinBase.scala (1)

185-186: LGTM! Good modularization of bootstrap logic.

Moving bootstrap computation to a dedicated class improves code organization and maintainability.

spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (3)

101-128: Refactor to reduce code duplication.

This method duplicates logic from leftDf. Consider extracting common logic into a private helper method.


474-526: Add input validation for skew keys.

The skew filtering logic looks solid, but consider validating that values in skewKeys are non-empty strings to prevent SQL injection.


569-594: Add error handling for time range shifts.

Consider handling edge cases where leftTimeRangeOpt and leftDf are both None.

Comment on lines 14 to 23
def getCoveringRanges(join: api.Join, fullRange: PartitionRange): Seq[BootstrapOrchestrationResult] = {
/*
0. Modify compile to put BootstrapPart directly onto a joinPart if it's schema-covering
1. For each joinPart that has a bootstrapPart
1. a. Get the intersecting range of the bootstrap part table and `fullRange` -- this is existingBootstrapRange
1. b. Get the computable range within the `fullRange` based on source table partitions -- this is computableRange
1. c. create a BootstrapResult with the above two ranges
1. d. for joinParts without an existing or computable range, do nothing
*/
Seq.empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implement the TODO logic in getCoveringRanges.

The method has detailed comments outlining the intended logic but currently returns an empty sequence.

Would you like me to help implement the logic based on the comments?

@varant-zlai varant-zlai force-pushed the vz--refactor_spark_modular branch 2 times, most recently from 67173ba to bdd2105 Compare February 25, 2025 22:49
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (6)
spark/src/main/scala/ai/chronon/spark/FinalJoinJob.scala (2)

53-59: Improve error handling.

Error caught is only printed but not logged at appropriate level. Consider structured logging with context.

-          e.printStackTrace()
+          logger.error(s"Failed to join with error: ${e.getMessage}", e)

180-182: Consider adding metrics.

Add execution metrics before calling explain() to track performance.

     finalDf.explain()
+    metrics.gauge("final_df_row_count", finalDf.count())
     finalDf
spark/src/main/scala/ai/chronon/spark/SourceJob.scala (1)

50-52: Enhance error information.

RuntimeException is too generic. Include query details for troubleshooting.

-      throw new RuntimeException(s"Query produced 0 rows in range $range.")
+      throw new RuntimeException(s"Query produced 0 rows in range $range for source ${source.table}.")
spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (2)

75-75: Address TODO.
There's a placeholder comment; let’s finalize the approach for left df/time range/bloom filter logic.


186-186: Replace println with logger.
Use consistent logging across the codebase.

-    println("leftSkewFilter: " + leftSkewFilter)
+    logger.info(s"leftSkewFilter: $leftSkewFilter")
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (1)

521-526: Potentially large NOT IN query.
If 'values' grows large, consider safer or more efficient filtering strategies.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 67173ba and bdd2105.

📒 Files selected for processing (16)
  • .bazelignore (1 hunks)
  • api/thrift/orchestration.thrift (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/Bootstrap.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/FinalJoinJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Join.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinBase.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (5 hunks)
  • spark/src/main/scala/ai/chronon/spark/SourceJob.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/bootstrap/TableBootstrapTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (10)
  • spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala
  • .bazelignore
  • api/thrift/orchestration.thrift
  • spark/src/test/scala/ai/chronon/spark/test/bootstrap/TableBootstrapTest.scala
  • spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationTest.scala
  • spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala
  • spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala
  • orchestration/src/main/scala/ai/chronon/orchestration/Bootstrap.scala
  • spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala
  • spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: spark_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: non_spark_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (10)
spark/src/main/scala/ai/chronon/spark/FinalJoinJob.scala (1)

64-77: Evaluate performance for multiple table scans.

Looping over joinPartsToTables can be expensive for large inputs. Consider caching or partition-wise reads.

spark/src/main/scala/ai/chronon/spark/SourceJob.scala (2)

63-72: Watch for potential SQL injection if values are user-provided.

Concatenate filter strings carefully or consider parameterization.


74-79: Be mindful of large IN-lists.

Large sequences in NOT IN can degrade performance. Consider dynamic partitioning or alternative filtering for huge lists.

spark/src/main/scala/ai/chronon/spark/JoinBase.scala (2)

313-313: Good refactoring to utility method.

Moving the small mode logic to JoinUtils improves maintainability.


185-186: Clean modularization.

Good use of BootstrapJob abstraction instead of direct method calls.

spark/src/main/scala/ai/chronon/spark/Join.scala (2)

253-255: Appropriate encapsulation.

Good modularization using BootstrapJob.


334-362: Good job context abstraction.

Well-structured JoinPartJobContext and JoinPartJob implementation.

spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (2)

50-50: Hardcoded query is reappearing.
Parameterize this query to minimize confusion.


182-197: Repeated skew filter logic.
Consolidate or merge if possible, as suggested in a past review.

spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (1)

101-128: Code duplication with leftDf.
Extract shared logic into a helper method.

@varant-zlai varant-zlai changed the title WIP -- Refactor spark to create modular jobs for orchestrator Refactor spark to create modular jobs for orchestrator Feb 26, 2025
*/
class JoinDerivationJob(trueLeftTable: String,
baseTable: String,
derivationOutputTable: String,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

standardize names to outputTable

* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take this out

} else {
scanDf
}
dff.schema
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these strays?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (2)

33-324: Test too large
Split into smaller tests for clarity.


78-90: Hardcoded exchange rate
Make the 70 parameterizable for clarity.

spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala (1)

25-86: Possible parallelization
Consider parallel tasks if datasets are large.

spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (1)

139-249: Large data overhead
May need optimization for big joins.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 22819ae and df4ee4e.

📒 Files selected for processing (4)
  • spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/SourceJob.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: groupby_tests
  • GitHub Check: groupby_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: join_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (3)
spark/src/main/scala/ai/chronon/spark/SourceJob.scala (2)

63-72: Potential injection
Sanitize the dynamic string to prevent misuse.


74-79: Hardcoded filter construction
Parameterize to avoid injection.

spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (1)

46-86: Check error handling
Ensure failures are clear if no left data.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (3)

77-77: Remove commented debug code.

-    //println("Rupee Source start partition $month")

89-90: Remove duplicate UDF registration.

            "create temporary function temp_replace_right_b as 'org.apache.hadoop.hive.ql.udf.UDFRegExpReplace'",
-            "create temporary function temp_replace_right_c as 'org.apache.hadoop.hive.ql.udf.UDFRegExpReplace'",
            "create temporary function temp_replace_right_c as 'org.apache.hadoop.hive.ql.udf.UDFRegExpReplace'"

323-323: Fix variable reference in error message.

-      println(s"Diff count: ${diff.count()}")
+      println(s"Diff count: ${finalDiff.count()}")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between df4ee4e and 25df68b.

📒 Files selected for processing (3)
  • api/thrift/orchestration.thrift (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: streaming_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: non_spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (7)
api/thrift/orchestration.thrift (5)

273-276: New struct looks good

Defines a source with optional key exclusion filtering.


278-283: Well-structured job arguments

Encapsulates parameters for source processing job.


285-291: Good parameter encapsulation

Contains all necessary fields for bootstrap job execution.


293-300: Effective merge job configuration

Properly structures join part merging parameters.


302-309: Clean derivation job definition

Enables separation of derivation logic as mentioned in PR.

spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (2)

242-252: Add validation for JoinPartJob output.

Missing validation for the first JoinPartJob similar to other jobs in the test.


253-263: Add validation for second JoinPartJob output.

Missing validation for the second JoinPartJob similar to other jobs in the test.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (7)
spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (1)

33-45: Method logic is straightforward.
Consider better error handling if scanDf fails.

spark/src/main/scala/ai/chronon/spark/MergeJob.scala (1)

79-139: Join logic is neat.
Time partition shift is clever. Watch off-by-one errors.

spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (3)

85-85: Remove commented-out debug code.

This commented println statement should be removed.

-    //println("Rupee Source start partition $month")

96-98: Fix duplicate function registration.

The temp_replace_right_c function is registered twice.

            "create temporary function temp_replace_right_b as 'org.apache.hadoop.hive.ql.udf.UDFRegExpReplace'",
            "create temporary function temp_replace_right_c as 'org.apache.hadoop.hive.ql.udf.UDFRegExpReplace'",
-           "create temporary function temp_replace_right_c as 'org.apache.hadoop.hive.ql.udf.UDFRegExpReplace'"

27-367: Add table cleanup after test completion.

The test creates multiple temporary tables but doesn't clean them up. Consider adding table cleanup in a tearDown method or after test completion.

class ModularJoinTest extends AnyFlatSpec {
+  // List of tables created during test for cleanup
+  private val testTables = scala.collection.mutable.ListBuffer.empty[String]
+  
+  override def afterAll(): Unit = {
+    testTables.foreach(table => spark.sql(s"DROP TABLE IF EXISTS $table"))
+    super.afterAll()
+  }

  // Then add each created table to testTables after creation
spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (2)

47-55: Remove commented code.

Commented parameters that should be removed.

-/*
-leftTable: String,
-                  leftDataModel: DataModel,
-                  joinPart: api.JoinPart,
-                  outputTable: String,
-                  dateRange: PartitionRange,
-                  tableUtils: TableUtils,
-                  skewKeys: Option[Map[String, Seq[String]]] = None,
-                  showDf: Boolean = false
-   */

200-200: Replace println with logger.

Use logger instead of println for consistency.

-println("leftSkewFilter: " + leftSkewFilter)
+logger.debug(s"leftSkewFilter: $leftSkewFilter")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 25df68b and 3bacd6d.

📒 Files selected for processing (12)
  • api/src/main/scala/ai/chronon/api/Extensions.scala (2 hunks)
  • api/thrift/hub.thrift (3 hunks)
  • api/thrift/orchestration.thrift (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/GroupBy.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Join.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinBase.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/MergeJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/SourceJob.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • spark/src/main/scala/ai/chronon/spark/GroupBy.scala
⏰ Context from checks skipped due to timeout of 90000ms (14)
  • GitHub Check: streaming_tests
  • GitHub Check: join_tests
  • GitHub Check: streaming_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: groupby_tests
  • GitHub Check: spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (31)
api/src/main/scala/ai/chronon/api/Extensions.scala (2)

24-26: Imports look fine.


1246-1253: Check edge cases.
Consider null or empty dates before constructing the PartitionRange.

api/thrift/hub.thrift (3)

35-35: Type update is consistent.


63-63: Matches orchestration.


113-113: Same DateRange usage.

spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (5)

1-31: Imports and class signature look fine.


47-141: Method is long.
Splitting into smaller helpers would improve clarity.


142-176: Helper method is good.
Ensure contextual fields are appended only when necessary.


177-185: Simple filler logic.
No issues spotted.


187-189: Schema conversion is fine.

spark/src/main/scala/ai/chronon/spark/MergeJob.scala (6)

1-19: Imports are coherent.


22-63: Run method is clear.
Catches exceptions properly.


64-77: Queries look fine.
Log statements help debugging.


141-152: Field padding is straightforward.


154-170: Dropping unwanted contextual fields is good.


172-183: Method organizes columns effectively.

spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala (2)

18-24: Clean class structure with standardized naming.

Class definition follows standard pattern with clear field extraction from args object.


25-86: Well-structured transformations with comprehensive comments.

The run method's logic for handling various column types is clearly documented with detailed comments explaining the transformation process.

spark/src/main/scala/ai/chronon/spark/SourceJob.scala (4)

64-73: SQL injection risk persists.

Watch for potential SQL injection if values are user-provided.


75-80: Performance concern with large NOT IN lists.

Large sequences in NOT IN can degrade performance. Consider using alternative filtering for huge lists.


19-23: Clear initialization of job parameters.

The job cleanly extracts its parameters from the args object.


24-62: Well-structured source materialization logic.

The run method handles time projections, skew filtering, and dataframe saving effectively.

spark/src/main/scala/ai/chronon/spark/JoinBase.scala (2)

187-196: Good modularization of bootstrap logic.

Effectively refactors bootstrap computation to use the dedicated BootstrapJob class, improving code organization.


323-323: Improved readability with utility method.

Extracting small mode logic to JoinUtils simplifies the code.

spark/src/main/scala/ai/chronon/spark/Join.scala (2)

257-266: Bootstrap computation now uses BootstrapJob.

Refactored to use BootstrapJob instead of direct computation, aligning with modular architecture.


346-385: JoinPartJob integration looks good.

Clean integration of the new JoinPartJob with appropriate construction of arguments and context.

spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (3)

89-89: Address TODO comment.

Fix the TODO about left df, left time range, etc.


111-112: Resolve metrics context TODO.

Need to determine whether to pass through metrics context from monolith join.


198-200: Duplicate skew filter application.

Code applies skew filter twice - first in genBloomFilterIfNeeded and then here.

api/thrift/orchestration.thrift (2)

14-17: New DateRange structure looks good.

Simple and reusable structure for specifying date ranges.


312-319: Well-structured JoinPartJobArgs.

Contains all necessary fields for JoinPartJob with optional markers.

@varant-zlai varant-zlai force-pushed the vz--refactor_spark_modular branch from 3bacd6d to 92aa64d Compare February 28, 2025 21:32
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🔭 Outside diff range comments (2)
api/src/main/scala/ai/chronon/api/DataRange.scala (1)

89-97: ⚠️ Potential issue

Method implementation appears incomplete

The partitions method returns a Stream without collecting to Seq as its signature suggests.

  def partitions: Seq[String] = {
    assert(wellDefined, s"Invalid partition range $this")
-    val startx = start
-    val endx = end
-    val pss = partitionSpec
    Stream
      .iterate(start)(partitionSpec.after)
      .takeWhile(_ <= end)
+      .toSeq
  }
spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (1)

27-369: 🛠️ Refactor suggestion

Test is too monolithic

This single test verifies multiple components in one go, making failures hard to diagnose.

Break into multiple focused tests:

  • testSourceJob()
  • testBootstrapJob()
  • testJoinPartJob()
  • testMergeJob()
  • testJoinDerivationJob()

This improves isolation of issues and makes the test suite more maintainable.

♻️ Duplicate comments (1)
spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (1)

48-141: 🛠️ Refactor suggestion

Method is too large

This method handles multiple responsibilities and is quite lengthy.

Break down into smaller methods for better readability:

  • validateInputs()
  • processBootstrapParts()
  • enrichBootstrapTable()
  • saveResults()
🧹 Nitpick comments (7)
spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (1)

1-47: Class structure looks good but needs initialization validation

The BootstrapJob class is well-documented and well-structured, capturing the job's purpose and requirements clearly.

Consider adding validation for all required input parameters in the constructor or early in run().

spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (1)

41-199: Test setup is too large

The test setup contains a lot of boilerplate that obscures the test intent.

Extract repeated setup code into helper methods to improve readability.

spark/src/main/scala/ai/chronon/spark/SourceJob.scala (1)

64-73: Watch large OR chains.
Many OR clauses can slow queries. Consider partitioned filters for efficiency.

api/thrift/orchestration.thrift (1)

14-17: DateRange structure looks fine.
Short usage docs might help newbies.

spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (3)

521-526: Add missing return type.

Method is well-implemented but lacks explicit return type annotation.

-  private def generateSkewFilterSql(key: String, values: Seq[String]): String = {
+  private def generateSkewFilterSql(key: String, values: Seq[String]): String = {

569-594: Add missing return type.

Method lacks explicit return type annotation which should be PartitionRange.

-  def shiftDays(leftDataModel: DataModel,
+  def shiftDays(leftDataModel: DataModel,
                joinPart: JoinPart,
                leftTimeRangeOpt: Option[PartitionRange],
                leftDf: Option[DfWithStats],
-               leftRange: PartitionRange) = {
+               leftRange: PartitionRange): PartitionRange = {

596-603: Potential performance issue with frequent DataFrame modifications.

Consider collecting all missing columns first and adding them in one operation to avoid multiple DataFrame transformations.

def padFields(df: DataFrame, structType: sql.types.StructType): DataFrame = {
-  structType.foldLeft(df) { case (df, field) =>
-    if (df.columns.contains(field.name)) {
-      df
-    } else {
-      df.withColumn(field.name, lit(null).cast(field.dataType))
-    }
-  }
+  val missingFields = structType.filter(field => !df.columns.contains(field.name))
+  if (missingFields.isEmpty) {
+    df
+  } else {
+    missingFields.foldLeft(df) { case (currentDf, field) =>
+      currentDf.withColumn(field.name, lit(null).cast(field.dataType))
+    }
+  }
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 3bacd6d and 92aa64d.

📒 Files selected for processing (45)
  • .bazelignore (1 hunks)
  • aggregator/src/main/scala/ai/chronon/aggregator/windowing/HopsAggregator.scala (1 hunks)
  • aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothAggregator.scala (1 hunks)
  • aggregator/src/test/scala/ai/chronon/aggregator/test/NaiveAggregator.scala (1 hunks)
  • aggregator/src/test/scala/ai/chronon/aggregator/test/SawtoothOnlineAggregatorTest.scala (0 hunks)
  • api/src/main/scala/ai/chronon/api/DataRange.scala (2 hunks)
  • api/src/main/scala/ai/chronon/api/Extensions.scala (2 hunks)
  • api/src/main/scala/ai/chronon/api/TsUtils.scala (1 hunks)
  • api/thrift/hub.thrift (3 hunks)
  • api/thrift/orchestration.thrift (2 hunks)
  • online/src/main/scala/ai/chronon/online/OnlineDerivationUtil.scala (1 hunks)
  • online/src/test/scala/ai/chronon/online/test/DataRangeTest.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/Bootstrap.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/utils/DependencyResolver.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Analyzer.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Extensions.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/GroupBy.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Join.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinBase.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (5 hunks)
  • spark/src/main/scala/ai/chronon/spark/LabelJoin.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/MergeJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/SourceJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/StagingQuery.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/utils/PartitionRunner.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/CompareTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/DataRangeTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/bootstrap/TableBootstrapTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/ChainingFetcherTest.scala (0 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByUploadTest.scala (0 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/join/JoinUtilsTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/streaming/MutationsTest.scala (1 hunks)
💤 Files with no reviewable changes (3)
  • aggregator/src/test/scala/ai/chronon/aggregator/test/SawtoothOnlineAggregatorTest.scala
  • spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByUploadTest.scala
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/ChainingFetcherTest.scala
✅ Files skipped from review due to trivial changes (22)
  • orchestration/src/main/scala/ai/chronon/orchestration/utils/DependencyResolver.scala
  • spark/src/main/scala/ai/chronon/spark/StagingQuery.scala
  • spark/src/test/scala/ai/chronon/spark/test/join/JoinUtilsTest.scala
  • spark/src/test/scala/ai/chronon/spark/test/CompareTest.scala
  • online/src/main/scala/ai/chronon/online/OnlineDerivationUtil.scala
  • spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala
  • aggregator/src/test/scala/ai/chronon/aggregator/test/NaiveAggregator.scala
  • spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala
  • spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala
  • spark/src/test/scala/ai/chronon/spark/test/DataRangeTest.scala
  • spark/src/main/scala/ai/chronon/spark/utils/PartitionRunner.scala
  • spark/src/main/scala/ai/chronon/spark/Analyzer.scala
  • aggregator/src/main/scala/ai/chronon/aggregator/windowing/HopsAggregator.scala
  • api/src/main/scala/ai/chronon/api/TsUtils.scala
  • aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothAggregator.scala
  • spark/src/test/scala/ai/chronon/spark/test/streaming/MutationsTest.scala
  • spark/src/main/scala/ai/chronon/spark/TableUtils.scala
  • spark/src/main/scala/ai/chronon/spark/Extensions.scala
  • online/src/test/scala/ai/chronon/online/test/DataRangeTest.scala
  • spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByTest.scala
  • spark/src/main/scala/ai/chronon/spark/LabelJoin.scala
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala
🚧 Files skipped from review as they are similar to previous changes (11)
  • .bazelignore
  • spark/src/test/scala/ai/chronon/spark/test/bootstrap/TableBootstrapTest.scala
  • spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala
  • api/thrift/hub.thrift
  • orchestration/src/main/scala/ai/chronon/orchestration/Bootstrap.scala
  • spark/src/main/scala/ai/chronon/spark/GroupBy.scala
  • spark/src/main/scala/ai/chronon/spark/MergeJob.scala
  • spark/src/main/scala/ai/chronon/spark/Join.scala
  • spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala
  • spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala
  • spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationTest.scala
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: groupby_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (18)
api/src/main/scala/ai/chronon/api/DataRange.scala (1)

17-17:

❓ Verification inconclusive

Package relocation is good but verify dependencies

Package move from ai.chronon.online to ai.chronon.api aligns with refactoring goals.


🏁 Script executed:

#!/bin/bash
# Check for any remaining references to the old package path
rg "ai\.chronon\.online\.PartitionRange" --type scala

Length of output: 55


Dependency Check: Manual Verification Recommended

  • Package relocation is correctly executed.
  • No references found for ai.chronon.online.PartitionRange in Scala files.
  • Please run a broader check (e.g., rg "ai\.chronon\.online" --type scala) to confirm no residual references remain.
api/src/main/scala/ai/chronon/api/Extensions.scala (1)

1244-1251: Clean extension method implementation

This implicit class neatly extends DateRange with a conversion method to PartitionRange, maintaining consistent API design patterns.

spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (1)

148-176: Clean utility method with good comments

The padExternalFields method is well-documented and handles its task efficiently.

spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala (2)

18-25: Good class structure

The class is properly initialized with clear parameter assignments.


39-63: Good comments explaining complex logic

The extensive comments help understand the column selection logic.

spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (1)

363-363: ⚠️ Potential issue

Incorrect variable in debug output

The variable used for debugging is inconsistent with the one used in the condition.

-      println(s"Diff count: ${diff.count()}")
+      println(s"Diff count: ${finalDiff.count()}")

Likely an incorrect or invalid review comment.

spark/src/main/scala/ai/chronon/spark/SourceJob.scala (2)

24-62: Validate empty DataFrame condition.
Throwing an exception is good for failing fast, but consider logging the query params for easier debugging.


75-80: Risk of SQL injection and large IN-lists.
If key or values come from external sources, sanitize input or parameterize the query. Large NOT IN sets can hurt performance.

spark/src/main/scala/ai/chronon/spark/JoinBase.scala (2)

187-196: Check DateRange validity.
Ensure start/end partitions exist or handle invalid scenarios for smoother bootstrap runs.


322-323: Confirm non-empty DF usage.
.get could fail if leftDf is None. Ensure no empty edge case remains unhandled.

api/thrift/orchestration.thrift (1)

278-319: New job args are clearly defined.
Optional fields are flexible. Ensure callers check for None to avoid runtime errors.

spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (7)

101-128: Code duplication with existing leftDf method.

Duplicates logic from the existing leftDf method. Consider extracting common logic into a private helper method.


337-340: Good refactoring of method parameter.

Changed from joinConf to the more specific leftDataModel, improving clarity.


474-497: Well-implemented skew filter logic.

Good error checking with appropriate assertion message.


499-519: Looks good and follows best practices.

Properly handles null cases with clear logging.


528-545: Clear implementation with proper early returns.

Good handling of edge cases.


547-561: Well structured with helpful logging.

Clearly handles the decision logic with informative logs.


563-567: Concise utility method.

Simple and effective Java-to-Scala conversion.

Comment on lines 91 to 93
val startx = start
val endx = end
val pss = partitionSpec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove unused variables

Variables startx, endx, and pss are defined but never used.

-    val startx = start
-    val endx = end
-    val pss = partitionSpec
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
val startx = start
val endx = end
val pss = partitionSpec

Comment on lines 71 to 77
println(s"LeftDF schema: ${leftDf.schema}")

val initDf = leftDf
// initialize an empty matched_hashes column for the purpose of later processing
.withColumn(Constants.MatchedHashes, typedLit[Array[String]](null))

println(s"InitDF schema: ${initDf.schema}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace println with proper logging

Using println for debugging is not ideal for production code.

Replace println statements with logger.debug() calls:

-    println(s"LeftDF schema: ${leftDf.schema}")
+    logger.debug(s"LeftDF schema: ${leftDf.schema}")

-    println(s"InitDF schema: ${initDf.schema}")
+    logger.debug(s"InitDF schema: ${initDf.schema}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
println(s"LeftDF schema: ${leftDf.schema}")
val initDf = leftDf
// initialize an empty matched_hashes column for the purpose of later processing
.withColumn(Constants.MatchedHashes, typedLit[Array[String]](null))
println(s"InitDF schema: ${initDf.schema}")
logger.debug(s"LeftDF schema: ${leftDf.schema}")
val initDf = leftDf
// initialize an empty matched_hashes column for the purpose of later processing
.withColumn(Constants.MatchedHashes, typedLit[Array[String]](null))
logger.debug(s"InitDF schema: ${initDf.schema}")

Comment on lines +127 to +128
println(s"JoinedDF schema: ${joinedDf.schema}")

// include all external fields if not already bootstrapped
val enrichedDf = padExternalFields(joinedDf, bootstrapInfo)

println(s"EnrichedDF schema: ${enrichedDf.schema}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

More println statements to replace

Additional println statements should be replaced with proper logging.

Replace with logger calls:

-    println(s"JoinedDF schema: ${joinedDf.schema}")
+    logger.debug(s"JoinedDF schema: ${joinedDf.schema}")

-    println(s"EnrichedDF schema: ${enrichedDf.schema}")
+    logger.debug(s"EnrichedDF schema: ${enrichedDf.schema}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
println(s"JoinedDF schema: ${joinedDf.schema}")
// include all external fields if not already bootstrapped
val enrichedDf = padExternalFields(joinedDf, bootstrapInfo)
println(s"EnrichedDF schema: ${enrichedDf.schema}")
logger.debug(s"JoinedDF schema: ${joinedDf.schema}")
// include all external fields if not already bootstrapped
val enrichedDf = padExternalFields(joinedDf, bootstrapInfo)
logger.debug(s"EnrichedDF schema: ${enrichedDf.schema}")

Comment on lines +26 to +87
def run(): Unit = {

val leftDf = tableUtils.scanDf(query = null, table = trueLeftTable, range = Some(dateRange))
val trueLeftCols = leftDf.columns

val baseDf = tableUtils.scanDf(query = null, table = baseTable, range = Some(dateRange))
val valueCols = baseDf.columns.diff(trueLeftCols)

val baseOutputColumns = baseDf.columns.toSet

val projections = derivations.derivationProjection(baseOutputColumns.toSeq)
val projectionsMap = projections.toMap

val finalOutputColumns =
/*
* Loop through all columns in the base join output:
* 1. If it is one of the value columns, then skip it here, and it will be handled later as we loop through
* derived columns again - derivation is a projection from all value columns to desired derived columns
* 2. (see case 2 below) If it is matching one of the projected output columns, then there are 2 subcases
* a. matching with a left column, then we handle the "coalesce" here to make sure left columns show on top
* b. a bootstrapped derivation case, the skip it here, and it will be handled later as
* loop through derivations to perform coalescing
* 3. Else, we keep it in the final output - cases falling here are either (1) key columns, or (2)
* arbitrary columns selected from left.
*/
baseDf.columns.flatMap { c =>
if (valueCols.contains(c)) {
None
} else if (projectionsMap.contains(c)) {
if (trueLeftCols.contains(c)) {
Some(coalesce(col(c), expr(projectionsMap(c))).as(c))
} else {
None
}
} else {
Some(col(c))
}
} ++
/*
* Loop through all clauses in derivation projections:
* 1. (see case 2 above) If it is matching one of the projected output columns, then there are 2 sub-cases
* a. matching with a left column, then we skip since it is handled above
* b. a bootstrapped derivation case (see case 2 below), then we do the coalescing to achieve the bootstrap
* behavior.
* 2. Else, we do the standard projection.
*/
projections
.flatMap { case (name, expression) =>
if (baseOutputColumns.contains(name)) {
if (trueLeftCols.contains(name)) {
None
} else {
Some(coalesce(col(name), expr(expression)).as(name))
}
} else {
Some(expr(expression).as(name))
}
}

baseDf.select(finalOutputColumns: _*).save(outputTable)

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Method is large and complex

The run method handles many responsibilities in one place.

Break into smaller methods:

  • scanInputTables()
  • identifyColumns()
  • generateOutputColumns()
  • applyDerivations()

This will improve readability and maintainability.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (1)

101-128: 🛠️ Refactor suggestion

Code duplication with leftDf method

This method duplicates logic from leftDf. Extract common functionality into a helper method.

+private def scanSourceDf(query: api.Query, 
+                        table: String,
+                        timeProjection: Seq[(String, String)],
+                        range: PartitionRange,
+                        tableUtils: TableUtils,
+                        limit: Option[Int] = None,
+                        skewFilter: Option[String] = None,
+                        allowEmpty: Boolean = false): Option[DataFrame] = {
+  var df = tableUtils.scanDf(query,
+                           table,
+                           Some((Map(tableUtils.partitionColumn -> null) ++ timeProjection).toMap),
+                           range = Some(range))
+  limit.foreach(l => df = df.limit(l))
+  val result = skewFilter
+    .map(sf => {
+      logger.info(s"skew filter: $sf")
+      df.filter(sf)
+    })
+    .getOrElse(df)
+  if (!allowEmpty && result.isEmpty) {
+    logger.info(s"Query produced 0 rows in range $range, and allowEmpty=false.")
+    return None
+  }
+  Some(result)
+}
🧹 Nitpick comments (5)
spark/src/main/scala/ai/chronon/spark/SourceJob.scala (2)

76-81: Consider refactoring null handling logic

Special handling for nulls creates complex conditions. Extract into a dedicated method.

-def generateSkewFilterSql(key: String, values: Seq[String]): String = {
-  val nulls = Seq("null", "Null", "NULL")
-  val nonNullFilters = Some(s"$key NOT IN (${values.filterNot(nulls.contains).mkString(", ")})")
-  val nullFilters = if (values.exists(nulls.contains)) Some(s"$key IS NOT NULL") else None
-  (nonNullFilters ++ nullFilters).mkString(" AND ")
+def generateSkewFilterSql(key: String, values: Seq[String]): String = {
+  val (nulls, nonNulls) = values.partition(v => Seq("null", "Null", "NULL").contains(v))
+  val filters = Seq(
+    if (nonNulls.nonEmpty) Some(s"$key NOT IN (${nonNulls.mkString(", ")})") else None,
+    if (nulls.nonEmpty) Some(s"$key IS NOT NULL") else None
+  ).flatten
+  filters.mkString(" AND ")

52-54: Consider graceful handling for empty results

RuntimeException for empty results seems harsh. Consider returning empty DataFrame instead.

spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (2)

183-197: Remove debug print statement

Replace println with logger.debug for production code.

-    println("leftSkewFilter: " + leftSkewFilter)
+    logger.debug("leftSkewFilter: {}", leftSkewFilter)

89-138: Simplify method parameters

Method has 8 parameters. Consider encapsulating related parameters into a dedicated class.

spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (1)

569-594: Complex conditional logic

Multiple nested conditions make this method hard to follow. Consider table-driven approach.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 92aa64d and e8bca6a.

📒 Files selected for processing (4)
  • spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (9 hunks)
  • spark/src/main/scala/ai/chronon/spark/MergeJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/SourceJob.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • spark/src/main/scala/ai/chronon/spark/MergeJob.scala
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: join_tests
  • GitHub Check: spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: groupby_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (4)
spark/src/main/scala/ai/chronon/spark/SourceJob.scala (2)

65-74: Potential SQL injection risk

Concatenating filter strings directly. SQL injection possible if values are user-provided.


76-81: Performance concern with large NOT IN lists

Large value sets in NOT IN clauses can degrade query performance.

spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (2)

76-76: Resolve TODO comment

Implementation should address this TODO about left df, time range, bloom filter, and small mode args.


183-197: Duplicate skew filter logic

Both left and right paths apply skew logic independently, causing overhead.

Comment on lines +521 to +526
private def generateSkewFilterSql(key: String, values: Seq[String]): String = {
val nulls = Seq("null", "Null", "NULL")
val nonNullFilters = Some(s"$key NOT IN (${values.filterNot(nulls.contains).mkString(", ")})")
val nullFilters = if (values.exists(nulls.contains)) Some(s"$key IS NOT NULL") else None
(nonNullFilters ++ nullFilters).mkString(" AND ")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Duplicated code from SourceJob

This method duplicates logic from SourceJob.generateSkewFilterSql. Extract to a common utility.

Copy link
Contributor

@nikhil-zlai nikhil-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few comments

Comment on lines 91 to 93
val startx = start
val endx = end
val pss = partitionSpec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

3: optional string trackerUrl
4: optional TaskArgs taskArgs
5: optional DateRange dateRange // specific to batch nodes
5: optional orchestration.DateRange dateRange // specific to batch nodes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually move this to common.thrift

// SCD2 = 4,
}

struct DateRange {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above - common.thrift

@@ -0,0 +1,31 @@
package ai.chronon.orchestration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could come in another pr

}
}

println(s"JoinedDF schema: ${joinedDf.schema}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

if (df.columns.contains(field.name)) {
df
} else {
df.withColumn(field.name, lit(null).cast(field.dataType))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't you cherry pick this at some point?

where df.foldLeft(... df.withColumn) -> df.select(... col ...)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm idk, but this is how it currently is in Main. That cherry pick might not have gone thru.

newDf = newDf.withColumn(field.name, lit(null).cast(field.dataType))

def createBootstrapJoin(baseJoin: Join): Join = {
val join = baseJoin.deepCopy()
join.getMetaData.setName("test.user_transaction_features.bootstrap")
join.getMetaData.setName("test.user_transaction_features.bootstrap_copy")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see the exact same string "test.user_transaction_features.bootstrap_copy" below - won't it still interfere?

Suggested change
join.getMetaData.setName("test.user_transaction_features.bootstrap_copy")
join.getMetaData.setName("test.user_transaction_features.bootstrap_log")

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a different test with a different namespace

// Create bootstrap join using base join as template
val bootstrapJoin = baseJoin.deepCopy()
bootstrapJoin.getMetaData.setName("test.user_transaction_features.bootstrap")
bootstrapJoin.getMetaData.setName("test.user_transaction_features.bootstrap_copy")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bootstrapJoin.getMetaData.setName("test.user_transaction_features.bootstrap_copy")
bootstrapJoin.getMetaData.setName("test.user_transaction_features.bootstrap_table")

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this is the join metadata name, it might be confusing for the output of the join to be bootstrap_table.

diff.show()
}
assertEquals(0, diff.count())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good test!

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
api/thrift/orchestration.thrift (1)

278-281: Add documentation for SourceWithFilter.

Add a brief comment explaining its purpose.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between a924d26 and 874d237.

📒 Files selected for processing (1)
  • api/thrift/orchestration.thrift (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: groupby_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: join_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (2)
api/thrift/orchestration.thrift (2)

14-17: Potential duplication with common.thrift.

Consider using common DateRange if available.


283-288: Consistent field numbering pattern is good.

Common fields use 100+ identifiers across all job structures.

Also applies to: 290-296, 298-306, 308-315, 317-326

@varant-zlai varant-zlai force-pushed the vz--refactor_spark_modular branch from 874d237 to 7b6b0e6 Compare March 3, 2025 01:50
Copy link
Contributor

@nikhil-zlai nikhil-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nicely done

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (4)
spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (1)

47-137: 🛠️ Refactor suggestion

Replace println with logger.

Use logger instead of println.

-    println(s"JoinedDF schema: ${joinedDf.schema}")
+    logger.debug(s"JoinedDF schema: ${joinedDf.schema}")

-    println(s"EnrichedDF schema: ${enrichedDf.schema}")
+    logger.debug(s"EnrichedDF schema: ${enrichedDf.schema}")
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (3)

101-128: 🛠️ Refactor suggestion

Refactor duplicate logic.

This method duplicates logic from the leftDf method - extract common code.

Create a private helper method that handles the common functionality and have both public methods call it.


521-526: 🛠️ Refactor suggestion

Duplicated logic from SourceJob.

This method duplicates code from SourceJob.generateSkewFilterSql.

Move this function to a common utility class accessible by both classes.


101-128: 🛠️ Refactor suggestion

Reduce code duplication with leftDf method.

This method duplicates leftDf logic. Extract common code.

+  private def scanSourceDf(source: ai.chronon.api.Source,
+                         range: PartitionRange,
+                         tableUtils: TableUtils,
+                         limit: Option[Int] = None): DataFrame = {
+    val timeProjection = if (source.dataModel == Events) {
+      Seq(Constants.TimeColumn -> Option(source.query).map(_.timeColumn).orNull)
+    } else {
+      Seq()
+    }
+    var df = tableUtils.scanDf(source.query,
+                             source.table,
+                             Some((Map(tableUtils.partitionColumn -> null) ++ timeProjection).toMap),
+                             range = Some(range))
+    limit.foreach(l => df = df.limit(l))
+    df
+  }
+
   def leftDfFromSource(left: ai.chronon.api.Source,
                        range: PartitionRange,
                        tableUtils: TableUtils,
                        allowEmpty: Boolean = false,
                        limit: Option[Int] = None,
                        skewFilter: Option[String]): Option[DataFrame] = {
-    val timeProjection = if (left.dataModel == Events) {
-      Seq(Constants.TimeColumn -> Option(left.query).map(_.timeColumn).orNull)
-    } else {
-      Seq()
-    }
-    var df = tableUtils.scanDf(left.query,
-                               left.table,
-                               Some((Map(tableUtils.partitionColumn -> null) ++ timeProjection).toMap),
-                               range = Some(range))
-    limit.foreach(l => df = df.limit(l))
+    val df = scanSourceDf(left, range, tableUtils, limit)
     val result = skewFilter
       .map(sf => {
         logger.info(s"left skew filter: $sf")
🧹 Nitpick comments (11)
spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (2)

18-24: Typo in comment.

Fixed typo in line 20.

-`SourceJob` output is shared across all joins.s
+`SourceJob` output is shared across all joins.

174-182: Consider using DataFrame.select for better performance.

Replace foldLeft withColumn with select for better performance.

-  private def padFields(df: DataFrame, structType: sql.types.StructType): DataFrame = {
-    structType.foldLeft(df) { case (df, field) =>
-      if (df.columns.contains(field.name)) {
-        df
-      } else {
-        df.withColumn(field.name, lit(null).cast(field.dataType))
-      }
-    }
-  }
+  private def padFields(df: DataFrame, structType: sql.types.StructType): DataFrame = {
+    val existingColumns = df.columns.toSet
+    val allColumns = df.columns ++ structType.fields
+      .filterNot(field => existingColumns.contains(field.name))
+      .map(field => lit(null).cast(field.dataType).as(field.name))
+    df.select(allColumns: _*)
+  }
spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (7)

98-98: Remove debug comment.

Comment about metrics context is a TODO that needs resolution.

- // val partMetrics = Metrics.Context(metrics, joinPart) -- TODO is this metrics context sufficient, or should we pass thru for monolith join?
+ val partMetrics = Metrics.Context(Metrics.Environment.JoinOffline, joinPart.groupBy)

224-237: Add pattern matching explanation.

Complex pattern matching logic needs a multiline comment explaining each case.

Add a comment block explaining each case combination and its purpose before this pattern match.


47-74: Address the hardcoded null query parameter.

The null query parameter passed to scanDf could be problematic.

-      val cachedLeftDf = tableUtils.scanDf(query = null, leftTable, range = Some(dateRange))
+      val cachedLeftDf = tableUtils.scanDf(query = Option.empty, leftTable, range = Some(dateRange))

187-187: Replace println with logger.

Use of println rather than structured logging.

-    println("leftSkewFilter: " + leftSkewFilter)
+    logger.info("leftSkewFilter: " + leftSkewFilter)

185-187: Remove redundant comment or explain better.

The comment about applying skew filter twice is confusing without context.

-    // this is the second time we apply skew filter - but this filters only on the keys
-    // relevant for this join part.
-    println("leftSkewFilter: " + leftSkewFilter)
+    // Apply join part-specific skew filter that targets only relevant keys
+    logger.info("leftSkewFilter: " + leftSkewFilter)

140-143: Concise method parameter grouping.

Method signature with multiple parameters could be easier to read with parameter groups.

-  private def computeJoinPart(leftDfWithStats: Option[DfWithStats],
-                              joinPart: JoinPart,
-                              joinLevelBloomMapOpt: Option[util.Map[String, BloomFilter]],
-                              skipBloom: Boolean): Option[DataFrame] = {
+  private def computeJoinPart(leftDfWithStats: Option[DfWithStats], joinPart: JoinPart)
+                             (joinLevelBloomMapOpt: Option[util.Map[String, BloomFilter]], 
+                              skipBloom: Boolean): Option[DataFrame] = {

224-237: Complex pattern matching logic.

This complex pattern match handles many cases - consider extracting it.

Extract this pattern matching logic into a separate method like getJoinStrategy that returns the appropriate DataFrame based on data models and accuracy.

api/thrift/orchestration.thrift (1)

307-314: Consider field reorganization for consistency.

JoinPartJobArgs has inconsistent field ordering compared to other structs.

 struct JoinPartJobArgs {
-    4: optional string outputTable
-    101: optional map<string, list<string>> skewKeys
+    101: optional string outputTable
+    102: optional map<string, list<string>> skewKeys
 }
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (1)

547-561: Hardcoded logging logic.

Consider extracting log message string to constants.

       if (result) {
-        logger.info(s"Counted $thresholdCount rows, running join in small mode.")
+        logger.info(s"Counted $thresholdCount rows, running join in small mode (limit: ${tableUtils.smallModeNumRowsCutoff}).")
       } else {
-        logger.info(
-          s"Counted greater than ${tableUtils.smallModeNumRowsCutoff} rows, proceeding with normal computation.")
+        logger.info(s"Counted > ${tableUtils.smallModeNumRowsCutoff} rows, proceeding with normal computation.")
       }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 874d237 and 7b6b0e6.

📒 Files selected for processing (47)
  • .bazelignore (1 hunks)
  • aggregator/src/main/scala/ai/chronon/aggregator/windowing/HopsAggregator.scala (1 hunks)
  • aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothAggregator.scala (1 hunks)
  • aggregator/src/test/scala/ai/chronon/aggregator/test/NaiveAggregator.scala (1 hunks)
  • aggregator/src/test/scala/ai/chronon/aggregator/test/SawtoothOnlineAggregatorTest.scala (0 hunks)
  • api/src/main/scala/ai/chronon/api/DataRange.scala (1 hunks)
  • api/src/main/scala/ai/chronon/api/Extensions.scala (2 hunks)
  • api/src/main/scala/ai/chronon/api/TsUtils.scala (1 hunks)
  • api/thrift/common.thrift (1 hunks)
  • api/thrift/hub.thrift (3 hunks)
  • api/thrift/orchestration.thrift (1 hunks)
  • online/src/main/scala/ai/chronon/online/OnlineDerivationUtil.scala (1 hunks)
  • online/src/test/scala/ai/chronon/online/test/DataRangeTest.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/utils/DependencyResolver.scala (1 hunks)
  • scripts/distribution/build_and_upload_artifacts.sh (2 hunks)
  • scripts/distribution/run_zipline_quickstart.sh (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/Analyzer.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Extensions.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/GroupBy.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Join.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinBase.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (9 hunks)
  • spark/src/main/scala/ai/chronon/spark/LabelJoin.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/MergeJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/SourceJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/StagingQuery.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/utils/PartitionRunner.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/CompareTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/DataRangeTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/bootstrap/TableBootstrapTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/ChainingFetcherTest.scala (0 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByUploadTest.scala (0 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/join/JoinUtilsTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/streaming/MutationsTest.scala (1 hunks)
💤 Files with no reviewable changes (3)
  • spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByUploadTest.scala
  • aggregator/src/test/scala/ai/chronon/aggregator/test/SawtoothOnlineAggregatorTest.scala
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/ChainingFetcherTest.scala
🚧 Files skipped from review as they are similar to previous changes (34)
  • .bazelignore
  • api/src/main/scala/ai/chronon/api/DataRange.scala
  • spark/src/test/scala/ai/chronon/spark/test/bootstrap/TableBootstrapTest.scala
  • spark/src/test/scala/ai/chronon/spark/test/CompareTest.scala
  • orchestration/src/main/scala/ai/chronon/orchestration/utils/DependencyResolver.scala
  • spark/src/test/scala/ai/chronon/spark/test/join/JoinUtilsTest.scala
  • spark/src/test/scala/ai/chronon/spark/test/DataRangeTest.scala
  • spark/src/main/scala/ai/chronon/spark/StagingQuery.scala
  • spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala
  • spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala
  • aggregator/src/test/scala/ai/chronon/aggregator/test/NaiveAggregator.scala
  • scripts/distribution/build_and_upload_artifacts.sh
  • spark/src/main/scala/ai/chronon/spark/LabelJoin.scala
  • spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala
  • online/src/main/scala/ai/chronon/online/OnlineDerivationUtil.scala
  • aggregator/src/main/scala/ai/chronon/aggregator/windowing/HopsAggregator.scala
  • scripts/distribution/run_zipline_quickstart.sh
  • spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByTest.scala
  • spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala
  • spark/src/test/scala/ai/chronon/spark/test/streaming/MutationsTest.scala
  • spark/src/main/scala/ai/chronon/spark/TableUtils.scala
  • spark/src/main/scala/ai/chronon/spark/Analyzer.scala
  • online/src/test/scala/ai/chronon/online/test/DataRangeTest.scala
  • spark/src/main/scala/ai/chronon/spark/Extensions.scala
  • spark/src/main/scala/ai/chronon/spark/MergeJob.scala
  • spark/src/main/scala/ai/chronon/spark/GroupBy.scala
  • api/src/main/scala/ai/chronon/api/TsUtils.scala
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala
  • api/thrift/hub.thrift
  • aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothAggregator.scala
  • spark/src/main/scala/ai/chronon/spark/Join.scala
  • spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala
  • spark/src/main/scala/ai/chronon/spark/utils/PartitionRunner.scala
  • spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationTest.scala
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: non_spark_tests
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: spark_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (22)
api/thrift/common.thrift (1)

21-26: DateRange struct looks good.

Clean, simple structure for date ranges.

api/src/main/scala/ai/chronon/api/Extensions.scala (1)

1244-1251: DateRangeOps implementation is clean.

Simple converter to PartitionRange.

spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala (1)

79-129: Conditional external parts handling looks good.

Clean implementation of externalPartsAlreadyIncluded flag.

spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala (1)

26-87: Break into smaller methods.

spark/src/main/scala/ai/chronon/spark/SourceJob.scala (2)

65-74: Potential SQL injection.


76-81: Avoid large IN-lists.

spark/src/main/scala/ai/chronon/spark/JoinBase.scala (2)

186-196: Looks fine.


322-323: Check .get usage.

spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (6)

51-51: Use parameterized query over hardcoded null.

The hardcoded null for query parameter could be confusing and limit flexibility.

- val cachedLeftDf = tableUtils.scanDf(query = null, leftTable, range = Some(dateRange))
+ val cachedLeftDf = tableUtils.scanDf(query = null, leftTable, projections = None, range = Some(dateRange))

183-197: Avoid duplicate skew filtering.

This code applies a skew filter but comment indicates it's the second time - optimize by merging filter operations.

Consider extracting a common skew filter function that can be applied once upstream rather than twice in different places.


24-29: Clean context case class.

Well-structured context class with clear parameters.


31-45: Effective argument extraction.

Cleanly extracts job parameters from args with appropriate type conversions.


76-76: Fix TODO comment.

Unresolved TODO comment should be addressed before finalizing.

Please ensure that the TODO comment is addressed:

// TODO: fix left df and left time range, bloom filter, small mode args

98-99: Fix metrics TODO comment.

Incomplete metrics initialization with unresolved TODO.

-    // val partMetrics = Metrics.Context(metrics, joinPart) -- TODO is this metrics context sufficient, or should we pass thru for monolith join?
-    val partMetrics = Metrics.Context(Metrics.Environment.JoinOffline, joinPart.groupBy)
+    val partMetrics = Metrics.Context(Metrics.Environment.JoinOffline, joinPart.groupBy)
api/thrift/orchestration.thrift (1)

273-276: Structure looks good.

Clear definition with optional fields that follow expected pattern.

spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (7)

338-342: Parameter updated correctly.

Signature change from joinConf to leftDataModel improves function specificity.


474-497: Well-structured skew filter logic.

Clean implementation with good error handling and logging.


569-594: Good documentation.

Thorough comments explaining different scenarios make complex logic clear.


474-497: Well-implemented skew filter.

Cleanly handles skew filtering with good error checking.


521-526: SQL generation handles edge cases.

SQL generation correctly handles both null and non-null values.


596-603: Efficient field padding implementation.

Concise implementation for adding missing columns.


569-594: Comprehensive shiftDays comments.

Excellent explanation of shift scenarios with clear comments.

JoinUtils.skewFilter(Option(joinPart.rightToLeft.values.toSeq), skewKeys, joinPart.rightToLeft.values.toSeq)
// this is the second time we apply skew filter - but this filters only on the keys
// relevant for this join part.
println("leftSkewFilter: " + leftSkewFilter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace println with logger.

Use proper logging instead of println for consistency.

- println("leftSkewFilter: " + leftSkewFilter)
+ logger.info("leftSkewFilter: " + leftSkewFilter)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
println("leftSkewFilter: " + leftSkewFilter)
logger.info("leftSkewFilter: " + leftSkewFilter)

runSmallMode)
}

// TODO: fix left df and left time range, bloom filter, small mode args
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

TODO needs resolution.

Unresolved TODO requires attention before merge.


🏁 Script executed:

#!/bin/bash
# Check if there are other resolved TODOs in the same file
rg -A 2 -B 2 "^    // TODO" --no-filename spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala | grep -v "fix left df" || echo "No other TODOs found"

Length of output: 178


Action Needed: Resolve the Unresolved TODO in JoinPartJob.scala

  • The file still contains an unresolved TODO at line 76:
    // TODO: fix left df and left time range, bloom filter, small mode args
  • Verification shows no other unresolved TODOs in this file.

Please address or remove this TODO before merging.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (4)
spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (4)

84-84: Remove commented out debug code.

-    //println("Rupee Source start partition $month")

261-262: Improve variable naming for clarity.

The variable describes the output table for the join part that doesn't use bootstrap, but this isn't clear from the name.

-    val joinPartOutputTable = joinConf.partOutputTable(jp1) + "_suffix1"
+    val joinPartWithoutBootstrapOutputTable = joinConf.partOutputTable(jp1) + "_suffix1"

279-280: Correct misleading comment.

The comment is duplicated and misleading as it states this is a join part that doesn't have bootstrap, but this was already mentioned in the previous section.

-    // Now run the join part job that *does not* have a bootstrap
+    // Now run the second join part job

40-366: Add test cleanup for created tables.

The test creates multiple tables but doesn't clean them up after execution.

Add this to the end of your test:

// Cleanup
spark.sql(s"DROP TABLE IF EXISTS $dollarTable")
spark.sql(s"DROP TABLE IF EXISTS $rupeeTable")
spark.sql(s"DROP TABLE IF EXISTS $queryTable")
spark.sql(s"DROP TABLE IF EXISTS $bootstrapSourceTable")
spark.sql(s"DROP TABLE IF EXISTS $sourceOutputTable")
spark.sql(s"DROP TABLE IF EXISTS $bootstrapOutputTable")
spark.sql(s"DROP TABLE IF EXISTS $joinPartOutputTable")
spark.sql(s"DROP TABLE IF EXISTS $joinPart2OutputTable")
spark.sql(s"DROP TABLE IF EXISTS $finalJoinOutputTable")
spark.sql(s"DROP TABLE IF EXISTS $derivationOutputTable")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 9e55007 and 04efbc7.

📒 Files selected for processing (1)
  • spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: join_tests
  • GitHub Check: join_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: fetcher_tests
🔇 Additional comments (1)
spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (1)

366-366: Good test!

Comprehensive test covering the entire modular join workflow.

Comment on lines +67 to +69
setups =
Seq("create temporary function temp_replace_right_a as 'org.apache.hadoop.hive.ql.udf.UDFRegExpReplace'")
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify UDF function exists.

The test creates a temporary UDF but doesn't verify it exists in the test environment.


🏁 Script executed:

#!/bin/bash
# Verify that UDFRegExpReplace exists in the codebase or is available in Hive
rg -A 10 "UDFRegExpReplace"

Length of output: 12090


Action: Add UDF verification.
The test only creates the UDF without asserting its proper registration. Please add an assertion (e.g., by executing a test query) to confirm that temp_replace_right_a—and hence UDFRegExpReplace—is registered and callable in the test environment.

Comment on lines +358 to +364
if (finalDiff.count() > 0) {
println(s"Actual count: ${computed.count()}")
println(s"Expected count: ${expected.count()}")
println(s"Diff count: ${diff.count()}")
println("diff result rows")
diff.show()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix variable reference in error reporting.

Using wrong variable diff instead of finalDiff in error reporting.

    if (finalDiff.count() > 0) {
      println(s"Actual count: ${computed.count()}")
      println(s"Expected count: ${expected.count()}")
-      println(s"Diff count: ${diff.count()}")
+      println(s"Diff count: ${finalDiff.count()}")
      println("diff result rows")
-      diff.show()
+      finalDiff.show()
    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (finalDiff.count() > 0) {
println(s"Actual count: ${computed.count()}")
println(s"Expected count: ${expected.count()}")
println(s"Diff count: ${diff.count()}")
println("diff result rows")
diff.show()
}
if (finalDiff.count() > 0) {
println(s"Actual count: ${computed.count()}")
println(s"Expected count: ${expected.count()}")
println(s"Diff count: ${finalDiff.count()}")
println("diff result rows")
finalDiff.show()
}

Comment on lines +96 to +97
"create temporary function temp_replace_right_c as 'org.apache.hadoop.hive.ql.udf.UDFRegExpReplace'"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix duplicate temporary function declaration.

The function temp_replace_right_c is declared twice.

            "create temporary function temp_replace_right_b as 'org.apache.hadoop.hive.ql.udf.UDFRegExpReplace'",
-            "create temporary function temp_replace_right_c as 'org.apache.hadoop.hive.ql.udf.UDFRegExpReplace'",
            "create temporary function temp_replace_right_c as 'org.apache.hadoop.hive.ql.udf.UDFRegExpReplace'"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"create temporary function temp_replace_right_c as 'org.apache.hadoop.hive.ql.udf.UDFRegExpReplace'"
)
"create temporary function temp_replace_right_b as 'org.apache.hadoop.hive.ql.udf.UDFRegExpReplace'",
"create temporary function temp_replace_right_c as 'org.apache.hadoop.hive.ql.udf.UDFRegExpReplace'"

@varant-zlai varant-zlai force-pushed the vz--refactor_spark_modular branch from 04efbc7 to c449039 Compare March 3, 2025 04:11
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (4)
spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (2)

123-123: 🛠️ Refactor suggestion

Replace println with proper logging.

-println(s"JoinedDF schema: ${joinedDf.schema}")
+logger.debug(s"JoinedDF schema: ${joinedDf.schema}")

128-128: 🛠️ Refactor suggestion

Replace println with proper logging.

-println(s"EnrichedDF schema: ${enrichedDf.schema}")
+logger.debug(s"EnrichedDF schema: ${enrichedDf.schema}")
spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (2)

96-97: ⚠️ Potential issue

Fix duplicate temporary function declaration.

-            "create temporary function temp_replace_right_c as 'org.apache.hadoop.hive.ql.udf.UDFRegExpReplace'",
            "create temporary function temp_replace_right_c as 'org.apache.hadoop.hive.ql.udf.UDFRegExpReplace'"

358-364: ⚠️ Potential issue

Fix variable reference in error reporting.

    if (finalDiff.count() > 0) {
      println(s"Actual count: ${computed.count()}")
      println(s"Expected count: ${expected.count()}")
-      println(s"Diff count: ${diff.count()}")
+      println(s"Diff count: ${finalDiff.count()}")
      println("diff result rows")
-      diff.show()
+      finalDiff.show()
    }
🧹 Nitpick comments (6)
spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (1)

179-180: Consider using select instead of foldLeft with withColumn.

-    structType.foldLeft(df) { case (df, field) =>
-      if (df.columns.contains(field.name)) {
-        df
-      } else {
-        df.withColumn(field.name, lit(null).cast(field.dataType))
-      }
-    }
+    val existingColumns = df.columns.toSet
+    val columnsToAdd = structType.fields.filterNot(f => existingColumns.contains(f.name))
+    
+    if (columnsToAdd.isEmpty) {
+      df
+    } else {
+      df.select(
+        df.columns.map(col) ++
+        columnsToAdd.map(f => lit(null).cast(f.dataType).as(f.name)): _*
+      )
+    }
api/thrift/orchestration.thrift (5)

273-276: Add documentation for this struct.

Add descriptive comment to explain purpose and usage of SourceWithFilter.


278-282: Add documentation for SourceJobArgs.

Missing explanatory comment above struct declaration.


284-289: Fix field numbering inconsistency.

Range field is numbered 2 here but 100 in other structs. Standardize field IDs.

struct BootstrapJobArgs {
-    2: optional common.DateRange range
+    100: optional common.DateRange range
}

307-314: Fix field numbering inconsistencies.

outputTable should be 101 and skewKeys should be non-100 series to match pattern in other structs.

struct JoinPartJobArgs {
-    4: optional string outputTable
+    5: optional map<string, list<string>> skewKeys
-    101: optional map<string, list<string>> skewKeys
+    101: optional string outputTable
}

273-314: Add overall documentation for job structures.

Add high-level comment describing relationships between these job structs and how they fit into the orchestration phases described at end of file.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 04efbc7 and c449039.

📒 Files selected for processing (6)
  • api/src/main/scala/ai/chronon/api/Extensions.scala (2 hunks)
  • api/thrift/orchestration.thrift (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Join.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinBase.scala (3 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • spark/src/main/scala/ai/chronon/spark/Join.scala
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: streaming_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (9)
api/src/main/scala/ai/chronon/api/Extensions.scala (2)

24-25: New DateRange import added.

Import added to support the new DateRangeOps implicit class.


1246-1252: LGTM! New DateRangeOps extension adds date range conversion.

Simple extension method to convert DateRange to PartitionRange.

spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (2)

19-24: Good documentation!

Clear explanation of the job's purpose and orchestrator requirements.


48-60: Method is large.

Consider breaking into smaller helpers for readability.

spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (1)

366-366: Good comprehensive test.

Test covers all modular job components thoroughly.

spark/src/main/scala/ai/chronon/spark/JoinBase.scala (4)

20-20: Updated imports to include DateRange.

Import to support the new bootstrap job args.


26-26: Added import for bootstrap job orchestration.

Required for new modular bootstrap job implementation.


182-193: Improved modularity with BootstrapJob.

Replaced inline bootstrap logic with dedicated job class.


319-319: Moved runSmallMode logic to utility class.

Better code organization.

@varant-zlai varant-zlai merged commit 522546b into main Mar 3, 2025
19 checks passed
@varant-zlai varant-zlai deleted the vz--refactor_spark_modular branch March 3, 2025 04:24
@coderabbitai coderabbitai bot mentioned this pull request Jul 3, 2025
4 tasks
@coderabbitai coderabbitai bot mentioned this pull request Oct 31, 2025
4 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants