-
Notifications
You must be signed in to change notification settings - Fork 8
feat: use normal spark rails to parse arguments #273
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Basic service scaffolding using Scala Play framework
# Conflicts: # build.sbt # hub/app/controllers/FrontendController.scala # hub/app/views/index.scala.html # hub/public/images/favicon.png
Adds a readme, and keeps the docker container active so the parquet table can be accessed.
Create svelte project and build it using play for deployment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (2)
96-100: Consider edge cases in parsing.
Short inputs or unusual separators might need special handling.
102-112: Spark job builder logic is solid.
If any required fields insparkArgsare missing, consider throwing an error.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala(3 hunks)spark/src/main/scala/org/apache/spark/deploy/VisibleSparkSubmitArguments.scala(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: fetcher_spark_tests
🔇 Additional comments (5)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (4)
16-16: Import looks correct.
65-65: Straightforward spark job call.
92-92: Driver env naming is clean.
94-94: Executor env naming is consistent.spark/src/main/scala/org/apache/spark/deploy/VisibleSparkSubmitArguments.scala (1)
1-3: Inheritance approach is good.
Implementation is succinct, no issues found.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)
92-96: Remove commented codeDead code should be removed rather than commented out.
- /* - private def toSparkDriverEnv(propertyKey: String) = s"spark.yarn.appMasterEnv.$propertyKey" - - private def toSparkExecEnv(propertyKey: String) = s"spark.executorEnv.$propertyKey" - */
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: fetcher_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (3)
97-101: LGTM!Clean implementation with proper attribution to Spark source.
103-113: Verify args format matches Spark's expectations✅ Verification successful
✓ Args format correctly matches Spark's expectations
The implementation extends Spark's official SparkSubmitArguments class, ensuring standard argument parsing.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for examples of SparkSubmit argument usage rg -A 5 "SparkSubmit.*main" # Search for tests using VisibleSparkSubmitArguments fd -e scala -e java . | xargs rg "VisibleSparkSubmitArguments.*test"Length of output: 100
Script:
#!/bin/bash # Find SparkSubmit usage rg "SparkSubmit" -A 5 # Find test files with Spark args fd ".*Test.*scala" | xargs rg "Spark.*args" # Find VisibleSparkSubmitArguments implementation fd -e scala | xargs rg "class VisibleSparkSubmitArguments"Length of output: 1898
12-12: Verify VisibleSparkSubmitArguments availability✅ Verification successful
Import statement is valid and class is available
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check if the class exists in dependencies fd -e scala -e jar . | xargs rg "class VisibleSparkSubmitArguments"Length of output: 255
Script:
#!/bin/bash # Check implementation details cat spark/src/main/scala/org/apache/spark/deploy/VisibleSparkSubmitArguments.scalaLength of output: 224
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)
97-101: LGTM: Well-documented utility method.Good practice documenting the source. Consider upgrading to newer Spark version to use the official utility.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala(3 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala(5 hunks)
✅ Files skipped from review due to trivial changes (1)
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: fetcher_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (2)
12-12: LGTM: Using standard Spark argument parsing.
103-113: LGTM: Clean integration with Spark's argument parsing.Verify that all existing job submissions provide arguments in the expected format.
✅ Verification successful
✓ All job submissions provide arguments in the expected format
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for submit method calls to ensure args are properly formatted rg -A 5 "submit\(" --type scalaLength of output: 6105
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)
242-251: Track TODO for migration to run.py.Consider creating a ticket to track this migration.
Would you like me to create an issue for tracking the migration of argument construction to run.py?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala(5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: fetcher_spark_tests
🔇 Additional comments (4)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (4)
13-13: LGTM: Import for new argument parsing.
93-97: Remove commented out code.Dead code should be removed rather than commented out.
98-102: Add unit tests for stringToSeq.While the implementation is correct, add tests to verify edge cases (null, empty string, single value, multiple values).
104-115: Verify argument parsing behavior.Test with various argument combinations to ensure consistent parsing:
- Mixed options and values
- Quoted arguments
- Special characters
## Summary
- the way tables are created can be different depending on the
underlying storage catalog that we are using. In the case of BigQuery,
we cannot issue a spark sql statement to do this operation. Let's
abstract that out into the `Format` layer for now, but eventually we
will need a `Catalog` abstraction that supports this.
- Try to remove the dependency on a sparksession in `Format`, invert the
dependency with a HOF
## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit
- **New Features**
- Enhanced BigQuery integration with new client parameter
- Added support for more flexible table creation methods
- Improved logging capabilities for table operations
- **Bug Fixes**
- Standardized table creation process across different formats
- Removed unsupported BigQuery table creation operations
- **Refactor**
- Simplified table creation and partition insertion logic
- Updated method signatures to support more comprehensive table
management
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->
---------
Co-authored-by: Thomas Chow <[email protected]>
## Summary - https://app.asana.com/0/1208949807589885/1209206040434612/f - Support explicit bigquery table creation. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update --- - To see the specific tasks where the Asana app for GitHub is being used, see below: - https://app.asana.com/0/0/1209206040434612 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced BigQuery table creation functionality with improved schema and partitioning support. - Streamlined table creation process in Spark's TableUtils. - **Refactor** - Simplified table existence checking logic. - Consolidated import statements for better readability. - Removed unused import in BigQuery catalog test. - Updated import statement in GcpFormatProviderTest for better integration with Spark BigQuery connector. - **Bug Fixes** - Improved error handling for table creation scenarios. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
…obs (#274) ## Summary Changes to avoid runtime errors while running Spark and Flume jobs on the cluster using Dataproc submit for the newly built bazel assembly jars. Successfully ran the DataprocSubmitterTest jobs using the newly built bazel jars for testing. Testing Steps 1. Build the assembly jars ``` bazel build //cloud_gcp:lib_deploy.jar bazel build //flink:assembly_deploy.jar bazel build //flink:kafka-assembly_deploy.jar ``` 2. Copy the jars to gcp account used by our jobs ``` gsutil cp bazel-bin/cloud_gcp/lib_deploy.jar gs://zipline-jars/bazel-cloud-gcp.jar gsutil cp bazel-bin/flink/assembly_deploy.jar gs://zipline-jars/bazel-flink.jar gsutil cp bazel-bin/flink/kafka-assembly_deploy.jar gs://zipline-jars/bazel-flink-kafka.jar ``` 3. Modify the jar paths in the DataprocSubmitterTest file and run the tests ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Added Kafka and Avro support for Flink. - Introduced new Flink assembly and Kafka assembly binaries. - **Dependency Management** - Updated Maven artifact dependencies, including Kafka and Avro. - Excluded specific Hadoop and Spark-related artifacts to prevent runtime conflicts. - **Build Configuration** - Enhanced build rules for Flink and Spark environments. - Improved dependency management to prevent class compatibility issues. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary
The string formatting here was breaking zipline commands when I built a
new wheel:
```
File "/Users/davidhan/zipline/chronon/dev_chronon/lib/python3.11/site-packages/ai/chronon/repo/hub_uploader.py", line 73
print(f"\n\nUploading:\n {"\n".join(diffed_entities.keys())}")
^
SyntaxError: unexpected character after line continuation character
```
## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit
- **Style**
- Improved string formatting and error message construction for better
code readability
- Enhanced log message clarity in upload process
Note: These changes are internal improvements that do not impact
end-user functionality.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary - With #263 we control table creation ourselves. We don't need to rely on indirect writes to then do the table creation (and partitioning) for us, we just simply use the storage API to write directly into the table we created. This should be much more performant and preferred over indirect writes because we don't need to stage data, then load as a temp BQ table, and it uses the BigQuery storage API directly. - Remove configs that are used only for indirect writes ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **Improvements** - Enhanced BigQuery data writing process with more precise configuration options. - Simplified table creation and partition insertion logic. - Improved handling of DataFrame column arrangements during data operations. - **Changes** - Updated BigQuery write method to use a direct writing approach. - Introduced a new option to prevent table creation if it does not exist. - Modified table creation process to be more format-aware. - Streamlined partition insertion mechanism. These updates improve data management and writing efficiency in cloud data processing workflows. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary
- Pull alter table properties functionality into `Format`
## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit
- **New Features**
- Added a placeholder method for altering table properties in BigQuery
format
- Introduced a new method to modify table properties across different
Spark formats
- Enhanced table creation utility to use format-specific property
alteration methods
- **Refactor**
- Improved table creation process by abstracting table property
modifications
- Standardized approach to handling table property changes across
different formats
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
---------
Co-authored-by: Thomas Chow <[email protected]>
## Summary
## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit
- **New Features**
- Added configurable repartitioning option for DataFrame writes.
- Introduced a new configuration setting to control repartitioning
behavior.
- Enhanced test suite with functionality to handle empty DataFrames.
- **Chores**
- Improved code formatting and logging for DataFrame writing process.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->
---------
Co-authored-by: tchow-zlai <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>
) ## Summary we cannot represent absent data in time series with null values because the thrift serializer doesn't allow nulls in list<double> - so we create a magic double value (based on string "chronon") to represent nulls ## Checklist - [x] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Introduced a new constant `magicNullDouble` for handling null value representations - **Bug Fixes** - Improved null value handling in serialization and data processing workflows - **Tests** - Added new test case for `TileDriftSeries` serialization to validate null value management The changes enhance data processing consistency and provide a standardized approach to managing null values across the system. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary This fixes an issue where Infinity/NaN values in drift calculations were causing JSON parse errors in the frontend. These special values are now converted to our standard magic null value (-2.7980863399423856E16) before serialization. ## Checklist - [x] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Bug Fixes** - Enhanced error handling for special double values (infinity and NaN) in data processing - Improved serialization test case to handle null and special values more robustly - **Tests** - Updated test case for `TileDriftSeries` serialization to cover edge cases with special double values <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Changed return type to seq (backed by ArrayBuffer under the hood). Added unit tests. ## Checklist - [x] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Refactor** - Updated SQL utility methods to return sequences of results instead of optional single results. - Modified result handling in SQL-related utility classes to support multiple result rows. - Enhanced logic to handle cases with no valid inputs in data processing methods. - **Tests** - Updated test cases to accommodate new result handling approach. - Added new test for handling "explode" invocations in SQL select clauses. - Refined handling of null values and special floating-point values in tests. - **Bug Fixes** - Improved error handling and result processing in SQL transformation methods. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary fixed null handling in drift test code. ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Bug Fixes** - Improved null handling in drift series processing to prevent potential runtime exceptions - Enhanced error resilience by safely checking for null values during data aggregation <!-- end of auto-generated comment: release notes by coderabbit.ai -->
) ## Summary - branch protection requires explicitly adding github actions that need to succeed before a user can merge a PR. Some github workflows do not qualify to run for a PR based on path filtering etc. Skipped workflows will still be required to "succeed" for a branch protection rule. This PR adds a blanked workflow that will poll for workflows explicitly triggered to succeed instead. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Chores** - Added a new GitHub Actions workflow to enforce status checks on pull requests. - Reformatted test file for improved code readability without changing functionality. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
1fddc1b to
b9a7c91
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (3)
98-102: Add input validation.Consider handling edge cases like null or malformed input.
private def stringToSeq(commaDelimited: String) = { - Option(commaDelimited).getOrElse("").split(",").map(_.trim()).filter(_.nonEmpty) + Option(commaDelimited).map(_.trim).getOrElse("").split(",").map(_.trim()).filter(_.nonEmpty) }
242-242: Track the TODO as a separate issue.Moving code to run.py should be tracked and addressed.
Would you like me to create an issue for tracking this TODO?
244-250: Extract configuration to a separate file.Hard-coded arguments should be moved to configuration.
Consider creating a configuration class:
case class SparkJobConfig( mainClass: String = Driver.getClass.getName, requiredArgs: List[String] = List( "--class", "--jars", "--files" ) )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala(5 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala(2 hunks)spark/src/main/scala/org/apache/spark/deploy/VisibleSparkSubmitArguments.scala(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- spark/src/main/scala/org/apache/spark/deploy/VisibleSparkSubmitArguments.scala
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: fetcher_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (3)
13-13: LGTM: Import aligns with PR objective.Using Spark's internal argument parser.
93-97: Remove commented code.Dead code should be removed rather than commented out. Git history preserves it if needed later.
104-115: LGTM: Robust argument parsing implementation.Effectively leverages Spark's internal argument parser for standardized handling.
Summary
use the Apache Spark's internal libraries to interpret args and map them to a dataproc job configuration. This ensures the interface for submission is the same regardless of what infrastructure distribution we use.
Checklist
Summary by CodeRabbit
Refactor
New Features
Bug Fixes