Skip to content
This repository was archived by the owner on Aug 6, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion wavefront-spring-boot-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<properties>
<java.version>1.8</java.version>
<spring-boot.version>2.4.2</spring-boot.version>
<spring-cloud.version>2020.0.0</spring-cloud.version>
<spring-cloud.version>2020.0.2-SNAPSHOT</spring-cloud.version>
</properties>

<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.wavefront.spring.autoconfigure;

import java.io.Closeable;
import java.io.IOException;

import brave.handler.MutableSpan;
import brave.handler.SpanHandler;
import brave.propagation.TraceContext;

import org.springframework.cloud.sleuth.brave.bridge.BraveFinishedSpan;
import org.springframework.cloud.sleuth.brave.bridge.BraveTraceContext;

class WavefrontSleuthBraveSpanHandler extends SpanHandler implements Runnable, Closeable {

final WavefrontSleuthSpanHandler spanHandler;

WavefrontSleuthBraveSpanHandler(WavefrontSleuthSpanHandler spanHandler) {
this.spanHandler = spanHandler;
}

@Override
public boolean end(TraceContext context, MutableSpan span, Cause cause) {
return spanHandler.end(BraveTraceContext.fromBrave(context), BraveFinishedSpan.fromBrave(span));
}

@Override
public void close() throws IOException {
this.spanHandler.close();
}

@Override
public void run() {
this.spanHandler.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
Expand All @@ -17,10 +17,8 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import brave.handler.MutableSpan;
import brave.handler.SpanHandler;
import brave.propagation.TraceContext;
import com.wavefront.internal.reporter.WavefrontInternalReporter;
import com.wavefront.java_sdk.com.google.common.collect.Iterators;
import com.wavefront.java_sdk.com.google.common.collect.Sets;
import com.wavefront.sdk.common.NamedThreadFactory;
import com.wavefront.sdk.common.Pair;
Expand All @@ -32,6 +30,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.cloud.sleuth.TraceContext;
import org.springframework.cloud.sleuth.exporter.FinishedSpan;
import org.springframework.util.StringUtils;

import static com.wavefront.internal.SpanDerivedMetricsUtils.TRACING_DERIVED_PREFIX;
import static com.wavefront.internal.SpanDerivedMetricsUtils.reportHeartbeats;
import static com.wavefront.internal.SpanDerivedMetricsUtils.reportWavefrontGeneratedData;
Expand Down Expand Up @@ -66,7 +68,7 @@
* {@link UUID#timestamp()} on UUIDs converted here, or in other Wavefront code, as it might
* throw.
*/
final class WavefrontSleuthSpanHandler extends SpanHandler implements Runnable, Closeable {
public final class WavefrontSleuthSpanHandler implements Runnable, Closeable {
private static final Log LOG = LogFactory.getLog(WavefrontSleuthSpanHandler.class);

// https://github.com/wavefrontHQ/wavefront-proxy/blob/3dd1fa11711a04de2d9d418e2269f0f9fb464f36/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java#L114-L114
Expand All @@ -75,7 +77,21 @@ final class WavefrontSleuthSpanHandler extends SpanHandler implements Runnable,
private final static String DEFAULT_SOURCE = "wavefront-spring-boot";
private final static String WAVEFRONT_GENERATED_COMPONENT = "wavefront-generated";

final LinkedBlockingQueue<Pair<TraceContext, MutableSpan>> spanBuffer;
private static final int LONG_BYTES = Long.SIZE / Byte.SIZE;

private static final int BYTE_BASE16 = 2;

private static final int LONG_BASE16 = BYTE_BASE16 * LONG_BYTES;

private static final int TRACE_ID_HEX_SIZE = 2 * LONG_BASE16;

private static final String ALPHABET = "0123456789abcdef";

private static final int ASCII_CHARACTERS = 128;

private static final byte[] DECODING = buildDecodingArray();

final LinkedBlockingQueue<Pair<TraceContext, FinishedSpan>> spanBuffer;
final WavefrontSender wavefrontSender;
final WavefrontInternalReporter wfInternalReporter;
final Set<String> traceDerivedCustomTagKeys;
Expand Down Expand Up @@ -144,7 +160,7 @@ final class WavefrontSleuthSpanHandler extends SpanHandler implements Runnable,

// Exact same behavior as WavefrontSpanReporter
// https://github.com/wavefrontHQ/wavefront-opentracing-sdk-java/blob/f1f08d8daf7b692b9b61dcd5bc24ca6befa8e710/src/main/java/com/wavefront/opentracing/reporting/WavefrontSpanReporter.java#L163-L179
@Override public boolean end(TraceContext context, MutableSpan span, Cause cause) {
public boolean end(TraceContext context, FinishedSpan span) {
spansReceived.increment();
if (!spanBuffer.offer(Pair.of(context, span))) {
spansDropped.increment();
Expand All @@ -160,30 +176,48 @@ List<Pair<String, String>> getDefaultTags() {
return Collections.unmodifiableList(this.defaultTags);
}

private void send(TraceContext context, MutableSpan span) {
UUID traceId = new UUID(context.traceIdHigh(), context.traceId());
UUID spanId = new UUID(0L, context.spanId());
private String padLeftWithZeros(String string, int length) {
if (string.length() >= length) {
return string;
}
else {
StringBuilder sb = new StringBuilder(length);
for (int i = string.length(); i < length; i++) {
sb.append('0');
}

return sb.append(string).toString();
}
}

private void send(TraceContext context, FinishedSpan span) {
String traceIdString = padLeftWithZeros(context.traceId(), TRACE_ID_HEX_SIZE);
String traceIdHigh = traceIdString.substring(0, traceIdString.length() / 2);
String traceIdLow = traceIdString.substring(traceIdString.length() / 2);
UUID traceId = new UUID(longFromBase16String(traceIdHigh), longFromBase16String(traceIdLow));
UUID spanId = new UUID(0L, longFromBase16String(context.spanId()));

// NOTE: wavefront-opentracing-sdk-java and wavefront-proxy differ, but we prefer the former.
// https://github.com/wavefrontHQ/wavefront-opentracing-sdk-java/blob/f1f08d8daf7b692b9b61dcd5bc24ca6befa8e710/src/main/java/com/wavefront/opentracing/reporting/WavefrontSpanReporter.java#L187-L190
// https://github.com/wavefrontHQ/wavefront-proxy/blob/3dd1fa11711a04de2d9d418e2269f0f9fb464f36/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java#L248-L252
List<UUID> parents = null;
if (context.parentIdAsLong() != 0L) {
parents = Collections.singletonList(new UUID(0L, context.parentIdAsLong()));
String parentId = context.parentId();
if (StringUtils.hasText(parentId) && longFromBase16String(parentId) != 0L) {
parents = Collections.singletonList(new UUID(0L, longFromBase16String(parentId)));
}
List<UUID> followsFrom = null;

// https://github.com/wavefrontHQ/wavefront-proxy/blob/3dd1fa11711a04de2d9d418e2269f0f9fb464f36/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java#L344-L345
String name = span.name();
String name = span.getName();
if (name == null) name = DEFAULT_SPAN_NAME;

// Start and duration become 0L if unset. Any positive duration rounds up to 1 millis.
long startMillis = span.startTimestamp() / 1000L, finishMillis = span.finishTimestamp() / 1000L;
long durationMicros = span.finishTimestamp() - span.startTimestamp();
long startMillis = span.getStartTimestamp() / 1000L, finishMillis = span.getEndTimestamp() / 1000L;
long durationMicros = span.getEndTimestamp() - span.getStartTimestamp();
long durationMillis = startMillis != 0 && finishMillis != 0L ? Math.max(finishMillis - startMillis, 1L) : 0L;

List<SpanLog> spanLogs = convertAnnotationsToSpanLogs(span);
TagList tags = new TagList(defaultTagKeys, defaultTags, context, span);
TagList tags = new TagList(defaultTagKeys, defaultTags, span);

try {
wavefrontSender.sendSpan(name, startMillis, durationMillis, source, traceId, spanId,
Expand Down Expand Up @@ -212,6 +246,38 @@ private void send(TraceContext context, MutableSpan span) {
}
}

private static byte[] buildDecodingArray() {
byte[] decoding = new byte[ASCII_CHARACTERS];
Arrays.fill(decoding, (byte) -1);
for (int i = 0; i < ALPHABET.length(); i++) {
char c = ALPHABET.charAt(i);
decoding[c] = (byte) i;
}
return decoding;
}

/**
* Returns the {@code long} value whose base16 representation is stored in the first
* 16 chars of {@code chars} starting from the {@code offset}.
* @param chars the base16 representation of the {@code long}.
*/
private static long longFromBase16String(CharSequence chars) {
int offset = 0;
return (decodeByte(chars.charAt(offset), chars.charAt(offset + 1)) & 0xFFL) << 56
| (decodeByte(chars.charAt(offset + 2), chars.charAt(offset + 3)) & 0xFFL) << 48
| (decodeByte(chars.charAt(offset + 4), chars.charAt(offset + 5)) & 0xFFL) << 40
| (decodeByte(chars.charAt(offset + 6), chars.charAt(offset + 7)) & 0xFFL) << 32
| (decodeByte(chars.charAt(offset + 8), chars.charAt(offset + 9)) & 0xFFL) << 24
| (decodeByte(chars.charAt(offset + 10), chars.charAt(offset + 11)) & 0xFFL) << 16
| (decodeByte(chars.charAt(offset + 12), chars.charAt(offset + 13)) & 0xFFL) << 8
| (decodeByte(chars.charAt(offset + 14), chars.charAt(offset + 15)) & 0xFFL);
}

private static byte decodeByte(char hi, char lo) {
int decoded = DECODING[hi] << 4 | DECODING[lo];
return (byte) decoded;
}

/**
* Extracted for test isolation and as parsing otherwise implies multiple-returns or scanning
* later.
Expand All @@ -225,18 +291,20 @@ static final class TagList extends ArrayList<Pair<String, String>> {

TagList(
Set<String> defaultTagKeys,
List<Pair<String, String>> defaultTags,
TraceContext context,
MutableSpan span
List<Pair<String, String>> defaultTags,FinishedSpan span
){
super(defaultTags.size() + span.tagCount());
boolean debug = context.debug(), hasAnnotations = span.annotationCount() > 0;
isError = span.error() != null;
super(defaultTags.size() + span.getTags().size());
// TODO: OTel doesn't have a notion of debug
boolean debug = false;
boolean hasAnnotations = span.getEvents().size() > 0;
isError = span.getError() != null;

int tagCount = span.tagCount();
int tagCount = span.getTags().size();
addAll(defaultTags);
for (int i = 0; i < tagCount; i++) {
String key = span.tagKeyAt(i), value = span.tagValueAt(i);
String tagKey = Iterators.get(span.getTags().keySet().iterator(), i);
String tagValue = Iterators.get(span.getTags().values().iterator(), i);
String key = tagKey, value = tagValue;
String lcKey = key.toLowerCase(Locale.ROOT);
if (lcKey.equals(ERROR_TAG_KEY)) {
isError = true;
Expand All @@ -261,8 +329,8 @@ static final class TagList extends ArrayList<Pair<String, String>> {
if (debug) add(Pair.of(DEBUG_TAG_KEY, "true"));

// https://github.com/wavefrontHQ/wavefront-proxy/blob/3dd1fa11711a04de2d9d418e2269f0f9fb464f36/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java#L254-L266
if (span.kind() != null) {
String kind = span.kind().toString().toLowerCase();
if (span.getKind() != null) {
String kind = span.getKind().toString().toLowerCase();
add(Pair.of("span.kind", kind));
if (hasAnnotations) {
add(Pair.of("_spanSecondaryId", kind));
Expand All @@ -273,20 +341,21 @@ static final class TagList extends ArrayList<Pair<String, String>> {
if (hasAnnotations) add(Pair.of(SPAN_LOG_KEY, "true"));

// https://github.com/wavefrontHQ/wavefront-proxy/blob/3dd1fa11711a04de2d9d418e2269f0f9fb464f36/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java#L324-L327
if (span.localIp() != null) {
add(Pair.of("ipv4", span.localIp())); // NOTE: this could be IPv6!!
if (span.getLocalIp() != null) {
add(Pair.of("ipv4", span.getLocalIp())); // NOTE: this could be IPv6!!
}
}
}

// https://github.com/wavefrontHQ/wavefront-proxy/blob/3dd1fa11711a04de2d9d418e2269f0f9fb464f36/proxy/src/main/java/com/wavefront/agent/listeners/tracing/ZipkinPortUnificationHandler.java#L397-L402
static List<SpanLog> convertAnnotationsToSpanLogs(MutableSpan span) {
int annotationCount = span.annotationCount();
static List<SpanLog> convertAnnotationsToSpanLogs(FinishedSpan span) {
int annotationCount = span.getEvents().size();
if (annotationCount == 0) return Collections.emptyList();
List<SpanLog> spanLogs = new ArrayList<>(annotationCount);
for (int i = 0; i < annotationCount; i++) {
long epochMicros = span.annotationTimestampAt(i);
String value = span.annotationValueAt(i);
Map.Entry<Long, String> entry = Iterators.get(span.getEvents().iterator(), i);
long epochMicros = entry.getKey();
String value = entry.getValue();
spanLogs.add(new SpanLog(epochMicros, Collections.singletonMap("annotation", value)));
}
return spanLogs;
Expand All @@ -295,7 +364,7 @@ static List<SpanLog> convertAnnotationsToSpanLogs(MutableSpan span) {
@Override public void run() {
while (!stop) {
try {
Pair<TraceContext, MutableSpan> contextAndSpan = spanBuffer.take();
Pair<TraceContext, FinishedSpan> contextAndSpan = spanBuffer.take();
send(contextAndSpan._1, contextAndSpan._2);
} catch (InterruptedException ex) {
if (LOG.isInfoEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.wavefront.spring.autoconfigure;

import brave.Tracer;
import brave.TracingCustomizer;
import brave.handler.SpanHandler;
import com.wavefront.sdk.common.WavefrontSender;
Expand All @@ -23,30 +24,37 @@
* @author Stephane Nicoll
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ SpanNamer.class, TracingCustomizer.class, SpanHandler.class })
@ConditionalOnClass({ SpanNamer.class, MeterRegistry.class, WavefrontConfig.class, WavefrontSender.class })
@AutoConfigureBefore(BraveAutoConfiguration.class)
class WavefrontTracingSleuthConfiguration {

static final String BEAN_NAME = "wavefrontTracingCustomizer";

@Bean(BEAN_NAME)
@ConditionalOnMissingBean(name = BEAN_NAME)
@Bean
@ConditionalOnBean({ MeterRegistry.class, WavefrontConfig.class, WavefrontSender.class })
TracingCustomizer wavefrontTracingCustomizer(MeterRegistry meterRegistry,
WavefrontSender wavefrontSender,
ApplicationTags applicationTags,
WavefrontConfig wavefrontConfig,
WavefrontProperties wavefrontProperties) {
WavefrontSleuthSpanHandler spanHandler = new WavefrontSleuthSpanHandler(
// https://github.com/wavefrontHQ/wavefront-opentracing-sdk-java/blob/f1f08d8daf7b692b9b61dcd5bc24ca6befa8e710/src/main/java/com/wavefront/opentracing/reporting/WavefrontSpanReporter.java#L54
50000, // TODO: maxQueueSize should be a property, ya?
wavefrontSender,
meterRegistry,
wavefrontConfig.source(),
applicationTags,
wavefrontProperties);

return t -> t.traceId128Bit(true).supportsJoin(false).addSpanHandler(spanHandler);
WavefrontSleuthSpanHandler wavefrontSleuthSpanHandler(MeterRegistry meterRegistry,
WavefrontSender wavefrontSender,
ApplicationTags applicationTags,
WavefrontConfig wavefrontConfig,
WavefrontProperties wavefrontProperties) {
return new WavefrontSleuthSpanHandler(
// https://github.com/wavefrontHQ/wavefront-opentracing-sdk-java/blob/f1f08d8daf7b692b9b61dcd5bc24ca6befa8e710/src/main/java/com/wavefront/opentracing/reporting/WavefrontSpanReporter.java#L54
50000, // TODO: maxQueueSize should be a property, ya?
wavefrontSender,
meterRegistry,
wavefrontConfig.source(),
applicationTags,
wavefrontProperties);
}

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({Tracer.class, TracingCustomizer.class, SpanHandler.class })
static class BraveCustomizerConfiguration {
@Bean(BEAN_NAME)
@ConditionalOnMissingBean(name = BEAN_NAME)
@ConditionalOnBean({ MeterRegistry.class, WavefrontConfig.class, WavefrontSender.class })
TracingCustomizer wavefrontTracingCustomizer(WavefrontSleuthSpanHandler spanHandler) {
return t -> t.traceId128Bit(true).supportsJoin(false).addSpanHandler(new WavefrontSleuthBraveSpanHandler(spanHandler));
}
}
}
Loading