feat(plugin-openlineage-event-listener): Add OpenLineage event listener plugin#27249
Conversation
… listener plugin to Presto
Reviewer's GuideAdds a new presto-openlineage-event-listener plugin that emits OpenLineage run events for Presto queries, including HTTP/console transport configuration, job naming and facet controls, and wires the plugin into the Presto build and server packaging with accompanying tests. Sequence diagram for OpenLineage event emission on Presto query lifecyclesequenceDiagram
participant PrestoServer
participant OpenLineagePlugin as OpenLineageEventListenerPlugin
participant Factory as OpenLineageEventListenerFactory
participant Listener as OpenLineageEventListener
participant OLClient as OpenLineageClient
participant Transport
participant OpenLineageBackend
PrestoServer->>OpenLineagePlugin: load Plugin SPI
OpenLineagePlugin-->>PrestoServer: EventListenerFactory list
PrestoServer->>Factory: create(config Map)
Factory->>Factory: new OpenLineageEventListenerConfig(config)
Factory->>Factory: new OpenLineageTransportConfig(config)
Factory->>Factory: buildTransport(transportConfig, config)
Factory-->>Transport: ConsoleTransport or HttpTransport
Factory->>OLClient: build with transport and disabled facets
Factory-->>PrestoServer: new OpenLineageEventListener
PrestoServer->>Listener: queryCreated(QueryCreatedEvent)
Listener->>Listener: getStartEvent(event)
Listener->>OLClient: emit(RunEvent START)
OLClient->>Transport: send START event
Transport->>OpenLineageBackend: deliver START event
PrestoServer->>Listener: queryCompleted(QueryCompletedEvent)
Listener->>Listener: queryTypeSupported(queryType?)
alt supported query type
Listener->>Listener: getCompletedEvent(event)
Listener->>OLClient: emit(RunEvent COMPLETE or FAIL)
OLClient->>Transport: send COMPLETE/FAIL event
Transport->>OpenLineageBackend: deliver COMPLETE/FAIL event
else unsupported query type
Listener-->>PrestoServer: skip emission, log debug
end
Class diagram for presto-openlineage-event-listener plugin structureclassDiagram
direction LR
class OpenLineageEventListenerPlugin {
+getEventListenerFactories() Iterable~EventListenerFactory~
}
class OpenLineageEventListenerFactory {
+getName() String
+create(config Map~String,String~) EventListener
-buildTransport(transportConfig OpenLineageTransportConfig, config Map~String,String~) Transport
-buildHttpTransport(config OpenLineageHttpTransportConfig) HttpTransport
}
class OpenLineageEventListener {
-openLineage OpenLineage
-client OpenLineageClient
-prestoURI URI
-jobNamespace String
-datasetNamespace String
-includeQueryTypes Set~QueryType~
-interpolator FormatInterpolator
+OpenLineageEventListener(openLineage OpenLineage, client OpenLineageClient, listenerConfig OpenLineageEventListenerConfig)
+queryCreated(event QueryCreatedEvent) void
+queryCompleted(event QueryCompletedEvent) void
-queryTypeSupported(queryType Optional~QueryType~) boolean
-getRunId(queryCreateTime Instant, queryMetadata QueryMetadata) UUID
-getPrestoQueryContextFacet(queryContext QueryContext) RunFacet
-getPrestoMetadataFacet(queryMetadata QueryMetadata) RunFacet
-getPrestoQueryStatisticsFacet(queryStatistics QueryStatistics) RunFacet
+getStartEvent(event QueryCreatedEvent) RunEvent
+getCompletedEvent(event QueryCompletedEvent) RunEvent
-getBaseRunFacetsBuilder(queryContext QueryContext) RunFacetsBuilder
-getBaseJobBuilder(queryContext QueryContext, queryMetadata QueryMetadata) JobBuilder
-buildInputs(ioMetadata QueryIOMetadata) List~InputDataset~
-buildOutputs(ioMetadata QueryIOMetadata) List~OutputDataset~
-getDatasetName(catalogName String, schemaName String, tableName String) String
+defaultNamespace(uri URI) URI
-toQualifiedSchemaName(catalogName String, schemaName String) String
}
class OpenLineageEventListenerConfig {
-prestoURI URI
-disabledFacets Set~OpenLineagePrestoFacet~
-namespace Optional~String~
-jobNameFormat String
-includeQueryTypes Set~QueryType~
+OpenLineageEventListenerConfig()
+OpenLineageEventListenerConfig(config Map~String,String~)
+getPrestoURI() URI
+setPrestoURI(prestoURI URI) OpenLineageEventListenerConfig
+getIncludeQueryTypes() Set~QueryType~
+setIncludeQueryTypes(includeQueryTypes Set~QueryType~) OpenLineageEventListenerConfig
+getDisabledFacets() Set~OpenLineagePrestoFacet~
+setDisabledFacets(disabledFacets Set~OpenLineagePrestoFacet~) OpenLineageEventListenerConfig
+getNamespace() Optional~String~
+setNamespace(namespace String) OpenLineageEventListenerConfig
+getJobNameFormat() String
+setJobNameFormat(jobNameFormat String) OpenLineageEventListenerConfig
+isJobNameFormatValid() boolean
}
class OpenLineageTransportConfig {
-transport OpenLineageTransport
+OpenLineageTransportConfig()
+OpenLineageTransportConfig(config Map~String,String~)
+getTransport() OpenLineageTransport
+setTransport(transport OpenLineageTransport) OpenLineageTransportConfig
}
class OpenLineageHttpTransportConfig {
<<enum>> Compression
-url URI
-endpoint String
-apiKey Optional~String~
-timeoutMillis long
-headers Map~String,String~
-urlParams Map~String,String~
-compression Compression
+OpenLineageHttpTransportConfig()
+OpenLineageHttpTransportConfig(config Map~String,String~)
+getUrl() URI
+setUrl(url URI) OpenLineageHttpTransportConfig
+getEndpoint() String
+setEndpoint(endpoint String) OpenLineageHttpTransportConfig
+getApiKey() Optional~String~
+setApiKey(apiKey String) OpenLineageHttpTransportConfig
+getTimeoutMillis() long
+setTimeoutMillis(timeoutMillis long) OpenLineageHttpTransportConfig
+getHeaders() Map~String,String~
+setHeaders(headers Map~String,String~) OpenLineageHttpTransportConfig
+getUrlParams() Map~String,String~
+setUrlParams(urlParams Map~String,String~) OpenLineageHttpTransportConfig
+getCompression() Compression
+setCompression(compression Compression) OpenLineageHttpTransportConfig
}
class OpenLineageJobContext {
-queryContext QueryContext
-queryMetadata QueryMetadata
+OpenLineageJobContext(queryContext QueryContext, queryMetadata QueryMetadata)
+getQueryContext() QueryContext
+getQueryMetadata() QueryMetadata
}
class FormatInterpolator {
-format String
-values OpenLineageJobInterpolatedValues[]
+FormatInterpolator(format String, values OpenLineageJobInterpolatedValues[])
+interpolate(context OpenLineageJobContext) String
+hasValidPlaceholders(format String, values OpenLineageJobInterpolatedValues[]) boolean
}
class OpenLineageJobInterpolatedValues {
<<enum>>
QUERY_ID
SOURCE
CLIENT_IP
USER
+value(context OpenLineageJobContext) String
}
class OpenLineagePrestoFacet {
<<enum>>
PRESTO_METADATA
PRESTO_QUERY_STATISTICS
PRESTO_QUERY_CONTEXT
+asText() String
}
class OpenLineageTransport {
<<enum>>
CONSOLE
HTTP
}
OpenLineageEventListenerPlugin ..|> Plugin
OpenLineageEventListenerFactory ..|> EventListenerFactory
OpenLineageEventListener ..|> EventListener
OpenLineageEventListenerPlugin --> OpenLineageEventListenerFactory
OpenLineageEventListenerFactory --> OpenLineageEventListenerConfig
OpenLineageEventListenerFactory --> OpenLineageTransportConfig
OpenLineageEventListenerFactory --> OpenLineageHttpTransportConfig
OpenLineageEventListenerFactory --> OpenLineageEventListener
OpenLineageEventListenerFactory --> OpenLineageClient
OpenLineageEventListenerFactory --> OpenLineage
OpenLineageTransportConfig --> OpenLineageTransport
OpenLineageHttpTransportConfig --> OpenLineageHttpTransportConfig.Compression
OpenLineageEventListener --> OpenLineageEventListenerConfig
OpenLineageEventListener --> FormatInterpolator
OpenLineageEventListener --> OpenLineagePrestoFacet
OpenLineageEventListener --> OpenLineageJobContext
FormatInterpolator --> OpenLineageJobInterpolatedValues
OpenLineageJobInterpolatedValues --> OpenLineageJobContext
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 7 issues, and left some high level feedback:
- OpenLineageEventListener.defaultNamespace assumes URI.getScheme() is non-null and calls isEmpty(), which will throw a NullPointerException for schemeless URIs—consider explicitly handling the null-scheme case (e.g., via uri.getScheme() == null) instead of relying on isEmpty().
- FormatInterpolator.hasValidPlaceholders currently uses a hardcoded VALID_FORMAT_PATTERN that explicitly lists placeholder names instead of deriving them from the provided OpenLineageJobInterpolatedValues[], which means the validation logic will fall out of sync if new placeholders are added—either compute the regex from the enum values or validate by scanning for $PLACEHOLDER tokens and checking membership in the provided array.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- OpenLineageEventListener.defaultNamespace assumes URI.getScheme() is non-null and calls isEmpty(), which will throw a NullPointerException for schemeless URIs—consider explicitly handling the null-scheme case (e.g., via uri.getScheme() == null) instead of relying on isEmpty().
- FormatInterpolator.hasValidPlaceholders currently uses a hardcoded VALID_FORMAT_PATTERN that explicitly lists placeholder names instead of deriving them from the provided OpenLineageJobInterpolatedValues[], which means the validation logic will fall out of sync if new placeholders are added—either compute the regex from the enum values or validate by scanning for $PLACEHOLDER tokens and checking membership in the provided array.
## Individual Comments
### Comment 1
<location path="presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageEventListener.java" line_range="391-396" />
<code_context>
+ return format("%s.%s.%s", catalogName, schemaName, tableName);
+ }
+
+ static URI defaultNamespace(URI uri)
+ {
+ if (!uri.getScheme().isEmpty()) {
+ return URI.create(uri.toString().replaceFirst(uri.getScheme(), "presto"));
+ }
+ return URI.create("presto://" + uri);
+ }
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Handle null URI scheme in defaultNamespace to avoid potential NullPointerException
In `defaultNamespace`, `uri.getScheme()` may be `null` (e.g., for URIs like `"//host"`), so calling `.isEmpty()` can throw a `NullPointerException`. Please add an explicit null check before using the scheme, or normalize the URI earlier so this method only receives URIs with non-null schemes.
</issue_to_address>
### Comment 2
<location path="presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageEventListener.java" line_range="76-85" />
<code_context>
+ private final Set<QueryType> includeQueryTypes;
+ private final FormatInterpolator interpolator;
+
+ public OpenLineageEventListener(OpenLineage openLineage, OpenLineageClient client, OpenLineageEventListenerConfig listenerConfig)
+ {
+ this.openLineage = requireNonNull(openLineage, "openLineage is null");
+ this.client = requireNonNull(client, "client is null");
+ requireNonNull(listenerConfig, "listenerConfig is null");
+ this.prestoURI = defaultNamespace(listenerConfig.getPrestoURI());
+ this.jobNamespace = listenerConfig.getNamespace().orElse(prestoURI.toString());
+ this.datasetNamespace = prestoURI.toString();
+ this.includeQueryTypes = ImmutableSet.copyOf(listenerConfig.getIncludeQueryTypes());
+ this.interpolator = new FormatInterpolator(listenerConfig.getJobNameFormat(), OpenLineageJobInterpolatedValues.values());
+ }
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Guard against null prestoURI in configuration before calling defaultNamespace
The constructor calls `defaultNamespace(listenerConfig.getPrestoURI())` without checking for null. If `prestoURI` is unset, this leads to a hard-to-debug `NullPointerException`. Please either make `prestoURI` a required config with a clear failure, supply a default, or add a null check that throws a descriptive configuration error.
</issue_to_address>
### Comment 3
<location path="presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/FormatInterpolator.java" line_range="64-66" />
<code_context>
+ return "$" + placeholder;
+ }
+
+ public static boolean hasValidPlaceholders(String format, OpenLineageJobInterpolatedValues[] values)
+ {
+ return VALID_FORMAT_PATTERN.matcher(format).matches();
+ }
+}
</code_context>
<issue_to_address>
**issue:** Validation ignores provided placeholder set and hardcodes allowed placeholders
`hasValidPlaceholders` accepts `OpenLineageJobInterpolatedValues[] values` but never uses it, and `VALID_FORMAT_PATTERN` hardcodes the allowed placeholders. This makes the signature misleading and requires manual regex updates when adding new placeholders. Either derive allowed placeholders from `values` to build the pattern dynamically, or remove the parameter if the hardcoded list is intentional.
</issue_to_address>
### Comment 4
<location path="presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageEventListenerFactory.java" line_range="88-97" />
<code_context>
+ ? HttpConfig.Compression.GZIP
+ : null;
+
+ return new HttpTransport(
+ new HttpConfig(
+ config.getUrl(),
+ config.getEndpoint(),
+ (int) config.getTimeoutMillis(),
+ tokenProvider,
+ config.getUrlParams(),
+ config.getHeaders(),
+ httpCompression,
+ new HttpSslContextConfig()));
+ }
+}
</code_context>
<issue_to_address>
**issue (bug_risk):** Avoid narrowing cast from long timeoutMillis to int when building HttpConfig
`(int) config.getTimeoutMillis()` can overflow if `timeoutMillis` exceeds `Integer.MAX_VALUE`, resulting in negative or incorrect timeouts. Consider either validating and clamping the value before casting, or updating `HttpConfig` to accept a `long` timeout so oversized values fail fast instead of being silently truncated.
</issue_to_address>
### Comment 5
<location path="presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/TestOpenLineageEventListener.java" line_range="28" />
<code_context>
+public class TestOpenLineageEventListener
+{
+ @Test
+ public void testGetCompleteEvent()
+ {
+ OpenLineageEventListener listener = (OpenLineageEventListener) createEventListener(Map.of(
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test for FAILED queries to cover FAIL events and error facets
The listener has dedicated logic for `queryState = "FAILED"` with non-empty `failureInfo`. Please add a test that builds a `QueryCompletedEvent` in this state and asserts that the emitted event has `eventType = RunEvent.EventType.FAIL` and an `errorMessage` facet populated with the failure message, so the failure path remains covered.
Suggested implementation:
```java
assertThat(result.getJob().getName()).isEqualTo("queryId");
```
Add a dedicated test for FAILED queries in `TestOpenLineageEventListener` to cover the failure path and error facets. Place it alongside `testGetCompleteEvent()`:
```java
@Test
public void testGetFailedEvent()
{
OpenLineageEventListener listener = (OpenLineageEventListener) createEventListener(Map.of(
"openlineage-event-listener.transport.type", "CONSOLE",
"openlineage-event-listener.presto.uri", "http://testhost"));
// Build or reuse a QueryCompletedEvent representing a FAILED query with non-empty failureInfo.
// Prefer using a shared fixture similar to `PrestoEventData.queryCompleteEvent`.
QueryCompletedEvent failedEvent = PrestoEventData.queryFailedEvent;
RunEvent result = listener.getCompletedEvent(failedEvent);
// Verify event type is FAIL
assertThat(result.getEventType()).isEqualTo(RunEvent.EventType.FAIL);
// Verify error facet is present and populated with the failure message
Map<String, OpenLineage.RunFacet> facets = result.getRun().getFacets();
assertThat(facets).containsKey("errorMessage");
OpenLineage.ErrorMessageRunFacet errorFacet =
(OpenLineage.ErrorMessageRunFacet) facets.get("errorMessage");
assertThat(errorFacet.getMessage())
.isEqualTo(failedEvent.getFailureInfo().getFailureMessage());
}
```
You will likely need to:
1. Ensure the following imports (or equivalent fully-qualified names) are present at the top of the file:
```java
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineage.RunEvent;
import com.facebook.presto.event.query.QueryCompletedEvent;
import java.util.Map;
```
(Some may already exist; avoid duplicates.)
2. Provide a `PrestoEventData.queryFailedEvent` fixture (or similar) that returns a `QueryCompletedEvent` whose `getQueryState()` is `"FAILED"` and whose `getFailureInfo()` returns a non-empty failure info object with `getFailureMessage()` populated. For example, in `PrestoEventData`:
```java
public static final QueryCompletedEvent queryFailedEvent = createFailedQueryCompletedEvent();
private static QueryCompletedEvent createFailedQueryCompletedEvent() {
// Build a QueryCompletedEvent with state FAILED and a populated failureInfo
QueryFailureInfo failureInfo = new QueryFailureInfo(
"Some failure message",
... // other required fields
);
return new QueryCompletedEvent(
...,
"FAILED",
failureInfo,
... // other required fields
);
}
```
3. Ensure the error facet key `"errorMessage"` matches the key used inside `OpenLineageEventListener` when populating the error facet. If a different key is used (e.g., `"error"` or `"error_message"`), adjust the `containsKey` and `get` calls accordingly.
4. If the failure message is obtained differently inside the listener (for example, `failureInfo.getMessage()` or `failureInfo.getThrowable().getMessage()`), update the final `assertThat(errorFacet.getMessage())` to match that source so the test aligns with the listener implementation.
</issue_to_address>
### Comment 6
<location path="presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/TestOpenLineageEventListener.java" line_range="76" />
<code_context>
+ }
+
+ @Test
+ public void testGetStartEvent()
+ {
+ OpenLineageEventListener listener = (OpenLineageEventListener) createEventListener(Map.of(
</code_context>
<issue_to_address>
**suggestion (testing):** Add tests around queryType filtering and ensuring `queryCompleted` does not emit for unsupported types
Right now the tests call `getCompletedEvent` directly, so they don’t exercise the `includeQueryTypes` filtering in `queryCompleted`. Please add tests that configure the listener with:
- a restricted type set (e.g., only `SELECT`) and send a `QueryCompletedEvent` with a non-matching type (e.g., `INSERT`), asserting no events are emitted;
- a matching type and asserting an event is emitted.
Use `OpenLineageMemoryTransport` (or a simple mock `Transport`) to capture events when invoking `queryCompleted` so the tests cover the full filter logic.
Suggested implementation:
```java
assertThat(result.getJob().getNamespace()).isEqualTo("presto://testhost");
assertThat(result.getJob().getName()).isEqualTo("queryId");
}
@Test
public void testQueryCompletedDoesNotEmitEventForUnsupportedQueryType()
{
OpenLineageEventListener listener = (OpenLineageEventListener) createEventListener(Map.of(
"openlineage-event-listener.transport.type", "MEMORY",
"openlineage-event-listener.presto.uri", "http://testhost",
// Configure includeQueryTypes so that the query type of queryCompleteEvent does NOT match.
// For example, if queryCompleteEvent is a SELECT, we restrict to INSERT to force filtering.
"openlineage-event-listener.include.query.types", "INSERT"));
// Ensure the memory transport is empty before invoking queryCompleted
OpenLineageMemoryTransport transport = OpenLineageMemoryTransport.getInstance();
transport.clear();
listener.queryCompleted(PrestoEventData.queryCompleteEvent);
// Since the query type does not match includeQueryTypes, no events should be emitted
assertThat(transport.getEvents()).isEmpty();
}
@Test
public void testQueryCompletedEmitsEventForSupportedQueryType()
{
OpenLineageEventListener listener = (OpenLineageEventListener) createEventListener(Map.of(
"openlineage-event-listener.transport.type", "MEMORY",
"openlineage-event-listener.presto.uri", "http://testhost",
// Configure includeQueryTypes so that the query type of queryCompleteEvent DOES match.
// For example, if queryCompleteEvent is a SELECT, we include SELECT.
"openlineage-event-listener.include.query.types", "SELECT"));
// Ensure the memory transport is empty before invoking queryCompleted
OpenLineageMemoryTransport transport = OpenLineageMemoryTransport.getInstance();
transport.clear();
listener.queryCompleted(PrestoEventData.queryCompleteEvent);
// Since the query type matches includeQueryTypes, a single COMPLETE event should be emitted
assertThat(transport.getEvents()).hasSize(1);
RunEvent emittedEvent = (RunEvent) transport.getEvents().get(0);
assertThat(emittedEvent.getEventType()).isEqualTo(RunEvent.EventType.COMPLETE);
assertThat(emittedEvent.getEventTime().toInstant()).isEqualTo(PrestoEventData.queryCompleteEvent.getEndTime());
}
```
1. Ensure the following imports are present at the top of `TestOpenLineageEventListener.java` (or equivalent, depending on the existing package imports and OpenLineage client version):
```java
import io.openlineage.client.OpenLineage.RunEvent;
import io.openlineage.client.transport.OpenLineageMemoryTransport;
```
2. The property key for configuring query type filtering (`"openlineage-event-listener.include.query.types"`) must match the one used in `OpenLineageEventListener`. If the actual key differs (e.g., uses hyphens vs. dots or a different prefix), adjust the property name in both new tests accordingly.
3. The `OpenLineageMemoryTransport` API is assumed to provide:
- a static `getInstance()` method returning the singleton transport used by the listener when `transport.type=MEMORY`;
- a `clear()` method to reset state between tests;
- a `getEvents()` method returning the list of emitted events.
If the actual API differs, adapt the calls in the tests (`getInstance`, `clear`, `getEvents`, and cast to `RunEvent`) to the concrete methods/fields provided in your implementation.
4. The values `"INSERT"` and `"SELECT"` for `include.query.types` must correspond to how query types are represented in `OpenLineageEventListener` (e.g., uppercase strings, enum names, etc.). If `PrestoEventData.queryCompleteEvent` uses another type (e.g., `"QUERY"`, `"SELECT_TABLE"`, etc.), update the two tests so that:
- the first test uses a non-matching type;
- the second test uses the exact matching type for `queryCompleteEvent`.
5. If `createEventListener` does not automatically wire in the `OpenLineageMemoryTransport` when `transport.type=MEMORY`, you will need to adjust the helper or listener construction to inject the memory transport instance used by the tests so that `transport.getEvents()` actually reflects what `queryCompleted` emits.
</issue_to_address>
### Comment 7
<location path="presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/OpenLineageMemoryTransport.java" line_range="24" />
<code_context>
+import java.util.Collections;
+import java.util.List;
+
+public class OpenLineageMemoryTransport
+ extends Transport
+{
</code_context>
<issue_to_address>
**suggestion (testing):** Leverage `OpenLineageMemoryTransport` in an integration-style test to assert emitted inputs/outputs and lineage facets
This in-memory `Transport` seems ideal for asserting which events the listener emits, but it isn’t used in tests yet. To better cover lineage behavior, consider an integration-style test that wires `OpenLineageEventListener` to `OpenLineageMemoryTransport`, sends a `QueryCompletedEvent` with non-empty `QueryIOMetadata` (inputs, output, and column lineage), and then inspects the captured `RunEvent` to verify input namespaces/dataset names/schema facets and output schema/column lineage facets.
Suggested implementation:
```java
@Override
public void emit(OpenLineage.RunEvent runEvent)
{
processedEvents.add(runEvent);
}
@Override
public void emit(OpenLineage.DatasetEvent datasetEvent)
{
processedEvents.add(datasetEvent);
}
@Override
public void emit(OpenLineage.BaseEvent event)
{
processedEvents.add(event);
}
/**
* Returns an immutable snapshot of all OpenLineage events that have been
* emitted through this transport. Intended for use in tests to assert lineage behavior.
*/
public List<OpenLineage.BaseEvent> getProcessedEvents()
{
synchronized (processedEvents) {
return ImmutableList.copyOf(processedEvents);
}
}
/**
* Clears all previously captured events. Useful to reset state between tests.
*/
public void clear()
{
processedEvents.clear();
}
```
To fully implement the testing approach described in your review comment, you’ll also want to add an integration-style test, for example:
1. Create a new test class (e.g., `OpenLineageEventListenerIntegrationTest`) under `presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/`.
2. In that test, construct an `OpenLineageMemoryTransport` and wire it into an `OpenLineageEventListener` instance (via configuration or direct construction, depending on how the listener is built in the codebase).
3. Build a `QueryCompletedEvent` with non-empty `QueryIOMetadata` including:
- One or more input tables (with schema/columns filled in).
- An output table (with schema/columns).
- Column-level lineage.
4. Invoke the listener’s `queryCompleted` method with this event.
5. Use `OpenLineageMemoryTransport.getProcessedEvents()` to obtain the emitted `RunEvent` and add assertions on:
- Input dataset namespace and name.
- Output dataset namespace and name.
- Input/output schema facets.
- Column lineage facets.
The exact construction of `QueryCompletedEvent`, `QueryIOMetadata`, and the listener will need to align with existing test helpers and factory methods already in your codebase.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| static URI defaultNamespace(URI uri) | ||
| { | ||
| if (!uri.getScheme().isEmpty()) { | ||
| return URI.create(uri.toString().replaceFirst(uri.getScheme(), "presto")); | ||
| } | ||
| return URI.create("presto://" + uri); |
There was a problem hiding this comment.
issue (bug_risk): Handle null URI scheme in defaultNamespace to avoid potential NullPointerException
In defaultNamespace, uri.getScheme() may be null (e.g., for URIs like "//host"), so calling .isEmpty() can throw a NullPointerException. Please add an explicit null check before using the scheme, or normalize the URI earlier so this method only receives URIs with non-null schemes.
| public OpenLineageEventListener(OpenLineage openLineage, OpenLineageClient client, OpenLineageEventListenerConfig listenerConfig) | ||
| { | ||
| this.openLineage = requireNonNull(openLineage, "openLineage is null"); | ||
| this.client = requireNonNull(client, "client is null"); | ||
| requireNonNull(listenerConfig, "listenerConfig is null"); | ||
| this.prestoURI = defaultNamespace(listenerConfig.getPrestoURI()); | ||
| this.jobNamespace = listenerConfig.getNamespace().orElse(prestoURI.toString()); | ||
| this.datasetNamespace = prestoURI.toString(); | ||
| this.includeQueryTypes = ImmutableSet.copyOf(listenerConfig.getIncludeQueryTypes()); | ||
| this.interpolator = new FormatInterpolator(listenerConfig.getJobNameFormat(), OpenLineageJobInterpolatedValues.values()); |
There was a problem hiding this comment.
issue (bug_risk): Guard against null prestoURI in configuration before calling defaultNamespace
The constructor calls defaultNamespace(listenerConfig.getPrestoURI()) without checking for null. If prestoURI is unset, this leads to a hard-to-debug NullPointerException. Please either make prestoURI a required config with a clear failure, supply a default, or add a null check that throws a descriptive configuration error.
| public static boolean hasValidPlaceholders(String format, OpenLineageJobInterpolatedValues[] values) | ||
| { | ||
| return VALID_FORMAT_PATTERN.matcher(format).matches(); |
There was a problem hiding this comment.
issue: Validation ignores provided placeholder set and hardcodes allowed placeholders
hasValidPlaceholders accepts OpenLineageJobInterpolatedValues[] values but never uses it, and VALID_FORMAT_PATTERN hardcodes the allowed placeholders. This makes the signature misleading and requires manual regex updates when adding new placeholders. Either derive allowed placeholders from values to build the pattern dynamically, or remove the parameter if the hardcoded list is intentional.
| return new HttpTransport( | ||
| new HttpConfig( | ||
| config.getUrl(), | ||
| config.getEndpoint(), | ||
| (int) config.getTimeoutMillis(), | ||
| tokenProvider, | ||
| config.getUrlParams(), | ||
| config.getHeaders(), | ||
| httpCompression, | ||
| new HttpSslContextConfig())); |
There was a problem hiding this comment.
issue (bug_risk): Avoid narrowing cast from long timeoutMillis to int when building HttpConfig
(int) config.getTimeoutMillis() can overflow if timeoutMillis exceeds Integer.MAX_VALUE, resulting in negative or incorrect timeouts. Consider either validating and clamping the value before casting, or updating HttpConfig to accept a long timeout so oversized values fail fast instead of being silently truncated.
| public class TestOpenLineageEventListener | ||
| { | ||
| @Test | ||
| public void testGetCompleteEvent() |
There was a problem hiding this comment.
suggestion (testing): Add a test for FAILED queries to cover FAIL events and error facets
The listener has dedicated logic for queryState = "FAILED" with non-empty failureInfo. Please add a test that builds a QueryCompletedEvent in this state and asserts that the emitted event has eventType = RunEvent.EventType.FAIL and an errorMessage facet populated with the failure message, so the failure path remains covered.
Suggested implementation:
assertThat(result.getJob().getName()).isEqualTo("queryId");Add a dedicated test for FAILED queries in TestOpenLineageEventListener to cover the failure path and error facets. Place it alongside testGetCompleteEvent():
@Test
public void testGetFailedEvent()
{
OpenLineageEventListener listener = (OpenLineageEventListener) createEventListener(Map.of(
"openlineage-event-listener.transport.type", "CONSOLE",
"openlineage-event-listener.presto.uri", "http://testhost"));
// Build or reuse a QueryCompletedEvent representing a FAILED query with non-empty failureInfo.
// Prefer using a shared fixture similar to `PrestoEventData.queryCompleteEvent`.
QueryCompletedEvent failedEvent = PrestoEventData.queryFailedEvent;
RunEvent result = listener.getCompletedEvent(failedEvent);
// Verify event type is FAIL
assertThat(result.getEventType()).isEqualTo(RunEvent.EventType.FAIL);
// Verify error facet is present and populated with the failure message
Map<String, OpenLineage.RunFacet> facets = result.getRun().getFacets();
assertThat(facets).containsKey("errorMessage");
OpenLineage.ErrorMessageRunFacet errorFacet =
(OpenLineage.ErrorMessageRunFacet) facets.get("errorMessage");
assertThat(errorFacet.getMessage())
.isEqualTo(failedEvent.getFailureInfo().getFailureMessage());
}You will likely need to:
-
Ensure the following imports (or equivalent fully-qualified names) are present at the top of the file:
import io.openlineage.client.OpenLineage; import io.openlineage.client.OpenLineage.RunEvent; import com.facebook.presto.event.query.QueryCompletedEvent; import java.util.Map;
(Some may already exist; avoid duplicates.)
-
Provide a
PrestoEventData.queryFailedEventfixture (or similar) that returns aQueryCompletedEventwhosegetQueryState()is"FAILED"and whosegetFailureInfo()returns a non-empty failure info object withgetFailureMessage()populated. For example, inPrestoEventData:public static final QueryCompletedEvent queryFailedEvent = createFailedQueryCompletedEvent(); private static QueryCompletedEvent createFailedQueryCompletedEvent() { // Build a QueryCompletedEvent with state FAILED and a populated failureInfo QueryFailureInfo failureInfo = new QueryFailureInfo( "Some failure message", ... // other required fields ); return new QueryCompletedEvent( ..., "FAILED", failureInfo, ... // other required fields ); }
-
Ensure the error facet key
"errorMessage"matches the key used insideOpenLineageEventListenerwhen populating the error facet. If a different key is used (e.g.,"error"or"error_message"), adjust thecontainsKeyandgetcalls accordingly. -
If the failure message is obtained differently inside the listener (for example,
failureInfo.getMessage()orfailureInfo.getThrowable().getMessage()), update the finalassertThat(errorFacet.getMessage())to match that source so the test aligns with the listener implementation.
| } | ||
|
|
||
| @Test | ||
| public void testGetStartEvent() |
There was a problem hiding this comment.
suggestion (testing): Add tests around queryType filtering and ensuring queryCompleted does not emit for unsupported types
Right now the tests call getCompletedEvent directly, so they don’t exercise the includeQueryTypes filtering in queryCompleted. Please add tests that configure the listener with:
- a restricted type set (e.g., only
SELECT) and send aQueryCompletedEventwith a non-matching type (e.g.,INSERT), asserting no events are emitted; - a matching type and asserting an event is emitted.
UseOpenLineageMemoryTransport(or a simple mockTransport) to capture events when invokingqueryCompletedso the tests cover the full filter logic.
Suggested implementation:
assertThat(result.getJob().getNamespace()).isEqualTo("presto://testhost");
assertThat(result.getJob().getName()).isEqualTo("queryId");
}
@Test
public void testQueryCompletedDoesNotEmitEventForUnsupportedQueryType()
{
OpenLineageEventListener listener = (OpenLineageEventListener) createEventListener(Map.of(
"openlineage-event-listener.transport.type", "MEMORY",
"openlineage-event-listener.presto.uri", "http://testhost",
// Configure includeQueryTypes so that the query type of queryCompleteEvent does NOT match.
// For example, if queryCompleteEvent is a SELECT, we restrict to INSERT to force filtering.
"openlineage-event-listener.include.query.types", "INSERT"));
// Ensure the memory transport is empty before invoking queryCompleted
OpenLineageMemoryTransport transport = OpenLineageMemoryTransport.getInstance();
transport.clear();
listener.queryCompleted(PrestoEventData.queryCompleteEvent);
// Since the query type does not match includeQueryTypes, no events should be emitted
assertThat(transport.getEvents()).isEmpty();
}
@Test
public void testQueryCompletedEmitsEventForSupportedQueryType()
{
OpenLineageEventListener listener = (OpenLineageEventListener) createEventListener(Map.of(
"openlineage-event-listener.transport.type", "MEMORY",
"openlineage-event-listener.presto.uri", "http://testhost",
// Configure includeQueryTypes so that the query type of queryCompleteEvent DOES match.
// For example, if queryCompleteEvent is a SELECT, we include SELECT.
"openlineage-event-listener.include.query.types", "SELECT"));
// Ensure the memory transport is empty before invoking queryCompleted
OpenLineageMemoryTransport transport = OpenLineageMemoryTransport.getInstance();
transport.clear();
listener.queryCompleted(PrestoEventData.queryCompleteEvent);
// Since the query type matches includeQueryTypes, a single COMPLETE event should be emitted
assertThat(transport.getEvents()).hasSize(1);
RunEvent emittedEvent = (RunEvent) transport.getEvents().get(0);
assertThat(emittedEvent.getEventType()).isEqualTo(RunEvent.EventType.COMPLETE);
assertThat(emittedEvent.getEventTime().toInstant()).isEqualTo(PrestoEventData.queryCompleteEvent.getEndTime());
}- Ensure the following imports are present at the top of
TestOpenLineageEventListener.java(or equivalent, depending on the existing package imports and OpenLineage client version):
import io.openlineage.client.OpenLineage.RunEvent;
import io.openlineage.client.transport.OpenLineageMemoryTransport;-
The property key for configuring query type filtering (
"openlineage-event-listener.include.query.types") must match the one used inOpenLineageEventListener. If the actual key differs (e.g., uses hyphens vs. dots or a different prefix), adjust the property name in both new tests accordingly. -
The
OpenLineageMemoryTransportAPI is assumed to provide:- a static
getInstance()method returning the singleton transport used by the listener whentransport.type=MEMORY; - a
clear()method to reset state between tests; - a
getEvents()method returning the list of emitted events.
If the actual API differs, adapt the calls in the tests (getInstance,clear,getEvents, and cast toRunEvent) to the concrete methods/fields provided in your implementation.
- a static
-
The values
"INSERT"and"SELECT"forinclude.query.typesmust correspond to how query types are represented inOpenLineageEventListener(e.g., uppercase strings, enum names, etc.). IfPrestoEventData.queryCompleteEventuses another type (e.g.,"QUERY","SELECT_TABLE", etc.), update the two tests so that:- the first test uses a non-matching type;
- the second test uses the exact matching type for
queryCompleteEvent.
-
If
createEventListenerdoes not automatically wire in theOpenLineageMemoryTransportwhentransport.type=MEMORY, you will need to adjust the helper or listener construction to inject the memory transport instance used by the tests so thattransport.getEvents()actually reflects whatqueryCompletedemits.
| import java.util.Collections; | ||
| import java.util.List; | ||
|
|
||
| public class OpenLineageMemoryTransport |
There was a problem hiding this comment.
suggestion (testing): Leverage OpenLineageMemoryTransport in an integration-style test to assert emitted inputs/outputs and lineage facets
This in-memory Transport seems ideal for asserting which events the listener emits, but it isn’t used in tests yet. To better cover lineage behavior, consider an integration-style test that wires OpenLineageEventListener to OpenLineageMemoryTransport, sends a QueryCompletedEvent with non-empty QueryIOMetadata (inputs, output, and column lineage), and then inspects the captured RunEvent to verify input namespaces/dataset names/schema facets and output schema/column lineage facets.
Suggested implementation:
@Override
public void emit(OpenLineage.RunEvent runEvent)
{
processedEvents.add(runEvent);
}
@Override
public void emit(OpenLineage.DatasetEvent datasetEvent)
{
processedEvents.add(datasetEvent);
}
@Override
public void emit(OpenLineage.BaseEvent event)
{
processedEvents.add(event);
}
/**
* Returns an immutable snapshot of all OpenLineage events that have been
* emitted through this transport. Intended for use in tests to assert lineage behavior.
*/
public List<OpenLineage.BaseEvent> getProcessedEvents()
{
synchronized (processedEvents) {
return ImmutableList.copyOf(processedEvents);
}
}
/**
* Clears all previously captured events. Useful to reset state between tests.
*/
public void clear()
{
processedEvents.clear();
}To fully implement the testing approach described in your review comment, you’ll also want to add an integration-style test, for example:
- Create a new test class (e.g.,
OpenLineageEventListenerIntegrationTest) underpresto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/. - In that test, construct an
OpenLineageMemoryTransportand wire it into anOpenLineageEventListenerinstance (via configuration or direct construction, depending on how the listener is built in the codebase). - Build a
QueryCompletedEventwith non-emptyQueryIOMetadataincluding:- One or more input tables (with schema/columns filled in).
- An output table (with schema/columns).
- Column-level lineage.
- Invoke the listener’s
queryCompletedmethod with this event. - Use
OpenLineageMemoryTransport.getProcessedEvents()to obtain the emittedRunEventand add assertions on:- Input dataset namespace and name.
- Output dataset namespace and name.
- Input/output schema facets.
- Column lineage facets.
The exact construction of QueryCompletedEvent, QueryIOMetadata, and the listener will need to align with existing test helpers and factory methods already in your codebase.
steveburnett
left a comment
There was a problem hiding this comment.
Thanks for the doc! Looks good, just a few rephrasings.
presto-docs/src/main/sphinx/develop/openlineage-event-listener.rst
Outdated
Show resolved
Hide resolved
presto-docs/src/main/sphinx/develop/openlineage-event-listener.rst
Outdated
Show resolved
Hide resolved
presto-docs/src/main/sphinx/develop/openlineage-event-listener.rst
Outdated
Show resolved
Hide resolved
presto-docs/src/main/sphinx/develop/openlineage-event-listener.rst
Outdated
Show resolved
Hide resolved
|
Please edit the PR title to follow semantic commit style to pass the failing and required CI check. See the failure in the test for advice. If you can't edit the PR title, let us know and we can help. |
|
Hi @imjalpreet @hantangwangd , do you mind help reviewing this PR since you help reviewed https://github.com/prestodb/presto/pull/25913/commits? Thanks |
steveburnett
left a comment
There was a problem hiding this comment.
LGTM! (docs)
Pull updated branch, new local doc build, looks good. Thanks!
beinan
left a comment
There was a problem hiding this comment.
let's merge this to unblock the development on prestissimo and velox side
|
rerun the ci |
|
CI passed! @beinan |
|
Hi @tdcmeehan , do you still want to take a look at this pr? otherwise, let's merge it, thanks! |
|
Thanks for the review @beinan, I only assigned it to make sure it gets a review, but I see you already got to it. No objections to merge. |
| .inputFields(column | ||
| .getSourceColumns() | ||
| .stream() | ||
| .map(inputColumn -> openLineage.newInputFieldBuilder() |
There was a problem hiding this comment.
Regarding the Column Level Lineage spec https://openlineage.io/docs/spec/facets/dataset-facets/column_lineage_facet, did we miss including the transformations field for each inputField of an output?
There was a problem hiding this comment.
Hi @evanvdia , right now the focus of this PR is to cherry-pick the change from Trino and we don't have transformation field for column lineage yet. We would need another PR to support that and we are working on it right now.
Description
Add an OpenLineage event listener plugin to Presto, ported from Trino's trino-openlineage plugin (commit 99b9b37).
co-author with @mgorsk1
The plugin emits query lifecycle events in the OpenLineage format, enabling integration with data lineage tracking systems such as Marquez, Atlan, and DataHub.
What it does:
STARTevents on query creation,COMPLETE/FAILevents on query completionOutputColumnMetadata.getSourceColumns())CONSOLE(stdout JSON) andHTTP(e.g., Marquez API) transport modesMotivation and Context
Presto currently has no OpenLineage support. OpenLineage is an open standard for lineage metadata collection that is widely adopted in the data ecosystem. This plugin enables Presto users to track data lineage without custom integration work.
Impact
presto-openlineage-event-listener— no changes to existing Presto behaviorio.openlineage:openlineage-java:1.44.1Test Plan
TestOpenLineageEventListener)TestOpenLineageEventListenerPlugin)TestOpenLineageEventListenerConfig)TestOpenLineageTransportConfig)TestOpenLineageHttpTransportConfig)./mvnw -pl presto-openlineage-event-listener test→ 12 tests, 0 failuresetc/event-listener.propertieswithCONSOLEtransport, run a query, verify JSON output on stdoutContributor checklist
Release Notes