Skip to content

Commit f76da89

Browse files
committed
Use AdminClient for creating partitions
1 parent 0a22686 commit f76da89

File tree

1 file changed

+10
-5
lines changed

1 file changed

+10
-5
lines changed

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ import kafka.api.Request
3232
import kafka.server.{KafkaConfig, KafkaServer}
3333
import kafka.server.checkpoints.OffsetCheckpointFile
3434
import kafka.utils.ZkUtils
35-
import kafka.zk.{AdminZkClient, KafkaZkClient}
35+
import kafka.zk.KafkaZkClient
36+
import org.apache.kafka.clients.CommonClientConfigs;
37+
import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, NewPartitions}
3638
import org.apache.kafka.clients.consumer.KafkaConsumer
3739
import org.apache.kafka.clients.producer._
3840
import org.apache.kafka.common.TopicPartition
@@ -65,7 +67,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
6567

6668
private var zkUtils: ZkUtils = _
6769
private var zkClient: KafkaZkClient = null
68-
private var adminZkClient: AdminZkClient = null
70+
private var adminClient: AdminClient = null
6971

7072
// Kafka broker related configurations
7173
private val brokerHost = "localhost"
@@ -107,7 +109,6 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
107109
zkPort = zookeeper.actualPort
108110
zkUtils = ZkUtils(zkSvr, zkSessionTimeout, zkConnectionTimeout, false)
109111
zkClient = KafkaZkClient(zkSvr, false, 6000, 10000, Int.MaxValue, Time.SYSTEM)
110-
adminZkClient = new AdminZkClient(zkClient)
111112
zkReady = true
112113
}
113114

@@ -126,6 +127,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
126127
}, new SparkConf(), "KafkaBroker")
127128

128129
brokerReady = true
130+
val props = new Properties()
131+
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "$brokerHost:$brokerPort")
132+
adminClient = AdminClient.create(props)
129133
}
130134

131135
/** setup the whole embedded servers, including Zookeeper and Kafka brokers */
@@ -215,8 +219,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
215219
collection.immutable.Set(topic)).map {
216220
case (topicPartition, replicas) => topicPartition.partition -> replicas
217221
}
218-
adminZkClient.addPartitions(topic, existingAssignment, adminZkClient.getBrokerMetadatas(),
219-
partitions)
222+
val actuallyDoIt = new CreatePartitionsOptions().validateOnly(false)
223+
adminClient.createPartitions(Map(topic ->
224+
NewPartitions.increaseTo(partitions)).asJava, actuallyDoIt)
220225
// wait until metadata is propagated
221226
(0 until partitions).foreach { p =>
222227
waitUntilMetadataIsPropagated(topic, p)

0 commit comments

Comments
 (0)