Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Oct 19, 2022

What changes were proposed in this pull request?

This PR proposes to flip the default value of Kafka offset fetching config (spark.sql.streaming.kafka.useDeprecatedOffsetFetching) from true to false, which enables AdminClient based offset fetching by default.

Why are the changes needed?

We had been encountered several production issues with old offset fetching (e.g. hang, issue with Kafka consumer group rebalance) which could be mitigated with new offset fetching. Despite the breaking change on the ACL, there is no need for moderate users to suffer from the old way.

The discussion went through the dev. mailing list: https://lists.apache.org/thread/spkco94gw33sj8355mhlxz1vl7gl1g5c

Does this PR introduce any user-facing change?

Yes, especially users who relies on Kafka ACL based on consumer group. They need to either adjust the ACL to topic based one, or set the value to true for spark.sql.streaming.kafka.useDeprecatedOffsetFetching to use the old approach.

How was this patch tested?

Existing UTs.

@HeartSaVioR
Copy link
Contributor Author

cc. @gaborgsomogyi @dongjoon-hyun

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I would mention one minor thing somewhere. There will be time when Kafka is not going to provide the old way of offset fetching so migration highly is advised. When the consumer is not going to be able to fetch offsets then the community has basically 2 options:

  • Drop the old feature
  • Not upgrading Kafka version

Temporarily not upgrading Kafka can be done but the megatrend is that the old feature will be dropped. Maybe we can deprecate the old fetching feature now?!

@HeartSaVioR
Copy link
Contributor Author

Kafka community deprecated the method poll(long) in Kafka 2.0 and couldn't remove the method in Kafka 3.0. They may seem to see lots of existing usage and be hesitant to remove it. I'd guess the earliest version they can reconsider is Kafka 4.0 which is not something we need to worry for now. Maybe Spark 4.0?

@gaborgsomogyi
Copy link
Contributor

Sounds fair, ship it then :)

@dongjoon-hyun
Copy link
Member

cc @viirya , too

@HeartSaVioR
Copy link
Contributor Author

https://github.com/HeartSaVioR/spark/actions/runs/3279380674/jobs/5400649338

Tests are failing from below suites

  • CastWithAnsiOffSuite
  • TryCastSuite
  • RebaseDateTimeSuite
  • TimestampFormatterSuite
  • python/pyspark/sql/tests/connect/test_connect_column_expressions.py

which none of them are related to this change.

@HeartSaVioR
Copy link
Contributor Author

Thanks! Merging to master.

SandishKumarHN pushed a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
### What changes were proposed in this pull request?

This PR proposes to flip the default value of Kafka offset fetching config (`spark.sql.streaming.kafka.useDeprecatedOffsetFetching`) from `true` to `false`, which enables AdminClient based offset fetching by default.

### Why are the changes needed?

We had been encountered several production issues with old offset fetching (e.g. hang, issue with Kafka consumer group rebalance) which could be mitigated with new offset fetching. Despite the breaking change on the ACL, there is no need for moderate users to suffer from the old way.

The discussion went through the dev. mailing list: https://lists.apache.org/thread/spkco94gw33sj8355mhlxz1vl7gl1g5c

### Does this PR introduce _any_ user-facing change?

Yes, especially users who relies on Kafka ACL based on consumer group. They need to either adjust the ACL to topic based one, or set the value to `true` for `spark.sql.streaming.kafka.useDeprecatedOffsetFetching` to use the old approach.

### How was this patch tested?

Existing UTs.

Closes apache#38306 from HeartSaVioR/SPARK-40844.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
HeartSaVioR pushed a commit that referenced this pull request Dec 16, 2024
…afkaOffsetReaderConsumer`

### What changes were proposed in this pull request?

This was missed when the default value of configuration is changed via the following.
- #38306

KafkaMicroBatchSourceSuite consists of set of different suites, where KafkaMicroBatchSourceSuiteBase based suite is defined. There are 4 implementations of this abstract class for now:
  1. `KafkaMicroBatchV1SourceSuite` - V1 source that supposes to use `KafkaOffsetReaderConsumer` as `KafkaOffsetReader`.
  2. `KafkaMicroBatchV2SourceSuite` - V2 source that supposes to use `KafkaOffsetReaderConsumer` as `KafkaOffsetReader`.
  3. `KafkaMicroBatchV1SourceWithAdminSuite` - V1 source that uses `KafkaOffsetReaderAdmin` as `KafkaOffsetReader`.
  4. `KafkaMicroBatchV2SourceWithAdminSuite` - V2 source that uses `KafkaOffsetReaderAdmin` as `KafkaOffsetReader`.
But `KafkaMicroBatchV1SourceSuite` and `KafkaMicroBatchV2SourceSuite` are still running based on `KafkaOffsetReaderAdmin`, as `USE_DEPRECATED_KAFKA_OFFSET_FETCHING` is `false` be default. By switching it to `true` in `beforeAll`, we make sure that corresponding `KafkaOffsetReader` is in use.

### Why are the changes needed?

To improve unit tests coverage for `KafkaOffsetReaderConsumer`

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit Tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #49164 from ostronaut/hotifx/KafkaMicroBatchSourceSuite-cover-KafkaOffsetReaderConsumer.

Authored-by: Dima <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants