Skip to content

Commit

Permalink
[#9673] Add integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ga-ram committed Feb 8, 2023
1 parent c58835e commit c719bcc
Show file tree
Hide file tree
Showing 18 changed files with 873 additions and 1 deletion.
4 changes: 4 additions & 0 deletions plugins-it/kafka-it/kafka-3-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.navercorp.pinpoint.plugin.kafka;

import com.navercorp.pinpoint.test.plugin.shared.SharedTestBeforeAllResult;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Properties;

public abstract class KafkaStreamsIT {

protected static final Logger logger = LogManager.getLogger(KafkaStreamsIT.class);

static String brokerUrl;
static int PORT;


@SharedTestBeforeAllResult
public static void setBeforeAllResult(Properties beforeAllResult) {
logger.info("Properties=" + beforeAllResult);
PORT = Integer.parseInt(beforeAllResult.getProperty("PORT"));
brokerUrl = "localhost:" + PORT;
}

public static int getPort() {
return PORT;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.navercorp.pinpoint.plugin.kafka;

import com.navercorp.pinpoint.bootstrap.plugin.test.*;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.internals.StreamTask;

import java.lang.reflect.Method;

import static com.navercorp.pinpoint.bootstrap.plugin.test.Expectations.annotation;
import static test.pinpoint.plugin.kafka.KafkaITConstants.*;

public class KafkaStreamsITBase {

public static void verifyProducerSend(String brokerUrl, int messageCount) throws NoSuchMethodException {

int consumerInvocationCount = messageCount * 3 + 2;
PluginTestVerifier verifier = PluginTestVerifierHolder.getInstance();
verifier.awaitTraceCount(messageCount + consumerInvocationCount, 100, MAX_TRACE_WAIT_TIME);
verifier.printCache();

Method sendMethod = KafkaProducer.class.getDeclaredMethod("send", ProducerRecord.class, Callback.class);
ExpectedTrace.Builder eventBuilder = ExpectedTrace.createEventBuilder(KAFKA_CLIENT_SERVICE_TYPE);
eventBuilder.setMethod(sendMethod);
eventBuilder.setEndPoint(brokerUrl);
eventBuilder.setDestinationId(brokerUrl);
eventBuilder.setAnnotations(annotation("kafka.topic", OUTPUT_TOPIC));
ExpectedTrace producerSendExpected = eventBuilder.build();

for (int i = 0; i < messageCount; i++) {
verifier.verifyDiscreteTrace(producerSendExpected);
}
}

public static void verifyMultiConsumerEntryPoint(String brokerUrl) throws NoSuchMethodException {

PluginTestVerifier verifier = PluginTestVerifierHolder.getInstance();
verifier.awaitTraceCount(3, 100, MAX_TRACE_WAIT_TIME);

String expectedRpc = "kafka-streams://topic=" + INPUT_TOPIC + "?batch=1";
verifyConsumerEntryPoint(verifier, brokerUrl, INPUT_TOPIC, expectedRpc, annotation("kafka.topic", INPUT_TOPIC), annotation("kafka.batch", 1)
);
}

private static void verifyConsumerEntryPoint(PluginTestVerifier verifier, String brokerUrl, String topic, String expectedRpc, ExpectedAnnotation... expectedAnnotations)
throws NoSuchMethodException {

Method sendMethod = KafkaProducer.class.getDeclaredMethod("send", ProducerRecord.class, Callback.class);
ExpectedTrace.Builder eventBuilder = ExpectedTrace.createEventBuilder(KAFKA_CLIENT_SERVICE_TYPE);
eventBuilder.setMethod(sendMethod);
eventBuilder.setEndPoint(brokerUrl);
eventBuilder.setDestinationId(brokerUrl);
eventBuilder.setAnnotations(annotation("kafka.topic", topic));
ExpectedTrace producerSendExpected = eventBuilder.build();

ExpectedTrace.Builder rootBuilder = ExpectedTrace.createRootBuilder(KAFKA_STREAMS_SERVICE_TYPE);
rootBuilder.setMethodSignature("Kafka Streams Invocation");
rootBuilder.setRpc(expectedRpc);
rootBuilder.setRemoteAddr(brokerUrl);
rootBuilder.setAnnotations(expectedAnnotations);
ExpectedTrace consumerEntryPointInvocationExpected = rootBuilder.build();

Method consumeRecordsMethod = StreamTask.class.getDeclaredMethod("addRecords", TopicPartition.class, Iterable.class);
ExpectedTrace messageArrivedExpected = Expectations.event(KAFKA_STREAMS_SERVICE_TYPE, consumeRecordsMethod);

verifier.printCache();
verifier.verifyDiscreteTrace(producerSendExpected, consumerEntryPointInvocationExpected, messageArrivedExpected);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2023 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.plugin.kafka;

import com.navercorp.pinpoint.pluginit.utils.AgentPath;
import com.navercorp.pinpoint.pluginit.utils.TestcontainersOption;
import com.navercorp.pinpoint.test.plugin.*;
import com.navercorp.pinpoint.test.plugin.shared.SharedTestLifeCycleClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import test.pinpoint.plugin.kafka.KafkaStreamsUnitServer;
import test.pinpoint.plugin.kafka.TestProducer;

import java.util.Random;

import static test.pinpoint.plugin.kafka.KafkaITConstants.TRACE_TYPE_MULTI_RECORDS;
import static test.pinpoint.plugin.kafka.KafkaITConstants.TRACE_TYPE_RECORD;

@RunWith(PinpointPluginTestSuite.class)
@PinpointAgent(AgentPath.PATH)
@PinpointConfig("pinpoint-kafka-client.config")
@ImportPlugin({"com.navercorp.pinpoint:pinpoint-kafka-plugin"})
@Dependency({
"org.apache.kafka:kafka_2.12:[2.5.0]", "log4j:log4j:[1.2.17]", "commons-io:commons-io:[2.5.0]",
"org.apache.kafka:kafka-clients:[2.5.0]", "org.apache.kafka:kafka-streams:[2.5.0,2.5.max]",
TestcontainersOption.TEST_CONTAINER, TestcontainersOption.KAFKA
})
@JvmVersion(8)
@SharedTestLifeCycleClass(KafkaStreamsUnitServer.class)
public class KafkaStreams_2_5_x_IT extends KafkaStreamsIT {
@Test
public void streamsProducerSendTest() throws NoSuchMethodException {
int messageCount = new Random().nextInt(5) + 1;
final TestProducer producer = new TestProducer();

producer.sendMessageForStream(brokerUrl, messageCount, TRACE_TYPE_RECORD);
KafkaStreamsITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Test
public void streamsConsumeTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessageForStream(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
KafkaStreamsITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2023 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.plugin.kafka;

import com.navercorp.pinpoint.pluginit.utils.AgentPath;
import com.navercorp.pinpoint.pluginit.utils.TestcontainersOption;
import com.navercorp.pinpoint.test.plugin.*;
import com.navercorp.pinpoint.test.plugin.shared.SharedTestLifeCycleClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import test.pinpoint.plugin.kafka.KafkaStreamsUnitServer;
import test.pinpoint.plugin.kafka.TestProducer;

import java.util.Random;

import static test.pinpoint.plugin.kafka.KafkaITConstants.TRACE_TYPE_MULTI_RECORDS;
import static test.pinpoint.plugin.kafka.KafkaITConstants.TRACE_TYPE_RECORD;

@RunWith(PinpointPluginTestSuite.class)
@PinpointAgent(AgentPath.PATH)
@PinpointConfig("pinpoint-kafka-client.config")
@ImportPlugin({"com.navercorp.pinpoint:pinpoint-kafka-plugin"})
@Dependency({
"org.apache.kafka:kafka_2.12:[2.6.0]", "log4j:log4j:[1.2.17]", "commons-io:commons-io:[2.5.0]",
"org.apache.kafka:kafka-clients:[2.6.2]", "org.apache.kafka:kafka-streams:[2.6.2,2.6.max]",
TestcontainersOption.TEST_CONTAINER, TestcontainersOption.KAFKA
})
@JvmVersion(8)
@SharedTestLifeCycleClass(KafkaStreamsUnitServer.class)
public class KafkaStreams_2_6_2_x_IT extends KafkaStreamsIT {
@Test
public void streamsProducerSendTest() throws NoSuchMethodException {
int messageCount = new Random().nextInt(5) + 1;
final TestProducer producer = new TestProducer();

producer.sendMessageForStream(brokerUrl, messageCount, TRACE_TYPE_RECORD);
KafkaStreamsITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Test
public void streamsConsumeTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessageForStream(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
KafkaStreamsITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2023 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.plugin.kafka;

import com.navercorp.pinpoint.pluginit.utils.AgentPath;
import com.navercorp.pinpoint.pluginit.utils.TestcontainersOption;
import com.navercorp.pinpoint.test.plugin.*;
import com.navercorp.pinpoint.test.plugin.shared.SharedTestLifeCycleClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import test.pinpoint.plugin.kafka.KafkaStreamsUnitServer;
import test.pinpoint.plugin.kafka.TestProducer;

import java.util.Random;

import static test.pinpoint.plugin.kafka.KafkaITConstants.TRACE_TYPE_MULTI_RECORDS;
import static test.pinpoint.plugin.kafka.KafkaITConstants.TRACE_TYPE_RECORD;

@RunWith(PinpointPluginTestSuite.class)
@PinpointAgent(AgentPath.PATH)
@PinpointConfig("pinpoint-kafka-client.config")
@ImportPlugin({"com.navercorp.pinpoint:pinpoint-kafka-plugin"})
@Dependency({
"org.apache.kafka:kafka_2.12:[2.6.0]", "log4j:log4j:[1.2.17]", "commons-io:commons-io:[2.5.0]",
"org.apache.kafka:kafka-clients:[2.6.0]", "org.apache.kafka:kafka-streams:[2.6.0,2.6.1]",
TestcontainersOption.TEST_CONTAINER, TestcontainersOption.KAFKA
})
@JvmVersion(8)
@SharedTestLifeCycleClass(KafkaStreamsUnitServer.class)
public class KafkaStreams_2_6_x_IT extends KafkaStreamsIT {
@Test
public void streamsProducerSendTest() throws NoSuchMethodException {
int messageCount = new Random().nextInt(5) + 1;
final TestProducer producer = new TestProducer();

producer.sendMessageForStream(brokerUrl, messageCount, TRACE_TYPE_RECORD);
KafkaStreamsITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Test
public void streamsConsumeTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessageForStream(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
KafkaStreamsITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2023 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.plugin.kafka;

import com.navercorp.pinpoint.pluginit.utils.AgentPath;
import com.navercorp.pinpoint.pluginit.utils.TestcontainersOption;
import com.navercorp.pinpoint.test.plugin.*;
import com.navercorp.pinpoint.test.plugin.shared.SharedTestLifeCycleClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import test.pinpoint.plugin.kafka.KafkaStreamsUnitServer;
import test.pinpoint.plugin.kafka.TestProducer;

import java.util.Random;

import static test.pinpoint.plugin.kafka.KafkaITConstants.TRACE_TYPE_MULTI_RECORDS;
import static test.pinpoint.plugin.kafka.KafkaITConstants.TRACE_TYPE_RECORD;

@RunWith(PinpointPluginTestSuite.class)
@PinpointAgent(AgentPath.PATH)
@PinpointConfig("pinpoint-kafka-client.config")
@ImportPlugin({"com.navercorp.pinpoint:pinpoint-kafka-plugin"})
@Dependency({
"org.apache.kafka:kafka_2.12:[2.7.2]", "log4j:log4j:[1.2.17]", "commons-io:commons-io:[2.5.0]",
"org.apache.kafka:kafka-clients:[2.7.2]", "org.apache.kafka:kafka-streams:[2.7.2]",
TestcontainersOption.TEST_CONTAINER, TestcontainersOption.KAFKA
})
@JvmVersion(8)
@SharedTestLifeCycleClass(KafkaStreamsUnitServer.class)
public class KafkaStreams_2_7_x_IT extends KafkaStreamsIT {
@Test
public void streamsProducerSendTest() throws NoSuchMethodException {
int messageCount = new Random().nextInt(5) + 1;
final TestProducer producer = new TestProducer();

producer.sendMessageForStream(brokerUrl, messageCount, TRACE_TYPE_RECORD);
KafkaStreamsITBase.verifyProducerSend(brokerUrl, messageCount);
}

@Test
public void streamsConsumeTest() throws NoSuchMethodException {
final TestProducer producer = new TestProducer();
producer.sendMessageForStream(brokerUrl, 1, TRACE_TYPE_MULTI_RECORDS);
KafkaStreamsITBase.verifyMultiConsumerEntryPoint(brokerUrl);
}
}
Loading

0 comments on commit c719bcc

Please sign in to comment.