diff --git a/seatunnel-connectors-v2/connector-kafka/pom.xml b/seatunnel-connectors-v2/connector-kafka/pom.xml index 9ffff7dbc0c..dceb19f7d28 100644 --- a/seatunnel-connectors-v2/connector-kafka/pom.xml +++ b/seatunnel-connectors-v2/connector-kafka/pom.xml @@ -30,7 +30,7 @@ SeaTunnel : Connectors V2 : Kafka - 3.2.0 + 3.4.0 1.9.8.Final diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java index 33d2caeb933..949a40a7706 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java @@ -124,15 +124,15 @@ public void resumeTransaction(long producerId, short epoch, boolean txnStarted) Object transactionManager = getTransactionManager(); synchronized (transactionManager) { - Object topicPartitionBookkeeper = + Object txnPartitionMap = ReflectionUtils.getField( transactionManager, transactionManager.getClass(), - "topicPartitionBookkeeper") + "txnPartitionMap") .get(); transitionTransactionManagerStateTo(transactionManager, "INITIALIZING"); - ReflectionUtils.invoke(topicPartitionBookkeeper, "reset"); + ReflectionUtils.invoke(txnPartitionMap, "reset"); ReflectionUtils.setField( transactionManager, diff --git a/seatunnel-formats/seatunnel-format-compatible-connect-json/pom.xml b/seatunnel-formats/seatunnel-format-compatible-connect-json/pom.xml index 9b4ea874cfd..3b30715ddd6 100644 --- a/seatunnel-formats/seatunnel-format-compatible-connect-json/pom.xml +++ b/seatunnel-formats/seatunnel-format-compatible-connect-json/pom.xml @@ -46,14 +46,14 @@ org.apache.kafka kafka-clients - 3.2.0 + 3.4.0 provided org.apache.kafka connect-json - 3.2.0 + 3.4.0 provided