Skip to content

Commit e6a52b2

Browse files
committed
[#noissue] Explicitly create topics for kafka-streams integration tests
1 parent b1af639 commit e6a52b2

File tree

1 file changed

+22
-4
lines changed

1 file changed

+22
-4
lines changed

plugins-it/kafka-it/kafka-3-it/src/test/java/test/pinpoint/plugin/kafka/KafkaStreamsUnitServer.java

+22-4
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,28 @@
1717
package test.pinpoint.plugin.kafka;
1818

1919
import com.navercorp.pinpoint.test.plugin.shared.SharedTestLifeCycle;
20+
import org.apache.kafka.clients.admin.AdminClient;
21+
import org.apache.kafka.clients.admin.NewTopic;
2022
import org.apache.kafka.streams.errors.StreamsException;
21-
import org.apache.logging.log4j.LogManager;
22-
import org.apache.logging.log4j.Logger;
2323
import org.junit.Assume;
2424
import org.testcontainers.DockerClientFactory;
2525
import org.testcontainers.containers.KafkaContainer;
2626
import org.testcontainers.utility.DockerImageName;
2727

2828
import java.util.Properties;
29+
import java.util.List;
30+
import java.util.ArrayList;
31+
import java.util.Map;
32+
import java.util.HashMap;
33+
34+
import static test.pinpoint.plugin.kafka.KafkaITConstants.INPUT_TOPIC;
35+
import static test.pinpoint.plugin.kafka.KafkaITConstants.OUTPUT_TOPIC;
2936

3037
/**
3138
* Copy of https://github.com/chbatey/kafka-unit/blob/master/src/main/java/info/batey/kafka/unit/KafkaUnit.java
3239
* Some codes have been modified for testing from the copied code.
3340
*/
3441
public class KafkaStreamsUnitServer implements SharedTestLifeCycle {
35-
private static final Logger logger = LogManager.getLogger(KafkaStreamsUnitServer.class);
36-
3742

3843
private KafkaContainer container;
3944
private TestStream TEST_STREAM;
@@ -48,6 +53,8 @@ public Properties beforeAll() {
4853

4954
String brokerUrl = "localhost:" + port;
5055

56+
createTopics(brokerUrl);
57+
5158
TEST_STREAM = new TestStream(brokerUrl);
5259
TEST_STREAM.start();
5360
System.out.println();
@@ -57,6 +64,17 @@ public Properties beforeAll() {
5764
return properties;
5865
}
5966

67+
private static void createTopics(String brokerUrl) {
68+
List<NewTopic> toCreate = new ArrayList<>();
69+
toCreate.add(new NewTopic(INPUT_TOPIC, 1, (short) 1));
70+
toCreate.add(new NewTopic(OUTPUT_TOPIC, 1, (short) 1));
71+
72+
Map<String, Object> config = new HashMap<>();
73+
config.put("bootstrap.servers", brokerUrl);
74+
75+
AdminClient.create(config).createTopics(toCreate);
76+
}
77+
6078
@Override
6179
public void afterAll() {
6280
if (TEST_STREAM != null) {

0 commit comments

Comments
 (0)