diff --git a/wavefront-spring-boot-parent/pom.xml b/wavefront-spring-boot-parent/pom.xml index d9143d2..d535387 100644 --- a/wavefront-spring-boot-parent/pom.xml +++ b/wavefront-spring-boot-parent/pom.xml @@ -20,7 +20,7 @@ 1.8 2.4.2 - 2020.0.0 + 2020.0.2-SNAPSHOT diff --git a/wavefront-spring-boot/src/main/java/com/wavefront/spring/autoconfigure/WavefrontSleuthBraveSpanHandler.java b/wavefront-spring-boot/src/main/java/com/wavefront/spring/autoconfigure/WavefrontSleuthBraveSpanHandler.java new file mode 100644 index 0000000..a843bca --- /dev/null +++ b/wavefront-spring-boot/src/main/java/com/wavefront/spring/autoconfigure/WavefrontSleuthBraveSpanHandler.java @@ -0,0 +1,35 @@ +package com.wavefront.spring.autoconfigure; + +import java.io.Closeable; +import java.io.IOException; + +import brave.handler.MutableSpan; +import brave.handler.SpanHandler; +import brave.propagation.TraceContext; + +import org.springframework.cloud.sleuth.brave.bridge.BraveFinishedSpan; +import org.springframework.cloud.sleuth.brave.bridge.BraveTraceContext; + +class WavefrontSleuthBraveSpanHandler extends SpanHandler implements Runnable, Closeable { + + final WavefrontSleuthSpanHandler spanHandler; + + WavefrontSleuthBraveSpanHandler(WavefrontSleuthSpanHandler spanHandler) { + this.spanHandler = spanHandler; + } + + @Override + public boolean end(TraceContext context, MutableSpan span, Cause cause) { + return spanHandler.end(BraveTraceContext.fromBrave(context), BraveFinishedSpan.fromBrave(span)); + } + + @Override + public void close() throws IOException { + this.spanHandler.close(); + } + + @Override + public void run() { + this.spanHandler.run(); + } +} diff --git a/wavefront-spring-boot/src/main/java/com/wavefront/spring/autoconfigure/WavefrontSleuthSpanHandler.java b/wavefront-spring-boot/src/main/java/com/wavefront/spring/autoconfigure/WavefrontSleuthSpanHandler.java index 284e6a2..2fc9ed4 100644 --- a/wavefront-spring-boot/src/main/java/com/wavefront/spring/autoconfigure/WavefrontSleuthSpanHandler.java +++ b/wavefront-spring-boot/src/main/java/com/wavefront/spring/autoconfigure/WavefrontSleuthSpanHandler.java @@ -3,12 +3,12 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.Executors; @@ -17,10 +17,8 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import brave.handler.MutableSpan; -import brave.handler.SpanHandler; -import brave.propagation.TraceContext; import com.wavefront.internal.reporter.WavefrontInternalReporter; +import com.wavefront.java_sdk.com.google.common.collect.Iterators; import com.wavefront.java_sdk.com.google.common.collect.Sets; import com.wavefront.sdk.common.NamedThreadFactory; import com.wavefront.sdk.common.Pair; @@ -32,6 +30,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.cloud.sleuth.TraceContext; +import org.springframework.cloud.sleuth.exporter.FinishedSpan; +import org.springframework.util.StringUtils; + import static com.wavefront.internal.SpanDerivedMetricsUtils.TRACING_DERIVED_PREFIX; import static com.wavefront.internal.SpanDerivedMetricsUtils.reportHeartbeats; import static com.wavefront.internal.SpanDerivedMetricsUtils.reportWavefrontGeneratedData; @@ -66,7 +68,7 @@ * {@link UUID#timestamp()} on UUIDs converted here, or in other Wavefront code, as it might * throw. */ -final class WavefrontSleuthSpanHandler extends SpanHandler implements Runnable, Closeable { +public final class WavefrontSleuthSpanHandler implements Runnable, Closeable { private static final Log LOG = LogFactory.getLog(WavefrontSleuthSpanHandler.class); // https://github.com/wavefrontHQ/wavefront-proxy/blob/3dd1fa11711a04de2d9d418e2269f0f9fb464f36/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java#L114-L114 @@ -75,7 +77,21 @@ final class WavefrontSleuthSpanHandler extends SpanHandler implements Runnable, private final static String DEFAULT_SOURCE = "wavefront-spring-boot"; private final static String WAVEFRONT_GENERATED_COMPONENT = "wavefront-generated"; - final LinkedBlockingQueue> spanBuffer; + private static final int LONG_BYTES = Long.SIZE / Byte.SIZE; + + private static final int BYTE_BASE16 = 2; + + private static final int LONG_BASE16 = BYTE_BASE16 * LONG_BYTES; + + private static final int TRACE_ID_HEX_SIZE = 2 * LONG_BASE16; + + private static final String ALPHABET = "0123456789abcdef"; + + private static final int ASCII_CHARACTERS = 128; + + private static final byte[] DECODING = buildDecodingArray(); + + final LinkedBlockingQueue> spanBuffer; final WavefrontSender wavefrontSender; final WavefrontInternalReporter wfInternalReporter; final Set traceDerivedCustomTagKeys; @@ -144,7 +160,7 @@ final class WavefrontSleuthSpanHandler extends SpanHandler implements Runnable, // Exact same behavior as WavefrontSpanReporter // https://github.com/wavefrontHQ/wavefront-opentracing-sdk-java/blob/f1f08d8daf7b692b9b61dcd5bc24ca6befa8e710/src/main/java/com/wavefront/opentracing/reporting/WavefrontSpanReporter.java#L163-L179 - @Override public boolean end(TraceContext context, MutableSpan span, Cause cause) { + public boolean end(TraceContext context, FinishedSpan span) { spansReceived.increment(); if (!spanBuffer.offer(Pair.of(context, span))) { spansDropped.increment(); @@ -160,30 +176,48 @@ List> getDefaultTags() { return Collections.unmodifiableList(this.defaultTags); } - private void send(TraceContext context, MutableSpan span) { - UUID traceId = new UUID(context.traceIdHigh(), context.traceId()); - UUID spanId = new UUID(0L, context.spanId()); + private String padLeftWithZeros(String string, int length) { + if (string.length() >= length) { + return string; + } + else { + StringBuilder sb = new StringBuilder(length); + for (int i = string.length(); i < length; i++) { + sb.append('0'); + } + + return sb.append(string).toString(); + } + } + + private void send(TraceContext context, FinishedSpan span) { + String traceIdString = padLeftWithZeros(context.traceId(), TRACE_ID_HEX_SIZE); + String traceIdHigh = traceIdString.substring(0, traceIdString.length() / 2); + String traceIdLow = traceIdString.substring(traceIdString.length() / 2); + UUID traceId = new UUID(longFromBase16String(traceIdHigh), longFromBase16String(traceIdLow)); + UUID spanId = new UUID(0L, longFromBase16String(context.spanId())); // NOTE: wavefront-opentracing-sdk-java and wavefront-proxy differ, but we prefer the former. // https://github.com/wavefrontHQ/wavefront-opentracing-sdk-java/blob/f1f08d8daf7b692b9b61dcd5bc24ca6befa8e710/src/main/java/com/wavefront/opentracing/reporting/WavefrontSpanReporter.java#L187-L190 // https://github.com/wavefrontHQ/wavefront-proxy/blob/3dd1fa11711a04de2d9d418e2269f0f9fb464f36/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java#L248-L252 List parents = null; - if (context.parentIdAsLong() != 0L) { - parents = Collections.singletonList(new UUID(0L, context.parentIdAsLong())); + String parentId = context.parentId(); + if (StringUtils.hasText(parentId) && longFromBase16String(parentId) != 0L) { + parents = Collections.singletonList(new UUID(0L, longFromBase16String(parentId))); } List followsFrom = null; // https://github.com/wavefrontHQ/wavefront-proxy/blob/3dd1fa11711a04de2d9d418e2269f0f9fb464f36/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java#L344-L345 - String name = span.name(); + String name = span.getName(); if (name == null) name = DEFAULT_SPAN_NAME; // Start and duration become 0L if unset. Any positive duration rounds up to 1 millis. - long startMillis = span.startTimestamp() / 1000L, finishMillis = span.finishTimestamp() / 1000L; - long durationMicros = span.finishTimestamp() - span.startTimestamp(); + long startMillis = span.getStartTimestamp() / 1000L, finishMillis = span.getEndTimestamp() / 1000L; + long durationMicros = span.getEndTimestamp() - span.getStartTimestamp(); long durationMillis = startMillis != 0 && finishMillis != 0L ? Math.max(finishMillis - startMillis, 1L) : 0L; List spanLogs = convertAnnotationsToSpanLogs(span); - TagList tags = new TagList(defaultTagKeys, defaultTags, context, span); + TagList tags = new TagList(defaultTagKeys, defaultTags, span); try { wavefrontSender.sendSpan(name, startMillis, durationMillis, source, traceId, spanId, @@ -212,6 +246,38 @@ private void send(TraceContext context, MutableSpan span) { } } + private static byte[] buildDecodingArray() { + byte[] decoding = new byte[ASCII_CHARACTERS]; + Arrays.fill(decoding, (byte) -1); + for (int i = 0; i < ALPHABET.length(); i++) { + char c = ALPHABET.charAt(i); + decoding[c] = (byte) i; + } + return decoding; + } + + /** + * Returns the {@code long} value whose base16 representation is stored in the first + * 16 chars of {@code chars} starting from the {@code offset}. + * @param chars the base16 representation of the {@code long}. + */ + private static long longFromBase16String(CharSequence chars) { + int offset = 0; + return (decodeByte(chars.charAt(offset), chars.charAt(offset + 1)) & 0xFFL) << 56 + | (decodeByte(chars.charAt(offset + 2), chars.charAt(offset + 3)) & 0xFFL) << 48 + | (decodeByte(chars.charAt(offset + 4), chars.charAt(offset + 5)) & 0xFFL) << 40 + | (decodeByte(chars.charAt(offset + 6), chars.charAt(offset + 7)) & 0xFFL) << 32 + | (decodeByte(chars.charAt(offset + 8), chars.charAt(offset + 9)) & 0xFFL) << 24 + | (decodeByte(chars.charAt(offset + 10), chars.charAt(offset + 11)) & 0xFFL) << 16 + | (decodeByte(chars.charAt(offset + 12), chars.charAt(offset + 13)) & 0xFFL) << 8 + | (decodeByte(chars.charAt(offset + 14), chars.charAt(offset + 15)) & 0xFFL); + } + + private static byte decodeByte(char hi, char lo) { + int decoded = DECODING[hi] << 4 | DECODING[lo]; + return (byte) decoded; + } + /** * Extracted for test isolation and as parsing otherwise implies multiple-returns or scanning * later. @@ -225,18 +291,20 @@ static final class TagList extends ArrayList> { TagList( Set defaultTagKeys, - List> defaultTags, - TraceContext context, - MutableSpan span + List> defaultTags,FinishedSpan span ){ - super(defaultTags.size() + span.tagCount()); - boolean debug = context.debug(), hasAnnotations = span.annotationCount() > 0; - isError = span.error() != null; + super(defaultTags.size() + span.getTags().size()); + // TODO: OTel doesn't have a notion of debug + boolean debug = false; + boolean hasAnnotations = span.getEvents().size() > 0; + isError = span.getError() != null; - int tagCount = span.tagCount(); + int tagCount = span.getTags().size(); addAll(defaultTags); for (int i = 0; i < tagCount; i++) { - String key = span.tagKeyAt(i), value = span.tagValueAt(i); + String tagKey = Iterators.get(span.getTags().keySet().iterator(), i); + String tagValue = Iterators.get(span.getTags().values().iterator(), i); + String key = tagKey, value = tagValue; String lcKey = key.toLowerCase(Locale.ROOT); if (lcKey.equals(ERROR_TAG_KEY)) { isError = true; @@ -261,8 +329,8 @@ static final class TagList extends ArrayList> { if (debug) add(Pair.of(DEBUG_TAG_KEY, "true")); // https://github.com/wavefrontHQ/wavefront-proxy/blob/3dd1fa11711a04de2d9d418e2269f0f9fb464f36/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java#L254-L266 - if (span.kind() != null) { - String kind = span.kind().toString().toLowerCase(); + if (span.getKind() != null) { + String kind = span.getKind().toString().toLowerCase(); add(Pair.of("span.kind", kind)); if (hasAnnotations) { add(Pair.of("_spanSecondaryId", kind)); @@ -273,20 +341,21 @@ static final class TagList extends ArrayList> { if (hasAnnotations) add(Pair.of(SPAN_LOG_KEY, "true")); // https://github.com/wavefrontHQ/wavefront-proxy/blob/3dd1fa11711a04de2d9d418e2269f0f9fb464f36/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java#L324-L327 - if (span.localIp() != null) { - add(Pair.of("ipv4", span.localIp())); // NOTE: this could be IPv6!! + if (span.getLocalIp() != null) { + add(Pair.of("ipv4", span.getLocalIp())); // NOTE: this could be IPv6!! } } } // https://github.com/wavefrontHQ/wavefront-proxy/blob/3dd1fa11711a04de2d9d418e2269f0f9fb464f36/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java#L397-L402 - static List convertAnnotationsToSpanLogs(MutableSpan span) { - int annotationCount = span.annotationCount(); + static List convertAnnotationsToSpanLogs(FinishedSpan span) { + int annotationCount = span.getEvents().size(); if (annotationCount == 0) return Collections.emptyList(); List spanLogs = new ArrayList<>(annotationCount); for (int i = 0; i < annotationCount; i++) { - long epochMicros = span.annotationTimestampAt(i); - String value = span.annotationValueAt(i); + Map.Entry entry = Iterators.get(span.getEvents().iterator(), i); + long epochMicros = entry.getKey(); + String value = entry.getValue(); spanLogs.add(new SpanLog(epochMicros, Collections.singletonMap("annotation", value))); } return spanLogs; @@ -295,7 +364,7 @@ static List convertAnnotationsToSpanLogs(MutableSpan span) { @Override public void run() { while (!stop) { try { - Pair contextAndSpan = spanBuffer.take(); + Pair contextAndSpan = spanBuffer.take(); send(contextAndSpan._1, contextAndSpan._2); } catch (InterruptedException ex) { if (LOG.isInfoEnabled()) { diff --git a/wavefront-spring-boot/src/main/java/com/wavefront/spring/autoconfigure/WavefrontTracingSleuthConfiguration.java b/wavefront-spring-boot/src/main/java/com/wavefront/spring/autoconfigure/WavefrontTracingSleuthConfiguration.java index 7e814e0..0232748 100644 --- a/wavefront-spring-boot/src/main/java/com/wavefront/spring/autoconfigure/WavefrontTracingSleuthConfiguration.java +++ b/wavefront-spring-boot/src/main/java/com/wavefront/spring/autoconfigure/WavefrontTracingSleuthConfiguration.java @@ -1,5 +1,6 @@ package com.wavefront.spring.autoconfigure; +import brave.Tracer; import brave.TracingCustomizer; import brave.handler.SpanHandler; import com.wavefront.sdk.common.WavefrontSender; @@ -23,30 +24,37 @@ * @author Stephane Nicoll */ @Configuration(proxyBeanMethods = false) -@ConditionalOnClass({ SpanNamer.class, TracingCustomizer.class, SpanHandler.class }) +@ConditionalOnClass({ SpanNamer.class, MeterRegistry.class, WavefrontConfig.class, WavefrontSender.class }) @AutoConfigureBefore(BraveAutoConfiguration.class) class WavefrontTracingSleuthConfiguration { static final String BEAN_NAME = "wavefrontTracingCustomizer"; - @Bean(BEAN_NAME) - @ConditionalOnMissingBean(name = BEAN_NAME) + @Bean @ConditionalOnBean({ MeterRegistry.class, WavefrontConfig.class, WavefrontSender.class }) - TracingCustomizer wavefrontTracingCustomizer(MeterRegistry meterRegistry, - WavefrontSender wavefrontSender, - ApplicationTags applicationTags, - WavefrontConfig wavefrontConfig, - WavefrontProperties wavefrontProperties) { - WavefrontSleuthSpanHandler spanHandler = new WavefrontSleuthSpanHandler( - // https://github.com/wavefrontHQ/wavefront-opentracing-sdk-java/blob/f1f08d8daf7b692b9b61dcd5bc24ca6befa8e710/src/main/java/com/wavefront/opentracing/reporting/WavefrontSpanReporter.java#L54 - 50000, // TODO: maxQueueSize should be a property, ya? - wavefrontSender, - meterRegistry, - wavefrontConfig.source(), - applicationTags, - wavefrontProperties); - - return t -> t.traceId128Bit(true).supportsJoin(false).addSpanHandler(spanHandler); + WavefrontSleuthSpanHandler wavefrontSleuthSpanHandler(MeterRegistry meterRegistry, + WavefrontSender wavefrontSender, + ApplicationTags applicationTags, + WavefrontConfig wavefrontConfig, + WavefrontProperties wavefrontProperties) { + return new WavefrontSleuthSpanHandler( + // https://github.com/wavefrontHQ/wavefront-opentracing-sdk-java/blob/f1f08d8daf7b692b9b61dcd5bc24ca6befa8e710/src/main/java/com/wavefront/opentracing/reporting/WavefrontSpanReporter.java#L54 + 50000, // TODO: maxQueueSize should be a property, ya? + wavefrontSender, + meterRegistry, + wavefrontConfig.source(), + applicationTags, + wavefrontProperties); } + @Configuration(proxyBeanMethods = false) + @ConditionalOnClass({Tracer.class, TracingCustomizer.class, SpanHandler.class }) + static class BraveCustomizerConfiguration { + @Bean(BEAN_NAME) + @ConditionalOnMissingBean(name = BEAN_NAME) + @ConditionalOnBean({ MeterRegistry.class, WavefrontConfig.class, WavefrontSender.class }) + TracingCustomizer wavefrontTracingCustomizer(WavefrontSleuthSpanHandler spanHandler) { + return t -> t.traceId128Bit(true).supportsJoin(false).addSpanHandler(new WavefrontSleuthBraveSpanHandler(spanHandler)); + } + } } diff --git a/wavefront-spring-boot/src/test/java/com/wavefront/spring/autoconfigure/WavefrontAutoConfigurationTests.java b/wavefront-spring-boot/src/test/java/com/wavefront/spring/autoconfigure/WavefrontAutoConfigurationTests.java index 42a17f5..05f7ef1 100644 --- a/wavefront-spring-boot/src/test/java/com/wavefront/spring/autoconfigure/WavefrontAutoConfigurationTests.java +++ b/wavefront-spring-boot/src/test/java/com/wavefront/spring/autoconfigure/WavefrontAutoConfigurationTests.java @@ -182,8 +182,8 @@ void tracingWithSleuthIsConfiguredWithWavefrontSender() { .with(sleuth()) .run((context) -> { assertThat(context).hasSingleBean(TracingCustomizer.class); - WavefrontSleuthSpanHandler spanHandler = extractSpanHandler(context.getBean(Tracer.class)); - assertThat(spanHandler).hasFieldOrPropertyWithValue("wavefrontSender", sender); + WavefrontSleuthBraveSpanHandler braveSpanHandler = extractSpanHandler(context.getBean(Tracer.class)); + assertThat(braveSpanHandler.spanHandler).hasFieldOrPropertyWithValue("wavefrontSender", sender); }); } @@ -247,8 +247,8 @@ private ContextConsumer assertSleuthSpanDefaultTag String serviceName, String cluster, String shard) { return (context) -> { assertThat(context).hasSingleBean(TracingCustomizer.class); - WavefrontSleuthSpanHandler spanHandler = extractSpanHandler(context.getBean(Tracer.class)); - assertThat(spanHandler.getDefaultTags()).contains( + WavefrontSleuthBraveSpanHandler braveSpanHandler = extractSpanHandler(context.getBean(Tracer.class)); + assertThat(braveSpanHandler.spanHandler.getDefaultTags()).contains( new Pair<>("application", applicationName), new Pair<>("service", serviceName), new Pair<>("cluster", cluster), @@ -266,7 +266,8 @@ void tracingWithSleuthCanBeConfigured() { .with(sleuth()) .run((context) -> { assertThat(context).hasSingleBean(TracingCustomizer.class); - WavefrontSleuthSpanHandler spanHandler = extractSpanHandler(context.getBean(Tracer.class)); + WavefrontSleuthBraveSpanHandler braveSpanHandler = extractSpanHandler(context.getBean(Tracer.class)); + WavefrontSleuthSpanHandler spanHandler = braveSpanHandler.spanHandler; Set traceDerivedCustomTagKeys = (Set) ReflectionTestUtils.getField( spanHandler, "traceDerivedCustomTagKeys"); assertThat(traceDerivedCustomTagKeys).containsExactlyInAnyOrder("region", "test"); @@ -392,12 +393,12 @@ void tracingIsNotConfiguredWithNonWavefrontRegistry() { } @SuppressWarnings("ConstantConditions") - private WavefrontSleuthSpanHandler extractSpanHandler(Tracer tracer) { + private WavefrontSleuthBraveSpanHandler extractSpanHandler(Tracer tracer) { SpanHandler[] handlers = (SpanHandler[]) ReflectionTestUtils.getField( ReflectionTestUtils.getField( ReflectionTestUtils.getField(tracer, "spanHandler"), "delegate"), "handlers"); - return (WavefrontSleuthSpanHandler) handlers[1]; + return (WavefrontSleuthBraveSpanHandler) handlers[1]; } @SuppressWarnings("unchecked")