5252import org .apache .spark .sql .streaming .StateOperatorProgress ;
5353import org .apache .spark .sql .streaming .StreamingQueryListener ;
5454import org .apache .spark .sql .streaming .StreamingQueryProgress ;
55- import org .apache .spark .util .Utils ;
5655import org .slf4j .Logger ;
5756import org .slf4j .LoggerFactory ;
5857import scala .Tuple2 ;
@@ -87,6 +86,10 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
8786
8887 private final AgentTracer .TracerAPI tracer ;
8988
89+ // This is created by constructor, and used if we're not in other known
90+ // parent context like Databricks, OpenLineage
91+ private final PredeterminedTraceIdParentContext predeterminedTraceIdParentContext ;
92+
9093 private AgentSpan applicationSpan ;
9194 private SparkListenerApplicationStart applicationStart ;
9295 private final HashMap <String , AgentSpan > streamingBatchSpans = new HashMap <>();
@@ -142,6 +145,9 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
142145 databricksClusterName = sparkConf .get ("spark.databricks.clusterUsageTags.clusterName" , null );
143146 databricksServiceName = getDatabricksServiceName (sparkConf , databricksClusterName );
144147 sparkServiceName = getSparkServiceName (sparkConf , isRunningOnDatabricks );
148+ predeterminedTraceIdParentContext =
149+ new PredeterminedTraceIdParentContext (
150+ Config .get ().getIdGenerationStrategy ().generateTraceId ());
145151
146152 // If JVM exiting with System.exit(code), it bypass the code closing the application span
147153 //
@@ -160,60 +166,23 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
160166 }));
161167 }
162168
163- static void setupSparkConf (SparkConf sparkConf ) {
164- sparkConf .set ("spark.openlineage.transport.type" , "composite" );
165- sparkConf .set ("spark.openlineage.transport.continueOnFailure" , "true" );
166- sparkConf .set ("spark.openlineage.transport.transports.agent.type" , "http" );
167- sparkConf .set ("spark.openlineage.transport.transports.agent.url" , getAgentHttpUrl ());
168- sparkConf .set ("spark.openlineage.transport.transports.agent.endpoint" , AGENT_OL_ENDPOINT );
169- sparkConf .set ("spark.openlineage.transport.transports.agent.compression" , "gzip" );
170- sparkConf .set (
171- "spark.openlineage.run.tags" ,
172- "_dd.trace_id:"
173- + listener .applicationSpan .context ().getTraceId ().toString ()
174- + ";_dd.ol_intake.emit_spans:false" );
175- for (Tuple2 <String , String > tuple : sparkConf .getAll ()) {
176- log .error ("Set Spark conf: Key: " + tuple ._1 + ", Value: " + tuple ._2 );
177- }
178- }
179-
180- public void setupOpenLineage () {
181- log .error ("Setting up OpenLineage-Datadog integration" );
169+ public void setupOpenLineage (DDTraceId traceId ) {
170+ log .error ("Setting up OpenLineage tags" );
182171 if (openLineageSparkListener != null ) {
183- log .error ("No init needed" );
184- setupSparkConf (openLineageSparkConf );
172+ openLineageSparkConf .set ("spark.openlineage.transport.type" , "composite" );
173+ openLineageSparkConf .set ("spark.openlineage.transport.continueOnFailure" , "true" );
174+ openLineageSparkConf .set ("spark.openlineage.transport.transports.agent.type" , "http" );
175+ openLineageSparkConf .set (
176+ "spark.openlineage.transport.transports.agent.url" , getAgentHttpUrl ());
177+ openLineageSparkConf .set (
178+ "spark.openlineage.transport.transports.agent.endpoint" , AGENT_OL_ENDPOINT );
179+ openLineageSparkConf .set ("spark.openlineage.transport.transports.agent.compression" , "gzip" );
180+ openLineageSparkConf .set (
181+ "spark.openlineage.run.tags" ,
182+ "_dd.trace_id:" + traceId .toString () + ";_dd.ol_intake.emit_spans:false" );
185183 return ;
186184 }
187-
188- String className = "io.openlineage.spark.agent.OpenLineageSparkListener" ;
189- Class clazz ;
190- try {
191- try {
192- clazz = Class .forName (className , true , Thread .currentThread ().getContextClassLoader ());
193- } catch (ClassNotFoundException e ) {
194- clazz = Class .forName (className , true , Utils .class .getClassLoader ());
195- }
196- } catch (ClassNotFoundException e ) {
197- log .info ("OpenLineage integration is not present on the classpath" );
198- return ;
199- }
200-
201- openLineageSparkConf = sparkConf ;
202- if (clazz == null ) {
203- log .info ("OpenLineage integration is not present on the classpath: class is null" );
204- return ;
205- }
206- try {
207- setupSparkConf (openLineageSparkConf );
208- openLineageSparkListener =
209- (SparkListenerInterface )
210- clazz .getConstructor (SparkConf .class ).newInstance (openLineageSparkConf );
211-
212- log .info (
213- "Created OL spark listener: {}" , openLineageSparkListener .getClass ().getSimpleName ());
214- } catch (Exception e ) {
215- log .warn ("Failed to instantiate OL Spark Listener: {}" , e .toString ());
216- }
185+ log .error ("No OpenLineageSparkListener!" );
217186 }
218187
219188 /** Resource name of the spark job. Provide an implementation based on a specific scala version */
@@ -237,6 +206,12 @@ public void setupOpenLineage() {
237206 @ Override
238207 public synchronized void onApplicationStart (SparkListenerApplicationStart applicationStart ) {
239208 this .applicationStart = applicationStart ;
209+
210+ setupOpenLineage (
211+ OpenlineageParentContext .from (sparkConf )
212+ .map (context -> context .getTraceId ())
213+ .orElse (predeterminedTraceIdParentContext .getTraceId ()));
214+ notifyOl (x -> openLineageSparkListener .onApplicationStart (x ), applicationStart );
240215 }
241216
242217 private void initApplicationSpanIfNotInitialized () {
@@ -260,38 +235,36 @@ private void initApplicationSpanIfNotInitialized() {
260235 }
261236
262237 captureApplicationParameters (builder );
263- captureOpenlineageContextIfPresent (builder );
238+
239+ Optional <OpenlineageParentContext > openlineageParentContext =
240+ OpenlineageParentContext .from (sparkConf );
241+ // We know we're not in Databricks context
242+ if (openlineageParentContext .isPresent ()) {
243+ captureOpenlineageContextIfPresent (builder , openlineageParentContext .get ());
244+ } else {
245+ builder .asChildOf (predeterminedTraceIdParentContext );
246+ }
264247
265248 applicationSpan = builder .start ();
266249 setDataJobsSamplingPriority (applicationSpan );
267250 applicationSpan .setMeasured (true );
268- // We need to set it up after we create application span to have correlation.
269- setupOpenLineage ();
270- notifyOl (x -> openLineageSparkListener .onApplicationStart (x ), applicationStart );
271251 }
272252
273- private void captureOpenlineageContextIfPresent (AgentTracer . SpanBuilder builder ) {
274- Optional < OpenlineageParentContext > openlineageParentContext =
275- OpenlineageParentContext . from ( sparkConf );
253+ private void captureOpenlineageContextIfPresent (
254+ AgentTracer . SpanBuilder builder , OpenlineageParentContext context ) {
255+ builder . asChildOf ( context );
276256
277- if (openlineageParentContext .isPresent ()) {
278- OpenlineageParentContext context = openlineageParentContext .get ();
279- builder .asChildOf (context );
257+ builder .withSpanId (context .getChildRootSpanId ());
280258
281- builder .withSpanId (context .getChildRootSpanId ());
259+ log .debug (
260+ "Captured Openlineage context: {}, with child trace_id: {}, child root span id: {}" ,
261+ context ,
262+ context .getTraceId (),
263+ context .getChildRootSpanId ());
282264
283- log .debug (
284- "Captured Openlineage context: {}, with child trace_id: {}, child root span id: {}" ,
285- context ,
286- context .getTraceId (),
287- context .getChildRootSpanId ());
288-
289- builder .withTag ("openlineage_parent_job_namespace" , context .getParentJobNamespace ());
290- builder .withTag ("openlineage_parent_job_name" , context .getParentJobName ());
291- builder .withTag ("openlineage_parent_run_id" , context .getParentRunId ());
292- } else {
293- log .debug ("Openlineage context not found" );
294- }
265+ builder .withTag ("openlineage_parent_job_namespace" , context .getParentJobNamespace ());
266+ builder .withTag ("openlineage_parent_job_name" , context .getParentJobName ());
267+ builder .withTag ("openlineage_parent_run_id" , context .getParentRunId ());
295268 }
296269
297270 @ Override
@@ -779,15 +752,15 @@ public void onOtherEvent(SparkListenerEvent event) {
779752
780753 private <T extends SparkListenerEvent > void notifyOl (Consumer <T > ol , T event ) {
781754 if (isRunningOnDatabricks || isStreamingJob ) {
782- log .error ("Not emitting event when running on databricks or on streaming jobs" );
755+ log .debug ("Not emitting event when running on databricks or on streaming jobs" );
783756 return ;
784757 }
785- initApplicationSpanIfNotInitialized ();
786758 if (openLineageSparkListener != null ) {
787- log .error ("Notifying with event `{}`" , event .getClass ().getCanonicalName ());
759+ log .debug (
760+ "Passing event `{}` to OpenLineageSparkListener" , event .getClass ().getCanonicalName ());
788761 ol .accept (event );
789762 } else {
790- log .error ("OpenLineageSparkListener is null" );
763+ log .trace ("OpenLineageSparkListener is null" );
791764 }
792765 }
793766
0 commit comments