|
| 1 | +package com.navercorp.pinpoint.plugin.kafka; |
| 2 | + |
| 3 | +import com.navercorp.pinpoint.bootstrap.plugin.test.*; |
| 4 | +import org.apache.kafka.clients.producer.Callback; |
| 5 | +import org.apache.kafka.clients.producer.KafkaProducer; |
| 6 | +import org.apache.kafka.clients.producer.ProducerRecord; |
| 7 | +import org.apache.kafka.common.TopicPartition; |
| 8 | +import org.apache.kafka.streams.processor.internals.StreamTask; |
| 9 | + |
| 10 | +import java.lang.reflect.Method; |
| 11 | + |
| 12 | +import static com.navercorp.pinpoint.bootstrap.plugin.test.Expectations.annotation; |
| 13 | +import static test.pinpoint.plugin.kafka.KafkaITConstants.*; |
| 14 | + |
| 15 | +public class KafkaStreamsITBase { |
| 16 | + |
| 17 | + public static void verifyProducerSend(String brokerUrl, int messageCount) throws NoSuchMethodException { |
| 18 | + |
| 19 | + int consumerInvocationCount = messageCount * 3 + 2; |
| 20 | + PluginTestVerifier verifier = PluginTestVerifierHolder.getInstance(); |
| 21 | + verifier.awaitTraceCount(messageCount + consumerInvocationCount, 100, MAX_TRACE_WAIT_TIME); |
| 22 | + verifier.printCache(); |
| 23 | + |
| 24 | + Method sendMethod = KafkaProducer.class.getDeclaredMethod("send", ProducerRecord.class, Callback.class); |
| 25 | + ExpectedTrace.Builder eventBuilder = ExpectedTrace.createEventBuilder(KAFKA_CLIENT_SERVICE_TYPE); |
| 26 | + eventBuilder.setMethod(sendMethod); |
| 27 | + eventBuilder.setEndPoint(brokerUrl); |
| 28 | + eventBuilder.setDestinationId(brokerUrl); |
| 29 | + eventBuilder.setAnnotations(annotation("kafka.topic", OUTPUT_TOPIC)); |
| 30 | + ExpectedTrace producerSendExpected = eventBuilder.build(); |
| 31 | + |
| 32 | + for (int i = 0; i < messageCount; i++) { |
| 33 | + verifier.verifyDiscreteTrace(producerSendExpected); |
| 34 | + } |
| 35 | + } |
| 36 | + |
| 37 | + public static void verifyMultiConsumerEntryPoint(String brokerUrl) throws NoSuchMethodException { |
| 38 | + |
| 39 | + PluginTestVerifier verifier = PluginTestVerifierHolder.getInstance(); |
| 40 | + verifier.awaitTraceCount(3, 100, MAX_TRACE_WAIT_TIME); |
| 41 | + |
| 42 | + String expectedRpc = "kafka-streams://topic=" + INPUT_TOPIC + "?batch=1"; |
| 43 | + verifyConsumerEntryPoint(verifier, brokerUrl, INPUT_TOPIC, expectedRpc, annotation("kafka.topic", INPUT_TOPIC), annotation("kafka.batch", 1) |
| 44 | + ); |
| 45 | + } |
| 46 | + |
| 47 | + private static void verifyConsumerEntryPoint(PluginTestVerifier verifier, String brokerUrl, String topic, String expectedRpc, ExpectedAnnotation... expectedAnnotations) |
| 48 | + throws NoSuchMethodException { |
| 49 | + |
| 50 | + Method sendMethod = KafkaProducer.class.getDeclaredMethod("send", ProducerRecord.class, Callback.class); |
| 51 | + ExpectedTrace.Builder eventBuilder = ExpectedTrace.createEventBuilder(KAFKA_CLIENT_SERVICE_TYPE); |
| 52 | + eventBuilder.setMethod(sendMethod); |
| 53 | + eventBuilder.setEndPoint(brokerUrl); |
| 54 | + eventBuilder.setDestinationId(brokerUrl); |
| 55 | + eventBuilder.setAnnotations(annotation("kafka.topic", topic)); |
| 56 | + ExpectedTrace producerSendExpected = eventBuilder.build(); |
| 57 | + |
| 58 | + ExpectedTrace.Builder rootBuilder = ExpectedTrace.createRootBuilder(KAFKA_STREAMS_SERVICE_TYPE); |
| 59 | + rootBuilder.setMethodSignature("Kafka Streams Invocation"); |
| 60 | + rootBuilder.setRpc(expectedRpc); |
| 61 | + rootBuilder.setRemoteAddr(brokerUrl); |
| 62 | + rootBuilder.setAnnotations(expectedAnnotations); |
| 63 | + ExpectedTrace consumerEntryPointInvocationExpected = rootBuilder.build(); |
| 64 | + |
| 65 | + Method consumeRecordsMethod = StreamTask.class.getDeclaredMethod("addRecords", TopicPartition.class, Iterable.class); |
| 66 | + ExpectedTrace messageArrivedExpected = Expectations.event(KAFKA_STREAMS_SERVICE_TYPE, consumeRecordsMethod); |
| 67 | + |
| 68 | + verifier.printCache(); |
| 69 | + verifier.verifyDiscreteTrace(producerSendExpected, consumerEntryPointInvocationExpected, messageArrivedExpected); |
| 70 | + } |
| 71 | +} |
0 commit comments