Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sdf kafka poll latencies #34275

Merged
merged 4 commits into from
Mar 20, 2025

Conversation

Naireen
Copy link
Contributor

@Naireen Naireen commented Mar 13, 2025

Adds sdf poll metrics to kafka for runner v2

Should be merged after #34244


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@Naireen
Copy link
Contributor Author

Naireen commented Mar 13, 2025

R: @scwhittle

Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@Naireen Naireen force-pushed the add_sdf_kafka_poll_latencies branch from 6f9eb14 to 31585e7 Compare March 13, 2025 16:59
@Naireen
Copy link
Contributor Author

Naireen commented Mar 14, 2025

Run Java PreCommit

@Naireen Naireen requested a review from scwhittle March 14, 2025 08:01
rawRecords = poll(consumer, kafkaSourceDescriptor.getTopicPartition());
KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics();
rawRecords = poll(consumer, kafkaSourceDescriptor.getTopicPartition(), kafkaMetrics);
kafkaMetrics.flushBufferedMetrics();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about not flushing here and just falling through?

would it be ok to just flush once outside the while loop with a try { } finally { kafkaMetrics.flushBufferedMetrics(); } block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't do it because of the returns, but apparently the finally block will run and then return, so that will work. I have modified the code.

@Naireen
Copy link
Contributor Author

Naireen commented Mar 14, 2025

Run Java PreCommit

@Naireen Naireen requested a review from scwhittle March 14, 2025 23:58
@Naireen
Copy link
Contributor Author

Naireen commented Mar 14, 2025

Run Java PreCommit

} else {
Preconditions.checkStateNotNull(this.extractOutputTimestampFn);
outputTimestamp = extractOutputTimestampFn.apply(kafkaRecord);
KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we instead have the metrics lifetime longer? Seems like avoiding allocation/flush per pull will be better for performance.

KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics();
try {
while (true) {

}
} finally {
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the way the container is implemented is that once its flushed, we dont reuse it. I'm less usre of the context behind this decision, I dont see why we cant change it, as I agree it would make it more efficient, I'll leave that for a subsequent PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think it would only be flushed once. it would just live the whole time and be flushed at the end once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@Naireen Naireen force-pushed the add_sdf_kafka_poll_latencies branch from 5ad81c7 to 4ac4b06 Compare March 20, 2025 18:29
@Naireen Naireen requested a review from scwhittle March 20, 2025 19:06
@liferoad liferoad added this to the 2.64.0 Release milestone Mar 20, 2025
@Naireen Naireen force-pushed the add_sdf_kafka_poll_latencies branch from 4ac4b06 to a900a43 Compare March 20, 2025 19:15
@Naireen Naireen force-pushed the add_sdf_kafka_poll_latencies branch from a900a43 to ad9471a Compare March 20, 2025 19:32
} else {
Preconditions.checkStateNotNull(this.extractOutputTimestampFn);
outputTimestamp = extractOutputTimestampFn.apply(kafkaRecord);
KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think it would only be flushed once. it would just live the whole time and be flushed at the end once.

@Naireen Naireen force-pushed the add_sdf_kafka_poll_latencies branch from ad9471a to e72dc1e Compare March 20, 2025 20:00
@Naireen Naireen requested a review from scwhittle March 20, 2025 20:31
Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@Naireen
Copy link
Contributor Author

Naireen commented Mar 20, 2025

Run Java PreCommit)

@Naireen
Copy link
Contributor Author

Naireen commented Mar 20, 2025

Run Java PreCommit

@liferoad
Copy link
Contributor

@liferoad
Copy link
Contributor

liferoad commented Mar 20, 2025

testAtLeastOnceWithAutoSchemaUpdate[1] (org.apache.beam.sdk.io.gcp.bigquery.StorageApiSinkSchemaUpdateIT) failed

sdks/java/io/google-cloud-platform/build/test-results/integrationTest/TEST-org.apache.beam.sdk.io.gcp.bigquery.StorageApiSinkSchemaUpdateIT.xml [took 5m 58s]

@liferoad
Copy link
Contributor

liferoad commented Mar 20, 2025

:runners:google-cloud-dataflow-java:worker:test FAILED |  
-- | --
  |   |  
  | org.apache.beam.runners.dataflow.worker.streaming.harness.FanOutStreamingEngineWorkerHarnessTest > testOnNewWorkerMetadata_redistributesBudget FAILED |  
  | java.lang.AssertionError at GrpcCleanupRule.java:201 |  
  |   |  
  | 1159 tests completed, 1 failed, 3 skipped

@liferoad liferoad merged commit 4450031 into apache:master Mar 20, 2025
22 of 23 checks passed
prodriguezdefino pushed a commit to prodriguezdefino/beam-pabs that referenced this pull request Mar 26, 2025
* add kafka sdf metrics

* address comments

* address more comments

* address comments

---------

Co-authored-by: Naireen <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants