Skip to content

Commit c34a423

Browse files
committed
classloader and code review fixes
Signed-off-by: Maciej Obuchowski <[email protected]>
1 parent 9392bcd commit c34a423

File tree

9 files changed

+23
-54
lines changed

9 files changed

+23
-54
lines changed

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ public static void start(
252252

253253
if (Config.get().isDataJobsOpenLineageEnabled()) {
254254
setSystemPropertyDefault(
255-
propertyNameToSystemPropertyName("integration.openlineage-spark.enabled"), "true");
255+
propertyNameToSystemPropertyName("integration.spark-openlineage.enabled"), "true");
256256
}
257257

258258
String javaCommand = System.getProperty("sun.java.command");

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,6 @@ public static class InjectListener {
4646
@Advice.OnMethodEnter(suppress = Throwable.class)
4747
public static void enter(@Advice.This SparkContext sparkContext) {
4848
Logger log = LoggerFactory.getLogger("Spark212InjectListener");
49-
log.debug(
50-
"AbstractDatadogSparkListener classloader is: ({}) {}",
51-
System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()),
52-
AbstractDatadogSparkListener.class.getClassLoader());
53-
5449
if (Config.get().isDataJobsOpenLineageEnabled()
5550
&& AbstractDatadogSparkListener.classIsLoadable(
5651
"io.openlineage.spark.agent.OpenLineageSparkListener")

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,6 @@ public static class InjectListener {
4747
public static void enter(@Advice.This SparkContext sparkContext) {
4848
// checking whether OpenLineage integration is enabled, available and that it supports tags
4949
Logger log = LoggerFactory.getLogger("Spark212InjectListener");
50-
log.debug(
51-
"AbstractDatadogSparkListener classloader is: ({}) {}",
52-
System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()),
53-
AbstractDatadogSparkListener.class.getClassLoader());
54-
5550
if (Config.get().isDataJobsOpenLineageEnabled()
5651
&& AbstractDatadogSparkListener.classIsLoadable(
5752
"io.openlineage.spark.agent.OpenLineageSparkListener")

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import datadog.trace.api.DDTraceId;
1313
import datadog.trace.api.sampling.PrioritySampling;
1414
import datadog.trace.api.sampling.SamplingMechanism;
15+
import datadog.trace.bootstrap.InstanceStore;
1516
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
1617
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
1718
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
@@ -69,8 +70,6 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
6970
private static final Logger log = LoggerFactory.getLogger(AbstractDatadogSparkListener.class);
7071
private static final ObjectMapper objectMapper = new ObjectMapper();
7172
public static volatile AbstractDatadogSparkListener listener = null;
72-
public static volatile SparkListenerInterface openLineageSparkListener = null;
73-
public static volatile SparkConf openLineageSparkConf = null;
7473

7574
public static volatile boolean finishTraceOnApplicationEnd = true;
7675
public static volatile boolean isPysparkShell = false;
@@ -80,6 +79,9 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
8079
private final String RUNTIME_TAGS_PREFIX = "spark.datadog.tags.";
8180
private static final String AGENT_OL_ENDPOINT = "openlineage/api/v1/lineage";
8281

82+
public volatile SparkListenerInterface openLineageSparkListener = null;
83+
public volatile SparkConf openLineageSparkConf = null;
84+
8385
private final SparkConf sparkConf;
8486
private final String sparkVersion;
8587
private final String appId;
@@ -180,8 +182,10 @@ public void setupOpenLineage(DDTraceId traceId) {
180182
"spark.openlineage.run.tags",
181183
"_dd.trace_id:"
182184
+ traceId.toString()
183-
+ ";_dd.ol_intake.emit_spans:false;dd.ol_service:"
184-
+ sparkServiceName);
185+
+ ";_dd.ol_intake.emit_spans:false;_dd.ol_service:"
186+
+ sparkServiceName
187+
+ ";_dd.ol_app_id:"
188+
+ appId);
185189
return;
186190
}
187191
log.debug(
@@ -214,6 +218,12 @@ public void setupOpenLineage(DDTraceId traceId) {
214218
public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) {
215219
this.applicationStart = applicationStart;
216220

221+
if (openLineageSparkListener == null) {
222+
openLineageSparkListener =
223+
InstanceStore.of(SparkListenerInterface.class).get("openLineageListener");
224+
openLineageSparkConf = InstanceStore.of(SparkConf.class).get("openLineageSparkConf");
225+
}
226+
217227
if (openLineageSparkListener != null) {
218228
setupOpenLineage(
219229
OpenlineageParentContext.from(sparkConf)

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import datadog.trace.agent.tooling.Instrumenter;
1010
import datadog.trace.agent.tooling.InstrumenterModule;
1111
import datadog.trace.api.Config;
12+
import datadog.trace.bootstrap.InstanceStore;
1213
import net.bytebuddy.asm.Advice;
1314
import org.apache.spark.deploy.SparkSubmitArguments;
1415
import org.apache.spark.scheduler.SparkListenerInterface;
@@ -121,15 +122,12 @@ public static class LiveListenerBusAdvice {
121122
// If OL is disabled in tracer config but user set it up manually don't interfere
122123
public static boolean enter(@Advice.Argument(0) SparkListenerInterface listener) {
123124
Logger log = LoggerFactory.getLogger("LiveListenerBusAdvice");
124-
log.debug(
125-
"AbstractDatadogSparkListener classloader for LiveListenerBusAdvice is: ({}) {}",
126-
System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()),
127-
AbstractDatadogSparkListener.class.getClassLoader());
128125
if (Config.get().isDataJobsOpenLineageEnabled()
129126
&& listener != null
130127
&& "io.openlineage.spark.agent.OpenLineageSparkListener"
131128
.equals(listener.getClass().getCanonicalName())) {
132129
log.debug("Detected OpenLineage listener, skipping adding it to ListenerBus");
130+
InstanceStore.of(SparkListenerInterface.class).put("openLineageListener", listener);
133131
return true;
134132
}
135133
return false;

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenLineageInstrumentation.java

Lines changed: 3 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,9 @@
99
import datadog.trace.agent.tooling.Instrumenter;
1010
import datadog.trace.agent.tooling.InstrumenterModule;
1111
import datadog.trace.api.Config;
12-
import java.lang.reflect.Field;
12+
import datadog.trace.bootstrap.InstanceStore;
1313
import net.bytebuddy.asm.Advice;
1414
import org.apache.spark.SparkConf;
15-
import org.apache.spark.scheduler.SparkListenerInterface;
1615
import org.slf4j.Logger;
1716
import org.slf4j.LoggerFactory;
1817

@@ -21,7 +20,7 @@ public class OpenLineageInstrumentation extends InstrumenterModule.Tracing
2120
implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice {
2221

2322
public OpenLineageInstrumentation() {
24-
super("openlineage-spark");
23+
super("spark-openlineage");
2524
}
2625

2726
@Override
@@ -70,35 +69,7 @@ public static void exit(@Advice.This Object self, @Advice.FieldValue("conf") Spa
7069
"OpenLineage - Data Jobs integration disabled. Not manipulating OpenLineageSparkListener");
7170
return;
7271
}
73-
74-
log.debug("Checking for OpenLineageSparkListener");
75-
ClassLoader cl = Thread.currentThread().getContextClassLoader();
76-
if (cl.getClass().getName().contains("MutableURLClassLoader")
77-
|| cl.getClass().getName().contains("ChildFirstURLClassLoader")) {
78-
log.debug(
79-
"Detected MutableURLClassLoader. Setting OpenLineage on AbstractDatadogSparkListener.class of parent classloader");
80-
try {
81-
log.debug(
82-
"Parent classloader: ({}) {}",
83-
System.identityHashCode(cl.getParent()),
84-
cl.getParent());
85-
Class clazz = cl.getParent().loadClass(AbstractDatadogSparkListener.class.getName());
86-
Field openLineageSparkListener = clazz.getDeclaredField("openLineageSparkListener");
87-
openLineageSparkListener.setAccessible(true);
88-
openLineageSparkListener.set(null, self);
89-
90-
Field openLineageSparkConf = clazz.getDeclaredField("openLineageSparkConf");
91-
openLineageSparkConf.setAccessible(true);
92-
openLineageSparkConf.set(null, conf);
93-
} catch (Throwable e) {
94-
log.info("Failed to setup OpenLineage", e);
95-
}
96-
} else {
97-
log.debug(
98-
"Detected other classloader than MutableURLClassLoader. Setting OpenLineage on AbstractDatadogSparkListener.class");
99-
AbstractDatadogSparkListener.openLineageSparkListener = (SparkListenerInterface) self;
100-
AbstractDatadogSparkListener.openLineageSparkConf = conf;
101-
}
72+
InstanceStore.of(SparkConf.class).put("openLineageSparkConf", conf);
10273
}
10374
}
10475
}

dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ abstract class AbstractSpark32SqlTest extends AgentTestRunner {
1010
void configurePreAgent() {
1111
super.configurePreAgent()
1212
injectSysConfig("dd.integration.spark.enabled", "true")
13-
injectSysConfig("dd.integration.openlineage-spark.enabled", "true")
13+
injectSysConfig("dd.integration.spark-openlineage.enabled", "true")
1414
}
1515

1616
def "compute a GROUP BY sql query plan"() {

dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkStructuredStreamingTest.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class AbstractSparkStructuredStreamingTest extends AgentTestRunner {
2525
void configurePreAgent() {
2626
super.configurePreAgent()
2727
injectSysConfig("dd.integration.spark.enabled", "true")
28-
injectSysConfig("dd.integration.openlineage-spark.enabled", "true")
28+
injectSysConfig("dd.integration.spark-openlineage.enabled", "true")
2929
}
3030

3131
private SparkSession createSparkSession(String appName) {

dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ abstract class AbstractSparkTest extends AgentTestRunner {
3434
void configurePreAgent() {
3535
super.configurePreAgent()
3636
injectSysConfig("dd.integration.spark.enabled", "true")
37-
injectSysConfig("dd.integration.openlineage-spark.enabled", "true")
37+
injectSysConfig("dd.integration.spark-openlineage.enabled", "true")
3838
}
3939

4040
def "generate application span with child job and stages"() {

0 commit comments

Comments
 (0)