-
Notifications
You must be signed in to change notification settings - Fork 9
Cherrypick OSS fetcher failure handling PRs - #932 and #964 #706
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis set of changes introduces partial failure awareness and enhanced error handling across the key-value store (KVStore), fetchers, and metadata management in both the online and Spark components. The in-memory KVStore now supports an option to throw on invalid datasets, with this behavior propagated through its builders and test utilities. Fetchers and the metadata store are updated to refresh caches and track partial failures, reflected in new Boolean flags and error metrics. The Spark runner and tests are updated to utilize and validate these new behaviors, including a dedicated test for partial failure handling. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Fetcher
participant MetadataStore
participant KVStore
Client->>Fetcher: fetchJoinSchema(request)
Fetcher->>MetadataStore: getJoinConf(name)
alt Success
MetadataStore-->>Fetcher: JoinConf
Fetcher->>MetadataStore: buildJoinCodec(JoinConf, refreshOnFail=true)
alt Codec Success
MetadataStore-->>Fetcher: (JoinCodec, false)
else Partial Failure
MetadataStore-->>Fetcher: (JoinCodec, true)
Fetcher->>MetadataStore: refreshJoinConf(name)
end
Fetcher-->>Client: JoinSchema/Codec
else Failure
MetadataStore-->>Fetcher: Exception
Fetcher->>MetadataStore: refreshJoinConf(name)
Fetcher-->>Client: Error
end
Possibly related PRs
Suggested reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🔭 Outside diff range comments (1)
online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (1)
428-437:⚠️ Potential issuePossible crash on second
getJoinConfcall.
If refresh still fails, the subsequent.getwill throwNoSuchElementException, bypassing intended graceful handling. Guard before.getor propagate theFailure.- val parts = - metadataStore - .getJoinConf(joinRequest.name) - .get + val partsTry = metadataStore.getJoinConf(joinRequest.name) + if (partsTry.isFailure) { + return ExternalToJoinRequest(Right(KeyMissingException("join_conf_exception", + Seq.empty, Map.empty)), joinRequest, null) // or other safe fallback + } + val parts = + partsTry.get
🧹 Nitpick comments (2)
spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala (1)
767-791: Solid partial-failure test; minor robustness tweak.
headwill throw if the query table is unexpectedly empty. Use.headOption.getOrElse(fail("no data"))for clearer failure reason.online/src/main/scala/ai/chronon/online/Api.scala (1)
85-88: Name nit:responseFutureOptisn’t an Option.
Misleading in logs & debuggers. Rename toresponseFut.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (10)
hub/src/main/java/ai/chronon/hub/HubVerticle.java(1 hunks)online/src/main/scala/ai/chronon/online/Api.scala(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala(8 hunks)online/src/main/scala/ai/chronon/online/fetcher/GroupByFetcher.scala(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/JoinPartFetcher.scala(2 hunks)online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala(5 hunks)spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/utils/InMemoryKvStore.scala(3 hunks)spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (34)
- GitHub Check: service_tests
- GitHub Check: api_tests
- GitHub Check: hub_tests
- GitHub Check: streaming_tests
- GitHub Check: service_tests
- GitHub Check: streaming_tests
- GitHub Check: service_commons_tests
- GitHub Check: spark_tests
- GitHub Check: online_tests
- GitHub Check: spark_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: join_tests
- GitHub Check: hub_tests
- GitHub Check: join_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: groupby_tests
- GitHub Check: flink_tests
- GitHub Check: groupby_tests
- GitHub Check: online_tests
- GitHub Check: fetcher_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: fetcher_tests
- GitHub Check: aggregator_tests
- GitHub Check: batch_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: analyzer_tests
- GitHub Check: flink_tests
- GitHub Check: analyzer_tests
- GitHub Check: orchestration_tests
- GitHub Check: orchestration_tests
- GitHub Check: batch_tests
- GitHub Check: aggregator_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: api_tests
🔇 Additional comments (9)
spark/src/main/scala/ai/chronon/spark/utils/InMemoryKvStore.scala (4)
35-37: Added option to control error behavior on invalid datasets.New parameter allows configuring whether to throw when accessing non-existent datasets.
56-58: Implemented validation logic for invalid datasets.Throws exception when requested dataset doesn't exist and hard failure is enabled.
159-162: Updated builder method to support configurable failure handling.Factory method now accepts and propagates the hardFailureOnInvalidDataset flag.
167-168: Correctly propagated failure flag to constructor.hub/src/main/java/ai/chronon/hub/HubVerticle.java (1)
70-70: Maintained existing behavior with explicit failure flag.Added explicit false parameter to maintain backward compatibility with new failure handling option.
online/src/main/scala/ai/chronon/online/fetcher/GroupByFetcher.scala (1)
46-51: Added cache refresh on error handling.Improved error recovery by refreshing metadata cache on failure, logging errors, and tracking metrics.
spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala (1)
196-203: Modified codec building to fail fast with invalid data.Now explicitly disables refresh-on-fail behavior to prevent using stale codecs when failures occur.
spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala (1)
198-202: Adds useful toggle for stricter tests – looks good.
Parameter is passed through correctly; no further action.online/src/main/scala/ai/chronon/online/fetcher/JoinPartFetcher.scala (1)
168-173: Prefix-based error key change is fine.
Matches new convention; no issues spotted.
| val joinTry: Try[JoinOps] = if (joinConf.isEmpty) { | ||
| val joinConfTry = metadataStore.getJoinConf(request.name) | ||
| if (joinConfTry.isFailure) { | ||
| metadataStore.getJoinConf.refresh(request.name) | ||
| } | ||
| joinConfTry | ||
| } else { | ||
| logger.debug(s"Using passed in join configuration: ${joinConf.get.metaData.getName}") | ||
| Success(JoinOps(joinConf.get)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Refresh without retry can leak failures.
After refresh, the code still uses the original failed joinConfTry. Consider re-fetching or returning the refreshed value to avoid needless exceptions.
- if (joinConfTry.isFailure) {
- metadataStore.getJoinConf.refresh(request.name)
- }
- joinConfTry
+ val freshTry = if (joinConfTry.isFailure) {
+ metadataStore.getJoinConf.refresh(request.name)
+ metadataStore.getJoinConf(request.name)
+ } else joinConfTry
+ freshTry📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| val joinTry: Try[JoinOps] = if (joinConf.isEmpty) { | |
| val joinConfTry = metadataStore.getJoinConf(request.name) | |
| if (joinConfTry.isFailure) { | |
| metadataStore.getJoinConf.refresh(request.name) | |
| } | |
| joinConfTry | |
| } else { | |
| logger.debug(s"Using passed in join configuration: ${joinConf.get.metaData.getName}") | |
| Success(JoinOps(joinConf.get)) | |
| val joinTry: Try[JoinOps] = if (joinConf.isEmpty) { | |
| val joinConfTry = metadataStore.getJoinConf(request.name) | |
| - if (joinConfTry.isFailure) { | |
| - metadataStore.getJoinConf.refresh(request.name) | |
| - } | |
| - joinConfTry | |
| + val freshTry = if (joinConfTry.isFailure) { | |
| + metadataStore.getJoinConf.refresh(request.name) | |
| + metadataStore.getJoinConf(request.name) | |
| + } else joinConfTry | |
| + freshTry | |
| } else { | |
| logger.debug(s"Using passed in join configuration: ${joinConf.get.metaData.getName}") | |
| Success(JoinOps(joinConf.get)) | |
| } |
| Try(Await.result(responseFutureOpt, Duration(timeoutMillis, MILLISECONDS))) match { | ||
| case Failure(e) => | ||
| Failure(buildException(e)) | ||
| case Success(resp) => | ||
| if (resp.values.isFailure) { | ||
| Failure(buildException(resp.values.failed.get)) | ||
| } else { | ||
| Success(resp.latest.get.bytes) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resp.latest.get may blow up on empty data.
If values is Success(Seq()), latest becomes Failure(NoSuchElementException), and .get re-throws outside the Try, bypassing your error path.
- Success(resp.latest.get.bytes)
+ resp.latest.map(_.bytes)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Try(Await.result(responseFutureOpt, Duration(timeoutMillis, MILLISECONDS))) match { | |
| case Failure(e) => | |
| Failure(buildException(e)) | |
| case Success(resp) => | |
| if (resp.values.isFailure) { | |
| Failure(buildException(resp.values.failed.get)) | |
| } else { | |
| Success(resp.latest.get.bytes) | |
| } | |
| Try(Await.result(responseFutureOpt, Duration(timeoutMillis, MILLISECONDS))) match { | |
| case Failure(e) => | |
| Failure(buildException(e)) | |
| case Success(resp) => | |
| if (resp.values.isFailure) { | |
| Failure(buildException(resp.values.failed.get)) | |
| } else { | |
| resp.latest.map(_.bytes) | |
| } | |
| } |
| onCreateFunc: Option[Try[(JoinCodec, Boolean)] => Unit]): TTLCache[String, Try[(JoinCodec, Boolean)]] = { | ||
|
|
||
| val codecBuilder = { joinName: String => | ||
| getJoinConf(joinName) | ||
| .map(_.join) | ||
| .map(buildJoinCodec) | ||
| .recoverWith { case th: Throwable => | ||
| Failure( | ||
| new RuntimeException( | ||
| s"Couldn't fetch joinName = ${joinName} or build join codec due to ${th.traceString}", | ||
| th | ||
| )) | ||
| val startTimeMs = System.currentTimeMillis() | ||
| val result: Try[(JoinCodec, Boolean)] = | ||
| try { | ||
| getJoinConf(joinName) | ||
| .map(_.join) | ||
| .map(join => buildJoinCodec(join, refreshOnFail = true)) | ||
| } catch { | ||
| case th: Throwable => | ||
| getJoinConf.refresh(joinName) | ||
| Failure( | ||
| new RuntimeException( | ||
| s"Couldn't fetch joinName = ${joinName} or build join codec due to ${th.traceString}", | ||
| th | ||
| )) | ||
| } | ||
| val context = Metrics.Context(Metrics.Environment.MetaDataFetching, join = joinName).withSuffix("join_codec") | ||
| if (result.isFailure) { | ||
| context.incrementException(result.failed.get) | ||
| } else { | ||
| context.distribution(Metrics.Name.LatencyMillis, System.currentTimeMillis() - startTimeMs) | ||
| } | ||
| result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Partial-failure results cached forever.
When hasPartialFailure = true you still Success((codec, true)), which TTL-caches an incomplete codec and masks later fixes. Consider Failure or much shorter TTL in that path.
| .map { case (leftKey, rightKey) => | ||
| StructField(leftKey, keySchema.fields.find(_.name == rightKey).get.fieldType) | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unsafe .get on missing right-key.
keySchema.fields.find(...).get will throw if the join mapping is out of sync with the group-by schema. Prefer safe lookup with explicit failure.
- StructField(leftKey, keySchema.fields.find(_.name == rightKey).get.fieldType)
+ val fType = keySchema.fields.find(_.name == rightKey)
+ .getOrElse(throw new IllegalStateException(
+ s"Key $rightKey absent in schema for groupBy ${servingInfo.groupBy.metaData.getName}"))
+ StructField(leftKey, fType.fieldType)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| .map { case (leftKey, rightKey) => | |
| StructField(leftKey, keySchema.fields.find(_.name == rightKey).get.fieldType) | |
| } | |
| .map { case (leftKey, rightKey) => | |
| val fType = keySchema.fields.find(_.name == rightKey) | |
| .getOrElse(throw new IllegalStateException( | |
| s"Key $rightKey absent in schema for groupBy ${servingInfo.groupBy.metaData.getName}")) | |
| StructField(leftKey, fType.fieldType) | |
| } |
55ef5b8 to
1a61b96
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala (1)
247-249: Unsafe.getremains
Still crashes if key missing:keySchema.fields.find(_.name == rightKey).getUse
getOrElsewith explicit error like your past comment.
🧹 Nitpick comments (5)
online/src/main/scala/ai/chronon/online/JoinCodec.scala (1)
38-40: Make sure flag is exercised in tests
hasPartialFailureis write-only here. Add/extend tests to assert cache refresh logic that relies on this flag.online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (3)
244-249: Possible refresh flood
joinCodecCache.refreshfires on every request when any existing codec carries the flag.
If the upstream KV store is slow this could create unnecessary concurrent refreshes.
Consider throttling (e.g. only if last refresh > TTL/2).
300-323: Duplicate refresh path
logResponserepeats the same refresh check done inapplyDerivations.
One central place would reduce code and race conditions.
517-526: Metric after early return?
distribution("response.latency.millis", …)is inside theSuccessbranch; failures skip latency logging.
Emit the metric in both branches.online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala (1)
308-321:varfor shared state
hasPartialFailureis mutated inside loops; not thread-safe ifbuildJoinCodecever becomes parallel.
Prefer localval flag = failures.nonEmpty.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (4)
online/src/main/scala/ai/chronon/online/JoinCodec.scala(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala(6 hunks)online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala(5 hunks)spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala
🧰 Additional context used
🧬 Code Graph Analysis (1)
online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (3)
online/src/main/scala/ai/chronon/online/metrics/TTLCache.scala (1)
refresh(97-97)online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala (1)
keyCodec(66-66)online/src/main/scala/ai/chronon/online/metrics/Metrics.scala (2)
distribution(208-211)distribution(211-214)
⏰ Context from checks skipped due to timeout of 90000ms (35)
- GitHub Check: service_tests
- GitHub Check: service_commons_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: hub_tests
- GitHub Check: streaming_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_tests
- GitHub Check: groupby_tests
- GitHub Check: streaming_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: online_tests
- GitHub Check: analyzer_tests
- GitHub Check: groupby_tests
- GitHub Check: api_tests
- GitHub Check: hub_tests
- GitHub Check: join_tests
- GitHub Check: join_tests
- GitHub Check: online_tests
- GitHub Check: fetcher_tests
- GitHub Check: spark_tests
- GitHub Check: flink_tests
- GitHub Check: aggregator_tests
- GitHub Check: batch_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: analyzer_tests
- GitHub Check: flink_tests
- GitHub Check: spark_tests
- GitHub Check: api_tests
- GitHub Check: fetcher_tests
- GitHub Check: orchestration_tests
- GitHub Check: orchestration_tests
- GitHub Check: aggregator_tests
- GitHub Check: batch_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
## Summary Pull in PRs - airbnb/chronon#964 and airbnb/chronon#932. We hit issues related to 964 in some of our tests at Etsy - groupByServingInfo lookups against BT timed out and we end up caching the failure response. 964 addresses this and it depends on 932 so pulling that in as well. ## Checklist - [ ] Added Unit Tests - [X] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Improved error handling and reporting for partial failures in join operations and key-value store lookups. - Enhanced cache refresh mechanisms for join configurations and metadata, improving system robustness during failures. - Added a configurable option to control strictness on invalid dataset references in the in-memory key-value store. - **Bug Fixes** - Exceptions and partial failures are now more accurately surfaced in fetch responses, ensuring clearer diagnostics for end-users. - Updated error key naming for consistency in response maps. - **Tests** - Added a new test to verify correct handling and reporting of partial failures in key-value store operations. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Pull in PRs - airbnb/chronon#964 and airbnb/chronon#932. We hit issues related to 964 in some of our tests at our clients - groupByServingInfo lookups against BT timed out and we end up caching the failure response. 964 addresses this and it depends on 932 so pulling that in as well. ## Checklist - [ ] Added Unit Tests - [X] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Improved error handling and reporting for partial failures in join operations and key-value store lookups. - Enhanced cache refresh mechanisms for join configurations and metadata, improving system robustness during failures. - Added a configurable option to control strictness on invalid dataset references in the in-memory key-value store. - **Bug Fixes** - Exceptions and partial failures are now more accurately surfaced in fetch responses, ensuring clearer diagnostics for end-users. - Updated error key naming for consistency in response maps. - **Tests** - Added a new test to verify correct handling and reporting of partial failures in key-value store operations. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Pull in PRs - airbnb/chronon#964 and airbnb/chronon#932. We hit issues related to 964 in some of our tests at our clients - groupByServingInfo lookups against BT timed out and we end up caching the failure response. 964 addresses this and it depends on 932 so pulling that in as well. ## Checklist - [ ] Added Unit Tests - [X] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Improved error handling and reporting for partial failures in join operations and key-value store lookups. - Enhanced cache refresh mechanisms for join configurations and metadata, improving system robustness during failures. - Added a configurable option to control strictness on invalid dataset references in the in-memory key-value store. - **Bug Fixes** - Exceptions and partial failures are now more accurately surfaced in fetch responses, ensuring clearer diagnostics for end-users. - Updated error key naming for consistency in response maps. - **Tests** - Added a new test to verify correct handling and reporting of partial failures in key-value store operations. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
…706) ## Summary Pull in PRs - airbnb/chronon#964 and airbnb/chronon#932. We hit issues related to 964 in some of our tests at our clients - groupByServingInfo lookups against BT timed out and we end up caching the failure response. 964 addresses this and it depends on 932 so pulling that in as well. ## Cheour clientslist - [ ] Added Unit Tests - [X] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Improved error handling and reporting for partial failures in join operations and key-value store lookups. - Enhanced cache refresh mechanisms for join configurations and metadata, improving system robustness during failures. - Added a configurable option to control strictness on invalid dataset references in the in-memory key-value store. - **Bug Fixes** - Exceptions and partial failures are now more accurately surfaced in fetch responses, ensuring clearer diagnostics for end-users. - Updated error key naming for consistency in response maps. - **Tests** - Added a new test to verify correct handling and reporting of partial failures in key-value store operations. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
Pull in PRs - airbnb/chronon#964 and airbnb/chronon#932. We hit issues related to 964 in some of our tests at Etsy - groupByServingInfo lookups against BT timed out and we end up caching the failure response. 964 addresses this and it depends on 932 so pulling that in as well.
Checklist
Summary by CodeRabbit
New Features
Bug Fixes
Tests