-
Notifications
You must be signed in to change notification settings - Fork 8
Switch GBU job to write out data in a partitioned manner #993
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis change updates partition specification handling in the group-by upload planning and execution flow. It explicitly passes a daily partition spec in planner logic, adjusts tests to verify this, ensures upload saves partition by "ds", and refactors partition column handling in Iceberg catalog utilities. Changes
Sequence Diagram(s)sequenceDiagram
participant Planner
participant MetaDataUtils
participant UploadJob
participant IcebergCatalog
Planner->>MetaDataUtils: layer(..., PartitionSpec.daily, ...)
MetaDataUtils-->>Planner: Metadata Layered
Planner->>UploadJob: Plan upload node with daily partition spec
UploadJob->>IcebergCatalog: getIcebergPartitions(table, "ds")
IcebergCatalog-->>UploadJob: List of partitions by "ds"
UploadJob->>UploadJob: Save DataFrame partitioned by "ds"
UploadJob->>UploadJob: Log job duration in seconds
Estimated code review effort2 (~15 minutes) Possibly related PRs
Suggested reviewers
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (4)
🧠 Learnings (4)📓 Common learningsapi/src/test/scala/ai/chronon/api/test/planner/GroupByPlannerTest.scala (1)Learnt from: piyush-zlai spark/src/main/scala/ai/chronon/spark/catalog/Iceberg.scala (3)Learnt from: nikhil-zlai Learnt from: tchow-zlai
Learnt from: piyush-zlai spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala (1)Learnt from: nikhil-zlai 🧰 Additional context used🧠 Learnings (4)📓 Common learningsapi/src/test/scala/ai/chronon/api/test/planner/GroupByPlannerTest.scala (1)Learnt from: piyush-zlai spark/src/main/scala/ai/chronon/spark/catalog/Iceberg.scala (3)Learnt from: nikhil-zlai Learnt from: tchow-zlai
Learnt from: piyush-zlai spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala (1)Learnt from: nikhil-zlai ⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (16)
🔇 Additional comments (6)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
Summary
We recently switched to querying partitions to persist table partition info during batch job runs. This results in the GBU jobs failing as they don't write out partitioned data and they use the 'ds' field to partition (and not the user configured table utils partition col). This PR extends the GBU jobs to write out data partitioned by ds as well as fixing the Iceberg table format to pass through the partition column that is passed as part of retrieving partitions (currently it is discarded and tableUtils.partitionColumn is what is used).
The alternative approach here is to swap the GBU & upload to kv jobs to use the user configured partition column as part of the writes. Chose the current approach as it seemed simpler and these GBU tables are internal to Chronon so we don't need to adhere to a user partitioning scheme.
Was able to get a successful set of runs with this change in our canary (gbu -> upload to kv -> streaming).
Checklist
Summary by CodeRabbit
Bug Fixes
Tests
Refactor