Skip to content

Conversation

@piyush-zlai
Copy link
Contributor

@piyush-zlai piyush-zlai commented Jul 2, 2025

Summary

Our current PubSub src tracks message ackIDs and postpones issuing the ACK until the checkpoint time (~10 s). This requires a bit of close coordination to ensure the pub sub ack deadlines are loose enough and also adds a bit of risk if we do miss a checkpoint or two (or if we were to adjust the checkpoint frequency in the future). Additionally under load it does backlog a large number of IDs to be acked and the current OSS source tries to ACK all of these IDs in one shot (broken into 2 requests) - so very large event loads might run into issues.

We switch things on two fronts in this PR:

  • We ack after pulling optimistically. In case of events like job crashes we will lean on our batch correction mechanism (in the form of GBUs) to help fix inaccuracies in the feature counts.
  • Use Pub/Sub's streaming pull api rather than batch pulling. This is PubSub's recommendation for low latency & highest throughput.

Tested in canary - Cloud Monitoring Dashboard

The screenshots below show the side-by-side freshness improvement we see with the switch to the streaming pull approach
Prior (just fast ack but batch pulls)
Screenshot 2025-07-07 at 6 37 44 PM

Fast ack with streaming pulls
Screenshot 2025-07-07 at 6 37 54 PM

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

  • New Features
    • Introduced a new Flink Pub/Sub source connector with immediate message acknowledgment for improved streaming efficiency.
    • Added support for wrapping Flink deserialization schemas for use with Pub/Sub messages.
    • Implemented a custom message receiver to ensure safe, synchronized message processing.
  • Bug Fixes
    • Simplified and updated the Pub/Sub source integration, removing unused configuration options.
  • Tests
    • Added tests to validate the deserialization schema wrapper functionality.
  • Chores
    • Updated build dependencies to support enhanced Pub/Sub and gRPC integration.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jul 2, 2025

Walkthrough

A new Flink Pub/Sub connector implementation was introduced using a "fastack" approach, which immediately acknowledges messages upon receipt. Supporting classes for deserialization and message reception were added, and related build dependencies were updated. The previous configuration properties for message pulling were removed, and corresponding tests for the new deserialization wrapper were created.

Changes

File(s) Change Summary
flink/BUILD.bazel Added gRPC, Google credentials, and Dropwizard metrics dependencies for the Pub/Sub connector/tests.
flink/src/main/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSource.scala Switched to fastack PubSubSource; removed message pull config; simplified data stream construction.
flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/PubSubSource.scala Added new PubSubSource class for fastack Pub/Sub streaming with immediate ack and builder method.
flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/DeserializationSchemaWrapper.scala Added wrapper to adapt Flink DeserializationSchema to PubSubDeserializationSchema.
flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/PubSubMessageReceiver.scala Added thread-safe message receiver for Pub/Sub messages and Flink collector integration.
flink/src/test/scala/ai/chronon/flink_connectors/pubsub/fastack/DeserializationSchemaWrapperTest.scala Added tests for DeserializationSchemaWrapper behavior and delegation.

Sequence Diagram(s)

sequenceDiagram
    participant PubSub as Pub/Sub Service
    participant Receiver as PubSubMessageReceiver
    participant Source as PubSubSource
    participant Flink as Flink Collector

    PubSub->>Receiver: Push PubsubMessage
    Receiver->>Receiver: Deserialize message (sync)
    Receiver->>Flink: Collect output
    Receiver->>PubSub: Ack message
    Note right of Flink: Processing continues in Flink job
Loading

Possibly related PRs

Suggested reviewers

  • david-zlai
  • nikhil-zlai
  • tchow-zlai

Poem

Flink and Pub/Sub now dance in sync,
Fastack acks before you blink!
New wrappers and receivers join the song,
With tests to prove they all belong.
Dependencies set, the build is tight—
Streaming onward, bytes take flight! 🚀


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d6d45a5 and 14d3580.

📒 Files selected for processing (6)
  • flink/BUILD.bazel (2 hunks)
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSource.scala (3 hunks)
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/DeserializationSchemaWrapper.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/PubSubMessageReceiver.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/PubSubSource.scala (1 hunks)
  • flink/src/test/scala/ai/chronon/flink_connectors/pubsub/fastack/DeserializationSchemaWrapperTest.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSource.scala
  • flink/src/test/scala/ai/chronon/flink_connectors/pubsub/fastack/DeserializationSchemaWrapperTest.scala
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/DeserializationSchemaWrapper.scala
  • flink/BUILD.bazel
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/PubSubMessageReceiver.scala
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/PubSubSource.scala
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
  • GitHub Check: analyzer_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: batch_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: api_tests
  • GitHub Check: service_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: flink_tests
  • GitHub Check: online_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (3)
tools/build_rules/dependencies/maven_repository.bzl (1)

86-88: grpc-api looks redundant—consider dropping it
grpc-core already brings grpc-api; an explicit entry risks future version skew & duplicate classes.

-        "io.grpc:grpc-api:1.69.0",
flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/PubSubSource.scala (2)

86-92: Remove redundant null check.

toByteArray won't return null for non-null getData.

-      .filter(pubSubMsg => pubSubMsg.getData != null && pubSubMsg.getData.toByteArray != null)
+      .filter(pubSubMsg => pubSubMsg.getData != null)

64-74: Consider backpressure handling.

Tight loop could overwhelm system under high load.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 36c2dff and 2e6bc7b.

📒 Files selected for processing (10)
  • flink/BUILD.bazel (2 hunks)
  • flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSource.scala (3 hunks)
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/DeserializationSchemaWrapper.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/FastAckGrpcPubSubSubscriber.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/PubSubSource.scala (1 hunks)
  • flink/src/test/scala/ai/chronon/flink_connectors/pubsub/fastack/DeserializationSchemaWrapperTest.scala (1 hunks)
  • flink/src/test/scala/ai/chronon/flink_connectors/pubsub/fastack/FastAckGrpcPubSubSubscriberTest.scala (1 hunks)
  • maven_install.json (4 hunks)
  • tools/build_rules/dependencies/maven_repository.bzl (1 hunks)
🧰 Additional context used
🧠 Learnings (6)
maven_install.json (3)

undefined

<retrieved_learning>
Learnt from: tchow-zlai
PR: #393
File: cloud_gcp/BUILD.bazel:99-99
Timestamp: 2025-02-22T20:30:28.381Z
Learning: The jar file "iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar" in cloud_gcp/BUILD.bazel is a local dependency and should not be replaced with maven_artifact.
</retrieved_learning>

<retrieved_learning>
Learnt from: chewy-zlai
PR: #30
File: api/py/test/sample/production/joins/risk/user_transactions.txn_join:217-218
Timestamp: 2024-10-03T17:15:03.325Z
Learning: The JSON files in this project are automatically generated and should not be manually modified or refactored.
</retrieved_learning>

<retrieved_learning>
Learnt from: chewy-zlai
PR: #30
File: api/py/test/sample/production/group_bys/risk/transaction_events.txn_group_by_user:37-38
Timestamp: 2024-10-03T17:12:58.693Z
Learning: In this project, certain JSON files are automatically generated, so suggestions to modify them manually may not be applicable.
</retrieved_learning>

tools/build_rules/dependencies/maven_repository.bzl (1)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#393
File: cloud_gcp/BUILD.bazel:99-99
Timestamp: 2025-02-22T20:30:28.381Z
Learning: The jar file "iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar" in cloud_gcp/BUILD.bazel is a local dependency and should not be replaced with maven_artifact.
flink/src/main/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSource.scala (1)
Learnt from: nikhil-zlai
PR: zipline-ai/chronon#70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import `scala.util.ScalaVersionSpecificCollectionsConverter` in `service/src/main/java/ai/chronon/service/ApiProvider.java` is correct and should not be flagged in future reviews.
flink/BUILD.bazel (1)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#393
File: cloud_gcp/BUILD.bazel:99-99
Timestamp: 2025-02-22T20:30:28.381Z
Learning: The jar file "iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar" in cloud_gcp/BUILD.bazel is a local dependency and should not be replaced with maven_artifact.
flink/src/test/scala/ai/chronon/flink_connectors/pubsub/fastack/DeserializationSchemaWrapperTest.scala (3)
Learnt from: piyush-zlai
PR: zipline-ai/chronon#33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within `DynamoDBKVStoreTest.scala` is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
Learnt from: piyush-zlai
PR: zipline-ai/chronon#44
File: hub/app/controllers/ModelController.scala:15-18
Timestamp: 2024-10-17T19:46:42.629Z
Learning: References to `MockDataService` in `hub/test/controllers/SearchControllerSpec.scala` and `hub/test/controllers/ModelControllerSpec.scala` are needed for tests and should not be removed.
Learnt from: nikhil-zlai
PR: zipline-ai/chronon#70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import `scala.util.ScalaVersionSpecificCollectionsConverter` in `service/src/main/java/ai/chronon/service/ApiProvider.java` is correct and should not be flagged in future reviews.
flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/DeserializationSchemaWrapper.scala (1)
Learnt from: nikhil-zlai
PR: zipline-ai/chronon#70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import `scala.util.ScalaVersionSpecificCollectionsConverter` in `service/src/main/java/ai/chronon/service/ApiProvider.java` is correct and should not be flagged in future reviews.
⏰ Context from checks skipped due to timeout of 90000ms (17)
  • GitHub Check: service_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: api_tests
  • GitHub Check: online_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: flink_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: groupby_tests
  • GitHub Check: bazel_config_tests
  • GitHub Check: join_tests
  • GitHub Check: spark_tests
  • GitHub Check: batch_tests
  • GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (17)
maven_install.json (3)

2-5: Autogen file—skip manual edits.
maven_install.json should only change via mvn_install rule; confirm the diff came from bazel run //:sync_deps (or equivalent).


1605-1609: Netty-shaded hash bump OK?
Ensure the new SHA matches the official 1.69.0 artifact; mismatches break fetch.


6280-6286: Removed animal-sniffer dep.
Confirm nothing still requires it; otherwise linkage errors surface at runtime.

flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala (1)

7-16: LGTM - clean histogram utility.

Good choice using ExponentiallyDecayingReservoir for time-weighted metric sampling.

flink/src/main/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSource.scala (3)

25-26: Documentation accurately reflects behavioral change.

Clear explanation of the switch to immediate acknowledgment.


63-69: Parameter mapping looks correct for builder pattern.

The build method parameters cover all necessary configuration for Pub/Sub connectivity.


4-4: Fast-ack PubSubSource build verified
The PubSubSource.scala exists and its build(groupByName, deserializationSchema, projectName, subscriptionName…) signature matches the invocation in PubSubFlinkSource.scala. All set.

flink/BUILD.bazel (2)

104-118: Necessary gRPC and metrics dependencies added.

Dependencies correctly support the new fast-ack Pub/Sub implementation.


150-160: Test dependencies mirror library dependencies.

Consistent dependency management between lib and test targets.

flink/src/test/scala/ai/chronon/flink_connectors/pubsub/fastack/FastAckGrpcPubSubSubscriberTest.scala (3)

46-71: Solid test coverage for happy path.

Good verification of pull and acknowledgment flow.


73-86: Retry logic properly tested.

Correctly verifies exhausted retries result in empty response.


117-131: Edge case handling verified.

Good test ensuring no acknowledgment when no messages received.

flink/src/test/scala/ai/chronon/flink_connectors/pubsub/fastack/DeserializationSchemaWrapperTest.scala (4)

28-36: Proper delegation testing.

Correctly verifies wrapper delegates to underlying schema.


64-74: Important unsupported operation test.

Ensures single-arg deserialize throws exception as expected.


76-88: Data extraction properly tested.

Verifies correct byte array extraction from PubsubMessage.


90-99: Null data handling verified.

Good test for edge case with empty message data.

flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/DeserializationSchemaWrapper.scala (1)

10-30: LGTM!

Clean adapter implementation.

@piyush-zlai piyush-zlai force-pushed the piyush/flink_pubsub_v2 branch from 2e6bc7b to 7e7b00e Compare July 2, 2025 15:24
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
flink/src/test/scala/ai/chronon/flink_connectors/pubsub/fastack/FastAckGrpcPubSubSubscriberTest.scala (2)

61-61: Consider using proper Empty instance instead of null cast.

-when(stubWithDeadline.acknowledge(any[AcknowledgeRequest])).thenReturn(null.asInstanceOf[com.google.protobuf.Empty])
+when(stubWithDeadline.acknowledge(any[AcknowledgeRequest])).thenReturn(com.google.protobuf.Empty.getDefaultInstance)

Also applies to: 105-105


23-28: Metrics mocks are created but never verified.

Consider adding assertions to verify metrics are recorded correctly, or remove unused mock setup.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2e6bc7b and 7e7b00e.

📒 Files selected for processing (8)
  • flink/BUILD.bazel (2 hunks)
  • flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSource.scala (3 hunks)
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/DeserializationSchemaWrapper.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/FastAckGrpcPubSubSubscriber.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/PubSubSource.scala (1 hunks)
  • flink/src/test/scala/ai/chronon/flink_connectors/pubsub/fastack/DeserializationSchemaWrapperTest.scala (1 hunks)
  • flink/src/test/scala/ai/chronon/flink_connectors/pubsub/fastack/FastAckGrpcPubSubSubscriberTest.scala (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/FastAckGrpcPubSubSubscriber.scala
🚧 Files skipped from review as they are similar to previous changes (6)
  • flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSource.scala
  • flink/src/test/scala/ai/chronon/flink_connectors/pubsub/fastack/DeserializationSchemaWrapperTest.scala
  • flink/BUILD.bazel
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/DeserializationSchemaWrapper.scala
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/PubSubSource.scala
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: piyush-zlai
PR: zipline-ai/chronon#924
File: flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/FastAckGrpcPubSubSubscriber.scala:52-61
Timestamp: 2025-07-02T14:54:50.588Z
Learning: In FastAckGrpcPubSubSubscriber class in Flink connectors, exceptions are intentionally swallowed after retries to prevent Flink application restarts. The error handling strategy returns empty sequences instead of propagating exceptions to maintain application stability.
flink/src/test/scala/ai/chronon/flink_connectors/pubsub/fastack/FastAckGrpcPubSubSubscriberTest.scala (1)
Learnt from: piyush-zlai
PR: zipline-ai/chronon#924
File: flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/FastAckGrpcPubSubSubscriber.scala:52-61
Timestamp: 2025-07-02T14:54:50.588Z
Learning: In FastAckGrpcPubSubSubscriber class in Flink connectors, exceptions are intentionally swallowed after retries to prevent Flink application restarts. The error handling strategy returns empty sequences instead of propagating exceptions to maintain application stability.
⏰ Context from checks skipped due to timeout of 90000ms (17)
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: service_tests
  • GitHub Check: api_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: online_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: flink_tests
  • GitHub Check: streaming_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: batch_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (5)
flink/src/test/scala/ai/chronon/flink_connectors/pubsub/fastack/FastAckGrpcPubSubSubscriberTest.scala (5)

1-14: LGTM - Clean imports and package structure.


17-44: Mock factory is well-structured.

Clean pattern for creating test instances with configurable parameters.


73-86: Excellent retry exhaustion test.

Properly validates the error handling strategy mentioned in retrieved learnings where exceptions are swallowed after retries.


88-115: Good retry recovery test coverage.

The thenThrow().thenReturn() pattern effectively tests retry success scenarios.


117-131: Edge case handling is solid.

Verifies no unnecessary acknowledgments occur for empty responses.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/PubSubMessageReceiver.scala (1)

22-22: Simplify null check - toByteArray never returns null.

The second null check is redundant since ByteString.toByteArray never returns null.

-    if (message.getData != null && message.getData.toByteArray != null) {
+    if (message.getData != null) {
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 09e6d11 and 1c41900.

📒 Files selected for processing (7)
  • flink/BUILD.bazel (2 hunks)
  • flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSource.scala (3 hunks)
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/DeserializationSchemaWrapper.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/PubSubMessageReceiver.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/PubSubSource.scala (1 hunks)
  • flink/src/test/scala/ai/chronon/flink_connectors/pubsub/fastack/DeserializationSchemaWrapperTest.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
  • flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala
  • flink/BUILD.bazel
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSource.scala
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/DeserializationSchemaWrapper.scala
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/PubSubSource.scala
  • flink/src/test/scala/ai/chronon/flink_connectors/pubsub/fastack/DeserializationSchemaWrapperTest.scala
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: piyush-zlai
PR: zipline-ai/chronon#924
File: flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/FastAckGrpcPubSubSubscriber.scala:52-61
Timestamp: 2025-07-02T14:54:50.615Z
Learning: In FastAckGrpcPubSubSubscriber class in Flink connectors, exceptions are intentionally swallowed after retries to prevent Flink application restarts. The error handling strategy returns empty sequences instead of propagating exceptions to maintain application stability.
flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/PubSubMessageReceiver.scala (1)
Learnt from: piyush-zlai
PR: zipline-ai/chronon#924
File: flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/FastAckGrpcPubSubSubscriber.scala:52-61
Timestamp: 2025-07-02T14:54:50.615Z
Learning: In FastAckGrpcPubSubSubscriber class in Flink connectors, exceptions are intentionally swallowed after retries to prevent Flink application restarts. The error handling strategy returns empty sequences instead of propagating exceptions to maintain application stability.
🧬 Code Graph Analysis (1)
flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/PubSubMessageReceiver.scala (2)
flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/PubSubSource.scala (1)
  • open (49-63)
api/python/ai/chronon/cli/logger.py (1)
  • require (54-57)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (17)
  • GitHub Check: service_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: online_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: api_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: flink_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: join_tests
  • GitHub Check: batch_tests
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (4)
flink/src/main/scala/ai/chronon/flink_connectors/pubsub/fastack/PubSubMessageReceiver.scala (4)

9-9: LGTM - Clean generic class design.

Good use of generic type parameter for flexible message output types.


11-14: Thread safety approach is correct.

The synchronization strategy properly handles concurrent access from multiple Pub/Sub threads while protecting the single-threaded Flink collector.


20-28: Consider error handling for deserialization failures.

If deserializationSchema.deserialize throws an exception, the message is still acknowledged, potentially causing message loss. Based on the fast-ack pattern, this may be intentional for stability.

From the retrieved learnings, I understand that exceptions are intentionally swallowed in the FastAckGrpcPubSubSubscriber to prevent application restarts. Is this the same pattern intended here, or should deserialization failures prevent acknowledgment?


26-26: Fast acknowledgment aligns with PR objectives.

The immediate acknowledgment after deserialization correctly implements the fast-ack behavior described in the PR objectives.

@piyush-zlai piyush-zlai force-pushed the piyush/flink_pubsub_v2 branch from ead7646 to d6d45a5 Compare July 8, 2025 16:11
Comment on lines +24 to +26
lockObject.synchronized {
deserializationSchema.deserialize(message, collector)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this have to be synchronized?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's not, do we just have concurrent messages overwriting each other in deser?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah so pubsub under the hood creates multiple threads per Flink task (default is 5). Each of these threads can invoke the receive message callback as and when they have events. The deserialize is performing Spark eval for us (as we do projection at the src level) - this breaks when there's multiple calls at the same time as some of the underlying spark CU code seemingly isn't thread safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh do we use cu directly? we should use it via the pooled catalyst util i think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah thus far we didn't need to use pooled cu as the behavior typically in flink is tasks get their own threads. Can look into swapping and seeing if that works in these pubsub thread callbacks

* Allows us to stay clear of tight ACK deadlines and drowning under Pub/Sub retransmits if the message volume is high / checkpoints
* fail a couple of times in a row, etc.
*
* This comes at the cost of accuracy (e.g. lost messages if the job fails after pulling but before processing), but we do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wanted to double check my understanding, does this cost us accuracy because by ack'ing the pubsub event as soon as we get it, if we fail in processing it or job goes down and comes back up, the pointer in the pubsub event has already advanced past the pubsub event since we already ack'd?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah exactly - the alternative is to hold off on ACKing till the checkpoint is taken. In case of the pubsub src the waiting till checkpoint also has an issue - if you try rolling back to a checkpoint that is not the latest. In that case we can't rollback to that 'offset' we rollback the Flink state but the PubSub offsets are set to what they were at the latest checkpoint.


private class PubSubCollector(ctx: SourceFunction.SourceContext[OUT]) extends Collector[OUT] {
override def collect(record: OUT): Unit = {
// sync on checkpoint lock as we might have multiple PubSub threads trying to collect records
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we didn't have this, would the issue be multiple pubsub threads could be collecting the same records?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah Flink's source contract is to have only one thread trigger the ctx.collect call at a time

Copy link
Contributor

@david-zlai david-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couple of questions

@piyush-zlai piyush-zlai force-pushed the piyush/flink_pubsub_v2 branch from d6d45a5 to 14d3580 Compare July 9, 2025 17:20
@piyush-zlai piyush-zlai merged commit 7a76d71 into main Jul 9, 2025
20 checks passed
@piyush-zlai piyush-zlai deleted the piyush/flink_pubsub_v2 branch July 9, 2025 17:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants