-
Notifications
You must be signed in to change notification settings - Fork 8
Add Driver verb to bulk load GBU data to KV store #172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe pull request introduces a new Changes
Possibly related PRs
Suggested reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms (6)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/Driver.scala (1)
730-746: Add better error recovery.
Currently, one exception disrupts the entire bulk load. Consider partial or iterative handling to avoid data loss.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
spark/src/main/scala/ai/chronon/spark/Driver.scala(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: fetcher_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (4)
spark/src/main/scala/ai/chronon/spark/Driver.scala (4)
717-718: Good introduction of a new subcommand.
The naming is consistent with other subcommands.
719-729: Validate the partition format.
Ensure the provided partition string matches “yyyy-MM-dd” to reduce runtime errors.
948-949: Subcommand integration looks good.
Everything aligns with the established pattern.
995-997: Consistent with existing structure.
No issues spotted, subcommand is properly invoked.
| opt[String](required = true, descr = "Name of the GroupBy that we're triggering this upload for") | ||
|
|
||
| val partitionString: ScallopOption[String] = | ||
| opt[String](required = true, descr = "Partition string (in 'yyyy-MM-dd' format) that we are uploading") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this string supposed to match whatever's configured at:
chronon/spark/src/main/scala/ai/chronon/spark/TableUtils.scala
Lines 71 to 72 in 3aa7369
| private val partitionFormat: String = | |
| sparkSession.conf.get("spark.chronon.partition.format", "yyyy-MM-dd") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah this needs to match what we're writing out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay I think there's some form of this where we can DRY things up but wouldn't block the PR. Thanks for clarifying it!
aca549b to
078d378
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala(2 hunks)spark/src/main/scala/ai/chronon/spark/Driver.scala(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: fetcher_spark_tests
🔇 Additional comments (4)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (2)
336-338: LGTM: Job ID uniqueness improvement.Adding timestamp to job ID prevents conflicts for same-partition exports.
315-315: Verify column family configuration matches TableUtils.The empty field name and "cf" alias should align with the column family configuration in TableUtils.
Also applies to: 323-323
✅ Verification successful
Column family configuration verified ✓
The empty field name and "cf" alias in the export configuration correctly align with BigTableKVStoreImpl's constants:
- ColumnFamilyString = "cf"
- ColumnFamilyQualifierString = "value"
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for column family configuration in TableUtils rg -A 5 'column.*family'Length of output: 26
Script:
#!/bin/bash # Find TableUtils file fd TableUtils # Search for cf or columnFamily patterns rg -i "columnfamily|cf\s*=" # Search for BigTable configuration patterns rg -i "bigtable.*config"Length of output: 2607
spark/src/main/scala/ai/chronon/spark/Driver.scala (2)
717-747: LGTM: Well-structured bulk load implementation.Good error handling and timing metrics. Clear logging of operations.
948-949: LGTM: Clean CLI integration.New command properly integrated into the CLI framework.
Also applies to: 995-997
| val srcOfflineTable: ScallopOption[String] = | ||
| opt[String](required = true, descr = "Name of the source GroupBy Upload table") | ||
|
|
||
| val groupbyName: ScallopOption[String] = | ||
| opt[String](required = true, descr = "Name of the GroupBy that we're triggering this upload for") | ||
|
|
||
| val partitionString: ScallopOption[String] = | ||
| opt[String](required = true, descr = "Partition string (in 'yyyy-MM-dd' format) that we are uploading") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add validation for partition string format.
Ensure the partition string matches 'yyyy-MM-dd' format to prevent runtime errors.
val partitionString: ScallopOption[String] =
opt[String](required = true, descr = "Partition string (in 'yyyy-MM-dd' format) that we are uploading")
+ validate(s => try {
+ java.time.LocalDate.parse(s, java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd"))
+ true
+ } catch {
+ case _: Exception => false
+ })📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| val srcOfflineTable: ScallopOption[String] = | |
| opt[String](required = true, descr = "Name of the source GroupBy Upload table") | |
| val groupbyName: ScallopOption[String] = | |
| opt[String](required = true, descr = "Name of the GroupBy that we're triggering this upload for") | |
| val partitionString: ScallopOption[String] = | |
| opt[String](required = true, descr = "Partition string (in 'yyyy-MM-dd' format) that we are uploading") | |
| val srcOfflineTable: ScallopOption[String] = | |
| opt[String](required = true, descr = "Name of the source GroupBy Upload table") | |
| val groupbyName: ScallopOption[String] = | |
| opt[String](required = true, descr = "Name of the GroupBy that we're triggering this upload for") | |
| val partitionString: ScallopOption[String] = | |
| opt[String](required = true, descr = "Partition string (in 'yyyy-MM-dd' format) that we are uploading") | |
| validate(s => try { | |
| java.time.LocalDate.parse(s, java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd")) | |
| true | |
| } catch { | |
| case _: Exception => false | |
| }) |
|
Actually @piyush-zlai we were chatting earlier today and @david-zlai brought up the topic of how this job is meant to be kicked off. It seems like this will be run straight from the user's laptop, the way it's implemented now. Currently, the way things work is that |
|
Let's chat about this offline - I'll ping you and David |
|
(Discussed offline - run.py will call the submitter which in turn calls Driver with the right verb + params. We can attempt to use the same rails for this bulk load invocation as well) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala (2)
24-26: LGTM! Consider enhancing error message.The fallback mechanism is well implemented.
- .getOrElse(throw new IllegalArgumentException("GCP_PROJECT_ID environment variable not set")) + .getOrElse(throw new IllegalArgumentException("GCP_PROJECT_ID not found in environment or configuration"))
29-31: Consider extracting the credential retrieval pattern.Duplicate pattern with GCP_PROJECT_ID retrieval.
private def getGcpConfig(key: String): String = { sys.env.get(key) .orElse(conf.get(key)) .getOrElse(throw new IllegalArgumentException(s"$key not found in environment or configuration")) }cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (1)
61-76: Consider adding parameter validation.The test would be more robust with validation of the parameters passed to
submit.Add assertions to verify:
- Source table format
- GroupBy name format
- Partition string format
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala(1 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: fetcher_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (1)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (1)
65-65: Verify empty jar list.The empty jar list (
List.empty) might be unintentional.
| println(submittedJobId) | ||
| assertEquals(submittedJobId, "mock-job-id") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix incorrect assertion.
The test creates a real DataprocSubmitter but expects a mock job ID.
Either:
- Use a mocked submitter like in the first test case, or
- Remove the assertion and keep it as a local-only test
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
Outdated
Show resolved
Hide resolved
|
Was able to test this via a submitter test that invokes the driver with the right verb + params. Merging this. |
## Summary Add a verb to the Driver to allow us to bulk load GBU data to the KV store of choice. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update Tested manually using the dummy gbu table (on bq) I used while testing the BigTable kv store code (data.test_gbu) ``` $ export GCP_INSTANCE_ID="zipline-canary-instance" $ export GCP_PROJECT_ID="canary-443022" $ java -cp spark/target/scala-2.12/spark-assembly-0.1.0-SNAPSHOT.jar:/opt/homebrew/Cellar/apache-spark/3.5.4/libexec/jars/* ai.chronon.spark.Driver groupby-upload-bulk-load --online-jar=cloud_gcp/target/scala-2.12/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar --online-class=ai.chronon.integrations.cloud_gcp.GcpApiImpl --src-offline-table=data.test_gbu --groupby-name=quickstart.purchases.v1 --partition-string=2023-11-30 ... Triggering bulk load for GroupBy: quickstart.purchases.v1 for partition: 2023-11-30 from table: data.test_gbu Uploaded GroupByUpload data to KV store for GroupBy: quickstart.purchases.v1; partition: 2023-11-30 in 2 seconds ``` Was also able to test via triggering the submitter test. The upload kicks off this [Spark job](https://console.cloud.google.com/dataproc/jobs/0af24968-51b2-45e7-95da-8a890b094837?region=us-central1&hl=en&inv=1&invt=AbmQ-g&project=canary-443022) and the bulk load succeeds. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added a new command-line subcommand for bulk loading GroupBy data to a key-value store. - Introduced a new option to upload offline GroupBy tables with specified parameters. - **Improvements** - Enhanced the export process for data to BigTable with updated query parameters. - Improved job ID generation for uniqueness during data uploads. - Updated error handling for environment variable retrieval to allow fallback options. - **Tests** - Added a new test case for the `DataprocSubmitter` class related to GBU bulk loading. <!-- end of auto-generated comment: release notes by coderabbit.ai --> Co-authored-by: Thomas Chow <[email protected]>
## Summary Add a verb to the Driver to allow us to bulk load GBU data to the KV store of choice. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update Tested manually using the dummy gbu table (on bq) I used while testing the BigTable kv store code (data.test_gbu) ``` $ export GCP_INSTANCE_ID="zipline-canary-instance" $ export GCP_PROJECT_ID="canary-443022" $ java -cp spark/target/scala-2.12/spark-assembly-0.1.0-SNAPSHOT.jar:/opt/homebrew/Cellar/apache-spark/3.5.4/libexec/jars/* ai.chronon.spark.Driver groupby-upload-bulk-load --online-jar=cloud_gcp/target/scala-2.12/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar --online-class=ai.chronon.integrations.cloud_gcp.GcpApiImpl --src-offline-table=data.test_gbu --groupby-name=quickstart.purchases.v1 --partition-string=2023-11-30 ... Triggering bulk load for GroupBy: quickstart.purchases.v1 for partition: 2023-11-30 from table: data.test_gbu Uploaded GroupByUpload data to KV store for GroupBy: quickstart.purchases.v1; partition: 2023-11-30 in 2 seconds ``` Was also able to test via triggering the submitter test. The upload kicks off this [Spark job](https://console.cloud.google.com/dataproc/jobs/0af24968-51b2-45e7-95da-8a890b094837?region=us-central1&hl=en&inv=1&invt=AbmQ-g&project=canary-443022) and the bulk load succeeds. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added a new command-line subcommand for bulk loading GroupBy data to a key-value store. - Introduced a new option to upload offline GroupBy tables with specified parameters. - **Improvements** - Enhanced the export process for data to BigTable with updated query parameters. - Improved job ID generation for uniqueness during data uploads. - Updated error handling for environment variable retrieval to allow fallback options. - **Tests** - Added a new test case for the `DataprocSubmitter` class related to GBU bulk loading. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Add a verb to the Driver to allow us to bulk load GBU data to the KV store of choice. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update Tested manually using the dummy gbu table (on bq) I used while testing the BigTable kv store code (data.test_gbu) ``` $ export GCP_INSTANCE_ID="zipline-canary-instance" $ export GCP_PROJECT_ID="canary-443022" $ java -cp spark/target/scala-2.12/spark-assembly-0.1.0-SNAPSHOT.jar:/opt/homebrew/Cellar/apache-spark/3.5.4/libexec/jars/* ai.chronon.spark.Driver groupby-upload-bulk-load --online-jar=cloud_gcp/target/scala-2.12/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar --online-class=ai.chronon.integrations.cloud_gcp.GcpApiImpl --src-offline-table=data.test_gbu --groupby-name=quickstart.purchases.v1 --partition-string=2023-11-30 ... Triggering bulk load for GroupBy: quickstart.purchases.v1 for partition: 2023-11-30 from table: data.test_gbu Uploaded GroupByUpload data to KV store for GroupBy: quickstart.purchases.v1; partition: 2023-11-30 in 2 seconds ``` Was also able to test via triggering the submitter test. The upload kicks off this [Spark job](https://console.cloud.google.com/dataproc/jobs/0af24968-51b2-45e7-95da-8a890b094837?region=us-central1&hl=en&inv=1&invt=AbmQ-g&project=canary-443022) and the bulk load succeeds. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added a new command-line subcommand for bulk loading GroupBy data to a key-value store. - Introduced a new option to upload offline GroupBy tables with specified parameters. - **Improvements** - Enhanced the export process for data to BigTable with updated query parameters. - Improved job ID generation for uniqueness during data uploads. - Updated error handling for environment variable retrieval to allow fallback options. - **Tests** - Added a new test case for the `DataprocSubmitter` class related to GBU bulk loading. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Add a verb to the Driver to allow us to bulk load GBU data to the KV store of choice. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update Tested manually using the dummy gbu table (on bq) I used while testing the BigTable kv store code (data.test_gbu) ``` $ export GCP_INSTANCE_ID="zipline-canary-instance" $ export GCP_PROJECT_ID="canary-443022" $ java -cp spark/target/scala-2.12/spark-assembly-0.1.0-SNAPSHOT.jar:/opt/homebrew/Cellar/apache-spark/3.5.4/libexec/jars/* ai.chronon.spark.Driver groupby-upload-bulk-load --online-jar=cloud_gcp/target/scala-2.12/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar --online-class=ai.chronon.integrations.cloud_gcp.GcpApiImpl --src-offline-table=data.test_gbu --groupby-name=quickstart.purchases.v1 --partition-string=2023-11-30 ... Triggering bulk load for GroupBy: quickstart.purchases.v1 for partition: 2023-11-30 from table: data.test_gbu Uploaded GroupByUpload data to KV store for GroupBy: quickstart.purchases.v1; partition: 2023-11-30 in 2 seconds ``` Was also able to test via triggering the submitter test. The upload kicks off this [Spark job](https://console.cloud.google.com/dataproc/jobs/0af24968-51b2-45e7-95da-8a890b094837?region=us-central1&hl=en&inv=1&invt=AbmQ-g&project=canary-443022) and the bulk load succeeds. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added a new command-line subcommand for bulk loading GroupBy data to a key-value store. - Introduced a new option to upload offline GroupBy tables with specified parameters. - **Improvements** - Enhanced the export process for data to BigTable with updated query parameters. - Improved job ID generation for uniqueness during data uploads. - Updated error handling for environment variable retrieval to allow fallback options. - **Tests** - Added a new test case for the `DataprocSubmitter` class related to GBU bulk loading. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Add a verb to the Driver to allow us to bulk load GBU data to the KV store of choice. ## Cheour clientslist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update Tested manually using the dummy gbu table (on bq) I used while testing the BigTable kv store code (data.test_gbu) ``` $ export GCP_INSTANCE_ID="zipline-canary-instance" $ export GCP_PROJECT_ID="canary-443022" $ java -cp spark/target/scala-2.12/spark-assembly-0.1.0-SNAPSHOT.jar:/opt/homebrew/Cellar/apache-spark/3.5.4/libexec/jars/* ai.chronon.spark.Driver groupby-upload-bulk-load --online-jar=cloud_gcp/target/scala-2.12/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar --online-class=ai.chronon.integrations.cloud_gcp.GcpApiImpl --src-offline-table=data.test_gbu --groupby-name=quiour clientsstart.purchases.v1 --partition-string=2023-11-30 ... Triggering bulk load for GroupBy: quiour clientsstart.purchases.v1 for partition: 2023-11-30 from table: data.test_gbu Uploaded GroupByUpload data to KV store for GroupBy: quiour clientsstart.purchases.v1; partition: 2023-11-30 in 2 seconds ``` Was also able to test via triggering the submitter test. The upload kiour clientss off this [Spark job](https://console.cloud.google.com/dataproc/jobs/0af24968-51b2-45e7-95da-8a890b094837?region=us-central1&hl=en&inv=1&invt=AbmQ-g&project=canary-443022) and the bulk load succeeds. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added a new command-line subcommand for bulk loading GroupBy data to a key-value store. - Introduced a new option to upload offline GroupBy tables with specified parameters. - **Improvements** - Enhanced the export process for data to BigTable with updated query parameters. - Improved job ID generation for uniqueness during data uploads. - Updated error handling for environment variable retrieval to allow fallbaour clients options. - **Tests** - Added a new test case for the `DataprocSubmitter` class related to GBU bulk loading. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
Add a verb to the Driver to allow us to bulk load GBU data to the KV store of choice.
Checklist
Tested manually using the dummy gbu table (on bq) I used while testing the BigTable kv store code (data.test_gbu)
Was also able to test via triggering the submitter test. The upload kicks off this Spark job and the bulk load succeeds.
Summary by CodeRabbit
New Features
Improvements
Tests
DataprocSubmitterclass related to GBU bulk loading.