Skip to content

Conversation

@piyush-zlai
Copy link
Contributor

@piyush-zlai piyush-zlai commented Jun 20, 2025

Summary

While letting the PubSub canary job run for a prolonged period of time with the timeout updates in #877 I noticed that they were set too low. PubSub seems to refresh connections / credentials every hour or so which results in a bit of a latency spike while pulling messages. At 1s we hit this timeout and as the underlying PubSub source throws rather than swallows the error we see the Flink job restart (which results in a couple of mins of ensuing lag). I have bumped this timeout further to 5s and let the job run for a few hours and we don't see the restarts kicking in. We still see the periodic spikes (digging into that async) that result in lag going up to 2-2.5s every hour ish but that's a lot better than the full job restart.

I made this and some of the other settings configurable (similar to what we have for Kafka sources) so bumping timeouts / retries etc won't require a core chronon engine change and deploy going forward.
Screenshot 2025-06-20 at 10 49 31 AM

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

  • New Features

    • Added support for customizing message pull behavior in PubSub sources using properties, including messages per pull, maximum retries, and pull timeout.
  • Chores

    • Updated default values for message pulling and made them configurable via user-defined properties.

@piyush-zlai piyush-zlai requested a review from david-zlai June 20, 2025 14:50
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jun 20, 2025

Walkthrough

The PubSubFlinkSource now supports user-configurable message pulling parameters via properties, replacing previously hardcoded values for messages per pull, pull timeout, and max retries. New property keys and default values are introduced, and the builder is updated to use these configurable options.

Changes

File(s) Change Summary
flink/.../PubSubFlinkSource.scala Replaced fixed message pulling constants with configurable properties; added public keys and updated builder.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant PubSubFlinkSource
    participant Builder

    User->>PubSubFlinkSource: Provide properties map
    PubSubFlinkSource->>PubSubFlinkSource: Read messages_per_pull, max_retries, pull_timeout_ms (with defaults)
    PubSubFlinkSource->>Builder: Pass parameters to builder
    Builder-->>PubSubFlinkSource: Build source with user or default configs
Loading

Possibly related PRs

Suggested reviewers

  • david-zlai
  • tchow-zlai

Poem

Configs unlocked, the code now sings,
No more constants, just flexible things.
Pulls and retries, set as you please,
PubSub flows onward, with elegant ease.
🎛️✨


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ad32e77 and 98ce344.

📒 Files selected for processing (1)
  • flink/src/main/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSource.scala (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (14)
  • GitHub Check: flink_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: service_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: online_tests
  • GitHub Check: groupby_tests
  • GitHub Check: api_tests
  • GitHub Check: spark_tests
  • GitHub Check: batch_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: streaming_tests
  • GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (5)
flink/src/main/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSource.scala (5)

37-42: LGTM: Clean configuration pattern.

Follows existing codebase conventions with proper fallback to sensible default.


43-47: LGTM: Consistent implementation.

Matches the pattern established for other configurable parameters.


48-52: LGTM: Addresses the core timeout issue.

The 5-second default resolves the credential refresh timeouts mentioned in the PR objectives.


66-66: LGTM: Proper use of configurable parameters.

Correctly replaces hardcoded values with the new configurable options.


83-85: LGTM: Clear property naming.

Property keys are descriptive and follow consistent naming conventions.


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@piyush-zlai piyush-zlai requested a review from tchow-zlai June 20, 2025 14:50
@piyush-zlai piyush-zlai merged commit d92f76d into main Jun 20, 2025
20 checks passed
@piyush-zlai piyush-zlai deleted the piyush/tweak_pubsub_settings branch June 20, 2025 18:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants