Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Hotfix][Connector-V2][kafka]Kafka consumer group automatically commits offset logic error fix #6961

Merged

Conversation

zhangshenghang
Copy link
Member

@zhangshenghang zhangshenghang commented Jun 7, 2024

Purpose of this pull request

Kafka consumer group automatically commits offset logic error fix.

When Kafka Source Options sets commit_on_checkpoint to true, Checkpoint is enabled, and Kafka Offset is manually committed in KafkaSourceReader. At this time, the Kafka consumer group should be set to manually commit Offset, but now the initConsumer method in the KafkaConsumerThread class is automatically committed props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(autoCommit));

Similarly, when commit_on_checkpoint is set to false, props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(autoCommit)); will also be set to false. At this time, Offset is neither manually committed nor automatically committed, This will cause the consumer group offset to never be committed.

I think this is a Bug. I have fixed it.

KafkaSourceReader.java

public void notifyCheckpointComplete(long checkpointId) {
        if (!checkpointOffsetMap.containsKey(checkpointId)) {
            log.warn("checkpoint {} do not exist or have already been committed.", checkpointId);
        } else {
            checkpointOffsetMap
                    .remove(checkpointId)
                    .forEach(
                            (topicPartition, offset) -> {
                                try {
                                    consumerThreadMap
                                            .get(topicPartition)
                                            .getTasks()
                                            .put(
                                                    consumer -> {
                                                        if (kafkaSourceConfig
                                                                .isCommitOnCheckpoint()) {
                                                            Map<TopicPartition, OffsetAndMetadata>
                                                                    offsets = new HashMap<>();
                                                            if (offset >= 0) {
                                                                offsets.put(
                                                                        topicPartition,
                                                                        new OffsetAndMetadata(
                                                                                offset));
                                                                consumer.commitSync(offsets);
                                                            }
                                                        }
                                                    });
                                } catch (InterruptedException e) {
                                    log.error("commit offset to kafka failed", e);
                                }
                            });
        }
    }

KafkaConsumerThread.java

...
    public KafkaConsumerThread(KafkaSourceConfig kafkaSourceConfig, ConsumerMetadata metadata) {
        this.metadata = metadata;
        this.tasks = new LinkedBlockingQueue<>();
        this.consumer =
                initConsumer(
                        kafkaSourceConfig.getBootstrap(),
                        metadata.getConsumerGroup(),
                        kafkaSourceConfig.getProperties(),
                        kafkaSourceConfig.isCommitOnCheckpoint());
    }
...
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(autoCommit));
...

Check list

@zhangshenghang zhangshenghang force-pushed the hotfix-connector-v2-kafka-auto-commit branch from 9fb56fa to 6dc6f12 Compare June 7, 2024 07:20
Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @zhangshenghang ! It's broken by #5992. cc @hailin0 @EricJoy2048 @zhilinli123

@hailin0 hailin0 merged commit 181f01e into apache:dev Jun 7, 2024
4 checks passed
chaorongzhi pushed a commit to chaorongzhi/seatunnel that referenced this pull request Aug 21, 2024
@zhangshenghang zhangshenghang deleted the hotfix-connector-v2-kafka-auto-commit branch September 6, 2024 02:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants