Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@
<module>presto-native-tests</module>
<module>presto-router</module>
<module>presto-open-telemetry</module>
<module>presto-openlineage-event-listener</module>
<module>redis-hbo-provider</module>
<module>presto-singlestore</module>
<module>presto-hana</module>
Expand Down Expand Up @@ -1192,6 +1193,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-openlineage-event-listener</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-native-sidecar-plugin</artifactId>
Expand Down
1 change: 1 addition & 0 deletions presto-docs/src/main/sphinx/develop.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ This guide is intended for Presto contributors and plugin developers.
develop/system-access-control
develop/password-authenticator
develop/event-listener
develop/openlineage-event-listener
develop/client-protocol
develop/worker-protocol
develop/serialized-page
Expand Down
163 changes: 163 additions & 0 deletions presto-docs/src/main/sphinx/develop/openlineage-event-listener.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
==========================
OpenLineage Event Listener
==========================

The OpenLineage event listener plugin emits query events in the
`OpenLineage <https://openlineage.io/>`_ format, enabling integration with
lineage tracking systems such as `Marquez <https://marquezproject.ai/>`_,
`Atlan <https://atlan.com/>`_, and `DataHub <https://datahubproject.io/>`_.

The plugin captures:

* Query start events (``START``)
* Query completion events (``COMPLETE`` or ``FAIL``)
* Input and output dataset information including column-level lineage

Installation
------------

The OpenLineage event listener plugin is bundled with Presto and requires
no additional installation.

Configuration
-------------

Create an ``etc/event-listener.properties`` file on the coordinator with the
following required properties:

.. code-block:: none

event-listener.name=openlineage-event-listener
openlineage-event-listener.presto.uri=http://presto-coordinator:8080
openlineage-event-listener.transport.type=CONSOLE

Transport Types
^^^^^^^^^^^^^^^

The plugin supports two transport types for emitting OpenLineage events:

**Console Transport**

Writes OpenLineage events as JSON to stdout. Useful for debugging and
development.

.. code-block:: none

event-listener.name=openlineage-event-listener
openlineage-event-listener.presto.uri=http://presto-coordinator:8080
openlineage-event-listener.transport.type=CONSOLE

**HTTP Transport**

Sends OpenLineage events to an HTTP endpoint such as the Marquez API.

.. code-block:: none

event-listener.name=openlineage-event-listener
openlineage-event-listener.presto.uri=http://presto-coordinator:8080
openlineage-event-listener.transport.type=HTTP
openlineage-event-listener.transport.url=http://marquez:5000
openlineage-event-listener.transport.endpoint=/api/v1/lineage

Configuration Properties
^^^^^^^^^^^^^^^^^^^^^^^^

.. list-table::
:widths: 40 10 10 40
:header-rows: 1

* - Property
- Required
- Default
- Description
* - ``openlineage-event-listener.presto.uri``
- Yes
-
- URI of the Presto server. Used for namespace rendering in OpenLineage events.
* - ``openlineage-event-listener.transport.type``
- No
- ``CONSOLE``
- Transport type for emitting events. Supported values: ``CONSOLE``, ``HTTP``.
* - ``openlineage-event-listener.namespace``
- No
-
- Override the default namespace for OpenLineage jobs. Defaults to the Presto URI with ``presto://`` scheme.
* - ``openlineage-event-listener.job.name-format``
- No
- ``$QUERY_ID``
- Format string for the OpenLineage job name. Supported placeholders: ``$QUERY_ID``, ``$USER``, ``$SOURCE``, ``$CLIENT_IP``.
* - ``openlineage-event-listener.presto.include-query-types``
- No
- ``DELETE,INSERT,MERGE,UPDATE,DATA_DEFINITION``
- Comma-separated list of query types that generate OpenLineage events. Other query types are filtered out on completion.
* - ``openlineage-event-listener.disabled-facets``
- No
-
- Comma-separated list of facets to exclude from events. Supported values: ``PRESTO_METADATA``, ``PRESTO_QUERY_STATISTICS``, ``PRESTO_QUERY_CONTEXT``.

HTTP Transport Properties
^^^^^^^^^^^^^^^^^^^^^^^^^

These properties apply when ``openlineage-event-listener.transport.type`` is set to ``HTTP``.

.. list-table::
:widths: 40 10 10 40
:header-rows: 1

* - Property
- Required
- Default
- Description
* - ``openlineage-event-listener.transport.url``
- Yes
-
- URL of the OpenLineage API server.
* - ``openlineage-event-listener.transport.endpoint``
- No
-
- Custom API path for receiving events.
* - ``openlineage-event-listener.transport.api-key``
- No
-
- API key for authentication. Sent as a ``Bearer`` token.
* - ``openlineage-event-listener.transport.timeout``
- No
- ``5s``
- HTTP request timeout. Accepts duration strings. For example: ``5s``, ``30s``, ``1m``.
* - ``openlineage-event-listener.transport.headers``
- No
-
- Custom HTTP headers as comma-separated ``key:value`` pairs.
* - ``openlineage-event-listener.transport.url-params``
- No
-
- Custom URL query parameters as comma-separated ``key:value`` pairs.
* - ``openlineage-event-listener.transport.compression``
- No
- ``NONE``
- HTTP body compression. Supported values: ``NONE``, ``GZIP``.

Event Details
-------------

The plugin emits the following OpenLineage facets:

**Run Facets**

* ``processing_engine`` - Presto server version information
* ``presto_metadata`` - Query ID, transaction ID, and query plan
* ``presto_query_context`` - User, server address, environment, source, client info
* ``presto_query_statistics`` - Detailed query execution statistics (on completion only)
* ``nominalTime`` - Query start and end times (on completion only)
* ``errorMessage`` - Failure message (on failure only)

**Job Facets**

* ``jobType`` - ``BATCH`` / ``PRESTO`` / ``QUERY``
* ``sql`` - The SQL query text with dialect ``presto``

**Dataset Facets**

* ``schema`` - Column names and types for input and output datasets
* ``dataSource`` - Catalog and schema information
* ``columnLineage`` - Column-level lineage mapping from input to output columns
119 changes: 119 additions & 0 deletions presto-openlineage-event-listener/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-root</artifactId>
<version>0.297-SNAPSHOT</version>
</parent>

<artifactId>presto-openlineage-event-listener</artifactId>
<name>presto-openlineage-event-listener</name>
<description>Presto - OpenLineage Event Listener</description>
<packaging>presto-plugin</packaging>

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<project.build.targetJdk>17</project.build.targetJdk>
<air.check.skip-modernizer>true</air.check.skip-modernizer>
</properties>

<dependencies>
<!-- OpenLineage client -->
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>openlineage-java</artifactId>
<version>1.44.1</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>log</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<!-- Presto SPI (provided) -->
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-spi</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-common</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.facebook.airlift.drift</groupId>
<artifactId>drift-api</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>slice</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>units</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<scope>provided</scope>
</dependency>

<!-- Test -->
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-testng-services</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>testing</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.openlineage;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static java.util.Objects.requireNonNull;

/**
* Simple replacement for Trino's FormatInterpolator from trino-plugin-toolkit.
* Replaces $PLACEHOLDER tokens in a format string with values from the provided context.
*/
public class FormatInterpolator
{
private static final Pattern PLACEHOLDER_PATTERN = Pattern.compile("\\$([A-Z_]+)");
// Valid format: only letters, digits, underscores, hyphens, commas, spaces, equal signs, and $PLACEHOLDER tokens
private static final Pattern VALID_FORMAT_PATTERN = Pattern.compile("^([a-zA-Z0-9_\\-,= ]|\\$(" +
"QUERY_ID|USER|SOURCE|CLIENT_IP))*$");

private final String format;
private final OpenLineageJobInterpolatedValues[] values;

public FormatInterpolator(String format, OpenLineageJobInterpolatedValues[] values)
{
this.format = requireNonNull(format, "format is null");
this.values = requireNonNull(values, "values is null");
}

public String interpolate(OpenLineageJobContext context)
{
Matcher matcher = PLACEHOLDER_PATTERN.matcher(format);
StringBuffer result = new StringBuffer();
while (matcher.find()) {
String placeholder = matcher.group(1);
String replacement = getValueForPlaceholder(placeholder, context);
matcher.appendReplacement(result, Matcher.quoteReplacement(replacement));
}
matcher.appendTail(result);
return result.toString();
}

private String getValueForPlaceholder(String placeholder, OpenLineageJobContext context)
{
for (OpenLineageJobInterpolatedValues value : values) {
if (value.name().equals(placeholder)) {
return value.value(context);
}
}
return "$" + placeholder;
}

public static boolean hasValidPlaceholders(String format, OpenLineageJobInterpolatedValues[] values)
{
return VALID_FORMAT_PATTERN.matcher(format).matches();
Comment on lines +64 to +66
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: Validation ignores provided placeholder set and hardcodes allowed placeholders

hasValidPlaceholders accepts OpenLineageJobInterpolatedValues[] values but never uses it, and VALID_FORMAT_PATTERN hardcodes the allowed placeholders. This makes the signature misleading and requires manual regex updates when adding new placeholders. Either derive allowed placeholders from values to build the pattern dynamically, or remove the parameter if the hardcoded list is intentional.

}
}
Loading
Loading