-
Notifications
You must be signed in to change notification settings - Fork 9
Switch BigTableKV store implementation to leverage bulk read rows + stream-line threadpools #599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis PR refactors how BigTable queries are built and executed. In the key-value store implementation, filtering has shifted from using a single Query to a modular ChainFilter with a new method for building row keys and a bulk read batching mechanism. The GCP API implementation now includes methods for configuring bulk read settings and thread pools, with added custom thread factory and executor provider. Test mocks and the online execution context have been updated to reflect these changes. Changes
Possibly related PRs
Suggested reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (6)
online/src/main/scala/ai/chronon/online/metrics/FlexibleExecutionContext.scala (1)
37-45: Check for graceful shutdown.
Consider providing a mechanism to shutdown the executor cleanly.lazy val buildExecutor: ThreadPoolExecutor = { val cores = Runtime.getRuntime.availableProcessors() new ThreadPoolExecutor(cores, cores * 2, 600, TimeUnit.SECONDS, new ArrayBlockingQueue[Runnable](1000), threadFactory) + // Optionally implement a shutdown function to invoke buildExecutor.shutdown() }cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (2)
146-146: Efficient row key building.
Collecting row keys for timeseries is clear. Consider boundary checks if startTs > endTs.Also applies to: 155-159
225-252: Chain filter with timestamp range.
Code is succinct. Large day-range requests may cause big scans, so consider chunking if needed.cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala (3)
3-3: Add import statement organizationConsider organizing imports alphabetically for better readability.
103-120: Consider making batch settings configurableHardcoded values may not be optimal for all use cases.
- private def setBigTableBulkReadRowsSettings(dataSettingsBuilderWithProfileId: BigtableDataSettings.Builder): Unit = { + private def setBigTableBulkReadRowsSettings( + dataSettingsBuilderWithProfileId: BigtableDataSettings.Builder, + elementCountThreshold: Long = 1, + delayThresholdDuration: org.threeten.bp.Duration = null + ): Unit = { // Get the bulkReadRowsSettings builder val bulkReadRowsSettingsBuilder = dataSettingsBuilderWithProfileId .stubSettings() .bulkReadRowsSettings() // Update the batching settings directly on the builder bulkReadRowsSettingsBuilder .setBatchingSettings( bulkReadRowsSettingsBuilder.getBatchingSettings.toBuilder - .setElementCountThreshold(1) - .setDelayThresholdDuration(null) + .setElementCountThreshold(elementCountThreshold) + .setDelayThresholdDuration(delayThresholdDuration) .build() ) }
140-157: Consider making thread count configurableThread count is hardcoded to available processors, which may not be optimal for all environments.
object GcpApiImpl { // Create a thread factory so that we can name the threads for easier debugging val threadFactory: ThreadFactory = new ThreadFactory { private val counter = new AtomicInteger(0) override def newThread(r: Runnable): Thread = { val t = new Thread(r) t.setName(s"chronon-bt-gax-${counter.incrementAndGet()}") t } } // create one of these as BT creates very large threadpools (cores * 4) and does them once per admin and data client lazy val executorProvider: InstantiatingExecutorProvider = InstantiatingExecutorProvider .newBuilder() - .setExecutorThreadCount(Runtime.getRuntime.availableProcessors()) + .setExecutorThreadCount( + sys.env.get("CHRONON_BT_THREAD_COUNT") + .map(_.toInt) + .getOrElse(Runtime.getRuntime.availableProcessors()) + ) .setThreadFactory(threadFactory) .build() }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (4)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala(8 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala(6 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala(2 hunks)online/src/main/scala/ai/chronon/online/metrics/FlexibleExecutionContext.scala(1 hunks)
🧰 Additional context used
🧬 Code Definitions (3)
online/src/main/scala/ai/chronon/online/metrics/FlexibleExecutionContext.scala (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala (1)
newThread(144-148)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/ApiFutureUtils.scala (2)
ApiFutureUtils(12-40)toCompletableFuture(14-39)online/src/main/scala/ai/chronon/online/metrics/Metrics.scala (4)
distribution(208-211)distribution(211-214)increment(189-189)increment(191-191)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala (1)
online/src/main/scala/ai/chronon/online/Api.scala (1)
GetRequest(44-47)
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: non_spark_tests
- GitHub Check: non_spark_tests
🔇 Additional comments (8)
online/src/main/scala/ai/chronon/online/metrics/FlexibleExecutionContext.scala (1)
27-35: Well-structured thread factory.
Naming threads with instance ID and a counter is helpful for debugging. No major issues.cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala (2)
275-275: Mocking Batcher is appropriate.
No issues; the approach is concise for unit testing.
284-288: Well-handled failure scenario.
UsingimmediateFailedFutureand verifying both calls is clear.cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (3)
129-135: Good dataset-target mapping and chain filter usage.
Helps unify requests by dataset.
174-183: Efficient bulk read approach.
Closing the batcher after usage prevents further additions. Looks correct.
191-204: Null-row filtering is good.
Ensures robust handling of partial results.cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala (2)
88-93: LGTM - Good separation of concernsClean separation of configuration logic for bulk reads and thread pools.
121-129: Make sure shared thread pool works for all use casesSharing thread pools between admin and data clients could cause issues if they have different workload patterns.
beffa96 to
b75885c
Compare
| // override thread pools | ||
| setClientThreadPools(dataSettingsBuilderWithProfileId, adminSettingsBuilder) | ||
|
|
||
| val dataSettings = dataSettingsBuilderWithProfileId.setProjectId(projectId).setInstanceId(instanceId).build() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this project ID setting take effect then if we're calling build earlier?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could the builder calls be merged into one method chain ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did try to streamline some this a bit but unfortunately to override the bulk read rows settings we need to reach a couple of layers deep in the settings / builders and they don't have setters available on those (e.g. the stubSettings() - you can get the builder but you can't set it back after updating).
Coming back - the project id (and settings below) do seem to take effect as they are following the java mutation based approach.
We could merge the dataSettingsBuilderWithProfileId in the case match above. I though it was easier to follow separated out..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about just refactoring so that we invoke .build() only once at the very end? Maybe just get the functions to return the builder and continue on setting properties?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok updated this a bit so that we have one build. Let me know if this is more in line with what you had in mind.
|
|
||
| // BigTable's client creates a thread pool with a size of cores * 4. This ends up being a lot larger than we'd like | ||
| // so we scale these down and we also use the same in both clients | ||
| private def setClientThreadPools( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious if there's maybe a simpler API to tap into to just adjust the thread pool size, rather than bringing your own executor provider.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately they don't expose this on the BigTable data / admin client or on the stubsettings. We also do want to override the thread pool so that we can provide a custom thread name. This is useful in the MMX context where they also have other thread pools which are Gax-XX (they have other BT shaped code there) so its nice to be able to know which threads are zipline BT threads when we are trying to debug when there's a lot of threads in the service..
b75885c to
ad30da9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala (1)
122-130: Added thread pool configurationAddresses excessive thread creation by sharing a single executor provider between clients.
🧹 Nitpick comments (2)
online/src/main/scala/ai/chronon/online/metrics/FlexibleExecutionContext.scala (1)
27-35: Thread factory creation.
Good naming approach.It might help to log thread creation for advanced debugging.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (1)
225-230: buildRowKeysForTimeranges signature changed.
Better chaining approach.Consider splitting logic to separate helper for clarity.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (4)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala(8 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala(6 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala(2 hunks)online/src/main/scala/ai/chronon/online/metrics/FlexibleExecutionContext.scala(1 hunks)
🧰 Additional context used
🧬 Code Definitions (2)
online/src/main/scala/ai/chronon/online/metrics/FlexibleExecutionContext.scala (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala (1)
newThread(145-149)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala (1)
online/src/main/scala/ai/chronon/online/Api.scala (1)
GetRequest(44-47)
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: non_spark_tests
- GitHub Check: non_spark_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (25)
online/src/main/scala/ai/chronon/online/metrics/FlexibleExecutionContext.scala (4)
19-20: Imports added for concurrency.
Looks fine.
22-22: ExecutionContextExecutor import.
No concerns.
25-26: Unique instanceId.
Used for naming; helps debugging.
37-45: Lazy executor with dynamic pool sizing.
Better resource usage.cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala (2)
10-10: New imports for Batcher, ByteString, etc.
All standard.Also applies to: 16-16, 18-18, 20-20
275-275: Mocking bulk read.
Ensures new batcher approach is tested.Also applies to: 284-285, 286-288
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (12)
17-17: New imports for bulk read, filters, and utils.
No issues.Also applies to: 30-30, 39-39
129-133: TargetId and filter construction.
Essential for grouping row lookups.
146-147: buildRowKeysForTimeranges for TileSummaries.
Refined for time-series queries.
157-158: Streaming tile row keys.
Consistent with tile-based approach.
173-177: Batcher usage.
Leverages bulk read for efficiency.
182-183: Close batcher after usage.
Prevents inadvertent new calls.
191-192: Null check on rows.
Avoids NPE. Good practice.
198-204: Map row cells to TimedValue.
Time in micros is correct.
248-249: Timestamp filter in microseconds.
Matches BT usage.
481-482: Day-based tile row keys.
Reduces row bloat.
499-505: mapDatasetToTable.
Dynamically maps BATCH/STREAMING.
508-514: getTableType logic.
Categorizes dataset suffix.cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala (7)
3-3: Added import for executorProviderClean import from companion object.
13-13: Updated GAX importsNecessary additions for thread pool management.
22-23: Added thread-related importsRequired for custom thread factory implementation.
80-86: Refactored app profile handlingMore explicit variable naming with proper scoping.
89-95: Added BigTable configuration methodsProperly extracts configuration logic into dedicated methods. Single build call at the end matches reviewer feedback.
104-120: Added bulk read rows settings configurationOptimizes latency by eliminating batching delay and setting element threshold to 1. This is the core of the PR's performance improvements.
141-158: Added companion object with thread pool configurationGood implementation with:
- Custom named threads for easier debugging
- Scaled thread count based on available processors
- Shared executor provider to reduce thread proliferation
This directly addresses the thread pool optimization goals.
| .bulkReadRowsSettings() | ||
|
|
||
| // Update the batching settings directly on the builder | ||
| bulkReadRowsSettingsBuilder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
weird API pattern but ok.
…tream-line threadpools (#599) ## Summary We've had some recommendations from the Google folks on using the bulkReadRows api. As-is it doesn't perform as we'd like and we have to override a few settings (element count threshold and delay threshold). Also as part of perf testing, we noticed that Zipline creates a lot of threads - our default KV store tp can go up to 1K, the BigTable client creates a threadpool of cores * 4 per client instantiation (so one for admin and one for data client). We streamline and tune these a bit. We currently have rails to allow users to configure the Fetcher tp (based on - airbnb/chronon#909). We can also add a knob to allow folks to configure the KV store tp (as those are separate if folks use the execution context override). ## Checklist - [ ] Added Unit Tests - [X] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced data retrieval using flexible filtering and efficient batch processing for improved performance. - Introduced configurable bulk read settings and dynamic thread pool management to optimize resource utilization and reduce latency. - Added custom thread naming for improved debugging in thread management. - **Refactor** - Streamlined internal processing flows to ensure robust error handling and increased overall system efficiency. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
…tream-line threadpools (#599) ## Summary We've had some recommendations from the Google folks on using the bulkReadRows api. As-is it doesn't perform as we'd like and we have to override a few settings (element count threshold and delay threshold). Also as part of perf testing, we noticed that Zipline creates a lot of threads - our default KV store tp can go up to 1K, the BigTable client creates a threadpool of cores * 4 per client instantiation (so one for admin and one for data client). We streamline and tune these a bit. We currently have rails to allow users to configure the Fetcher tp (based on - airbnb/chronon#909). We can also add a knob to allow folks to configure the KV store tp (as those are separate if folks use the execution context override). ## Checklist - [ ] Added Unit Tests - [X] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced data retrieval using flexible filtering and efficient batch processing for improved performance. - Introduced configurable bulk read settings and dynamic thread pool management to optimize resource utilization and reduce latency. - Added custom thread naming for improved debugging in thread management. - **Refactor** - Streamlined internal processing flows to ensure robust error handling and increased overall system efficiency. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
…tream-line threadpools (#599) ## Summary We've had some recommendations from the Google folks on using the bulkReadRows api. As-is it doesn't perform as we'd like and we have to override a few settings (element count threshold and delay threshold). Also as part of perf testing, we noticed that Zipline creates a lot of threads - our default KV store tp can go up to 1K, the BigTable client creates a threadpool of cores * 4 per client instantiation (so one for admin and one for data client). We streamline and tune these a bit. We currently have rails to allow users to configure the Fetcher tp (based on - airbnb/chronon#909). We can also add a knob to allow folks to configure the KV store tp (as those are separate if folks use the execution context override). ## Checklist - [ ] Added Unit Tests - [X] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced data retrieval using flexible filtering and efficient batch processing for improved performance. - Introduced configurable bulk read settings and dynamic thread pool management to optimize resource utilization and reduce latency. - Added custom thread naming for improved debugging in thread management. - **Refactor** - Streamlined internal processing flows to ensure robust error handling and increased overall system efficiency. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
…tream-line threadpools (#599) ## Summary We've had some recommendations from the Google folks on using the bulkReadRows api. As-is it doesn't perform as we'd like and we have to override a few settings (element count threshold and delay threshold). Also as part of perf testing, we noticed that Zipline creates a lot of threads - our default KV store tp can go up to 1K, the BigTable client creates a threadpool of cores * 4 per client instantiation (so one for admin and one for data client). We streamline and tune these a bit. We currently have rails to allow users to configure the Fetcher tp (based on - airbnb/chronon#909). We can also add a knob to allow folks to configure the KV store tp (as those are separate if folks use the execution context override). ## Checklist - [ ] Added Unit Tests - [X] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced data retrieval using flexible filtering and efficient batch processing for improved performance. - Introduced configurable bulk read settings and dynamic thread pool management to optimize resource utilization and reduce latency. - Added custom thread naming for improved debugging in thread management. - **Refactor** - Streamlined internal processing flows to ensure robust error handling and increased overall system efficiency. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
…tream-line threadpools (#599) ## Summary We've had some recommendations from the Google folks on using the bulkReadRows api. As-is it doesn't perform as we'd like and we have to override a few settings (element count threshold and delay threshold). Also as part of perf testing, we noticed that Zipline creates a lot of threads - our default KV store tp can go up to 1K, the BigTable client creates a threadpool of cores * 4 per client instantiation (so one for admin and one for data client). We streamline and tune these a bit. We currently have rails to allow users to configure the Fetcher tp (based on - airbnb/chronon#909). We can also add a knob to allow folks to configure the KV store tp (as those are separate if folks use the execution context override). ## Cheour clientslist - [ ] Added Unit Tests - [X] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced data retrieval using flexible filtering and efficient batch processing for improved performance. - Introduced configurable bulk read settings and dynamic thread pool management to optimize resource utilization and reduce latency. - Added custom thread naming for improved debugging in thread management. - **Refactor** - Streamlined internal processing flows to ensure robust error handling and increased overall system efficiency. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
We've had some recommendations from the Google folks on using the bulkReadRows api. As-is it doesn't perform as we'd like and we have to override a few settings (element count threshold and delay threshold). Also as part of perf testing, we noticed that Zipline creates a lot of threads - our default KV store tp can go up to 1K, the BigTable client creates a threadpool of cores * 4 per client instantiation (so one for admin and one for data client). We streamline and tune these a bit. We currently have rails to allow users to configure the Fetcher tp (based on - airbnb/chronon#909).
We can also add a knob to allow folks to configure the KV store tp (as those are separate if folks use the execution context override).
Checklist
Summary by CodeRabbit
New Features
Refactor