diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java index aad8be72b5f..a1858eab9cd 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java @@ -9,6 +9,7 @@ import com.datadoghq.sketch.ddsketch.encoding.VarEncodingHelper; import datadog.context.propagation.CarrierVisitor; import datadog.trace.api.Config; +import datadog.trace.api.ProcessTags; import datadog.trace.api.WellKnownTags; import datadog.trace.api.datastreams.DataStreamsContext; import datadog.trace.api.datastreams.PathwayContext; @@ -363,6 +364,10 @@ public static long getBaseHash(WellKnownTags wellKnownTags) { if (primaryTag != null) { builder.append(primaryTag); } + CharSequence processTags = ProcessTags.getTagsForSerialization(); + if (processTags != null) { + builder.append(processTags); + } return FNV64Hash.generateHash(builder.toString(), FNV64Hash.Version.v1); } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java index 2baa8943de0..6dbf342b27b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java @@ -7,7 +7,9 @@ import datadog.communication.serialization.WritableFormatter; import datadog.communication.serialization.msgpack.MsgPackWriter; import datadog.trace.api.Config; +import datadog.trace.api.ProcessTags; import datadog.trace.api.WellKnownTags; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.common.metrics.Sink; import java.util.Collection; import java.util.List; @@ -33,6 +35,7 @@ public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter private static final byte[] BACKLOG_VALUE = "Value".getBytes(ISO_8859_1); private static final byte[] BACKLOG_TAGS = "Tags".getBytes(ISO_8859_1); private static final byte[] PRODUCTS_MASK = "ProductMask".getBytes(ISO_8859_1); + private static final byte[] PROCESS_TAGS = "ProcessTags".getBytes(ISO_8859_1); private static final int INITIAL_CAPACITY = 512 * 1024; @@ -80,7 +83,9 @@ public long getProductsMask() { @Override public void writePayload(Collection data, String serviceNameOverride) { - writer.startMap(8); + final List processTags = ProcessTags.getTagsAsUTF8ByteStringList(); + final boolean hasProcessTags = processTags != null; + writer.startMap(8 + (hasProcessTags ? 1 : 0)); /* 1 */ writer.writeUTF8(ENV); writer.writeUTF8(wellKnownTags.getEnv()); @@ -139,6 +144,13 @@ public void writePayload(Collection data, String serviceNameOverrid writer.writeUTF8(PRODUCTS_MASK); writer.writeLong(getProductsMask()); + /* 9 */ + if (hasProcessTags) { + writer.writeUTF8(PROCESS_TAGS); + writer.startArray(processTags.size()); + processTags.forEach(writer::writeUTF8); + } + buffer.mark(); sink.accept(buffer.messageCount(), buffer.slice()); buffer.reset(); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy index f3a435cec1e..e9bf3803e1e 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy @@ -4,6 +4,7 @@ import datadog.communication.ddagent.DDAgentFeaturesDiscovery import datadog.communication.ddagent.SharedCommunicationObjects import datadog.communication.http.OkHttpUtils import datadog.trace.api.Config +import datadog.trace.api.ProcessTags import datadog.trace.api.TraceConfig import datadog.trace.api.WellKnownTags import datadog.trace.api.time.ControllableTimeSource @@ -21,6 +22,7 @@ import spock.lang.Shared import spock.util.concurrent.PollingConditions import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer +import static datadog.trace.api.config.GeneralConfig.EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED import static java.util.concurrent.TimeUnit.SECONDS /** @@ -103,8 +105,11 @@ class DataStreamsWritingTest extends DDCoreSpecification { assert unpacker.unpackString() == serviceNameOverride } - def "Write bucket to mock server"() { - given: + def "Write bucket to mock server with process tags enabled #processTagsEnabled"() { + setup: + injectSysConfig(EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED, "$processTagsEnabled") + ProcessTags.reset() + def conditions = new PollingConditions(timeout: 2) def testOkhttpClient = OkHttpUtils.buildHttpClient(HttpUrl.get(server.address), 5000L) @@ -152,16 +157,23 @@ class DataStreamsWritingTest extends DDCoreSpecification { assert requestBodies.size() == 1 } - validateMessage(requestBodies[0]) + validateMessage(requestBodies[0], processTagsEnabled) + + cleanup: + injectSysConfig(EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED, "false") + ProcessTags.reset() + + where: + processTagsEnabled << [true, false] } - def validateMessage(byte[] message) { + def validateMessage(byte[] message, boolean processTagsEnabled) { GzipSource gzipSource = new GzipSource(Okio.source(new ByteArrayInputStream(message))) BufferedSource bufferedSource = Okio.buffer(gzipSource) MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bufferedSource.inputStream()) - assert unpacker.unpackMapHeader() == 8 + assert unpacker.unpackMapHeader() == 8 + (processTagsEnabled ? 1 : 0) assert unpacker.unpackString() == "Env" assert unpacker.unpackString() == "test" assert unpacker.unpackString() == "Service" @@ -265,6 +277,16 @@ class DataStreamsWritingTest extends DDCoreSpecification { assert unpacker.unpackString() == "ProductMask" assert unpacker.unpackLong() == 1 + def processTags = ProcessTags.getTagsAsStringList() + assert unpacker.hasNext() == (processTags != null) + if (processTags != null) { + assert unpacker.unpackString() == "ProcessTags" + assert unpacker.unpackArrayHeader() == processTags.size() + processTags.each { + assert unpacker.unpackString() == it + } + } + return true } } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy index 3761944b6ba..9ee7a366849 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy @@ -3,6 +3,7 @@ package datadog.trace.core.datastreams import datadog.communication.ddagent.DDAgentFeaturesDiscovery import datadog.trace.api.Config import datadog.trace.api.DDTraceId +import datadog.trace.api.ProcessTags import datadog.trace.api.TraceConfig import datadog.trace.api.WellKnownTags import datadog.trace.api.datastreams.StatsPoint @@ -17,6 +18,7 @@ import java.util.function.Consumer import static datadog.context.Context.root import static datadog.trace.api.TracePropagationStyle.DATADOG +import static datadog.trace.api.config.GeneralConfig.EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED import static datadog.trace.api.config.GeneralConfig.PRIMARY_TAG import static datadog.trace.api.datastreams.DataStreamsContext.create import static datadog.trace.api.datastreams.DataStreamsContext.fromTags @@ -428,6 +430,23 @@ class DefaultPathwayContextTest extends DDCoreSpecification { firstBaseHash != secondBaseHash } + def "Process Tags used in hash calculation"() { + when: + def firstBaseHash = DefaultPathwayContext.getBaseHash(wellKnownTags) + + injectSysConfig(EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED, "true") + ProcessTags.reset() + ProcessTags.addTag("000", "first") + def secondBaseHash = DefaultPathwayContext.getBaseHash(wellKnownTags) + + then: + firstBaseHash != secondBaseHash + assert ProcessTags.getTagsForSerialization().startsWithAny("000:first,") + cleanup: + injectSysConfig(EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED, "false") + ProcessTags.reset() + } + def "Check context extractor decorator behavior"() { given: def sink = Mock(Sink) diff --git a/internal-api/src/main/java/datadog/trace/api/ProcessTags.java b/internal-api/src/main/java/datadog/trace/api/ProcessTags.java index 184930b26c1..881fc706cd9 100644 --- a/internal-api/src/main/java/datadog/trace/api/ProcessTags.java +++ b/internal-api/src/main/java/datadog/trace/api/ProcessTags.java @@ -6,9 +6,10 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collections; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.stream.Collectors; import java.util.stream.Stream; import org.slf4j.Logger; @@ -19,12 +20,14 @@ public class ProcessTags { private static boolean enabled = Config.get().isExperimentalPropagateProcessTagsEnabled(); private static class Lazy { - static final Map TAGS = loadTags(); + // the tags are used to compute a hash for dsm hence that map must be sorted. + static final SortedMap TAGS = loadTags(); static volatile UTF8BytesString serializedForm; - static volatile List listForm; + static volatile List utf8ListForm; + static volatile List stringListForm; - private static Map loadTags() { - Map tags = new LinkedHashMap<>(); + private static SortedMap loadTags() { + SortedMap tags = new TreeMap<>(); if (enabled) { try { fillBaseTags(tags); @@ -86,15 +89,21 @@ private static void fillJbossTags(Map tags) { } static void calculate() { - if (listForm != null || TAGS.isEmpty()) { + if (serializedForm != null || TAGS.isEmpty()) { return; } synchronized (Lazy.TAGS) { - final Stream tagStream = + final Stream tagStream = TAGS.entrySet().stream() - .map(entry -> entry.getKey() + ":" + TraceUtils.normalizeTag(entry.getValue())); - listForm = Collections.unmodifiableList(tagStream.collect(Collectors.toList())); - serializedForm = UTF8BytesString.create(String.join(",", listForm)); + .map( + entry -> + UTF8BytesString.create( + entry.getKey() + ":" + TraceUtils.normalizeTag(entry.getValue()))); + utf8ListForm = Collections.unmodifiableList(tagStream.collect(Collectors.toList())); + stringListForm = + Collections.unmodifiableList( + utf8ListForm.stream().map(UTF8BytesString::toString).collect(Collectors.toList())); + serializedForm = UTF8BytesString.create(String.join(",", utf8ListForm)); } } } @@ -107,21 +116,34 @@ public static void addTag(String key, String value) { synchronized (Lazy.TAGS) { Lazy.TAGS.put(key, value); Lazy.serializedForm = null; - Lazy.listForm = null; + Lazy.stringListForm = null; + Lazy.utf8ListForm = null; } } } - public static List getTagsAsList() { + public static List getTagsAsUTF8ByteStringList() { if (!enabled) { return null; } - final List listForm = Lazy.listForm; + final List listForm = Lazy.utf8ListForm; if (listForm != null) { return listForm; } Lazy.calculate(); - return Lazy.listForm; + return Lazy.utf8ListForm; + } + + public static List getTagsAsStringList() { + if (!enabled) { + return null; + } + final List listForm = Lazy.stringListForm; + if (listForm != null) { + return listForm; + } + Lazy.calculate(); + return Lazy.stringListForm; } public static UTF8BytesString getTagsForSerialization() { @@ -141,7 +163,8 @@ static void empty() { synchronized (Lazy.TAGS) { Lazy.TAGS.clear(); Lazy.serializedForm = null; - Lazy.listForm = null; + Lazy.stringListForm = null; + Lazy.utf8ListForm = null; } } diff --git a/internal-api/src/test/groovy/datadog/trace/api/ProcessTagsForkedTest.groovy b/internal-api/src/test/groovy/datadog/trace/api/ProcessTagsForkedTest.groovy index d0835bfbf48..e1725479e23 100644 --- a/internal-api/src/test/groovy/datadog/trace/api/ProcessTagsForkedTest.groovy +++ b/internal-api/src/test/groovy/datadog/trace/api/ProcessTagsForkedTest.groovy @@ -30,7 +30,7 @@ class ProcessTagsForkedTest extends DDSpecification { tags =~ expected where: jar | cls | expected - Paths.get("my test", "my.jar").toFile() | null | "entrypoint.name:my,entrypoint.basedir:my_test,entrypoint.workdir:[^,]+" + Paths.get("my test", "my.jar").toFile() | null | "entrypoint.basedir:my_test,entrypoint.name:my,entrypoint.workdir:[^,]+" Paths.get("my.jar").toFile() | null | "entrypoint.name:my,entrypoint.workdir:[^,]+" null | "com.test.Main" | "entrypoint.name:com.test.main,entrypoint.workdir:[^,]+" null | null | "entrypoint.workdir:[^,]+" @@ -56,9 +56,9 @@ class ProcessTagsForkedTest extends DDSpecification { System.clearProperty("jboss.server.name") where: jbossHome | mode | serverName | expected - "/opt/jboss/myserver" | "[Standalone]" | "standalone" | "entrypoint.name:jboss-modules,entrypoint.basedir:somewhere,entrypoint.workdir:.+,jboss.home:myserver,server.name:standalone,jboss.mode:standalone" - "/opt/jboss/myserver" | "[server1:12345]" | "server1" | "entrypoint.name:jboss-modules,entrypoint.basedir:somewhere,entrypoint.workdir:.+,jboss.home:myserver,server.name:server1,jboss.mode:domain" - null | "[Standalone]" | "standalone" | "entrypoint.name:jboss-modules,entrypoint.basedir:somewhere,entrypoint.workdir:[^,]+" // don't expect jboss tags since home is missing + "/opt/jboss/myserver" | "[Standalone]" | "standalone" | "entrypoint.basedir:somewhere,entrypoint.name:jboss-modules,entrypoint.workdir:.+,jboss.home:myserver,jboss.mode:standalone,server.name:standalone" + "/opt/jboss/myserver" | "[server1:12345]" | "server1" | "entrypoint.basedir:somewhere,entrypoint.name:jboss-modules,entrypoint.workdir:.+,jboss.home:myserver,jboss.mode:domain,server.name:server1" + null | "[Standalone]" | "standalone" | "entrypoint.basedir:somewhere,entrypoint.name:jboss-modules,entrypoint.workdir:[^,]+" // don't expect jboss tags since home is missing } def 'should not calculate process tags by default'() { @@ -72,7 +72,8 @@ class ProcessTagsForkedTest extends DDSpecification { ProcessTags.addTag("test", "value") then: assert ProcessTags.tagsForSerialization == null - assert ProcessTags.tagsAsList == null + assert ProcessTags.tagsAsStringList == null + assert ProcessTags.tagsAsUTF8ByteStringList == null } def 'should lazily recalculate when a tag is added'() { @@ -81,18 +82,24 @@ class ProcessTagsForkedTest extends DDSpecification { ProcessTags.reset() when: def processTags = ProcessTags.tagsForSerialization - def tagsAsList = ProcessTags.tagsAsList + def tagsAsList = ProcessTags.tagsAsStringList + def tagsAsUtf8List = ProcessTags.tagsAsUTF8ByteStringList then: assert ProcessTags.enabled assert processTags != null assert tagsAsList != null assert tagsAsList.size() > 0 + assert tagsAsUtf8List != null + assert tagsAsUtf8List.size() == tagsAsList.size() when: - ProcessTags.addTag("test", "value") + // add it as first pos since 0 < any other a-z + ProcessTags.addTag("0test", "value") then: - assert ProcessTags.tagsForSerialization.toString() == "$processTags,test:value" - def size = ProcessTags.tagsAsList.size() + assert ProcessTags.tagsForSerialization.toString() == "0test:value,$processTags" + def size = ProcessTags.tagsAsStringList.size() assert size == tagsAsList.size() + 1 - assert ProcessTags.tagsAsList[size - 1] == "test:value" + assert size == ProcessTags.tagsAsUTF8ByteStringList.size() + assert ProcessTags.tagsAsStringList[0] == "0test:value" + assert ProcessTags.tagsAsUTF8ByteStringList[0].toString() == "0test:value" } } diff --git a/remote-config/remote-config-core/src/main/java/datadog/remoteconfig/tuf/RemoteConfigRequest.java b/remote-config/remote-config-core/src/main/java/datadog/remoteconfig/tuf/RemoteConfigRequest.java index 7fce349e7bf..c8b6c388d41 100644 --- a/remote-config/remote-config-core/src/main/java/datadog/remoteconfig/tuf/RemoteConfigRequest.java +++ b/remote-config/remote-config-core/src/main/java/datadog/remoteconfig/tuf/RemoteConfigRequest.java @@ -34,7 +34,7 @@ public static RemoteConfigRequest newRequest( serviceEnv, serviceVersion, tags, - ProcessTags.getTagsAsList()); + ProcessTags.getTagsAsStringList()); ClientInfo clientInfo = new RemoteConfigRequest.ClientInfo(