-
Notifications
You must be signed in to change notification settings - Fork 9
Rework BigTableKVStore multiget to issue a bulkGet request rather than n query calls #562
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis PR refactors the Changes
Sequence Diagram(s)sequenceDiagram
participant C as Client
participant B as BigTableKVStoreImpl
participant DB as BigTable
C->>B: Call multiGet(requests)
B->>B: Group requests by dataset
B->>B: Generate query & row key map via setQueryTimeSeriesFilters
B->>DB: Execute single query per dataset
DB-->>B: Return aggregated results
B->>B: Lookup results by row keys
B-->>C: Return responses for each request
Possibly related PRs
Suggested reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
⏰ Context from checks skipped due to timeout of 90000ms (4)
🔇 Additional comments (5)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
6163585 to
7de3f74
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (1)
172-213: Watch concurrency if many datasets.Would you like help adding a safety check or concurrency-limiting mechanism?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala(2 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala(5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: non_spark_tests
- GitHub Check: non_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (10)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (3)
47-47: No concerns.
124-168: Good grouping approach.
221-242: Returning row keys is neat.cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala (7)
31-31: No issue importing NumericRange.
51-51: Accessor looks fine.
58-58: EmulatorWrapper usage is okay.
387-412: Test coverage for multiple keys is good.
476-509: Excellent streaming tile coverage.
511-553: Well tested for separate batch times.
665-674: Helper method is concise.
…n n query calls (#562) ## Summary In the old implementation we do see a decent amount of RejectedExecutionExceptions. This seems to be due to us making n individual query / get requests (result transform backed by a Scala future) and then sequencing them together into one Future for return upstream. As the fetchJoin/groupBy batch size increases along with a higher rps we end up with a lot of these Futures at a time and end up hitting limits of the KV store thread pool. This PR reworks things to make one bulkGet request where we look up all the relevant RowKeys and the appropriate range within them. Item to call out: We currently request the n RowKeys (depending on whether streaming data spans just current day or multiple if batch was delayed) as well as the time range we're interested in (per dataset). We could instead just issue the n RowKeys we're interested in and then filter on the client side (where we pick the relevant tiles per Row) - this means a simpler query to BT but means more data over the wire. We could test this out in a subsequent pass / check with Google folks on what they recommend. ## Checklist - [X] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Optimized time series data queries by grouping requests per dataset to reduce query overhead and provide consolidated error responses. - **Tests** - Expanded test coverage with new cases for handling multiple keys and datasets, ensuring consistent data retrieval across various time ranges. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
…n n query calls (#562) ## Summary In the old implementation we do see a decent amount of RejectedExecutionExceptions. This seems to be due to us making n individual query / get requests (result transform backed by a Scala future) and then sequencing them together into one Future for return upstream. As the fetchJoin/groupBy batch size increases along with a higher rps we end up with a lot of these Futures at a time and end up hitting limits of the KV store thread pool. This PR reworks things to make one bulkGet request where we look up all the relevant RowKeys and the appropriate range within them. Item to call out: We currently request the n RowKeys (depending on whether streaming data spans just current day or multiple if batch was delayed) as well as the time range we're interested in (per dataset). We could instead just issue the n RowKeys we're interested in and then filter on the client side (where we pick the relevant tiles per Row) - this means a simpler query to BT but means more data over the wire. We could test this out in a subsequent pass / check with Google folks on what they recommend. ## Checklist - [X] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Optimized time series data queries by grouping requests per dataset to reduce query overhead and provide consolidated error responses. - **Tests** - Expanded test coverage with new cases for handling multiple keys and datasets, ensuring consistent data retrieval across various time ranges. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
…n n query calls (#562) ## Summary In the old implementation we do see a decent amount of RejectedExecutionExceptions. This seems to be due to us making n individual query / get requests (result transform backed by a Scala future) and then sequencing them together into one Future for return upstream. As the fetchJoin/groupBy batch size increases along with a higher rps we end up with a lot of these Futures at a time and end up hitting limits of the KV store thread pool. This PR reworks things to make one bulkGet request where we look up all the relevant RowKeys and the appropriate range within them. Item to call out: We currently request the n RowKeys (depending on whether streaming data spans just current day or multiple if batch was delayed) as well as the time range we're interested in (per dataset). We could instead just issue the n RowKeys we're interested in and then filter on the client side (where we pick the relevant tiles per Row) - this means a simpler query to BT but means more data over the wire. We could test this out in a subsequent pass / check with Google folks on what they recommend. ## Checklist - [X] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Optimized time series data queries by grouping requests per dataset to reduce query overhead and provide consolidated error responses. - **Tests** - Expanded test coverage with new cases for handling multiple keys and datasets, ensuring consistent data retrieval across various time ranges. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
…n n query calls (#562) ## Summary In the old implementation we do see a decent amount of RejectedExecutionExceptions. This seems to be due to us making n individual query / get requests (result transform backed by a Scala future) and then sequencing them together into one Future for return upstream. As the fetchJoin/groupBy batch size increases along with a higher rps we end up with a lot of these Futures at a time and end up hitting limits of the KV store thread pool. This PR reworks things to make one bulkGet request where we look up all the relevant RowKeys and the appropriate range within them. Item to call out: We currently request the n RowKeys (depending on whether streaming data spans just current day or multiple if batch was delayed) as well as the time range we're interested in (per dataset). We could instead just issue the n RowKeys we're interested in and then filter on the client side (where we pick the relevant tiles per Row) - this means a simpler query to BT but means more data over the wire. We could test this out in a subsequent pass / check with Google folks on what they recommend. ## Checklist - [X] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Optimized time series data queries by grouping requests per dataset to reduce query overhead and provide consolidated error responses. - **Tests** - Expanded test coverage with new cases for handling multiple keys and datasets, ensuring consistent data retrieval across various time ranges. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
…n n query calls (#562) ## Summary In the old implementation we do see a decent amount of RejectedExecutionExceptions. This seems to be due to us making n individual query / get requests (result transform baour clientsed by a Scala future) and then sequencing them together into one Future for return upstream. As the fetchJoin/groupBy batch size increases along with a higher rps we end up with a lot of these Futures at a time and end up hitting limits of the KV store thread pool. This PR reworks things to make one bulkGet request where we look up all the relevant RowKeys and the appropriate range within them. Item to call out: We currently request the n RowKeys (depending on whether streaming data spans just current day or multiple if batch was delayed) as well as the time range we're interested in (per dataset). We could instead just issue the n RowKeys we're interested in and then filter on the client side (where we piour clients the relevant tiles per Row) - this means a simpler query to BT but means more data over the wire. We could test this out in a subsequent pass / cheour clients with Google folks on what they recommend. ## Cheour clientslist - [X] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Optimized time series data queries by grouping requests per dataset to reduce query overhead and provide consolidated error responses. - **Tests** - Expanded test coverage with new cases for handling multiple keys and datasets, ensuring consistent data retrieval across various time ranges. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
In the old implementation we do see a decent amount of RejectedExecutionExceptions. This seems to be due to us making n individual query / get requests (result transform backed by a Scala future) and then sequencing them together into one Future for return upstream. As the fetchJoin/groupBy batch size increases along with a higher rps we end up with a lot of these Futures at a time and end up hitting limits of the KV store thread pool. This PR reworks things to make one bulkGet request where we look up all the relevant RowKeys and the appropriate range within them.
Item to call out:
We currently request the n RowKeys (depending on whether streaming data spans just current day or multiple if batch was delayed) as well as the time range we're interested in (per dataset). We could instead just issue the n RowKeys we're interested in and then filter on the client side (where we pick the relevant tiles per Row) - this means a simpler query to BT but means more data over the wire. We could test this out in a subsequent pass / check with Google folks on what they recommend.
Checklist
Summary by CodeRabbit
New Features
Tests