Skip to content

Conversation

@tchow-zlai
Copy link
Collaborator

@tchow-zlai tchow-zlai commented May 23, 2025

Summary

  • factor out a compute function to work on an input PartitionRange.
  • this is done for two reasons - in preparation for orchestrator based execution and also DataImports for non Spark engine .

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

Summary by CodeRabbit

  • Style
    • Reorganized and consolidated import statements in several files for improved code clarity.
    • Refactored staging query execution logic into a reusable method for streamlined processing.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented May 23, 2025

Warning

Rate limit exceeded

@tchow-zlai has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 1 minutes and 9 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 82b7d0c and 8f14475.

📒 Files selected for processing (3)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/batch/StagingQuery.scala (3 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala (1 hunks)

Walkthrough

This change refactors the staging query execution in the Spark batch workflow by extracting the core logic into a new compute method in StagingQuery.scala. Related method calls in BatchNodeRunner.scala are updated accordingly. Across several files, import statements are reordered and consolidated for clarity, with no impact on logic.

Changes

Cohort / File(s) Change Summary
Spark Staging Query Refactor
spark/src/main/scala/ai/chronon/spark/batch/StagingQuery.scala, spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala
Extracts staging query execution logic into a new compute method; updates BatchNodeRunner to use the new method and parameters.
Spark Import Reordering
spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala, spark/src/main/scala/ai/chronon/spark/LogFlattenerJob.scala
Reorders and consolidates import statements for clarity; no logic or functional changes.
GCP Import Cleanup
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
Removes an unused import and reorders remaining imports; no logic or functional changes.

Sequence Diagram(s)

sequenceDiagram
  participant BatchNodeRunner
  participant StagingQuery
  participant Spark

  BatchNodeRunner->>StagingQuery: compute(range, setups, enableAutoExpand)
  StagingQuery->>Spark: Execute setup SQL (if any)
  StagingQuery->>Spark: Render staging query with partition params
  StagingQuery->>Spark: Run query, save DataFrame to output table
  StagingQuery->>BatchNodeRunner: Log completion
Loading

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~8 minutes

Possibly related PRs

Suggested reviewers

  • nikhil-zlai
  • piyush-zlai

Poem

✨ Code refined, imports aligned,
Staging queries now combined.
Compute method leads the way,
Cleaner paths for work and play.
Reviewers, your time is prime,
Let’s make this change sublime! 🚀

✨ Finishing Touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch tchow/sq-elt

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (3)
spark/src/main/scala/ai/chronon/spark/catalog/Format.scala (1)

111-114: Consider handling empty predicates sequence.

Method works correctly but doesn't handle empty input. Consider returning empty string for empty sequences.

 def andPredicates(predicates: Seq[String]): String = {
+  if (predicates.isEmpty) return ""
   val whereStr = predicates.map(p => s"($p)").mkString(" AND ")
   whereStr
 }
spark/src/main/scala/ai/chronon/spark/ingest/DataImport.scala (1)

5-9: Consider declaring as a trait
A trait gives call-site mix-in flexibility and reads more idiomatically for an interface with no state.

spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (1)

131-132: Name mismatch could confuse
rangeWheres is now a String, but the plural name hints at a Seq. Rename?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3472f74 and e42900c.

📒 Files selected for processing (8)
  • cloud_gcp/BUILD.bazel (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryImport.scala (1 hunks)
  • spark/BUILD.bazel (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/batch/LabelJoinV2.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/catalog/Format.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/ingest/DataImport.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
spark/src/main/scala/ai/chronon/spark/ingest/DataImport.scala (3)
spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (1)
  • sql (289-317)
api/src/main/scala/ai/chronon/api/ParametricMacro.scala (1)
  • api (66-66)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryImport.scala (1)
  • sync (86-143)
spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (2)
spark/src/main/scala/ai/chronon/spark/catalog/Format.scala (3)
  • table (30-45)
  • Format (88-116)
  • andPredicates (111-114)
api/src/main/scala/ai/chronon/api/DataRange.scala (1)
  • whereClauses (78-81)
⏰ Context from checks skipped due to timeout of 90000ms (30)
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: service_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: online_tests
  • GitHub Check: service_tests
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: online_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: api_tests
  • GitHub Check: api_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: flink_tests
  • GitHub Check: flink_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: batch_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: batch_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (6)
spark/src/main/scala/ai/chronon/spark/batch/LabelJoinV2.scala (1)

3-3: Import reorganization looks good.

Clean consolidation of ai.chronon.api imports and logical reordering.

Also applies to: 19-19

spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala (1)

5-5: Import consolidation aligns with codebase standards.

Consistent with similar changes across the project.

Also applies to: 10-11, 16-16, 22-22

cloud_gcp/BUILD.bazel (1)

3-3: New dep looks sane
//spark:ingest_lib correctly wires the new ingest code.

spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (2)

100-106: No action – just a call-site swap
Replacing the local helper with Format.andPredicates keeps behaviour unchanged.


556-558: Good centralisation
Delegating to Format.andPredicates removes duplicate logic.

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryImport.scala (1)

104-113: Double-check export destination
destPrefix relies on spark.sql.catalog.<dataset>.warehouse. Ensure this value is a GCS URI (gs://…); otherwise BigQuery EXPORT DATA will fail.

Comment on lines 95 to 110
val pColOption = getPartitionColumn(providedProject, bqTableId)
val spec = partitionRange.partitionSpec
val whereClauses = Format.andPredicates(partitionRange.whereClauses)
val partitionWheres = if (whereClauses.nonEmpty) s"WHERE ${whereClauses}" else whereClauses
val select = pColOption match {
case Some(nativeCol) if nativeCol.isSystemDefined =>
s"SELECT ${nativeCol.colName} as ${internalBQPartitionCol}, * FROM ${bqFriendlyName} ${partitionWheres}"
case _ => s"SELECT * FROM ${bqFriendlyName} ${partitionWheres}"
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Partition filter may miss native column
partitionRange.whereClauses are built using spec.column; if the BigQuery table partitions on a different (system) column, the filter is ineffective. Consider translating the clauses to the native column when nativeCol.isSystemDefined.

🤖 Prompt for AI Agents
In
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryImport.scala
around lines 95 to 103, the partition filter uses whereClauses built from
spec.column, which may not match the native system-defined partition column,
causing ineffective filtering. To fix this, detect when
nativeCol.isSystemDefined is true and translate or rewrite the whereClauses to
reference the native partition column name instead of the spec.column before
constructing the WHERE clause string. This ensures the filter applies correctly
to the actual partition column used by BigQuery.

Comment on lines 134 to 153
val dfToWrite = pColOption
.map { case (nativeColumn) => // as long as we have a native partition column we'll attempt to rename it.
internalLoad
.withColumnRenamed(internalBQPartitionCol, nativeColumn.colName)
}
.getOrElse(internalLoad)
tableUtils
.insertPartitions(dfToWrite, destinationTableName, partitionColumns = List(spec.column))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential missing partition column in write
insertPartitions is called with partitionColumns = List(spec.column). If nativeCol.colNamespec.column, the write will produce an unpartitioned table. Pass the renamed column instead.

-          .insertPartitions(dfToWrite, destinationTableName, partitionColumns = List(spec.column))
+          .insertPartitions(
+            dfToWrite,
+            destinationTableName,
+            partitionColumns = List(dfToWrite.columns.find(_ == spec.column).getOrElse(nativeColumn.colName))
+          )

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryImport.scala
around lines 134 to 142, the insertPartitions call uses partitionColumns =
List(spec.column), but if the native partition column name differs from
spec.column, the table will not be partitioned correctly. To fix this, pass the
renamed native column name (nativeColumn.colName) as the partition column to
insertPartitions instead of spec.column, ensuring the partitioning matches the
actual column in the DataFrame.

@nikhil-zlai
Copy link
Contributor

i was expecting a "partitionSpec.translate" somewhere.

@tchow-zlai
Copy link
Collaborator Author

i was expecting a "partitionSpec.translate" somewhere.

I'm imagining that this is already translated from the caller. Because we don't have access to a Source object here.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryImport.scala (2)

95-103: Partition filter may miss native column.

partitionRange.whereClauses are built using spec.column; if the BigQuery table partitions on a different (system) column, the filter is ineffective. Consider translating the clauses to the native column when nativeCol.isSystemDefined.


142-150: Potential missing partition column in write.

insertPartitions is called with partitionColumns = List(spec.column). If nativeCol.colNamespec.column, the write will produce an unpartitioned table. Pass the renamed column instead.

🧹 Nitpick comments (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryImport.scala (3)

25-36: Consider making overwrite configurable.

overwrite = false will fail on subsequent exports to the same path.

-    overwrite = false
+    overwrite = true

58-67: Simplify boolean conversion.

Pattern matching can be replaced with direct boolean conversion.

-        isSystemDefined = isSystemDefined match {
-          case "YES" => true
-          case "NO"  => false
-          case _ =>
-            throw new IllegalArgumentException(s"Unknown partition column system definition: ${isSystemDefined}")
-        }
+        isSystemDefined = isSystemDefined == "YES"

86-151: Consider breaking down the sync method.

This method is quite long and handles multiple responsibilities. Consider extracting helper methods for export job execution and data loading.

Example refactor:

override def sync(...): Unit = {
  val (bqTableId, projectId, bqFriendlyName) = prepareTableInfo(sourceTableName)
  val pColOption = getPartitionColumn(projectId, bqTableId)
  val exportPath = executeExport(bqFriendlyName, pColOption, partitionRange, destinationTableName)
  loadAndInsertData(exportPath, pColOption, destinationTableName, partitionRange.partitionSpec)
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e7c8abd and a14f91c.

📒 Files selected for processing (1)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryImport.scala (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (28)
  • GitHub Check: service_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: streaming_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: join_tests
  • GitHub Check: online_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: flink_tests
  • GitHub Check: api_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: spark_tests
  • GitHub Check: batch_tests
  • GitHub Check: service_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: join_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: api_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: online_tests
  • GitHub Check: flink_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: batch_tests
🔇 Additional comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryImport.scala (2)

1-24: Clean setup with proper lazy initialization.

Good use of lazy vals for expensive resources like BigQuery client.


74-84: Clean path generation logic.

Good use of UUID for uniqueness and proper path sanitization.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/ingest/DataImport.scala (1)

38-42: Improve error message clarity.

The error message mentions "format provider" which doesn't match the DataImport context.

-        throw new IllegalArgumentException(
-          s"Failed to instantiate format provider. Please ensure the class is available in the classpath. Error: ${e.getMessage}",
-          e
-        )
+        throw new IllegalArgumentException(
+          s"Failed to instantiate DataImport for engine type ${engineType}. Please ensure the implementation class is available in the classpath. Error: ${e.getMessage}",
+          e
+        )
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 94de242 and b564f4e.

📒 Files selected for processing (11)
  • api/python/ai/chronon/staging_query.py (2 hunks)
  • api/thrift/api.thrift (1 hunks)
  • cloud_gcp/BUILD.bazel (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryImport.scala (1 hunks)
  • spark/BUILD.bazel (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/batch/LabelJoinV2.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/batch/StagingQuery.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/catalog/Format.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/ingest/DataImport.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • cloud_gcp/BUILD.bazel
🚧 Files skipped from review as they are similar to previous changes (6)
  • spark/src/main/scala/ai/chronon/spark/catalog/Format.scala
  • spark/src/main/scala/ai/chronon/spark/batch/LabelJoinV2.scala
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala
  • spark/BUILD.bazel
  • spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryImport.scala
🧰 Additional context used
🧬 Code Graph Analysis (2)
spark/src/main/scala/ai/chronon/spark/batch/StagingQuery.scala (4)
api/python/ai/chronon/staging_query.py (2)
  • EngineType (14-16)
  • StagingQuery (25-152)
api/src/main/scala/ai/chronon/api/ScalaJavaConversions.scala (1)
  • ScalaJavaConversions (6-97)
spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (3)
  • TableUtils (42-598)
  • TableUtils (600-602)
  • sql (289-317)
spark/src/main/scala/ai/chronon/spark/ingest/DataImport.scala (3)
  • DataImport (6-11)
  • DataImport (13-45)
  • from (15-45)
spark/src/main/scala/ai/chronon/spark/ingest/DataImport.scala (2)
api/python/ai/chronon/staging_query.py (1)
  • EngineType (14-16)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryImport.scala (1)
  • sync (86-152)
⏰ Context from checks skipped due to timeout of 90000ms (30)
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: service_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: online_tests
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: service_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: groupby_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: spark_tests
  • GitHub Check: online_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: api_tests
  • GitHub Check: join_tests
  • GitHub Check: api_tests
  • GitHub Check: spark_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: join_tests
  • GitHub Check: flink_tests
  • GitHub Check: flink_tests
  • GitHub Check: batch_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: python_tests
  • GitHub Check: batch_tests
  • GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (6)
api/python/ai/chronon/staging_query.py (1)

35-35: LGTM! Clean API extension.

The exports parameter is correctly added to both the function signature and passed through to the Thrift object constructor.

Also applies to: 147-147

api/thrift/api.thrift (2)

94-95: LGTM! Proper Thrift schema extension.

The exports field addition follows Thrift conventions with appropriate field numbering.


97-101: LGTM! Well-designed Export struct.

Clean struct definition with appropriate optional fields for flexibility.

spark/src/main/scala/ai/chronon/spark/batch/StagingQuery.scala (2)

3-3: LGTM! Import updates are appropriate.

The new imports correctly support the DataImport functionality and local SparkSession usage.

Also applies to: 12-12, 19-19


79-79: Verify export timing in workflow.

Exports run before SQL execution. Ensure this order is correct for your use case - typically data synchronization might happen after processing.

Please confirm that running exports before the staging query execution is the intended workflow. In some cases, you might want to export the results after processing.

spark/src/main/scala/ai/chronon/spark/ingest/DataImport.scala (1)

6-11: LGTM! Clean abstraction design.

The abstract class provides a clear contract for data import implementations with appropriate parameters.

Comment on lines 33 to 47
def runExports(partitionRange: PartitionRange)(implicit sparkSession: SparkSession): Unit = {
val exports = Option(stagingQueryConf.getExports).map(_.toScala).getOrElse(Seq.empty)
exports.foreach((export) =>
DataImport.from(export.getEngineType).sync(export.getSource, export.getDestination, partitionRange))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider error handling for export failures.

The method correctly iterates over exports and calls the DataImport abstraction, but export failures could disrupt the entire staging query process.

Consider wrapping individual export operations in try-catch blocks:

  def runExports(partitionRange: PartitionRange)(implicit sparkSession: SparkSession): Unit = {
    val exports = Option(stagingQueryConf.getExports).map(_.toScala).getOrElse(Seq.empty)
-   exports.foreach((export) =>
-     DataImport.from(export.getEngineType).sync(export.getSource, export.getDestination, partitionRange))
+   exports.foreach { export =>
+     try {
+       DataImport.from(export.getEngineType).sync(export.getSource, export.getDestination, partitionRange)
+       logger.info(s"Successfully completed export from ${export.getSource} to ${export.getDestination}")
+     } catch {
+       case e: Exception =>
+         logger.error(s"Export failed from ${export.getSource} to ${export.getDestination}: ${e.getMessage}", e)
+         throw e
+     }
+   }
  }
🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/batch/StagingQuery.scala around lines
33 to 37, the runExports method currently calls DataImport.sync for each export
without handling potential failures. To fix this, wrap the call to
DataImport.from(...).sync(...) inside a try-catch block to catch exceptions for
each export individually. Log or handle the exceptions appropriately to prevent
one export failure from stopping the entire process.

Comment on lines 26 to 37
val mirror = runtimeMirror(getClass.getClassLoader)
val classSymbol = mirror.staticClass(clazzName)
val classMirror = mirror.reflectClass(classSymbol)

val constructor = classSymbol.primaryConstructor.asMethod
val constructorMirror = classMirror.reflectConstructor(constructor)

val reflected = constructorMirror()
reflected.asInstanceOf[DataImport]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Reflection approach has several risks.

The current reflection implementation makes assumptions about the target class structure and could fail at runtime.

Issues with the current approach:

  1. Assumes parameterless constructor exists
  2. No validation that class implements DataImport
  3. ClassLoader issues in different deployment scenarios
  4. Runtime failures instead of compile-time safety

Consider the registry pattern suggested above or use a more explicit factory approach.

🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/ingest/DataImport.scala around lines 26
to 35, the current reflection-based instantiation assumes a parameterless
constructor, lacks validation that the class implements DataImport, and risks
runtime failures and classloader issues. Replace this reflection approach with a
registry or explicit factory pattern that maps class names to constructors or
instances, ensuring compile-time type safety and validation that the created
object implements DataImport, thereby avoiding runtime reflection errors and
improving reliability.

Comment on lines 15 to 26
def from(engineType: EngineType): DataImport =
try {

val clazzName = engineType match {
case EngineType.BIGQUERY => "ai.chronon.integrations.cloud_gcp.BigQueryImport"
case _ =>
throw new UnsupportedOperationException(
s"Engine type ${engineType} is not supported for Staging Query export"
)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider using a registry pattern instead of hardcoded class names.

The current approach with hardcoded class names is fragile and doesn't scale well.

Consider a registry-based approach:

+object DataImport {
+  private val importProviders = mutable.Map[EngineType, () => DataImport]()
+  
+  def register(engineType: EngineType, provider: () => DataImport): Unit = {
+    importProviders(engineType) = provider
+  }
+  
+  def from(engineType: EngineType): DataImport = {
+    importProviders.get(engineType) match {
+      case Some(provider) => provider()
+      case None => throw new UnsupportedOperationException(
+        s"Engine type ${engineType} is not supported for Staging Query export"
+      )
+    }
+  }

This allows implementations to self-register and avoids reflection entirely.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/ingest/DataImport.scala around lines 15
to 24, replace the hardcoded class name matching for engine types with a
registry pattern. Create a registry map that associates EngineType values with
their corresponding DataImport implementations, allowing new implementations to
self-register. Modify the from method to look up the implementation in this
registry instead of using pattern matching and reflection, improving scalability
and maintainability.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryImport.scala (1)

144-152: Partition column mismatch in DataFrame write.

After renaming the internal column to nativeColumn.colName (line 146), the code still uses spec.column for partitioning (line 151). This creates inconsistency.

-        dfToWrite.save(
-          destinationTableName,
-          partitionColumns = List(spec.column)
-        )
+        dfToWrite.save(
+          destinationTableName,
+          partitionColumns = List(pColOption.map(_.colName).getOrElse(spec.column))
+        )
🧹 Nitpick comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryImport.scala (2)

130-130: Synchronous job execution blocks thread.

waitFor() blocks indefinitely until job completion, which could cause timeouts for large exports.

Consider adding timeout parameter or async handling for better resource management.


81-82: UUID collision risk in concurrent executions.

Multiple concurrent syncs could theoretically generate the same UUID, causing file conflicts.

Consider using timestamp prefix or other collision-resistant identifier.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8d84fa0 and 7d9ddf4.

📒 Files selected for processing (5)
  • api/thrift/api.thrift (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryImport.scala (1 hunks)
  • spark/BUILD.bazel (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/batch/StagingQuery.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/ingest/DataImport.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • api/thrift/api.thrift
  • spark/BUILD.bazel
  • spark/src/main/scala/ai/chronon/spark/batch/StagingQuery.scala
  • spark/src/main/scala/ai/chronon/spark/ingest/DataImport.scala
⏰ Context from checks skipped due to timeout of 90000ms (31)
  • GitHub Check: service_tests
  • GitHub Check: streaming_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: streaming_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: spark_tests
  • GitHub Check: service_tests
  • GitHub Check: spark_tests
  • GitHub Check: online_tests
  • GitHub Check: join_tests
  • GitHub Check: online_tests
  • GitHub Check: flink_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: flink_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: batch_tests
  • GitHub Check: api_tests
  • GitHub Check: api_tests
  • GitHub Check: batch_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: python_tests
  • GitHub Check: scala_compile_fmt_fix

Comment on lines 43 to 46
|SELECT column_name, is_system_defined FROM `${projectId}.${bqTableId.getDataset}.INFORMATION_SCHEMA.COLUMNS`
|WHERE table_name = '${bqTableId.getTable}' AND is_partitioning_column = 'YES'
|
|""".stripMargin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

SQL injection risk in table name interpolation.

Direct string interpolation of bqTableId.getTable could allow injection if table names contain special characters.

Consider parameterized queries or proper escaping of table identifiers.

🤖 Prompt for AI Agents
In
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryImport.scala
around lines 43 to 46, the SQL query directly interpolates the table name using
bqTableId.getTable, which poses a SQL injection risk. To fix this, avoid direct
string interpolation for the table name and instead use parameterized queries if
supported, or properly escape the table identifier to ensure special characters
do not lead to injection vulnerabilities. Implement a safe method to handle the
table name in the query string.

Comment on lines 102 to 105
require(
nativeCol.colName.equals(spec.column),
s"Configured column in the pipeline definition ${spec.column} does not match what's defined on the BigQuery table: ${nativeCol.colName}. "
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Contradictory logic around partition column names.

The require statement enforces that nativeCol.colName equals spec.column, but past comments suggest these can differ for system-defined columns. This may prevent valid use cases.

🤖 Prompt for AI Agents
In
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryImport.scala
around lines 102 to 105, the require statement currently enforces that
nativeCol.colName must equal spec.column, which contradicts previous comments
allowing differences for system-defined partition columns. Modify the logic to
allow nativeCol.colName and spec.column to differ when nativeCol.colName
corresponds to a recognized system-defined partition column, ensuring valid use
cases are not blocked while still validating other columns strictly.

query=query,
startPartition=start_partition,
setups=setups,
exports=exports,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the thrift we seem to be calling them dataImports - either name is cool with me - fewer syllables the better

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to name them imports but it's a bit of an overloaded term (and reserved in code) so left it as dataImports

@tchow-zlai tchow-zlai force-pushed the tchow/sq-elt branch 2 times, most recently from 13ea53b to 4db46de Compare June 3, 2025 18:00
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1c20dc7 and 4db46de.

📒 Files selected for processing (11)
  • api/python/ai/chronon/staging_query.py (2 hunks)
  • api/thrift/api.thrift (1 hunks)
  • cloud_gcp/BUILD.bazel (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryImport.scala (1 hunks)
  • spark/BUILD.bazel (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/batch/LabelJoinV2.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/batch/StagingQuery.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/catalog/Format.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/ingest/DataImport.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • spark/src/main/scala/ai/chronon/spark/batch/LabelJoinV2.scala
🚧 Files skipped from review as they are similar to previous changes (9)
  • spark/src/main/scala/ai/chronon/spark/catalog/Format.scala
  • cloud_gcp/BUILD.bazel
  • spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala
  • spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala
  • spark/src/main/scala/ai/chronon/spark/batch/StagingQuery.scala
  • api/thrift/api.thrift
  • spark/src/main/scala/ai/chronon/spark/ingest/DataImport.scala
  • spark/BUILD.bazel
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryImport.scala
🧰 Additional context used
🪛 Pylint (3.3.7)
api/python/ai/chronon/staging_query.py

[convention] 35-35: Argument name "dataImports" doesn't conform to snake_case naming style

(C0103)

⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: cloud_aws_tests
  • GitHub Check: flink_tests
  • GitHub Check: groupby_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: spark_tests
  • GitHub Check: batch_tests
🔇 Additional comments (1)
api/python/ai/chronon/staging_query.py (1)

151-151: LGTM!

Correctly passes the parameter to the constructor.

Note: Update to use snake_case variable name once the parameter is renamed.

partition_column: Optional[str] = None,
engine_type: Optional[EngineType] = None,
dependencies: Optional[List[Union[TableDependency, Dict]]] = None,
dataImports: Optional[List[ttypes.DataImport]] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix parameter naming and add documentation.

Parameter should use snake_case (data_imports) per Python conventions, and needs documentation in the docstring.

-    dataImports: Optional[List[ttypes.DataImport]] = None,
+    data_imports: Optional[List[ttypes.DataImport]] = None,

Also add parameter documentation around line 95:

+    :param data_imports:
+        List of data import configurations for extracting data from data warehouses.
+    :type data_imports: Optional[List[ttypes.DataImport]]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
dataImports: Optional[List[ttypes.DataImport]] = None,
--- a/api/python/ai/chronon/staging_query.py
+++ b/api/python/ai/chronon/staging_query.py
@@ -35,7 +35,7 @@ def staging_query( # …other params…
- dataImports: Optional[List[ttypes.DataImport]] = None,
+ data_imports: Optional[List[ttypes.DataImport]] = None,
@@ -95,6 +96,9 @@ def staging_query( # docstring start
:param some_other_param: …
+ :param data_imports:
+ List of data import configurations for extracting data from data warehouses.
+ :type data_imports: Optional[List[ttypes.DataImport]]
:return: …
🧰 Tools
🪛 Pylint (3.3.7)

[convention] 35-35: Argument name "dataImports" doesn't conform to snake_case naming style

(C0103)

🤖 Prompt for AI Agents
In api/python/ai/chronon/staging_query.py at line 35, rename the parameter from
dataImports to data_imports to follow Python's snake_case naming convention.
Additionally, update the function's docstring around line 95 to include a clear
description of the data_imports parameter, explaining its type and purpose.

@tchow-zlai tchow-zlai changed the title feat: dataimports abstraction feat: Refactor StagingQuery to operate on a range. Aug 4, 2025
@tchow-zlai tchow-zlai merged commit bc65447 into main Aug 5, 2025
21 checks passed
@tchow-zlai tchow-zlai deleted the tchow/sq-elt branch August 5, 2025 06:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants