77import datadog .trace .bootstrap .instrumentation .api .AgentSpanContext ;
88import datadog .trace .bootstrap .instrumentation .api .AgentTraceCollector ;
99import datadog .trace .bootstrap .instrumentation .api .AgentTracer ;
10- import java .nio .ByteBuffer ;
11- import java .nio .charset .StandardCharsets ;
12- import java .security .MessageDigest ;
13- import java .security .NoSuchAlgorithmException ;
10+ import datadog .trace .util .FNV64Hash ;
1411import java .util .Collections ;
1512import java .util .Map ;
1613import java .util .Optional ;
@@ -27,16 +24,23 @@ public class OpenlineageParentContext implements AgentSpanContext {
2724
2825 private final DDTraceId traceId ;
2926 private final long spanId ;
30- private final long childRootSpanId ;
3127
3228 private final String parentJobNamespace ;
3329 private final String parentJobName ;
3430 private final String parentRunId ;
31+ private final String rootParentJobNamespace ;
32+ private final String rootParentJobName ;
33+ private final String rootParentRunId ;
3534
3635 public static final String OPENLINEAGE_PARENT_JOB_NAMESPACE =
3736 "spark.openlineage.parentJobNamespace" ;
3837 public static final String OPENLINEAGE_PARENT_JOB_NAME = "spark.openlineage.parentJobName" ;
3938 public static final String OPENLINEAGE_PARENT_RUN_ID = "spark.openlineage.parentRunId" ;
39+ public static final String OPENLINEAGE_ROOT_PARENT_JOB_NAMESPACE =
40+ "spark.openlineage.rootParentJobNamespace" ;
41+ public static final String OPENLINEAGE_ROOT_PARENT_JOB_NAME =
42+ "spark.openlineage.rootParentJobName" ;
43+ public static final String OPENLINEAGE_ROOT_PARENT_RUN_ID = "spark.openlineage.rootParentRunId" ;
4044
4145 public static Optional <OpenlineageParentContext > from (SparkConf sparkConf ) {
4246 if (!sparkConf .contains (OPENLINEAGE_PARENT_JOB_NAMESPACE )
@@ -45,68 +49,84 @@ public static Optional<OpenlineageParentContext> from(SparkConf sparkConf) {
4549 return Optional .empty ();
4650 }
4751
52+ if (!sparkConf .contains (OPENLINEAGE_ROOT_PARENT_RUN_ID )) {
53+ log .debug ("Found parent info, but not root parent info. Can't construct valid trace id." );
54+ return Optional .empty ();
55+ }
56+
4857 String parentJobNamespace = sparkConf .get (OPENLINEAGE_PARENT_JOB_NAMESPACE );
4958 String parentJobName = sparkConf .get (OPENLINEAGE_PARENT_JOB_NAME );
5059 String parentRunId = sparkConf .get (OPENLINEAGE_PARENT_RUN_ID );
5160
5261 if (!UUID .matcher (parentRunId ).matches ()) {
62+ log .debug ("OpenLineage parent run id is not a valid UUID: {}" , parentRunId );
63+ return Optional .empty ();
64+ }
65+
66+ String rootParentJobNamespace = sparkConf .get (OPENLINEAGE_ROOT_PARENT_JOB_NAMESPACE );
67+ String rootParentJobName = sparkConf .get (OPENLINEAGE_ROOT_PARENT_JOB_NAME );
68+ String rootParentRunId = sparkConf .get (OPENLINEAGE_ROOT_PARENT_RUN_ID );
69+
70+ if (!UUID .matcher (rootParentRunId ).matches ()) {
71+ log .debug ("OpenLineage root parent run id is not a valid UUID: {}" , parentRunId );
5372 return Optional .empty ();
5473 }
5574
5675 return Optional .of (
57- new OpenlineageParentContext (parentJobNamespace , parentJobName , parentRunId ));
76+ new OpenlineageParentContext (
77+ parentJobNamespace ,
78+ parentJobName ,
79+ parentRunId ,
80+ rootParentJobNamespace ,
81+ rootParentJobName ,
82+ rootParentRunId ));
5883 }
5984
60- OpenlineageParentContext (String parentJobNamespace , String parentJobName , String parentRunId ) {
85+ OpenlineageParentContext (
86+ String parentJobNamespace ,
87+ String parentJobName ,
88+ String parentRunId ,
89+ String rootParentJobNamespace ,
90+ String rootParentJobName ,
91+ String rootParentRunId ) {
6192 log .debug (
62- "Creating OpenlineageParentContext with parentJobNamespace: {}, parentJobName: {}, parentRunId: {}" ,
93+ "Creating OpenlineageParentContext with parentJobNamespace: {}, parentJobName: {}, parentRunId: {}, rootParentJobNamespace: {}, rootParentJobName: {}, rootParentRunId: {} " ,
6394 parentJobNamespace ,
6495 parentJobName ,
65- parentRunId );
96+ parentRunId ,
97+ rootParentJobNamespace ,
98+ rootParentJobName ,
99+ rootParentRunId );
66100
67101 this .parentJobNamespace = parentJobNamespace ;
68102 this .parentJobName = parentJobName ;
69103 this .parentRunId = parentRunId ;
70104
71- MessageDigest digest = null ;
72- try {
73- digest = MessageDigest .getInstance ("SHA-256" );
74- } catch (NoSuchAlgorithmException e ) {
75- log .debug ("Unable to find SHA-256 algorithm" , e );
76- }
105+ this .rootParentJobNamespace = rootParentJobNamespace ;
106+ this .rootParentJobName = rootParentJobName ;
107+ this .rootParentRunId = rootParentRunId ;
77108
78- if (digest != null && parentJobNamespace != null && parentRunId != null ) {
79- traceId = computeTraceId (digest , parentJobNamespace , parentJobName , parentRunId );
80- spanId = DDSpanId . ZERO ;
81-
82- childRootSpanId =
83- computeChildRootSpanId ( digest , parentJobNamespace , parentJobName , parentRunId );
109+ if (this . rootParentRunId != null ) {
110+ traceId = computeTraceId (this . rootParentRunId );
111+ spanId = computeSpanId ( this . parentRunId ) ;
112+ } else if ( this . parentRunId != null ) {
113+ traceId = computeTraceId ( this . parentRunId );
114+ spanId = computeSpanId ( this . parentRunId );
84115 } else {
85116 traceId = DDTraceId .ZERO ;
86117 spanId = DDSpanId .ZERO ;
87-
88- childRootSpanId = DDSpanId .ZERO ;
89118 }
90119
91120 log .debug ("Created OpenlineageParentContext with traceId: {}, spanId: {}" , traceId , spanId );
92121 }
93122
94- private long computeChildRootSpanId (
95- MessageDigest digest , String parentJobNamespace , String parentJobName , String parentRunId ) {
96- byte [] inputBytes =
97- (parentJobNamespace + parentJobName + parentRunId ).getBytes (StandardCharsets .UTF_8 );
98- byte [] hash = digest .digest (inputBytes );
99-
100- return ByteBuffer .wrap (hash ).getLong ();
123+ private long computeSpanId (String runId ) {
124+ return FNV64Hash .generateHash (runId , FNV64Hash .Version .v1A );
101125 }
102126
103- private DDTraceId computeTraceId (
104- MessageDigest digest , String parentJobNamespace , String parentJobName , String parentRunId ) {
105- byte [] inputBytes =
106- (parentJobNamespace + parentJobName + parentRunId ).getBytes (StandardCharsets .UTF_8 );
107- byte [] hash = digest .digest (inputBytes );
108-
109- return DDTraceId .from (ByteBuffer .wrap (hash ).getLong ());
127+ private DDTraceId computeTraceId (String runId ) {
128+ log .debug ("Generating traceID from runId: {}" , runId );
129+ return DDTraceId .from (FNV64Hash .generateHash (runId , FNV64Hash .Version .v1A ));
110130 }
111131
112132 @ Override
@@ -119,10 +139,6 @@ public long getSpanId() {
119139 return spanId ;
120140 }
121141
122- public long getChildRootSpanId () {
123- return childRootSpanId ;
124- }
125-
126142 @ Override
127143 public AgentTraceCollector getTraceCollector () {
128144 return AgentTracer .NoopAgentTraceCollector .INSTANCE ;
@@ -159,4 +175,16 @@ public String getParentJobName() {
159175 public String getParentRunId () {
160176 return parentRunId ;
161177 }
178+
179+ public String getRootParentJobNamespace () {
180+ return rootParentJobNamespace ;
181+ }
182+
183+ public String getRootParentJobName () {
184+ return rootParentJobName ;
185+ }
186+
187+ public String getRootParentRunId () {
188+ return rootParentRunId ;
189+ }
162190}
0 commit comments