Skip to content

Commit b8a287e

Browse files
committed
[#9673] Add integration tests
1 parent 6d97609 commit b8a287e

18 files changed

+873
-1
lines changed

plugins-it/kafka-it/kafka-3-it/pom.xml

+4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@
3535
<groupId>org.apache.kafka</groupId>
3636
<artifactId>kafka-clients</artifactId>
3737
</dependency>
38+
<dependency>
39+
<groupId>org.apache.kafka</groupId>
40+
<artifactId>kafka-streams</artifactId>
41+
</dependency>
3842
<dependency>
3943
<groupId>commons-io</groupId>
4044
<artifactId>commons-io</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.navercorp.pinpoint.plugin.kafka;
2+
3+
import com.navercorp.pinpoint.test.plugin.shared.SharedTestBeforeAllResult;
4+
import org.apache.logging.log4j.LogManager;
5+
import org.apache.logging.log4j.Logger;
6+
7+
import java.util.Properties;
8+
9+
public abstract class KafkaStreamsIT {
10+
11+
protected static final Logger logger = LogManager.getLogger(KafkaStreamsIT.class);
12+
13+
static String brokerUrl;
14+
static int PORT;
15+
16+
17+
@SharedTestBeforeAllResult
18+
public static void setBeforeAllResult(Properties beforeAllResult) {
19+
logger.info("Properties=" + beforeAllResult);
20+
PORT = Integer.parseInt(beforeAllResult.getProperty("PORT"));
21+
brokerUrl = "localhost:" + PORT;
22+
}
23+
24+
public static int getPort() {
25+
return PORT;
26+
}
27+
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package com.navercorp.pinpoint.plugin.kafka;
2+
3+
import com.navercorp.pinpoint.bootstrap.plugin.test.*;
4+
import org.apache.kafka.clients.producer.Callback;
5+
import org.apache.kafka.clients.producer.KafkaProducer;
6+
import org.apache.kafka.clients.producer.ProducerRecord;
7+
import org.apache.kafka.common.TopicPartition;
8+
import org.apache.kafka.streams.processor.internals.StreamTask;
9+
10+
import java.lang.reflect.Method;
11+
12+
import static com.navercorp.pinpoint.bootstrap.plugin.test.Expectations.annotation;
13+
import static test.pinpoint.plugin.kafka.KafkaITConstants.*;
14+
15+
public class KafkaStreamsITBase {
16+
17+
public static void verifyProducerSend(String brokerUrl, int messageCount) throws NoSuchMethodException {
18+
19+
int consumerInvocationCount = messageCount * 3 + 2;
20+
PluginTestVerifier verifier = PluginTestVerifierHolder.getInstance();
21+
verifier.awaitTraceCount(messageCount + consumerInvocationCount, 100, MAX_TRACE_WAIT_TIME);
22+
verifier.printCache();
23+
24+
Method sendMethod = KafkaProducer.class.getDeclaredMethod("send", ProducerRecord.class, Callback.class);
25+
ExpectedTrace.Builder eventBuilder = ExpectedTrace.createEventBuilder(KAFKA_CLIENT_SERVICE_TYPE);
26+
eventBuilder.setMethod(sendMethod);
27+
eventBuilder.setEndPoint(brokerUrl);
28+
eventBuilder.setDestinationId(brokerUrl);
29+
eventBuilder.setAnnotations(annotation("kafka.topic", OUTPUT_TOPIC));
30+
ExpectedTrace producerSendExpected = eventBuilder.build();
31+
32+
for (int i = 0; i < messageCount; i++) {
33+
verifier.verifyDiscreteTrace(producerSendExpected);
34+
}
35+
}
36+
37+
public static void verifyMultiConsumerEntryPoint(String brokerUrl) throws NoSuchMethodException {
38+
39+
PluginTestVerifier verifier = PluginTestVerifierHolder.getInstance();
40+
verifier.awaitTraceCount(3, 100, MAX_TRACE_WAIT_TIME);
41+
42+
String expectedRpc = "kafka-streams://topic=" + INPUT_TOPIC + "?batch=1";
43+
verifyConsumerEntryPoint(verifier, brokerUrl, INPUT_TOPIC, expectedRpc, annotation("kafka.topic", INPUT_TOPIC), annotation("kafka.batch", 1)
44+
);
45+
}
46+
47+
private static void verifyConsumerEntryPoint(PluginTestVerifier verifier, String brokerUrl, String topic, String expectedRpc, ExpectedAnnotation... expectedAnnotations)
48+
throws NoSuchMethodException {
49+
50+
Method sendMethod = KafkaProducer.class.getDeclaredMethod("send", ProducerRecord.class, Callback.class);
51+
ExpectedTrace.Builder eventBuilder = ExpectedTrace.createEventBuilder(KAFKA_CLIENT_SERVICE_TYPE);
52+
eventBuilder.setMethod(sendMethod);
53+
eventBuilder.setEndPoint(brokerUrl);
54+
eventBuilder.setDestinationId(brokerUrl);
55+
eventBuilder.setAnnotations(annotation("kafka.topic", topic));
56+
ExpectedTrace producerSendExpected = eventBuilder.build();
57+
58+
ExpectedTrace.Builder rootBuilder = ExpectedTrace.createRootBuilder(KAFKA_STREAMS_SERVICE_TYPE);
59+
rootBuilder.setMethodSignature("Kafka Streams Invocation");
60+
rootBuilder.setRpc(expectedRpc);
61+
rootBuilder.setRemoteAddr(brokerUrl);
62+
rootBuilder.setAnnotations(expectedAnnotations);
63+
ExpectedTrace consumerEntryPointInvocationExpected = rootBuilder.build();
64+
65+
Method consumeRecordsMethod = StreamTask.class.getDeclaredMethod("addRecords", TopicPartition.class, Iterable.class);
66+
ExpectedTrace messageArrivedExpected = Expectations.event(KAFKA_STREAMS_SERVICE_TYPE, consumeRecordsMethod);
67+
68+
verifier.printCache();
69+
verifier.verifyDiscreteTrace(producerSendExpected, consumerEntryPointInvocationExpected, messageArrivedExpected);
70+
}
71+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2023 NAVER Corp.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.navercorp.pinpoint.plugin.kafka;
18+
19+
import com.navercorp.pinpoint.pluginit.utils.AgentPath;
20+
import com.navercorp.pinpoint.pluginit.utils.TestcontainersOption;
21+
import com.navercorp.pinpoint.test.plugin.*;
22+
import com.navercorp.pinpoint.test.plugin.shared.SharedTestLifeCycleClass;
23+
import org.junit.Test;
24+
import org.junit.runner.RunWith;
25+
import test.pinpoint.plugin.kafka.KafkaStreamsUnitServer;
26+
import test.pinpoint.plugin.kafka.TestProducer;
27+
28+
import java.util.Random;
29+
30+
import static test.pinpoint.plugin.kafka.KafkaITConstants.TRACE_TYPE_MULTI_RECORDS;
31+
import static test.pinpoint.plugin.kafka.KafkaITConstants.TRACE_TYPE_RECORD;
32+
33+
@RunWith(PinpointPluginTestSuite.class)
34+
@PinpointAgent(AgentPath.PATH)
35+
@PinpointConfig("pinpoint-kafka-client.config")
36+
@ImportPlugin({"com.navercorp.pinpoint:pinpoint-kafka-plugin"})
37+
@Dependency({
38+
"org.apache.kafka:kafka_2.12:[2.5.0]", "log4j:log4j:[1.2.17]", "commons-io:commons-io:[2.5.0]",
39+
"org.apache.kafka:kafka-clients:[2.5.0]", "org.apache.kafka:kafka-streams:[2.5.0,2.5.max]",
40+
TestcontainersOption.TEST_CONTAINER, TestcontainersOption.KAFKA
41+
})
42+
@JvmVersion(8)
43+
@SharedTestLifeCycleClass(KafkaStreamsUnitServer.class)
44+
public class KafkaStreams_2_5_x_IT extends KafkaStreamsIT {
45+
@Test
46+
public void streamsProducerSendTest() throws NoSuchMethodException {
47+
int messageCount = new Random().nextInt(5) + 1;
48+
final TestProducer producer = new TestProducer();
49+
50+
producer.sendMessageForStream(brokerUrl, messageCount, TRACE_TYPE_RECORD);
51+
KafkaStreamsITBase.verifyProducerSend(brokerUrl, messageCount);
52+
}
53+
54+
@Test
55+
public void streamsConsumeTest() throws NoSuchMethodException {
56+
final TestProducer producer = new TestProducer();
57+
producer.sendMessageForStream(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
58+
KafkaStreamsITBase.verifyMultiConsumerEntryPoint(brokerUrl);
59+
}
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2023 NAVER Corp.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.navercorp.pinpoint.plugin.kafka;
18+
19+
import com.navercorp.pinpoint.pluginit.utils.AgentPath;
20+
import com.navercorp.pinpoint.pluginit.utils.TestcontainersOption;
21+
import com.navercorp.pinpoint.test.plugin.*;
22+
import com.navercorp.pinpoint.test.plugin.shared.SharedTestLifeCycleClass;
23+
import org.junit.Test;
24+
import org.junit.runner.RunWith;
25+
import test.pinpoint.plugin.kafka.KafkaStreamsUnitServer;
26+
import test.pinpoint.plugin.kafka.TestProducer;
27+
28+
import java.util.Random;
29+
30+
import static test.pinpoint.plugin.kafka.KafkaITConstants.TRACE_TYPE_MULTI_RECORDS;
31+
import static test.pinpoint.plugin.kafka.KafkaITConstants.TRACE_TYPE_RECORD;
32+
33+
@RunWith(PinpointPluginTestSuite.class)
34+
@PinpointAgent(AgentPath.PATH)
35+
@PinpointConfig("pinpoint-kafka-client.config")
36+
@ImportPlugin({"com.navercorp.pinpoint:pinpoint-kafka-plugin"})
37+
@Dependency({
38+
"org.apache.kafka:kafka_2.12:[2.6.0]", "log4j:log4j:[1.2.17]", "commons-io:commons-io:[2.5.0]",
39+
"org.apache.kafka:kafka-clients:[2.6.2]", "org.apache.kafka:kafka-streams:[2.6.2,2.6.max]",
40+
TestcontainersOption.TEST_CONTAINER, TestcontainersOption.KAFKA
41+
})
42+
@JvmVersion(8)
43+
@SharedTestLifeCycleClass(KafkaStreamsUnitServer.class)
44+
public class KafkaStreams_2_6_2_x_IT extends KafkaStreamsIT {
45+
@Test
46+
public void streamsProducerSendTest() throws NoSuchMethodException {
47+
int messageCount = new Random().nextInt(5) + 1;
48+
final TestProducer producer = new TestProducer();
49+
50+
producer.sendMessageForStream(brokerUrl, messageCount, TRACE_TYPE_RECORD);
51+
KafkaStreamsITBase.verifyProducerSend(brokerUrl, messageCount);
52+
}
53+
54+
@Test
55+
public void streamsConsumeTest() throws NoSuchMethodException {
56+
final TestProducer producer = new TestProducer();
57+
producer.sendMessageForStream(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
58+
KafkaStreamsITBase.verifyMultiConsumerEntryPoint(brokerUrl);
59+
}
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2023 NAVER Corp.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.navercorp.pinpoint.plugin.kafka;
18+
19+
import com.navercorp.pinpoint.pluginit.utils.AgentPath;
20+
import com.navercorp.pinpoint.pluginit.utils.TestcontainersOption;
21+
import com.navercorp.pinpoint.test.plugin.*;
22+
import com.navercorp.pinpoint.test.plugin.shared.SharedTestLifeCycleClass;
23+
import org.junit.Test;
24+
import org.junit.runner.RunWith;
25+
import test.pinpoint.plugin.kafka.KafkaStreamsUnitServer;
26+
import test.pinpoint.plugin.kafka.TestProducer;
27+
28+
import java.util.Random;
29+
30+
import static test.pinpoint.plugin.kafka.KafkaITConstants.TRACE_TYPE_MULTI_RECORDS;
31+
import static test.pinpoint.plugin.kafka.KafkaITConstants.TRACE_TYPE_RECORD;
32+
33+
@RunWith(PinpointPluginTestSuite.class)
34+
@PinpointAgent(AgentPath.PATH)
35+
@PinpointConfig("pinpoint-kafka-client.config")
36+
@ImportPlugin({"com.navercorp.pinpoint:pinpoint-kafka-plugin"})
37+
@Dependency({
38+
"org.apache.kafka:kafka_2.12:[2.6.0]", "log4j:log4j:[1.2.17]", "commons-io:commons-io:[2.5.0]",
39+
"org.apache.kafka:kafka-clients:[2.6.0]", "org.apache.kafka:kafka-streams:[2.6.0,2.6.1]",
40+
TestcontainersOption.TEST_CONTAINER, TestcontainersOption.KAFKA
41+
})
42+
@JvmVersion(8)
43+
@SharedTestLifeCycleClass(KafkaStreamsUnitServer.class)
44+
public class KafkaStreams_2_6_x_IT extends KafkaStreamsIT {
45+
@Test
46+
public void streamsProducerSendTest() throws NoSuchMethodException {
47+
int messageCount = new Random().nextInt(5) + 1;
48+
final TestProducer producer = new TestProducer();
49+
50+
producer.sendMessageForStream(brokerUrl, messageCount, TRACE_TYPE_RECORD);
51+
KafkaStreamsITBase.verifyProducerSend(brokerUrl, messageCount);
52+
}
53+
54+
@Test
55+
public void streamsConsumeTest() throws NoSuchMethodException {
56+
final TestProducer producer = new TestProducer();
57+
producer.sendMessageForStream(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
58+
KafkaStreamsITBase.verifyMultiConsumerEntryPoint(brokerUrl);
59+
}
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2023 NAVER Corp.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.navercorp.pinpoint.plugin.kafka;
18+
19+
import com.navercorp.pinpoint.pluginit.utils.AgentPath;
20+
import com.navercorp.pinpoint.pluginit.utils.TestcontainersOption;
21+
import com.navercorp.pinpoint.test.plugin.*;
22+
import com.navercorp.pinpoint.test.plugin.shared.SharedTestLifeCycleClass;
23+
import org.junit.Test;
24+
import org.junit.runner.RunWith;
25+
import test.pinpoint.plugin.kafka.KafkaStreamsUnitServer;
26+
import test.pinpoint.plugin.kafka.TestProducer;
27+
28+
import java.util.Random;
29+
30+
import static test.pinpoint.plugin.kafka.KafkaITConstants.TRACE_TYPE_MULTI_RECORDS;
31+
import static test.pinpoint.plugin.kafka.KafkaITConstants.TRACE_TYPE_RECORD;
32+
33+
@RunWith(PinpointPluginTestSuite.class)
34+
@PinpointAgent(AgentPath.PATH)
35+
@PinpointConfig("pinpoint-kafka-client.config")
36+
@ImportPlugin({"com.navercorp.pinpoint:pinpoint-kafka-plugin"})
37+
@Dependency({
38+
"org.apache.kafka:kafka_2.12:[2.7.2]", "log4j:log4j:[1.2.17]", "commons-io:commons-io:[2.5.0]",
39+
"org.apache.kafka:kafka-clients:[2.7.2]", "org.apache.kafka:kafka-streams:[2.7.2]",
40+
TestcontainersOption.TEST_CONTAINER, TestcontainersOption.KAFKA
41+
})
42+
@JvmVersion(8)
43+
@SharedTestLifeCycleClass(KafkaStreamsUnitServer.class)
44+
public class KafkaStreams_2_7_x_IT extends KafkaStreamsIT {
45+
@Test
46+
public void streamsProducerSendTest() throws NoSuchMethodException {
47+
int messageCount = new Random().nextInt(5) + 1;
48+
final TestProducer producer = new TestProducer();
49+
50+
producer.sendMessageForStream(brokerUrl, messageCount, TRACE_TYPE_RECORD);
51+
KafkaStreamsITBase.verifyProducerSend(brokerUrl, messageCount);
52+
}
53+
54+
@Test
55+
public void streamsConsumeTest() throws NoSuchMethodException {
56+
final TestProducer producer = new TestProducer();
57+
producer.sendMessageForStream(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
58+
KafkaStreamsITBase.verifyMultiConsumerEntryPoint(brokerUrl);
59+
}
60+
}

0 commit comments

Comments
 (0)