Skip to content

Commit c470369

Browse files
dolfinusPraveen2112
authored andcommitted
Allow OpenLineage job name customization
1 parent 88261cc commit c470369

File tree

9 files changed

+242
-38
lines changed

9 files changed

+242
-38
lines changed

docs/src/main/sphinx/admin/event-listeners-openlineage.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ not limited to) Spark, Airflow, Flink.
3030
- Run Event Time
3131
*
3232
- Query Id
33-
- Job Facet Name
33+
- Job Facet Name (default, can be overriden)
3434
*
3535
- `trino:// + {openlineage-event-listener.trino.uri.getHost()} + ":" + {openlineage-event-listener.trino.uri.getPort()}`
3636
- Job Facet Namespace (default, can be overridden)
@@ -157,6 +157,13 @@ event-listener.config-files=etc/openlineage-event-listener.properties,...
157157
- Custom namespace to be used for Job `namespace` attribute. If blank will
158158
default to Dataset Namespace.
159159
- None.
160+
*
161+
- openlineage-event-listener.job.name-format
162+
- Custom namespace to use for the job `name` attribute.
163+
Use any string with, with optional substitution
164+
variables: `$QUERY_ID`, `$USER`, `$SOURCE`, `$CLIENT_IP`.
165+
For example: `As $USER from $CLIENT_IP via $SOURCE`.
166+
- `$QUERY_ID`.
160167

161168
:::
162169

plugin/trino-openlineage/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,11 @@
7676
</exclusions>
7777
</dependency>
7878

79+
<dependency>
80+
<groupId>io.trino</groupId>
81+
<artifactId>trino-plugin-toolkit</artifactId>
82+
</dependency>
83+
7984
<dependency>
8085
<groupId>jakarta.validation</groupId>
8186
<artifactId>jakarta.validation-api</artifactId>

plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageListener.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@
3030
import io.openlineage.client.OpenLineage.RunFacet;
3131
import io.openlineage.client.OpenLineage.RunFacetsBuilder;
3232
import io.openlineage.client.OpenLineageClient;
33+
import io.trino.plugin.base.logging.FormatInterpolator;
34+
import io.trino.plugin.openlineage.job.OpenLineageJobContext;
35+
import io.trino.plugin.openlineage.job.OpenLineageJobInterpolatedValues;
3336
import io.trino.spi.connector.CatalogSchemaName;
3437
import io.trino.spi.eventlistener.EventListener;
3538
import io.trino.spi.eventlistener.OutputColumnMetadata;
@@ -73,6 +76,7 @@ public class OpenLineageListener
7376
private final String jobNamespace;
7477
private final String datasetNamespace;
7578
private final Set<QueryType> includeQueryTypes;
79+
private final FormatInterpolator<OpenLineageJobContext> interpolator;
7680

7781
@Inject
7882
public OpenLineageListener(OpenLineage openLineage, OpenLineageClient client, OpenLineageListenerConfig listenerConfig)
@@ -84,6 +88,9 @@ public OpenLineageListener(OpenLineage openLineage, OpenLineageClient client, Op
8488
this.jobNamespace = listenerConfig.getNamespace().orElse(trinoURI.toString());
8589
this.datasetNamespace = trinoURI.toString();
8690
this.includeQueryTypes = ImmutableSet.copyOf(listenerConfig.getIncludeQueryTypes());
91+
this.interpolator = new FormatInterpolator(
92+
listenerConfig.getJobNameFormat(),
93+
OpenLineageJobInterpolatedValues.values());
8794
}
8895

8996
@Override
@@ -224,7 +231,7 @@ public RunEvent getStartEvent(QueryCreatedEvent queryCreatedEvent)
224231
.eventType(RunEvent.EventType.START)
225232
.eventTime(queryCreatedEvent.getCreateTime().atZone(UTC))
226233
.run(openLineage.newRunBuilder().runId(runID).facets(runFacetsBuilder.build()).build())
227-
.job(getBaseJobBuilder(queryCreatedEvent.getMetadata()).build())
234+
.job(getBaseJobBuilder(queryCreatedEvent.getContext(), queryCreatedEvent.getMetadata()).build())
228235
.build();
229236
}
230237

@@ -263,7 +270,7 @@ public RunEvent getCompletedEvent(QueryCompletedEvent queryCompletedEvent)
263270
: RunEvent.EventType.COMPLETE)
264271
.eventTime(queryCompletedEvent.getEndTime().atZone(UTC))
265272
.run(openLineage.newRunBuilder().runId(runID).facets(runFacetsBuilder.build()).build())
266-
.job(getBaseJobBuilder(queryCompletedEvent.getMetadata()).build())
273+
.job(getBaseJobBuilder(queryCompletedEvent.getContext(), queryCompletedEvent.getMetadata()).build())
267274
.inputs(buildInputs(queryCompletedEvent.getMetadata()))
268275
.outputs(buildOutputs(queryCompletedEvent.getIoMetadata()))
269276
.build();
@@ -278,11 +285,11 @@ private RunFacetsBuilder getBaseRunFacetsBuilder(QueryContext queryContext)
278285
.build());
279286
}
280287

281-
private JobBuilder getBaseJobBuilder(QueryMetadata queryMetadata)
288+
private JobBuilder getBaseJobBuilder(QueryContext queryContext, QueryMetadata queryMetadata)
282289
{
283290
return openLineage.newJobBuilder()
284291
.namespace(this.jobNamespace)
285-
.name(queryMetadata.getQueryId())
292+
.name(interpolator.interpolate(new OpenLineageJobContext(queryContext, queryMetadata)))
286293
.facets(openLineage.newJobFacetsBuilder()
287294
.jobType(openLineage.newJobTypeJobFacet("BATCH", "TRINO", "QUERY"))
288295
.sql(openLineage.newSQLJobFacet(queryMetadata.getQuery(), "trino"))

plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/OpenLineageListenerConfig.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,24 @@
1616
import com.google.common.collect.ImmutableSet;
1717
import io.airlift.configuration.Config;
1818
import io.airlift.configuration.ConfigDescription;
19+
import io.trino.plugin.openlineage.job.OpenLineageJobInterpolatedValues;
1920
import io.trino.spi.resourcegroups.QueryType;
21+
import jakarta.validation.constraints.AssertTrue;
22+
import jakarta.validation.constraints.NotEmpty;
2023
import jakarta.validation.constraints.NotNull;
2124

2225
import java.net.URI;
2326
import java.util.Optional;
2427
import java.util.Set;
2528

29+
import static io.trino.plugin.base.logging.FormatInterpolator.hasValidPlaceholders;
30+
2631
public class OpenLineageListenerConfig
2732
{
2833
private URI trinoURI;
2934
private Set<OpenLineageTrinoFacet> disabledFacets = ImmutableSet.of();
3035
private Optional<String> namespace = Optional.empty();
36+
private String jobNameFormat = "$QUERY_ID";
3137

3238
private Set<QueryType> includeQueryTypes = ImmutableSet.<QueryType>builder()
3339
.add(QueryType.ALTER_TABLE_EXECUTE)
@@ -91,4 +97,24 @@ public OpenLineageListenerConfig setNamespace(String namespace)
9197
this.namespace = Optional.ofNullable(namespace);
9298
return this;
9399
}
100+
101+
@NotEmpty
102+
public String getJobNameFormat()
103+
{
104+
return jobNameFormat;
105+
}
106+
107+
@Config("openlineage-event-listener.job.name-format")
108+
@ConfigDescription("Set name format for OpenLineage job")
109+
public OpenLineageListenerConfig setJobNameFormat(String jobNameFormat)
110+
{
111+
this.jobNameFormat = jobNameFormat;
112+
return this;
113+
}
114+
115+
@AssertTrue(message = "Correct job name format may consist of only letters, digits, underscores, commas, spaces, equal signs and predefined values")
116+
public boolean isJobNameFormatValid()
117+
{
118+
return hasValidPlaceholders(jobNameFormat, OpenLineageJobInterpolatedValues.values());
119+
}
94120
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.openlineage.job;
15+
16+
import io.trino.spi.eventlistener.QueryContext;
17+
import io.trino.spi.eventlistener.QueryMetadata;
18+
19+
import static java.util.Objects.requireNonNull;
20+
21+
public record OpenLineageJobContext(QueryContext queryContext, QueryMetadata queryMetadata)
22+
{
23+
public OpenLineageJobContext
24+
{
25+
requireNonNull(queryContext, "queryContext is null");
26+
requireNonNull(queryMetadata, "queryMetadata is null");
27+
}
28+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.openlineage.job;
15+
16+
import io.trino.plugin.base.logging.FormatInterpolator.InterpolatedValue;
17+
18+
import java.util.function.Function;
19+
20+
import static java.util.Objects.requireNonNull;
21+
22+
public enum OpenLineageJobInterpolatedValues
23+
implements InterpolatedValue<OpenLineageJobContext>
24+
{
25+
QUERY_ID(jobContext -> jobContext.queryMetadata().getQueryId()),
26+
SOURCE(jobContext -> jobContext.queryContext().getSource().orElse("")),
27+
CLIENT_IP(jobContext -> jobContext.queryContext().getRemoteClientAddress().orElse("")),
28+
USER(jobContext -> jobContext.queryContext().getUser());
29+
30+
private final Function<OpenLineageJobContext, String> valueProvider;
31+
32+
OpenLineageJobInterpolatedValues(Function<OpenLineageJobContext, String> valueProvider)
33+
{
34+
this.valueProvider = requireNonNull(valueProvider, "valueProvider is null");
35+
}
36+
37+
@Override
38+
public String value(OpenLineageJobContext context)
39+
{
40+
return valueProvider.apply(context);
41+
}
42+
}

plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TestOpenLineageEventListenerMarquezIntegration.java

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ protected QueryRunner createQueryRunner()
5353
@Override
5454
public void assertCreateTableAsSelectFromTable(String queryId, String query, String fullTableName, LineageTestTableType tableType, SessionRepresentation session)
5555
{
56-
String expectedQueryId = URLEncoder.encode(queryId, UTF_8);
56+
String expectedJobName = URLEncoder.encode(queryId, UTF_8);
5757

58-
checkJobRegistration(client, expectedQueryId);
58+
checkJobRegistration(client, expectedJobName);
5959
}
6060

6161
@Override
@@ -70,45 +70,45 @@ public void assertCreateTableAsSelectFromView(
7070
SessionRepresentation session)
7171
{
7272
{
73-
String expectedQueryId = URLEncoder.encode(createViewQueryId, UTF_8);
74-
checkJobRegistration(client, expectedQueryId);
73+
String expectedJobName = URLEncoder.encode(createViewQueryId, UTF_8);
74+
checkJobRegistration(client, expectedJobName);
7575
}
7676
{
77-
String expectedQueryId = URLEncoder.encode(createTableQueryId, UTF_8);
78-
checkJobRegistration(client, expectedQueryId);
77+
String expectedJobName = URLEncoder.encode(createTableQueryId, UTF_8);
78+
checkJobRegistration(client, expectedJobName);
7979
}
8080
}
8181

8282
@Override
8383
public void assertCreateTableWithJoin(String createTableQueryId, String createTableQuery, SessionRepresentation session)
8484
{
85-
String expectedQueryId = URLEncoder.encode(createTableQueryId, UTF_8);
85+
String expectedJobName = URLEncoder.encode(createTableQueryId, UTF_8);
8686

87-
checkJobRegistration(client, expectedQueryId);
87+
checkJobRegistration(client, expectedJobName);
8888
}
8989

9090
@Override
9191
public void assertCreateTableWithCTE(String createTableQueryId, String createTableQuery, SessionRepresentation session)
9292
{
93-
String expectedQueryId = URLEncoder.encode(createTableQueryId, UTF_8);
93+
String expectedJobName = URLEncoder.encode(createTableQueryId, UTF_8);
9494

95-
checkJobRegistration(client, expectedQueryId);
95+
checkJobRegistration(client, expectedJobName);
9696
}
9797

9898
@Override
9999
public void assertCreateTableWithSubquery(String createTableQueryId, String createTableQuery, SessionRepresentation session)
100100
{
101-
String expectedQueryId = URLEncoder.encode(createTableQueryId, UTF_8);
101+
String expectedJobName = URLEncoder.encode(createTableQueryId, UTF_8);
102102

103-
checkJobRegistration(client, expectedQueryId);
103+
checkJobRegistration(client, expectedJobName);
104104
}
105105

106106
@Override
107107
public void assertCreateTableWithUnion(String createTableQueryId, String createTableQuery, String fullTableName, SessionRepresentation session)
108108
{
109-
String expectedQueryId = URLEncoder.encode(createTableQueryId, UTF_8);
109+
String expectedJobName = URLEncoder.encode(createTableQueryId, UTF_8);
110110

111-
checkJobRegistration(client, expectedQueryId);
111+
checkJobRegistration(client, expectedJobName);
112112
}
113113

114114
@Override
@@ -121,12 +121,12 @@ public void assertInsertIntoTable(
121121
SessionRepresentation session)
122122
{
123123
{
124-
String expectedQueryId = URLEncoder.encode(createTableQueryId, UTF_8);
125-
checkJobRegistration(client, expectedQueryId);
124+
String expectedJobName = URLEncoder.encode(createTableQueryId, UTF_8);
125+
checkJobRegistration(client, expectedJobName);
126126
}
127127
{
128-
String expectedQueryId = URLEncoder.encode(insertQueryId, UTF_8);
129-
checkJobRegistration(client, expectedQueryId);
128+
String expectedJobName = URLEncoder.encode(insertQueryId, UTF_8);
129+
checkJobRegistration(client, expectedJobName);
130130
}
131131
}
132132

@@ -142,16 +142,16 @@ void assertDeleteFromTable(
142142
SessionRepresentation session)
143143
{
144144
{
145-
String expectedQueryId = URLEncoder.encode(createSchemaQueryId, UTF_8);
146-
checkJobRegistration(client, expectedQueryId);
145+
String expectedJobName = URLEncoder.encode(createSchemaQueryId, UTF_8);
146+
checkJobRegistration(client, expectedJobName);
147147
}
148148
{
149-
String expectedQueryId = URLEncoder.encode(createTableQueryId, UTF_8);
150-
checkJobRegistration(client, expectedQueryId);
149+
String expectedJobName = URLEncoder.encode(createTableQueryId, UTF_8);
150+
checkJobRegistration(client, expectedJobName);
151151
}
152152
{
153-
String expectedQueryId = URLEncoder.encode(deleteQueryId, UTF_8);
154-
checkJobRegistration(client, expectedQueryId);
153+
String expectedJobName = URLEncoder.encode(deleteQueryId, UTF_8);
154+
checkJobRegistration(client, expectedJobName);
155155
}
156156
}
157157

@@ -167,25 +167,25 @@ void assertMergeIntoTable(
167167
SessionRepresentation session)
168168
{
169169
{
170-
String expectedQueryId = URLEncoder.encode(createSchemaQueryId, UTF_8);
171-
checkJobRegistration(client, expectedQueryId);
170+
String expectedJobName = URLEncoder.encode(createSchemaQueryId, UTF_8);
171+
checkJobRegistration(client, expectedJobName);
172172
}
173173
{
174-
String expectedQueryId = URLEncoder.encode(createTableQueryId, UTF_8);
175-
checkJobRegistration(client, expectedQueryId);
174+
String expectedJobName = URLEncoder.encode(createTableQueryId, UTF_8);
175+
checkJobRegistration(client, expectedJobName);
176176
}
177177
{
178-
String expectedQueryId = URLEncoder.encode(mergeQueryId, UTF_8);
179-
checkJobRegistration(client, expectedQueryId);
178+
String expectedJobName = URLEncoder.encode(mergeQueryId, UTF_8);
179+
checkJobRegistration(client, expectedJobName);
180180
}
181181
}
182182

183-
private void checkJobRegistration(HttpClient client, String expectedQueryId)
183+
private void checkJobRegistration(HttpClient client, String expectedJobName)
184184
{
185185
try {
186186
String encodedNamespace = URLEncoder.encode(OPEN_LINEAGE_NAMESPACE, UTF_8);
187187
HttpRequest requestJob = HttpRequest.newBuilder()
188-
.uri(new URI(marquezURI + "/api/v1/namespaces/" + encodedNamespace + "/jobs/" + expectedQueryId))
188+
.uri(new URI(marquezURI + "/api/v1/namespaces/" + encodedNamespace + "/jobs/" + expectedJobName))
189189
.GET()
190190
.build();
191191

0 commit comments

Comments
 (0)