Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename kafka propagation setting and clarify behavior #6957

Merged
merged 2 commits into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions buildscripts/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,12 @@
<module name="NonEmptyAtclauseDescription"/>
<module name="InvalidJavadocPosition"/>
<module name="JavadocTagContinuationIndentation"/>
<!-- The Error Prone MissingSummary check is better / more nuanced.
<module name="SummaryJavadoc">
<property name="forbiddenSummaryFragments"
value="^@return the *|^This method returns |^A [{]@code [a-zA-Z0-9]+[}]( is a )"/>
</module>
-->
<module name="JavadocParagraph"/>
<module name="RequireEmptyLineBeforeBlockTagGroup"/>
<module name="AtclauseOrder">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ tasks {
includeTestsMatching("KafkaClientPropagationDisabledTest")
}
include("**/KafkaClientPropagationDisabledTest.*")
jvmArgs("-Dotel.instrumentation.kafka.client-propagation.enabled=false")
jvmArgs("-Dotel.instrumentation.kafka.producer-propagation.enabled=false")
}

val testReceiveSpansDisabled by registering(Test::class) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public static void onEnter(
context = producerInstrumenter().start(parentContext, record);
scope = context.makeCurrent();

if (KafkaSingletons.isPropagationEnabled() && KafkaPropagation.shouldPropagate(apiVersions)) {
if (KafkaSingletons.isProducerPropagationEnabled()
&& KafkaPropagation.shouldPropagate(apiVersions)) {
record = KafkaPropagation.propagateContext(context, record);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigProperties;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import java.util.Map;
Expand All @@ -20,9 +21,12 @@
public final class KafkaSingletons {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-0.11";

private static final boolean PROPAGATION_ENABLED =
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.client-propagation.enabled", true);
private static final boolean PRODUCER_PROPAGATION_ENABLED =
DeprecatedConfigProperties.getBoolean(
InstrumentationConfig.get(),
"otel.instrumentation.kafka.client-propagation.enabled",
"otel.instrumentation.kafka.producer-propagation.enabled",
true);
private static final boolean METRICS_ENABLED =
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.metric-reporter.enabled", true);
Expand All @@ -38,16 +42,15 @@ public final class KafkaSingletons {
.setCaptureExperimentalSpanAttributes(
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false))
.setPropagationEnabled(PROPAGATION_ENABLED)
.setMessagingReceiveInstrumentationEnabled(
ExperimentalConfig.get().messagingReceiveInstrumentationEnabled());
PRODUCER_INSTRUMENTER = instrumenterFactory.createProducerInstrumenter();
CONSUMER_RECEIVE_INSTRUMENTER = instrumenterFactory.createConsumerReceiveInstrumenter();
CONSUMER_PROCESS_INSTRUMENTER = instrumenterFactory.createConsumerProcessInstrumenter();
}

public static boolean isPropagationEnabled() {
return PROPAGATION_ENABLED;
public static boolean isProducerPropagationEnabled() {
return PRODUCER_PROPAGATION_ENABLED;
}

public static Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import java.time.Duration

abstract class KafkaClientPropagationBaseTest extends KafkaClientBaseTest implements AgentTestTrait {

private static final boolean propagationEnabled = Boolean.parseBoolean(
System.getProperty("otel.instrumentation.kafka.client-propagation.enabled", "true"))
private static final boolean producerPropagationEnabled = Boolean.parseBoolean(
System.getProperty("otel.instrumentation.kafka.producer-propagation.enabled", "true"))

@Unroll
def "test kafka client header propagation manual config"() {
Expand All @@ -28,7 +28,7 @@ abstract class KafkaClientPropagationBaseTest extends KafkaClientBaseTest implem
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
records.count() == 1
for (record in records) {
assert record.headers().iterator().hasNext() == propagationEnabled
assert record.headers().iterator().hasNext() == producerPropagationEnabled
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,17 @@ public final class KafkaTelemetry {
private final OpenTelemetry openTelemetry;
private final Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter;
private final Instrumenter<ConsumerRecord<?, ?>, Void> consumerProcessInstrumenter;
private final boolean producerPropagationEnabled;

KafkaTelemetry(
OpenTelemetry openTelemetry,
Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter,
Instrumenter<ConsumerRecord<?, ?>, Void> consumerProcessInstrumenter) {
Instrumenter<ConsumerRecord<?, ?>, Void> consumerProcessInstrumenter,
boolean producerPropagationEnabled) {
this.openTelemetry = openTelemetry;
this.producerInstrumenter = producerInstrumenter;
this.consumerProcessInstrumenter = consumerProcessInstrumenter;
this.producerPropagationEnabled = producerPropagationEnabled;
}

/** Returns a new {@link KafkaTelemetry} configured with the given {@link OpenTelemetry}. */
Expand Down Expand Up @@ -162,23 +165,22 @@ public <K, V> Consumer<K, V> wrap(Consumer<K, V> consumer) {
* @param record the producer record to inject span info.
*/
<K, V> void buildAndInjectSpan(ProducerRecord<K, V> record) {
Context currentContext = Context.current();
Context parentContext = Context.current();

if (!producerInstrumenter.shouldStart(currentContext, record)) {
if (!producerInstrumenter.shouldStart(parentContext, record)) {
return;
}

Context current = producerInstrumenter.start(currentContext, record);
try (Scope ignored = current.makeCurrent()) {
Context context = producerInstrumenter.start(parentContext, record);
if (producerPropagationEnabled) {
try {
propagator().inject(current, record.headers(), SETTER);
propagator().inject(context, record.headers(), SETTER);
} catch (Throwable t) {
// it can happen if headers are read only (when record is sent second time)
logger.log(WARNING, "failed to inject span context. sending record second time?", t);
}
}

producerInstrumenter.end(current, record, null, null);
producerInstrumenter.end(context, record, null, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ public KafkaTelemetryBuilder setCaptureExperimentalSpanAttributes(
}

/**
* Sets whether the producer context should be propagated from the producer span to the consumer
* span. Enabled by default.
* Set whether to propagate trace context in producers. Enabled by default.
*
* <p>You will need to disable this if there are kafka consumers using kafka-clients version prior
* to 0.11, since those old versions do not support headers, and attaching trace context
* propagation headers upstream causes those consumers to fail when reading the messages.
*/
@CanIgnoreReturnValue
public KafkaTelemetryBuilder setPropagationEnabled(boolean propagationEnabled) {
Expand All @@ -85,13 +88,13 @@ public KafkaTelemetry build() {
KafkaInstrumenterFactory instrumenterFactory =
new KafkaInstrumenterFactory(openTelemetry, INSTRUMENTATION_NAME)
.setCapturedHeaders(capturedHeaders)
.setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes)
.setPropagationEnabled(propagationEnabled);
.setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes);

return new KafkaTelemetry(
openTelemetry,
instrumenterFactory.createProducerInstrumenter(producerAttributesExtractors),
instrumenterFactory.createConsumerOperationInstrumenter(
MessageOperation.RECEIVE, consumerAttributesExtractors));
MessageOperation.RECEIVE, consumerAttributesExtractors),
propagationEnabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public final class KafkaInstrumenterFactory {
private ErrorCauseExtractor errorCauseExtractor = ErrorCauseExtractor.getDefault();
private List<String> capturedHeaders = emptyList();
private boolean captureExperimentalSpanAttributes = false;
private boolean propagationEnabled = true;
private boolean messagingReceiveInstrumentationEnabled = false;

public KafkaInstrumenterFactory(OpenTelemetry openTelemetry, String instrumentationName) {
Expand All @@ -63,9 +62,14 @@ public KafkaInstrumenterFactory setCaptureExperimentalSpanAttributes(
return this;
}

/**
* @deprecated if you have a need for this configuration option please open an issue in the <a
* href="https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues">opentelemetry-java-instrumentation</a>
* repository.
*/
@Deprecated
@CanIgnoreReturnValue
public KafkaInstrumenterFactory setPropagationEnabled(boolean propagationEnabled) {
this.propagationEnabled = propagationEnabled;
return this;
}

Expand Down Expand Up @@ -137,9 +141,7 @@ public KafkaInstrumenterFactory setMessagingReceiveInstrumentationEnabled(
builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor());
}

if (!propagationEnabled) {
return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
} else if (messagingReceiveInstrumentationEnabled) {
if (messagingReceiveInstrumentationEnabled) {
builder.addSpanLinksExtractor(
new PropagatorBasedSpanLinksExtractor<ConsumerRecord<?, ?>>(
openTelemetry.getPropagators().getTextMapPropagator(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ public final class KafkaStreamsSingletons {
.setCaptureExperimentalSpanAttributes(
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false))
.setPropagationEnabled(
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.client-propagation.enabled", true))
.setMessagingReceiveInstrumentationEnabled(
ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
.createConsumerProcessInstrumenter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyConnectionInstrumenter;
import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettySslInstrumenter;
import io.opentelemetry.javaagent.bootstrap.internal.CommonConfig;
import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigPropertyWarning;
import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigProperties;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import java.util.Collections;

Expand All @@ -24,15 +24,12 @@ public final class NettyClientSingletons {

static {
InstrumentationConfig config = InstrumentationConfig.get();
DeprecatedConfigPropertyWarning.warnIfUsed(
config,
"otel.instrumentation.netty.always-create-connect-span",
"otel.instrumentation.netty.connection-telemetry.enabled");
boolean alwaysCreateConnectSpan =
config.getBoolean("otel.instrumentation.netty.always-create-connect-span", false);
connectionTelemetryEnabled =
config.getBoolean(
"otel.instrumentation.netty.connection-telemetry.enabled", alwaysCreateConnectSpan);
DeprecatedConfigProperties.getBoolean(
config,
"otel.instrumentation.netty.always-create-connect-span",
"otel.instrumentation.netty.connection-telemetry.enabled",
false);
sslTelemetryEnabled =
config.getBoolean("otel.instrumentation.netty.ssl-telemetry.enabled", false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyConnectionInstrumenter;
import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettySslInstrumenter;
import io.opentelemetry.javaagent.bootstrap.internal.CommonConfig;
import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigPropertyWarning;
import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigProperties;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import java.util.Collections;

Expand All @@ -24,15 +24,12 @@ public final class NettyClientSingletons {

static {
InstrumentationConfig config = InstrumentationConfig.get();
DeprecatedConfigPropertyWarning.warnIfUsed(
config,
"otel.instrumentation.netty.always-create-connect-span",
"otel.instrumentation.netty.connection-telemetry.enabled");
boolean alwaysCreateConnectSpan =
config.getBoolean("otel.instrumentation.netty.always-create-connect-span", false);
connectionTelemetryEnabled =
config.getBoolean(
"otel.instrumentation.netty.connection-telemetry.enabled", alwaysCreateConnectSpan);
DeprecatedConfigProperties.getBoolean(
config,
"otel.instrumentation.netty.always-create-connect-span",
"otel.instrumentation.netty.connection-telemetry.enabled",
false);
sslTelemetryEnabled =
config.getBoolean("otel.instrumentation.netty.ssl-telemetry.enabled", false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyClientInstrumenterFactory;
import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyConnectionInstrumenter;
import io.opentelemetry.javaagent.bootstrap.internal.CommonConfig;
import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigPropertyWarning;
import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigProperties;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import reactor.netty.http.client.HttpClientConfig;
import reactor.netty.http.client.HttpClientResponse;
Expand All @@ -30,16 +30,12 @@ public final class ReactorNettySingletons {

static {
InstrumentationConfig config = InstrumentationConfig.get();
DeprecatedConfigPropertyWarning.warnIfUsed(
config,
"otel.instrumentation.reactor-netty.always-create-connect-span",
"otel.instrumentation.reactor-netty.connection-telemetry.enabled");
boolean alwaysCreateConnectSpan =
config.getBoolean("otel.instrumentation.reactor-netty.always-create-connect-span", false);
connectionTelemetryEnabled =
config.getBoolean(
DeprecatedConfigProperties.getBoolean(
config,
"otel.instrumentation.reactor-netty.always-create-connect-span",
"otel.instrumentation.reactor-netty.connection-telemetry.enabled",
alwaysCreateConnectSpan);
false);
}

private static final Instrumenter<HttpClientConfig, HttpClientResponse> INSTRUMENTER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ public final class SpringKafkaSingletons {
.setCaptureExperimentalSpanAttributes(
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false))
.setPropagationEnabled(
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.client-propagation.enabled", true))
.setMessagingReceiveInstrumentationEnabled(
ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ public final class SpringKafkaTelemetryBuilder {
private final OpenTelemetry openTelemetry;
private List<String> capturedHeaders = emptyList();
private boolean captureExperimentalSpanAttributes = false;
private boolean propagationEnabled = true;
private boolean messagingReceiveInstrumentationEnabled = false;

SpringKafkaTelemetryBuilder(OpenTelemetry openTelemetry) {
Expand All @@ -40,9 +39,14 @@ public SpringKafkaTelemetryBuilder setCaptureExperimentalSpanAttributes(
return this;
}

/**
* @deprecated if you have a need for this configuration option please open an issue in the <a
* href="https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues">opentelemetry-java-instrumentation</a>
* repository.
*/
@Deprecated
@CanIgnoreReturnValue
public SpringKafkaTelemetryBuilder setPropagationEnabled(boolean propagationEnabled) {
this.propagationEnabled = propagationEnabled;
return this;
}

Expand All @@ -62,7 +66,6 @@ public SpringKafkaTelemetry build() {
new KafkaInstrumenterFactory(openTelemetry, INSTRUMENTATION_NAME)
.setCapturedHeaders(capturedHeaders)
.setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes)
.setPropagationEnabled(propagationEnabled)
.setMessagingReceiveInstrumentationEnabled(messagingReceiveInstrumentationEnabled)
.setErrorCauseExtractor(SpringKafkaErrorCauseExtractor.INSTANCE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ public final class VertxKafkaSingletons {
.setCaptureExperimentalSpanAttributes(
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false))
.setPropagationEnabled(
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.client-propagation.enabled", true))
.setMessagingReceiveInstrumentationEnabled(
ExperimentalConfig.get().messagingReceiveInstrumentationEnabled());
BATCH_PROCESS_INSTRUMENTER = factory.createBatchProcessInstrumenter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,24 @@
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public final class DeprecatedConfigPropertyWarning {
public final class DeprecatedConfigProperties {

private static final Logger logger =
Logger.getLogger(DeprecatedConfigPropertyWarning.class.getName());
private static final Logger logger = Logger.getLogger(DeprecatedConfigProperties.class.getName());

public static void warnIfUsed(
InstrumentationConfig config, String deprecatedPropertyName, String newPropertyName) {
public static boolean getBoolean(
InstrumentationConfig config,
String deprecatedPropertyName,
String newPropertyName,
boolean defaultValue) {
if (config.getString(deprecatedPropertyName) != null) {
logger.log(
WARNING,
"Deprecated property \"{0}\" was used; use the \"{1}\" property instead",
new Object[] {deprecatedPropertyName, newPropertyName});
}
boolean value = config.getBoolean(deprecatedPropertyName, defaultValue);
return config.getBoolean(newPropertyName, value);
}

private DeprecatedConfigPropertyWarning() {}
private DeprecatedConfigProperties() {}
}