-
Notifications
You must be signed in to change notification settings - Fork 9
Add Hudi format #496
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Hudi format #496
Conversation
WalkthroughThe pull request introduces non-functional changes across various modules. It adds a comment noting a potential partition issue in the Python API, updates a JSON customer value from “canary” to “dev”, and expands AWS integration mappings for “dev”. The changes also reorganize imports and formatting for readability, add a utility method for Hive partition parsing, remove a redundant local method, update Hudi-Spark configurations, extend Kryo registrations, and introduce a new test class for Hudi table operations. Changes
Sequence Diagram(s)sequenceDiagram
participant T as HudiTableUtilsTest
participant S as SparkSession
participant TU as TableUtils
participant H as Hudi Framework
T->>S: Initialize Spark session with Hudi configs
T->>TU: Create Hudi table with DataFrame
TU->>H: Write data in Hudi format
H-->>TU: Confirm table creation
TU->>T: Verify table exists in Spark catalog
Suggested reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms (15)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
| case Success(isHudi) => | ||
| logger.info(s"Hudi check: Successfully read the format of table: $tableName as Hudi") | ||
| isHudi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
worked here
�[36m2025/03/12 19:17:14�[m �[32mINFO �[m �[32mDefaultFormatProvider.scala:40�[m - Hudi check: Successfully read the format of table: data.plaid_raw as Hudi
| // Customer specific infra configurations | ||
| private val CustomerToSubnetIdMap = Map( | ||
| "canary" -> "subnet-085b2af531b50db44" | ||
| "canary" -> "subnet-085b2af531b50db44", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we going to need one for plaid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mmmm yes we will. @chewy-zlai , we don't know these because it's on their account huh?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this something we can thread through from teams.json? Or in general can the cluster configuration stuff be passed in from teams.json?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it can be threaded over.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, yeah. The subnet is going to be something we have to get from Daniel as they aren't using the VPC we wanted to setup.
| main_class=main_class, | ||
| ) | ||
| + f" --additional-conf-path=additional-confs.yaml --files={s3_file_args}" | ||
| + f" --additional-conf-path={EMR_MOUNT_FILE_PREFIX}additional-confs.yaml --files={s3_file_args}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doing this for now to fix. cc @chewy-zlai this is what you ran into earlier.
but in a follow up pr, i'm going to move this away to spark.files
| "mode": "backfill", | ||
| "dataproc": False, | ||
| "ds": today, | ||
| "ds": today, # TODO: this breaks if the partition column is not the same as yyyy-MM-dd. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean if the format is not yyyy-MM-dd ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. plaid's date format was yyyyMMdd.
when I set the backfill start date to like 20250216, then the ds here was set to today's date of 2025-02-12. inconsistent formats
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh is this something that's controlled with the spark config?
| sparkSession.conf.get("spark.chronon.partition.format", "yyyy-MM-dd") |
or does this happen even earlier during compliation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
|
||
| import org.apache.spark.sql.SparkSession | ||
|
|
||
| case object Hudi extends Format { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm actaully not sure if we need a new format for HUDI at all, I think we can just use Hive.
| .getString(1) | ||
| assertEquals("hudi", provider) | ||
|
|
||
| tableUtils.insertPartitions(sourceDF, tableName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fails for me here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
cloud_aws/src/main/resources/hudi_spark_confs.yaml (1)
1-3: Add newline at end of file.Missing newline character at end of file per YAML linting rules.
spark.sql.catalog.spark_catalog: "org.apache.spark.sql.hudi.catalog.HoodieCatalog" spark.sql.extensions: "org.apache.spark.sql.hudi.HoodieSparkSessionExtension" spark.chronon.table_write.format: "hudi" +🧰 Tools
🪛 YAMLlint (1.35.1)
[error] 3-3: no new line character at the end of file
(new-line-at-end-of-file)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (4)
cloud_aws/src/main/resources/hudi_spark_confs.yaml(1 hunks)cloud_aws/src/test/scala/ai/chronon/integrations/aws/HudiTableUtilsTest.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala(2 hunks)spark/src/main/scala/ai/chronon/spark/TableUtils.scala(2 hunks)
✅ Files skipped from review due to trivial changes (1)
- spark/src/main/scala/ai/chronon/spark/TableUtils.scala
🧰 Additional context used
🧠 Learnings (1)
spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (1)
Learnt from: nikhil-zlai
PR: zipline-ai/chronon#51
File: spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala:192-200
Timestamp: 2025-03-12T15:28:06.350Z
Learning: Only suggest registering Delta Lake action classes for serialization if they are actually used in the codebase.
🪛 YAMLlint (1.35.1)
cloud_aws/src/main/resources/hudi_spark_confs.yaml
[error] 3-3: no new line character at the end of file
(new-line-at-end-of-file)
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: spark_tests
- GitHub Check: join_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: fetcher_tests
- GitHub Check: non_spark_tests
- GitHub Check: non_spark_tests
- GitHub Check: analyzer_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (2)
spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (2)
93-93: Appropriate addition for Kryo serialization.EmptyList registration helps shuffle operations.
215-216: Good Hudi integration enhancement.Required for proper serialization of Hudi payload classes.
| val back = tableUtils.loadTable(tableName) | ||
| val backSet = back.select("id").as[Int].collect().toSet | ||
| // assertEquals(input, backSet) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix type casting issue in test.
String values ("a"-"d") can't be cast to Int.
val back = tableUtils.loadTable(tableName)
- val backSet = back.select("id").as[Int].collect().toSet
+ val backSet = back.select("id").as[String].collect().toSet
// assertEquals(input, backSet)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| val back = tableUtils.loadTable(tableName) | |
| val backSet = back.select("id").as[Int].collect().toSet | |
| // assertEquals(input, backSet) | |
| val back = tableUtils.loadTable(tableName) | |
| val backSet = back.select("id").as[String].collect().toSet | |
| // assertEquals(input, backSet) |
| val backSet = back.select("id").as[Int].collect().toSet | ||
| // assertEquals(input, backSet) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Uncomment or remove assertion.
Incomplete test - assertion is commented out.
val backSet = back.select("id").as[String].collect().toSet
- // assertEquals(input, backSet)
+ val expectedSet = Set("a", "b", "c", "d")
+ assertEquals(expectedSet, backSet)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| val backSet = back.select("id").as[Int].collect().toSet | |
| // assertEquals(input, backSet) | |
| val backSet = back.select("id").as[String].collect().toSet | |
| val expectedSet = Set("a", "b", "c", "d") | |
| assertEquals(expectedSet, backSet) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
cloud_aws/src/test/scala/ai/chronon/integrations/aws/HudiTableUtilsTest.scala (2)
33-37: Consider diversifying test partition data.All records use same date. Add multiple dates to properly test partitioning.
val source = Seq( ("a", "2025-03-12"), ("b", "2025-03-12"), ("c", "2025-03-12"), ("d", "2025-03-12") )
26-58: Consider adding update/delete tests.Current test only covers basic create/read. Hudi's key features include update and delete operations.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
cloud_aws/src/test/scala/ai/chronon/integrations/aws/HudiTableUtilsTest.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/TableUtils.scala(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- spark/src/main/scala/ai/chronon/spark/TableUtils.scala
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: join_tests
- GitHub Check: analyzer_tests
- GitHub Check: spark_tests
- GitHub Check: groupby_tests
- GitHub Check: non_spark_tests
- GitHub Check: spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: non_spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: groupby_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (6)
cloud_aws/src/test/scala/ai/chronon/integrations/aws/HudiTableUtilsTest.scala (6)
11-24: Solid test setup.Properly configures Hudi catalog, extensions, and Kryo registrator.
40-40: Good partitioning setup.Correctly creates Hudi table with PARQUET format and date-based partitioning.
41-48: Good provider verification.Properly verifies table exists and uses Hudi provider.
50-50: Previously failing section now fixed.Line previously had issues per david-zlai's comment, now properly implemented.
53-54: Fixed type issue and uncommented assertion.Correctly collects both columns as (String, String) tuple and verifies against source data.
55-57: Good cleanup practice.Properly drops test table in finally block.
| local = true, | ||
| additionalConfig = Some( | ||
| Map( | ||
| "spark.sql.catalog.spark_catalog" -> "org.apache.spark.sql.hudi.catalog.HoodieCatalog", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's change this to a different catalog default_hudi, and set that as the default catalog.
"spark.sql.defaultCatalog" -> "default_hudi",
"spark.sql.catalog.default_hudi" -> "org.apache.spark.sql.hudi.catalog.HoodieCatalog"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, but what's the reason
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
going to merge and put up a new PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
## Summary ^^^ Tested on AWS. see below:   ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced a utility for parsing Hive partition strings, streamlining partition management. - Enhanced Apache Hudi integration in Spark SQL with updated configurations for catalog support and table write format. - Expanded AWS integration settings to support additional development environments, enabling broader network configuration options. - **Refactor** - Improved underlying serialization handling to bolster data processing reliability. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary ^^^ Tested on AWS. see below:   ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced a utility for parsing Hive partition strings, streamlining partition management. - Enhanced Apache Hudi integration in Spark SQL with updated configurations for catalog support and table write format. - Expanded AWS integration settings to support additional development environments, enabling broader network configuration options. - **Refactor** - Improved underlying serialization handling to bolster data processing reliability. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary ^^^ Tested on AWS. see below:   ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced a utility for parsing Hive partition strings, streamlining partition management. - Enhanced Apache Hudi integration in Spark SQL with updated configurations for catalog support and table write format. - Expanded AWS integration settings to support additional development environments, enabling broader network configuration options. - **Refactor** - Improved underlying serialization handling to bolster data processing reliability. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary ^^^ Tested on AWS. see below:   ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced a utility for parsing Hive partition strings, streamlining partition management. - Enhanced Apache Hudi integration in Spark SQL with updated configurations for catalog support and table write format. - Expanded AWS integration settings to support additional development environments, enabling broader network configuration options. - **Refactor** - Improved underlying serialization handling to bolster data processing reliability. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary ^^^ Tested on AWS. see below:   ## Cheour clientslist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced a utility for parsing Hive partition strings, streamlining partition management. - Enhanced Apache Hudi integration in Spark SQL with updated configurations for catalog support and table write format. - Expanded AWS integration settings to support additional development environments, enabling broader network configuration options. - **Refactor** - Improved underlying serialization handling to bolster data processing reliability. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
^^^
Tested on AWS. see below:
Checklist
Summary by CodeRabbit
New Features
Refactor