17
17
package test .pinpoint .plugin .kafka ;
18
18
19
19
import com .navercorp .pinpoint .test .plugin .shared .SharedTestLifeCycle ;
20
+ import org .apache .kafka .clients .admin .AdminClient ;
21
+ import org .apache .kafka .clients .admin .NewTopic ;
20
22
import org .apache .kafka .streams .errors .StreamsException ;
21
- import org .apache .logging .log4j .LogManager ;
22
- import org .apache .logging .log4j .Logger ;
23
23
import org .junit .Assume ;
24
24
import org .testcontainers .DockerClientFactory ;
25
25
import org .testcontainers .containers .KafkaContainer ;
26
26
import org .testcontainers .utility .DockerImageName ;
27
27
28
28
import java .util .Properties ;
29
+ import java .util .List ;
30
+ import java .util .ArrayList ;
31
+ import java .util .Map ;
32
+ import java .util .HashMap ;
33
+
34
+ import static test .pinpoint .plugin .kafka .KafkaITConstants .INPUT_TOPIC ;
35
+ import static test .pinpoint .plugin .kafka .KafkaITConstants .OUTPUT_TOPIC ;
29
36
30
37
/**
31
38
* Copy of https://github.com/chbatey/kafka-unit/blob/master/src/main/java/info/batey/kafka/unit/KafkaUnit.java
32
39
* Some codes have been modified for testing from the copied code.
33
40
*/
34
41
public class KafkaStreamsUnitServer implements SharedTestLifeCycle {
35
- private static final Logger logger = LogManager .getLogger (KafkaStreamsUnitServer .class );
36
-
37
42
38
43
private KafkaContainer container ;
39
44
private TestStream TEST_STREAM ;
@@ -48,6 +53,8 @@ public Properties beforeAll() {
48
53
49
54
String brokerUrl = "localhost:" + port ;
50
55
56
+ createTopics (brokerUrl );
57
+
51
58
TEST_STREAM = new TestStream (brokerUrl );
52
59
TEST_STREAM .start ();
53
60
System .out .println ();
@@ -57,6 +64,17 @@ public Properties beforeAll() {
57
64
return properties ;
58
65
}
59
66
67
+ private static void createTopics (String brokerUrl ) {
68
+ List <NewTopic > toCreate = new ArrayList <>();
69
+ toCreate .add (new NewTopic (INPUT_TOPIC , 1 , (short ) 1 ));
70
+ toCreate .add (new NewTopic (OUTPUT_TOPIC , 1 , (short ) 1 ));
71
+
72
+ Map <String , Object > config = new HashMap <>();
73
+ config .put ("bootstrap.servers" , brokerUrl );
74
+
75
+ AdminClient .create (config ).createTopics (toCreate );
76
+ }
77
+
60
78
@ Override
61
79
public void afterAll () {
62
80
if (TEST_STREAM != null ) {
0 commit comments