-
Notifications
You must be signed in to change notification settings - Fork 4.8k
HIVE-29238:upgrade kafka version to fix CVE-2024-31141 and CVE-2021-3… #6110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,7 +36,7 @@ | |
| <druid.derby.version>10.11.1.1</druid.derby.version> | ||
| <druid.guava.version>16.0.1</druid.guava.version> | ||
| <druid.guice.version>4.1.0</druid.guice.version> | ||
| <kafka.test.version>2.5.0</kafka.test.version> | ||
| <kafka.test.version>3.9.1</kafka.test.version> | ||
| <druid.guice.version>4.1.0</druid.guice.version> | ||
| <slf4j.version>1.7.30</slf4j.version> | ||
| </properties> | ||
|
|
@@ -226,6 +226,11 @@ | |
| <artifactId>kafka-clients</artifactId> | ||
| <version>${kafka.test.version}</version> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>org.apache.kafka</groupId> | ||
| <artifactId>kafka-server</artifactId> | ||
| <version>${kafka.test.version}</version> | ||
| </dependency> | ||
|
Comment on lines
+229
to
+233
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this new dependency? I don't see this defined in the kafka-handler module and there I see that the |
||
| <dependency> | ||
| <groupId>org.slf4j</groupId> | ||
| <artifactId>slf4j-api</artifactId> | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,7 @@ | |
| import org.apache.kafka.common.PartitionInfo; | ||
| import org.apache.kafka.common.TopicPartition; | ||
| import org.apache.kafka.common.errors.ProducerFencedException; | ||
| import org.apache.kafka.common.Uuid; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
@@ -44,6 +45,7 @@ | |
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Properties; | ||
| import java.util.Set; | ||
| import java.util.concurrent.Future; | ||
|
|
||
| /** | ||
|
|
@@ -67,6 +69,11 @@ class HiveKafkaProducer<K, V> implements Producer<K, V> { | |
| kafkaProducer = new KafkaProducer<>(properties); | ||
| } | ||
|
|
||
| @Override | ||
| public Uuid clientInstanceId(Duration timeout) { | ||
| throw new UnsupportedOperationException(); | ||
| } | ||
|
|
||
| @Override public void initTransactions() { | ||
| kafkaProducer.initTransactions(); | ||
| } | ||
|
|
@@ -138,11 +145,11 @@ synchronized void resumeTransaction(long producerId, short epoch) { | |
|
|
||
| Object transactionManager = getValue(kafkaProducer, "transactionManager"); | ||
|
|
||
| Object topicPartitionBookkeeper = getValue(transactionManager, "topicPartitionBookkeeper"); | ||
| Object txnPartitionMap = getValue(transactionManager, "txnPartitionMap"); | ||
| invoke(transactionManager, | ||
| "transitionTo", | ||
| getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING")); | ||
| invoke(topicPartitionBookkeeper, "reset"); | ||
| invoke(txnPartitionMap, "reset"); | ||
| Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch"); | ||
| setValue(producerIdAndEpoch, "producerId", producerId); | ||
| setValue(producerIdAndEpoch, "epoch", epoch); | ||
|
|
@@ -181,10 +188,15 @@ short getEpoch() { | |
| */ | ||
| private void flushNewPartitions() { | ||
| LOG.info("Flushing new partitions"); | ||
| TransactionalRequestResult result = enqueueNewPartitions(); | ||
| Object sender = getValue(kafkaProducer, "sender"); | ||
| invoke(sender, "wakeup"); | ||
| result.await(); | ||
| Object transactionManager = getValue(kafkaProducer, "transactionManager"); | ||
| Set<TopicPartition> newPartitionsInTransaction = | ||
| (Set<TopicPartition>) getValue(transactionManager, "newPartitionsInTransaction"); | ||
| if (!newPartitionsInTransaction.isEmpty()) { | ||
|
Comment on lines
+191
to
+194
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This class was originated from Apache Flink and looking at the latest version I see that the handling of new partitions is inside |
||
| TransactionalRequestResult result = enqueueNewPartitions(); | ||
| Object sender = getValue(kafkaProducer, "sender"); | ||
| invoke(sender, "wakeup"); | ||
| result.await(); | ||
| } | ||
| } | ||
|
|
||
| private synchronized TransactionalRequestResult enqueueNewPartitions() { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,6 +37,7 @@ | |
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.io.IOException; | ||
| import java.time.Duration; | ||
| import java.util.Iterator; | ||
| import java.util.Properties; | ||
|
|
||
|
|
@@ -150,7 +151,7 @@ private void cleanRowBoat() { | |
| LOG.trace("total read bytes [{}]", readBytes); | ||
| if (consumer != null) { | ||
| consumer.wakeup(); | ||
| consumer.close(); | ||
| consumer.close(Duration.ZERO); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we now passing a zero timeout? The default is not sufficient? |
||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -173,7 +173,7 @@ | |
| <junit.version>4.13.2</junit.version> | ||
| <junit.jupiter.version>5.13.3</junit.jupiter.version> | ||
| <junit.vintage.version>5.13.3</junit.vintage.version> | ||
| <kafka.version>2.5.0</kafka.version> | ||
| <kafka.version>3.9.1</kafka.version> | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I recall well there were issues with previous upgrade attempts. Please check the (git) history and related PRs for more information to ensure that code remains functional.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes I am aware of that, its wip right now, I am planning to address those issues if I get a grren label in current state |
||
| <kryo.version>5.5.0</kryo.version> | ||
| <reflectasm.version>1.11.9</reflectasm.version> | ||
| <kudu.version>1.17.0</kudu.version> | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,3 @@ | ||
| --! qt:disabled:HIVE-23985 | ||
|
|
||
| SET hive.vectorized.execution.enabled=true; | ||
|
|
||
| CREATE EXTERNAL TABLE kafka_table | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can drop this version definition here and keep only the one in the root pom.xml file.