diff --git a/pom.xml b/pom.xml
index 02554a7d6077e..ff7b32da33711 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,7 +56,7 @@
2.12.7
- 1.53
+ 1.54
7.5
8.10.0
3.8.0
@@ -67,7 +67,7 @@
1.9.17
313
1.7.32
- 2.3.1
+ 3.9.0
0.11.0
30.0.1
2.3.1
@@ -207,6 +207,13 @@
presto-native-sidecar-plugin
+
+
+ jitpack.io
+ https://jitpack.io
+
+
+
@@ -1294,7 +1301,7 @@
org.pcollections
pcollections
- 2.1.2
+ 4.0.1
@@ -1811,6 +1818,18 @@
org.apache.zookeeper
zookeeper
+
+ commons-logging
+ commons-logging
+
+
+ com.fasterxml.jackson.dataformat
+ jackson-dataformat-csv
+
+
+ commons-io
+ commons-io
+
@@ -1935,6 +1954,14 @@
org.slf4j
slf4j-log4j12
+
+ ch.qos.logback
+ *
+
+
+ io.netty
+ *
+
@@ -1963,9 +1990,9 @@
- io.prestodb.tempto
+ com.github.adkharat.tempto
tempto-core
- ${dep.tempto.version}
+ upgrade_kafka_client_version_cve_2022_34917-SNAPSHOT
com.datastax.cassandra
@@ -1979,9 +2006,9 @@
- io.prestodb.tempto
+ com.github.adkharat.tempto
tempto-ldap
- ${dep.tempto.version}
+ upgrade_kafka_client_version_cve_2022_34917-SNAPSHOT
com.datastax.cassandra
@@ -1991,9 +2018,9 @@
- io.prestodb.tempto
+ com.github.adkharat.tempto
tempto-kafka
- ${dep.tempto.version}
+ upgrade_kafka_client_version_cve_2022_34917-SNAPSHOT
com.datastax.cassandra
@@ -2003,13 +2030,17 @@
org.slf4j
slf4j-log4j12
+
+ org.apache.zookeeper
+ zookeeper
+
- io.prestodb.tempto
+ com.github.adkharat.tempto
tempto-runner
- ${dep.tempto.version}
+ upgrade_kafka_client_version_cve_2022_34917-SNAPSHOT
com.datastax.cassandra
diff --git a/presto-kafka/pom.xml b/presto-kafka/pom.xml
index 9327e2646510a..d71e14f0d0477 100644
--- a/presto-kafka/pom.xml
+++ b/presto-kafka/pom.xml
@@ -18,6 +18,11 @@
+
+ org.slf4j
+ slf4j-api
+
+
com.facebook.airlift
bootstrap
@@ -234,6 +239,19 @@
${scala.version}
test
+
+
+ org.apache.curator
+ curator-test
+ 5.6.0
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+
+
+
@@ -248,6 +266,16 @@
+
+ org.basepom.maven
+ duplicate-finder-maven-plugin
+
+
+ kafka/kafka-version.properties
+ about.html
+
+
+
diff --git a/presto-kafka/src/test/java/com/facebook/presto/kafka/util/EmbeddedKafka.java b/presto-kafka/src/test/java/com/facebook/presto/kafka/util/EmbeddedKafka.java
index 8b0adfe9662f4..7dad586803826 100644
--- a/presto-kafka/src/test/java/com/facebook/presto/kafka/util/EmbeddedKafka.java
+++ b/presto-kafka/src/test/java/com/facebook/presto/kafka/util/EmbeddedKafka.java
@@ -16,19 +16,22 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
-import kafka.utils.ZkUtils;
+import kafka.server.KafkaServer;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.utils.Time;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.facebook.presto.kafka.util.TestUtils.findUnusedPort;
@@ -45,27 +48,28 @@
public class EmbeddedKafka
implements Closeable
{
- private final EmbeddedZookeeper zookeeper;
+ private final ZooKeeperEmbedded zookeeper;
private final int port;
private final File kafkaDataDir;
- private final KafkaServerStartable kafka;
+ private final KafkaServer kafka;
+ private final AdminClient adminClient;
private final AtomicBoolean started = new AtomicBoolean();
private final AtomicBoolean stopped = new AtomicBoolean();
public static EmbeddedKafka createEmbeddedKafka()
- throws IOException
+ throws Exception
{
- return new EmbeddedKafka(new EmbeddedZookeeper(), new Properties());
+ return new EmbeddedKafka(new ZooKeeperEmbedded(findUnusedPort()), new Properties());
}
public static EmbeddedKafka createEmbeddedKafka(Properties overrideProperties)
- throws IOException
+ throws Exception
{
- return new EmbeddedKafka(new EmbeddedZookeeper(), overrideProperties);
+ return new EmbeddedKafka(new ZooKeeperEmbedded(findUnusedPort()), overrideProperties);
}
- EmbeddedKafka(EmbeddedZookeeper zookeeper, Properties overrideProperties)
+ EmbeddedKafka(ZooKeeperEmbedded zookeeper, Properties overrideProperties)
throws IOException
{
this.zookeeper = requireNonNull(zookeeper, "zookeeper is null");
@@ -77,6 +81,7 @@ public static EmbeddedKafka createEmbeddedKafka(Properties overrideProperties)
Map properties = ImmutableMap.builder()
.put("broker.id", "0")
.put("host.name", "localhost")
+ .put("listeners", "PLAINTEXT://localhost:" + getPort())
.put("num.partitions", "2")
.put("log.flush.interval.messages", "10000")
.put("log.flush.interval.ms", "1000")
@@ -85,20 +90,24 @@ public static EmbeddedKafka createEmbeddedKafka(Properties overrideProperties)
.put("zookeeper.connection.timeout.ms", "1000000")
.put("port", Integer.toString(port))
.put("log.dirs", kafkaDataDir.getAbsolutePath())
- .put("zookeeper.connect", zookeeper.getConnectString())
+ .put("zookeeper.connect", zookeeper.connectString())
.put("offsets.topic.replication.factor", "1")
.putAll(Maps.fromProperties(overrideProperties))
.build();
KafkaConfig config = new KafkaConfig(toProperties(properties));
- this.kafka = new KafkaServerStartable(config);
+ Time time = Time.SYSTEM;
+ this.kafka = new KafkaServer(config, time, scala.Option.empty(), false);
+ Properties adminProps = new Properties();
+ adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getConnectString());
+ adminProps.put("log.level", "DEBUG");
+ this.adminClient = AdminClient.create(adminProps);
}
public void start()
throws InterruptedException, IOException
{
if (!started.getAndSet(true)) {
- zookeeper.start();
kafka.startup();
}
}
@@ -110,8 +119,9 @@ public void close()
if (started.get() && !stopped.getAndSet(true)) {
kafka.shutdown();
kafka.awaitShutdown();
- zookeeper.close();
+ zookeeper.stop();
deleteRecursively(kafkaDataDir.toPath(), ALLOW_INSECURE);
+ adminClient.close();
}
}
@@ -123,15 +133,15 @@ public void createTopics(String... topics)
public void createTopics(int partitions, int replication, Properties topicProperties, String... topics)
{
checkState(started.get() && !stopped.get(), "not started!");
-
- ZkUtils zkUtils = ZkUtils.apply(getZookeeperConnectString(), 30_000, 30_000, false);
try {
for (String topic : topics) {
- AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicProperties, RackAwareMode.Disabled$.MODULE$);
+ NewTopic newTopic = new NewTopic(topic, partitions, (short) replication);
+ newTopic.configs(Maps.fromProperties(topicProperties));
+ adminClient.createTopics(Collections.singleton(newTopic)).all().get();
}
}
- finally {
- zkUtils.close();
+ catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException("Failed to create topics", e);
}
}
@@ -146,11 +156,6 @@ public KafkaProducer createProducer()
return new KafkaProducer<>(properties);
}
- public int getZookeeperPort()
- {
- return zookeeper.getPort();
- }
-
public int getPort()
{
return port;
@@ -163,6 +168,6 @@ public String getConnectString()
public String getZookeeperConnectString()
{
- return zookeeper.getConnectString();
+ return zookeeper.connectString();
}
}
diff --git a/presto-kafka/src/test/java/com/facebook/presto/kafka/util/ZooKeeperEmbedded.java b/presto-kafka/src/test/java/com/facebook/presto/kafka/util/ZooKeeperEmbedded.java
new file mode 100644
index 0000000000000..9bf19ad21a851
--- /dev/null
+++ b/presto-kafka/src/test/java/com/facebook/presto/kafka/util/ZooKeeperEmbedded.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.kafka.util;
+
+import org.apache.curator.test.TestingServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class ZooKeeperEmbedded
+{
+ private static final Logger log = LoggerFactory.getLogger(ZooKeeperEmbedded.class);
+
+ private final TestingServer server;
+
+ /**
+ * Starts a ZooKeeper instance that listens at the defined port.
+ *
+ * @param port The port (aka `clientPort`) to listen to. Default: 2181.
+ * @throws Exception
+ */
+ public ZooKeeperEmbedded(int port) throws Exception
+ {
+ log.debug("Starting embedded ZooKeeper server on port {} ...", port);
+ this.server = new TestingServer(port);
+ }
+
+ public void stop() throws IOException
+ {
+ log.debug("Shutting down embedded ZooKeeper server at {} ...", server.getConnectString());
+ server.close();
+ log.debug("Shutdown of embedded ZooKeeper server at {} completed", server.getConnectString());
+ }
+
+ /**
+ * The ZooKeeper connection string aka `zookeeper.connect` in `hostnameOrIp:port` format.
+ * Example: `127.0.0.1:2181`.
+ *
+ * You can use this to e.g. tell Kafka brokers how to connect to this instance.
+ */
+ public String connectString()
+ {
+ return server.getConnectString();
+ }
+
+ /**
+ * The hostname of the ZooKeeper instance. Example: `127.0.0.1`
+ */
+ public String hostname()
+ {
+ // "server:1:2:3" -> "server:1:2"
+ return connectString().substring(0, connectString().lastIndexOf(':'));
+ }
+}
diff --git a/presto-product-tests/pom.xml b/presto-product-tests/pom.xml
index 49c2bf79a304c..51fcfa2d2a1e7 100644
--- a/presto-product-tests/pom.xml
+++ b/presto-product-tests/pom.xml
@@ -18,6 +18,13 @@
2.12.2
+
+
+ jitpack.io
+ https://jitpack.io
+
+
+
@@ -55,19 +62,25 @@
- io.prestodb.tempto
+ com.github.adkharat.tempto
tempto-core
- io.prestodb.tempto
+ com.github.adkharat.tempto
tempto-ldap
- io.prestodb.tempto
+ com.github.adkharat.tempto
tempto-kafka
+
+
+ commons-cli
+ commons-cli
+
+
- io.prestodb.tempto
+ com.github.adkharat.tempto
tempto-runner