From d5f0fbdad858d08230adcb48128c4a5b174c8ce1 Mon Sep 17 00:00:00 2001 From: "jianjian.xie" Date: Mon, 2 Mar 2026 16:34:01 -0800 Subject: [PATCH 1/3] feat(plugin-openlineage-event-listener): Port Trino OpenLineage event listener plugin to Presto --- pom.xml | 7 + presto-openlineage-event-listener/pom.xml | 119 ++++++ .../openlineage/FormatInterpolator.java | 68 +++ .../openlineage/OpenLineageEventListener.java | 403 ++++++++++++++++++ .../OpenLineageEventListenerConfig.java | 146 +++++++ .../OpenLineageEventListenerFactory.java | 99 +++++ .../OpenLineageEventListenerPlugin.java | 28 ++ .../OpenLineageHttpTransportConfig.java | 193 +++++++++ .../openlineage/OpenLineageJobContext.java | 41 ++ .../OpenLineageJobInterpolatedValues.java | 38 ++ .../openlineage/OpenLineagePrestoFacet.java | 28 ++ .../openlineage/OpenLineageTransport.java | 21 + .../OpenLineageTransportConfig.java | 47 ++ .../services/com.facebook.presto.spi.Plugin | 1 + .../OpenLineageMemoryTransport.java | 56 +++ .../plugin/openlineage/PrestoEventData.java | 158 +++++++ .../TestOpenLineageEventListener.java | 109 +++++ .../TestOpenLineageEventListenerConfig.java | 90 ++++ .../TestOpenLineageEventListenerPlugin.java | 50 +++ .../TestOpenLineageHttpTransportConfig.java | 62 +++ .../TestOpenLineageTransportConfig.java | 42 ++ presto-server/src/main/provisio/presto.xml | 6 + 22 files changed, 1812 insertions(+) create mode 100644 presto-openlineage-event-listener/pom.xml create mode 100644 presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/FormatInterpolator.java create mode 100644 presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageEventListener.java create mode 100644 presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageEventListenerConfig.java create mode 100644 presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageEventListenerFactory.java create mode 100644 presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageEventListenerPlugin.java create mode 100644 presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageHttpTransportConfig.java create mode 100644 presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageJobContext.java create mode 100644 presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageJobInterpolatedValues.java create mode 100644 presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineagePrestoFacet.java create mode 100644 presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageTransport.java create mode 100644 presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageTransportConfig.java create mode 100644 presto-openlineage-event-listener/src/main/resources/META-INF/services/com.facebook.presto.spi.Plugin create mode 100644 presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/OpenLineageMemoryTransport.java create mode 100644 presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/PrestoEventData.java create mode 100644 presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/TestOpenLineageEventListener.java create mode 100644 presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/TestOpenLineageEventListenerConfig.java create mode 100644 presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/TestOpenLineageEventListenerPlugin.java create mode 100644 presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/TestOpenLineageHttpTransportConfig.java create mode 100644 presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/TestOpenLineageTransportConfig.java diff --git a/pom.xml b/pom.xml index 880d2928cc193..0b9105d66371d 100644 --- a/pom.xml +++ b/pom.xml @@ -224,6 +224,7 @@ presto-native-tests presto-router presto-open-telemetry + presto-openlineage-event-listener redis-hbo-provider presto-singlestore presto-hana @@ -1192,6 +1193,12 @@ ${project.version} + + com.facebook.presto + presto-openlineage-event-listener + ${project.version} + + com.facebook.presto presto-native-sidecar-plugin diff --git a/presto-openlineage-event-listener/pom.xml b/presto-openlineage-event-listener/pom.xml new file mode 100644 index 0000000000000..4f85a9f1c6bf5 --- /dev/null +++ b/presto-openlineage-event-listener/pom.xml @@ -0,0 +1,119 @@ + + + 4.0.0 + + + com.facebook.presto + presto-root + 0.297-SNAPSHOT + + + presto-openlineage-event-listener + presto-openlineage-event-listener + Presto - OpenLineage Event Listener + presto-plugin + + + ${project.parent.basedir} + 17 + true + + + + + + io.openlineage + openlineage-java + 1.44.1 + + + commons-logging + commons-logging + + + + + + com.facebook.airlift + log + + + + com.fasterxml.jackson.core + jackson-databind + + + + com.google.guava + guava + + + + + com.facebook.presto + presto-spi + provided + + + + com.facebook.presto + presto-common + provided + + + + com.facebook.airlift.drift + drift-api + provided + + + + io.airlift + slice + provided + + + + com.facebook.airlift + units + provided + + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + + org.openjdk.jol + jol-core + provided + + + + + com.facebook.presto + presto-testng-services + test + + + + org.testng + testng + test + + + + com.facebook.airlift + testing + test + + + + org.assertj + assertj-core + test + + + diff --git a/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/FormatInterpolator.java b/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/FormatInterpolator.java new file mode 100644 index 0000000000000..6a8d013423359 --- /dev/null +++ b/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/FormatInterpolator.java @@ -0,0 +1,68 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.openlineage; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static java.util.Objects.requireNonNull; + +/** + * Simple replacement for Trino's FormatInterpolator from trino-plugin-toolkit. + * Replaces $PLACEHOLDER tokens in a format string with values from the provided context. + */ +public class FormatInterpolator +{ + private static final Pattern PLACEHOLDER_PATTERN = Pattern.compile("\\$([A-Z_]+)"); + // Valid format: only letters, digits, underscores, hyphens, commas, spaces, equal signs, and $PLACEHOLDER tokens + private static final Pattern VALID_FORMAT_PATTERN = Pattern.compile("^([a-zA-Z0-9_\\-,= ]|\\$(" + + "QUERY_ID|USER|SOURCE|CLIENT_IP))*$"); + + private final String format; + private final OpenLineageJobInterpolatedValues[] values; + + public FormatInterpolator(String format, OpenLineageJobInterpolatedValues[] values) + { + this.format = requireNonNull(format, "format is null"); + this.values = requireNonNull(values, "values is null"); + } + + public String interpolate(OpenLineageJobContext context) + { + Matcher matcher = PLACEHOLDER_PATTERN.matcher(format); + StringBuffer result = new StringBuffer(); + while (matcher.find()) { + String placeholder = matcher.group(1); + String replacement = getValueForPlaceholder(placeholder, context); + matcher.appendReplacement(result, Matcher.quoteReplacement(replacement)); + } + matcher.appendTail(result); + return result.toString(); + } + + private String getValueForPlaceholder(String placeholder, OpenLineageJobContext context) + { + for (OpenLineageJobInterpolatedValues value : values) { + if (value.name().equals(placeholder)) { + return value.value(context); + } + } + return "$" + placeholder; + } + + public static boolean hasValidPlaceholders(String format, OpenLineageJobInterpolatedValues[] values) + { + return VALID_FORMAT_PATTERN.matcher(format).matches(); + } +} diff --git a/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageEventListener.java b/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageEventListener.java new file mode 100644 index 0000000000000..0120dadd1dc44 --- /dev/null +++ b/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageEventListener.java @@ -0,0 +1,403 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.openlineage; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.common.resourceGroups.QueryType; +import com.facebook.presto.spi.eventlistener.EventListener; +import com.facebook.presto.spi.eventlistener.OutputColumnMetadata; +import com.facebook.presto.spi.eventlistener.QueryCompletedEvent; +import com.facebook.presto.spi.eventlistener.QueryContext; +import com.facebook.presto.spi.eventlistener.QueryCreatedEvent; +import com.facebook.presto.spi.eventlistener.QueryFailureInfo; +import com.facebook.presto.spi.eventlistener.QueryIOMetadata; +import com.facebook.presto.spi.eventlistener.QueryMetadata; +import com.facebook.presto.spi.eventlistener.QueryOutputMetadata; +import com.facebook.presto.spi.eventlistener.QueryStatistics; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.openlineage.client.OpenLineage; +import io.openlineage.client.OpenLineage.DatasetFacetsBuilder; +import io.openlineage.client.OpenLineage.InputDataset; +import io.openlineage.client.OpenLineage.InputDatasetBuilder; +import io.openlineage.client.OpenLineage.JobBuilder; +import io.openlineage.client.OpenLineage.OutputDataset; +import io.openlineage.client.OpenLineage.RunEvent; +import io.openlineage.client.OpenLineage.RunFacet; +import io.openlineage.client.OpenLineage.RunFacetsBuilder; +import io.openlineage.client.OpenLineageClient; + +import java.net.URI; +import java.time.Instant; +import java.time.ZoneOffset; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.openlineage.client.utils.UUIDUtils.generateStaticUUID; +import static java.lang.String.format; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.time.ZoneOffset.UTC; +import static java.util.Objects.requireNonNull; + +public class OpenLineageEventListener + implements EventListener +{ + private static final Logger logger = Logger.get(OpenLineageEventListener.class); + private static final ObjectMapper QUERY_STATISTICS_MAPPER = new ObjectMapper() + .findAndRegisterModules() + .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + + private final OpenLineage openLineage; + private final OpenLineageClient client; + private final URI prestoURI; + private final String jobNamespace; + private final String datasetNamespace; + private final Set includeQueryTypes; + private final FormatInterpolator interpolator; + + public OpenLineageEventListener(OpenLineage openLineage, OpenLineageClient client, OpenLineageEventListenerConfig listenerConfig) + { + this.openLineage = requireNonNull(openLineage, "openLineage is null"); + this.client = requireNonNull(client, "client is null"); + requireNonNull(listenerConfig, "listenerConfig is null"); + this.prestoURI = defaultNamespace(listenerConfig.getPrestoURI()); + this.jobNamespace = listenerConfig.getNamespace().orElse(prestoURI.toString()); + this.datasetNamespace = prestoURI.toString(); + this.includeQueryTypes = ImmutableSet.copyOf(listenerConfig.getIncludeQueryTypes()); + this.interpolator = new FormatInterpolator(listenerConfig.getJobNameFormat(), OpenLineageJobInterpolatedValues.values()); + } + + @Override + public void queryCreated(QueryCreatedEvent queryCreatedEvent) + { + // Presto does not have queryType on QueryCreatedEvent (only on QueryCompletedEvent), + // so we always emit the START event — we can't filter by type until completion. + RunEvent event = getStartEvent(queryCreatedEvent); + client.emit(event); + } + + @Override + public void queryCompleted(QueryCompletedEvent queryCompletedEvent) + { + if (queryTypeSupported(queryCompletedEvent.getQueryType())) { + RunEvent event = getCompletedEvent(queryCompletedEvent); + client.emit(event); + return; + } + logger.debug("Query type %s not supported. Supported query types %s", + queryCompletedEvent.getQueryType().toString(), + this.includeQueryTypes); + } + + private boolean queryTypeSupported(Optional queryType) + { + return queryType + .map(this.includeQueryTypes::contains) + .orElse(false); + } + + /* + * Construct UUIDv7 from query creation time and queryId hash. + * UUIDv7 are both globally unique and ordered. + */ + private UUID getRunId(Instant queryCreateTime, QueryMetadata queryMetadata) + { + return generateStaticUUID(queryCreateTime, queryMetadata.getQueryId().getBytes(UTF_8)); + } + + private RunFacet getPrestoQueryContextFacet(QueryContext queryContext) + { + RunFacet queryContextFacet = openLineage.newRunFacet(); + + ImmutableMap.Builder properties = ImmutableMap.builder(); + + properties.put("server_address", queryContext.getServerAddress()); + properties.put("environment", queryContext.getEnvironment()); + + // Presto has no getQueryType() on QueryContext; queryType is on the event itself. + // We omit it from the context facet since it's not available here. + + properties.put("user", queryContext.getUser()); + // Presto has no getOriginalUser() — omitted + + queryContext.getPrincipal().ifPresent(principal -> + properties.put("principal", principal)); + + queryContext.getSource().ifPresent(source -> + properties.put("source", source)); + + queryContext.getClientInfo().ifPresent(clientInfo -> + properties.put("client_info", clientInfo)); + + queryContext.getRemoteClientAddress().ifPresent(remoteClientAddress -> + properties.put("remote_client_address", remoteClientAddress)); + + queryContext.getUserAgent().ifPresent(userAgent -> + properties.put("user_agent", userAgent)); + + // Presto has no getTraceToken() — omitted + + queryContextFacet + .getAdditionalProperties() + .putAll(properties.buildOrThrow()); + + return queryContextFacet; + } + + private RunFacet getPrestoMetadataFacet(QueryMetadata queryMetadata) + { + RunFacet prestoMetadataFacet = openLineage.newRunFacet(); + + ImmutableMap.Builder properties = ImmutableMap.builder(); + + properties.put("query_id", queryMetadata.getQueryId()); + + queryMetadata.getPlan().ifPresent( + queryPlan -> properties.put("query_plan", queryPlan)); + + queryMetadata.getTransactionId().ifPresent( + transactionId -> properties.put("transaction_id", transactionId)); + + prestoMetadataFacet + .getAdditionalProperties() + .putAll(properties.buildOrThrow()); + + return prestoMetadataFacet; + } + + @SuppressWarnings("unchecked") + private RunFacet getPrestoQueryStatisticsFacet(QueryStatistics queryStatistics) + { + RunFacet prestoQueryStatisticsFacet = openLineage.newRunFacet(); + + ImmutableMap.Builder properties = ImmutableMap.builder(); + + QUERY_STATISTICS_MAPPER.convertValue(queryStatistics, HashMap.class).forEach( + (key, value) -> { + if (key != null && value != null) { + properties.put(key.toString(), value.toString()); + } + }); + + prestoQueryStatisticsFacet + .getAdditionalProperties() + .putAll(properties.buildOrThrow()); + + return prestoQueryStatisticsFacet; + } + + public RunEvent getStartEvent(QueryCreatedEvent queryCreatedEvent) + { + UUID runID = getRunId(queryCreatedEvent.getCreateTime(), queryCreatedEvent.getMetadata()); + RunFacetsBuilder runFacetsBuilder = getBaseRunFacetsBuilder(queryCreatedEvent.getContext()); + + runFacetsBuilder.put(OpenLineagePrestoFacet.PRESTO_METADATA.asText(), + getPrestoMetadataFacet(queryCreatedEvent.getMetadata())); + runFacetsBuilder.put(OpenLineagePrestoFacet.PRESTO_QUERY_CONTEXT.asText(), + getPrestoQueryContextFacet(queryCreatedEvent.getContext())); + + return openLineage.newRunEventBuilder() + .eventType(RunEvent.EventType.START) + .eventTime(queryCreatedEvent.getCreateTime().atZone(UTC)) + .run(openLineage.newRunBuilder().runId(runID).facets(runFacetsBuilder.build()).build()) + .job(getBaseJobBuilder(queryCreatedEvent.getContext(), queryCreatedEvent.getMetadata()).build()) + .build(); + } + + public RunEvent getCompletedEvent(QueryCompletedEvent queryCompletedEvent) + { + UUID runID = getRunId(queryCompletedEvent.getCreateTime(), queryCompletedEvent.getMetadata()); + RunFacetsBuilder runFacetsBuilder = getBaseRunFacetsBuilder(queryCompletedEvent.getContext()); + + runFacetsBuilder.put(OpenLineagePrestoFacet.PRESTO_METADATA.asText(), + getPrestoMetadataFacet(queryCompletedEvent.getMetadata())); + runFacetsBuilder.put(OpenLineagePrestoFacet.PRESTO_QUERY_CONTEXT.asText(), + getPrestoQueryContextFacet(queryCompletedEvent.getContext())); + runFacetsBuilder.put(OpenLineagePrestoFacet.PRESTO_QUERY_STATISTICS.asText(), + getPrestoQueryStatisticsFacet(queryCompletedEvent.getStatistics())); + runFacetsBuilder.nominalTime( + openLineage.newNominalTimeRunFacet( + queryCompletedEvent.getCreateTime().atZone(ZoneOffset.UTC), + queryCompletedEvent.getEndTime().atZone(ZoneOffset.UTC))); + + boolean failed = queryCompletedEvent.getMetadata().getQueryState().equals("FAILED"); + if (failed) { + queryCompletedEvent + .getFailureInfo() + .flatMap(QueryFailureInfo::getFailureMessage) + .ifPresent(failureMessage -> runFacetsBuilder + .errorMessage(openLineage + .newErrorMessageRunFacetBuilder() + .message(failureMessage) + .build())); + } + + return openLineage.newRunEventBuilder() + .eventType( + failed + ? RunEvent.EventType.FAIL + : RunEvent.EventType.COMPLETE) + .eventTime(queryCompletedEvent.getEndTime().atZone(UTC)) + .run(openLineage.newRunBuilder().runId(runID).facets(runFacetsBuilder.build()).build()) + .job(getBaseJobBuilder(queryCompletedEvent.getContext(), queryCompletedEvent.getMetadata()).build()) + .inputs(buildInputs(queryCompletedEvent.getIoMetadata())) + .outputs(buildOutputs(queryCompletedEvent.getIoMetadata())) + .build(); + } + + private RunFacetsBuilder getBaseRunFacetsBuilder(QueryContext queryContext) + { + return openLineage.newRunFacetsBuilder() + .processing_engine(openLineage.newProcessingEngineRunFacetBuilder() + .name("presto") + .version(queryContext.getServerVersion()) + .build()); + } + + private JobBuilder getBaseJobBuilder(QueryContext queryContext, QueryMetadata queryMetadata) + { + return openLineage.newJobBuilder() + .namespace(this.jobNamespace) + .name(interpolator.interpolate(new OpenLineageJobContext(queryContext, queryMetadata))) + .facets(openLineage.newJobFacetsBuilder() + .jobType(openLineage.newJobTypeJobFacet("BATCH", "PRESTO", "QUERY")) + .sql(openLineage.newSQLJobFacet(queryMetadata.getQuery(), "presto")) + .build()); + } + + /** + * Build inputs from QueryIOMetadata. + * Unlike Trino which uses queryMetadata.getTables() (compile-time analysis), + * Presto has no equivalent. We use ioMetadata.getInputs() (runtime) which is + * only available in queryCompleted events. + */ + private List buildInputs(QueryIOMetadata ioMetadata) + { + return ioMetadata + .getInputs() + .stream() + .map(input -> { + String datasetName = getDatasetName(input.getCatalogName(), input.getSchema(), input.getTable()); + InputDatasetBuilder inputDatasetBuilder = openLineage + .newInputDatasetBuilder() + .namespace(this.datasetNamespace) + .name(datasetName); + + DatasetFacetsBuilder datasetFacetsBuilder = openLineage.newDatasetFacetsBuilder() + .dataSource(openLineage.newDatasourceDatasetFacet( + toQualifiedSchemaName(input.getCatalogName(), input.getSchema()), + prestoURI.resolve(toQualifiedSchemaName(input.getCatalogName(), input.getSchema())))) + .schema(openLineage.newSchemaDatasetFacetBuilder() + .fields( + input.getColumnObjects() + .stream() + .map(column -> openLineage.newSchemaDatasetFacetFieldsBuilder() + .name(column.getName()) + .type(column.getType()) + .build()) + .collect(toImmutableList())) + .build()); + + return inputDatasetBuilder + .facets(datasetFacetsBuilder.build()) + .build(); + }) + .collect(toImmutableList()); + } + + private List buildOutputs(QueryIOMetadata ioMetadata) + { + Optional outputs = ioMetadata.getOutput(); + if (outputs.isPresent()) { + QueryOutputMetadata outputMetadata = outputs.get(); + List outputColumns = outputMetadata.getColumns().orElse(List.of()); + + OpenLineage.ColumnLineageDatasetFacetFieldsBuilder columnLineageDatasetFacetFieldsBuilder = openLineage.newColumnLineageDatasetFacetFieldsBuilder(); + outputColumns.forEach(column -> + columnLineageDatasetFacetFieldsBuilder.put(column.getColumnName(), + openLineage.newColumnLineageDatasetFacetFieldsAdditionalBuilder() + .inputFields(column + .getSourceColumns() + .stream() + .map(inputColumn -> openLineage.newInputFieldBuilder() + .field(inputColumn.getColumnName()) + .namespace(this.datasetNamespace) + .name(getDatasetName( + inputColumn.getTableName().getCatalogName(), + inputColumn.getTableName().getSchemaName(), + inputColumn.getTableName().getObjectName())) + .build()) + .collect(toImmutableList())) + .build())); + + ImmutableList.Builder inputFields = ImmutableList.builder(); + ioMetadata.getInputs().forEach(input -> { + for (com.facebook.presto.spi.eventlistener.Column column : input.getColumnObjects()) { + inputFields.add(openLineage.newInputFieldBuilder() + .field(column.getName()) + .namespace(this.datasetNamespace) + .name(getDatasetName(input.getCatalogName(), input.getSchema(), input.getTable())) + .build()); + } + }); + + return ImmutableList.of( + openLineage.newOutputDatasetBuilder() + .namespace(this.datasetNamespace) + .name(getDatasetName(outputMetadata.getCatalogName(), outputMetadata.getSchema(), outputMetadata.getTable())) + .facets(openLineage.newDatasetFacetsBuilder() + .columnLineage(openLineage.newColumnLineageDatasetFacet(columnLineageDatasetFacetFieldsBuilder.build(), inputFields.build())) + .schema(openLineage.newSchemaDatasetFacetBuilder() + .fields( + outputColumns.stream() + .map(column -> openLineage.newSchemaDatasetFacetFieldsBuilder() + .name(column.getColumnName()) + .type(column.getColumnType()) + .build()) + .collect(toImmutableList())) + .build()) + .dataSource(openLineage.newDatasourceDatasetFacet( + toQualifiedSchemaName(outputMetadata.getCatalogName(), outputMetadata.getSchema()), + prestoURI.resolve(toQualifiedSchemaName(outputMetadata.getCatalogName(), outputMetadata.getSchema())))) + .build()) + .build()); + } + return ImmutableList.of(); + } + + private String getDatasetName(String catalogName, String schemaName, String tableName) + { + return format("%s.%s.%s", catalogName, schemaName, tableName); + } + + static URI defaultNamespace(URI uri) + { + if (!uri.getScheme().isEmpty()) { + return URI.create(uri.toString().replaceFirst(uri.getScheme(), "presto")); + } + return URI.create("presto://" + uri); + } + + private static String toQualifiedSchemaName(String catalogName, String schemaName) + { + return catalogName + "." + schemaName; + } +} diff --git a/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageEventListenerConfig.java b/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageEventListenerConfig.java new file mode 100644 index 0000000000000..0b0a57be74d55 --- /dev/null +++ b/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageEventListenerConfig.java @@ -0,0 +1,146 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.openlineage; + +import com.facebook.presto.common.resourceGroups.QueryType; +import com.google.common.collect.ImmutableSet; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +public class OpenLineageEventListenerConfig +{ + private URI prestoURI; + private Set disabledFacets = ImmutableSet.of(); + private Optional namespace = Optional.empty(); + private String jobNameFormat = "$QUERY_ID"; + + private Set includeQueryTypes = ImmutableSet.builder() + .add(QueryType.DELETE) + .add(QueryType.INSERT) + .add(QueryType.MERGE) + .add(QueryType.UPDATE) + .add(QueryType.DATA_DEFINITION) + .build(); + + public OpenLineageEventListenerConfig() + { + } + + public OpenLineageEventListenerConfig(Map config) + { + requireNonNull(config, "config is null"); + + String uriStr = config.get("openlineage-event-listener.presto.uri"); + if (uriStr != null) { + try { + this.prestoURI = new URI(uriStr); + } + catch (URISyntaxException e) { + throw new IllegalArgumentException("Invalid Presto URI: " + uriStr, e); + } + } + + String queryTypesStr = config.get("openlineage-event-listener.presto.include-query-types"); + if (queryTypesStr != null && !queryTypesStr.isEmpty()) { + this.includeQueryTypes = Arrays.stream(queryTypesStr.split(",")) + .map(String::trim) + .map(QueryType::valueOf) + .collect(Collectors.toSet()); + } + + String disabledFacetsStr = config.get("openlineage-event-listener.disabled-facets"); + if (disabledFacetsStr != null && !disabledFacetsStr.isEmpty()) { + this.disabledFacets = Arrays.stream(disabledFacetsStr.split(",")) + .map(String::trim) + .map(s -> OpenLineagePrestoFacet.valueOf(s.toUpperCase())) + .collect(Collectors.toSet()); + } + + String namespaceStr = config.get("openlineage-event-listener.namespace"); + this.namespace = Optional.ofNullable(namespaceStr); + + String jobNameFormatStr = config.get("openlineage-event-listener.job.name-format"); + if (jobNameFormatStr != null) { + this.jobNameFormat = jobNameFormatStr; + } + } + + public URI getPrestoURI() + { + return prestoURI; + } + + public OpenLineageEventListenerConfig setPrestoURI(URI prestoURI) + { + this.prestoURI = prestoURI; + return this; + } + + public Set getIncludeQueryTypes() + { + return includeQueryTypes; + } + + public OpenLineageEventListenerConfig setIncludeQueryTypes(Set includeQueryTypes) + { + this.includeQueryTypes = ImmutableSet.copyOf(includeQueryTypes); + return this; + } + + public Set getDisabledFacets() + { + return disabledFacets; + } + + public OpenLineageEventListenerConfig setDisabledFacets(Set disabledFacets) + { + this.disabledFacets = ImmutableSet.copyOf(disabledFacets); + return this; + } + + public Optional getNamespace() + { + return namespace; + } + + public OpenLineageEventListenerConfig setNamespace(String namespace) + { + this.namespace = Optional.ofNullable(namespace); + return this; + } + + public String getJobNameFormat() + { + return jobNameFormat; + } + + public OpenLineageEventListenerConfig setJobNameFormat(String jobNameFormat) + { + this.jobNameFormat = jobNameFormat; + return this; + } + + public boolean isJobNameFormatValid() + { + return FormatInterpolator.hasValidPlaceholders(jobNameFormat, OpenLineageJobInterpolatedValues.values()); + } +} diff --git a/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageEventListenerFactory.java b/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageEventListenerFactory.java new file mode 100644 index 0000000000000..7236d8f504705 --- /dev/null +++ b/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageEventListenerFactory.java @@ -0,0 +1,99 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.openlineage; + +import com.facebook.presto.spi.eventlistener.EventListener; +import com.facebook.presto.spi.eventlistener.EventListenerFactory; +import io.openlineage.client.OpenLineage; +import io.openlineage.client.OpenLineageClient; +import io.openlineage.client.transports.ConsoleTransport; +import io.openlineage.client.transports.HttpConfig; +import io.openlineage.client.transports.HttpSslContextConfig; +import io.openlineage.client.transports.HttpTransport; +import io.openlineage.client.transports.TokenProvider; +import io.openlineage.client.transports.Transport; + +import java.net.URI; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +public class OpenLineageEventListenerFactory + implements EventListenerFactory +{ + private static final URI PRODUCER_URI = URI.create("https://github.com/prestodb/presto/plugin/presto-openlineage-event-listener"); + + @Override + public String getName() + { + return "openlineage-event-listener"; + } + + @Override + public EventListener create(Map config) + { + requireNonNull(config, "config is null"); + + OpenLineageEventListenerConfig listenerConfig = new OpenLineageEventListenerConfig(config); + OpenLineageTransportConfig transportConfig = new OpenLineageTransportConfig(config); + Transport transport = buildTransport(transportConfig, config); + + String[] disabledFacets = listenerConfig.getDisabledFacets().stream() + .map(OpenLineagePrestoFacet::asText) + .toArray(String[]::new); + + OpenLineageClient client = OpenLineageClient.builder() + .transport(transport) + .disableFacets(disabledFacets) + .build(); + + OpenLineage openLineage = new OpenLineage(PRODUCER_URI); + return new OpenLineageEventListener(openLineage, client, listenerConfig); + } + + private static Transport buildTransport(OpenLineageTransportConfig transportConfig, Map config) + { + switch (transportConfig.getTransport()) { + case CONSOLE: + return new ConsoleTransport(); + case HTTP: + return buildHttpTransport(new OpenLineageHttpTransportConfig(config)); + default: + throw new IllegalArgumentException("Unsupported transport type: " + transportConfig.getTransport()); + } + } + + private static HttpTransport buildHttpTransport(OpenLineageHttpTransportConfig config) + { + TokenProvider tokenProvider = config.getApiKey() + .map(key -> (TokenProvider) () -> "Bearer " + key) + .orElse(null); + + OpenLineageHttpTransportConfig.Compression configCompression = config.getCompression(); + HttpConfig.Compression httpCompression = configCompression == OpenLineageHttpTransportConfig.Compression.GZIP + ? HttpConfig.Compression.GZIP + : null; + + return new HttpTransport( + new HttpConfig( + config.getUrl(), + config.getEndpoint(), + (int) config.getTimeoutMillis(), + tokenProvider, + config.getUrlParams(), + config.getHeaders(), + httpCompression, + new HttpSslContextConfig())); + } +} diff --git a/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageEventListenerPlugin.java b/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageEventListenerPlugin.java new file mode 100644 index 0000000000000..91eac3df2f504 --- /dev/null +++ b/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageEventListenerPlugin.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.openlineage; + +import com.facebook.presto.spi.Plugin; +import com.facebook.presto.spi.eventlistener.EventListenerFactory; +import com.google.common.collect.ImmutableList; + +public class OpenLineageEventListenerPlugin + implements Plugin +{ + @Override + public Iterable getEventListenerFactories() + { + return ImmutableList.of(new OpenLineageEventListenerFactory()); + } +} diff --git a/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageHttpTransportConfig.java b/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageHttpTransportConfig.java new file mode 100644 index 0000000000000..27de344d7dbea --- /dev/null +++ b/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageHttpTransportConfig.java @@ -0,0 +1,193 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.openlineage; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public class OpenLineageHttpTransportConfig +{ + public enum Compression + { + NONE, GZIP + } + + private URI url; + private String endpoint; + private Optional apiKey = Optional.empty(); + private long timeoutMillis = TimeUnit.SECONDS.toMillis(5); + private Map headers = new HashMap<>(); + private Map urlParams = new HashMap<>(); + private Compression compression = Compression.NONE; + + public OpenLineageHttpTransportConfig() + { + } + + public OpenLineageHttpTransportConfig(Map config) + { + requireNonNull(config, "config is null"); + String urlStr = config.get("openlineage-event-listener.transport.url"); + if (urlStr != null) { + try { + this.url = new URI(urlStr); + } + catch (URISyntaxException e) { + throw new IllegalArgumentException("Invalid transport URL: " + urlStr, e); + } + } + this.endpoint = config.get("openlineage-event-listener.transport.endpoint"); + String apiKeyStr = config.get("openlineage-event-listener.transport.api-key"); + this.apiKey = Optional.ofNullable(apiKeyStr); + + String timeoutStr = config.get("openlineage-event-listener.transport.timeout"); + if (timeoutStr != null) { + this.timeoutMillis = parseDurationToMillis(timeoutStr); + } + + String headersStr = config.get("openlineage-event-listener.transport.headers"); + if (headersStr != null && !headersStr.isEmpty()) { + this.headers = parseKeyValuePairs(headersStr, "headers"); + } + + String urlParamsStr = config.get("openlineage-event-listener.transport.url-params"); + if (urlParamsStr != null && !urlParamsStr.isEmpty()) { + this.urlParams = parseKeyValuePairs(urlParamsStr, "url-params"); + } + + String compressionStr = config.get("openlineage-event-listener.transport.compression"); + if (compressionStr != null) { + this.compression = Compression.valueOf(compressionStr.toUpperCase()); + } + } + + public URI getUrl() + { + return url; + } + + public OpenLineageHttpTransportConfig setUrl(URI url) + { + this.url = url; + return this; + } + + public String getEndpoint() + { + return endpoint; + } + + public OpenLineageHttpTransportConfig setEndpoint(String endpoint) + { + this.endpoint = endpoint; + return this; + } + + public Optional getApiKey() + { + return apiKey; + } + + public OpenLineageHttpTransportConfig setApiKey(String apiKey) + { + this.apiKey = Optional.ofNullable(apiKey); + return this; + } + + public long getTimeoutMillis() + { + return timeoutMillis; + } + + public OpenLineageHttpTransportConfig setTimeoutMillis(long timeoutMillis) + { + this.timeoutMillis = timeoutMillis; + return this; + } + + public Map getHeaders() + { + return headers; + } + + public OpenLineageHttpTransportConfig setHeaders(Map headers) + { + this.headers = headers; + return this; + } + + public Map getUrlParams() + { + return urlParams; + } + + public OpenLineageHttpTransportConfig setUrlParams(Map urlParams) + { + this.urlParams = urlParams; + return this; + } + + public Compression getCompression() + { + return compression; + } + + public OpenLineageHttpTransportConfig setCompression(Compression compression) + { + this.compression = compression; + return this; + } + + private static Map parseKeyValuePairs(String input, String propertyName) + { + Map result = new HashMap<>(); + String[] pairs = input.split(","); + for (String pair : pairs) { + String[] parts = pair.split(":", 2); + if (parts.length != 2) { + throw new IllegalArgumentException(format( + "Cannot parse %s from property; value provided was %s, " + + "expected format is \"key1:value1,key2:value2,...\"", + propertyName, input)); + } + result.put(parts[0].trim(), parts[1].trim()); + } + return result; + } + + private static long parseDurationToMillis(String duration) + { + duration = duration.trim().toLowerCase(); + if (duration.endsWith("ms")) { + return Long.parseLong(duration.substring(0, duration.length() - 2)); + } + else if (duration.endsWith("s")) { + return TimeUnit.SECONDS.toMillis(Long.parseLong(duration.substring(0, duration.length() - 1))); + } + else if (duration.endsWith("m")) { + return TimeUnit.MINUTES.toMillis(Long.parseLong(duration.substring(0, duration.length() - 1))); + } + else if (duration.endsWith("h")) { + return TimeUnit.HOURS.toMillis(Long.parseLong(duration.substring(0, duration.length() - 1))); + } + return Long.parseLong(duration); + } +} diff --git a/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageJobContext.java b/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageJobContext.java new file mode 100644 index 0000000000000..797e47f77577c --- /dev/null +++ b/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageJobContext.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.openlineage; + +import com.facebook.presto.spi.eventlistener.QueryContext; +import com.facebook.presto.spi.eventlistener.QueryMetadata; + +import static java.util.Objects.requireNonNull; + +public class OpenLineageJobContext +{ + private final QueryContext queryContext; + private final QueryMetadata queryMetadata; + + public OpenLineageJobContext(QueryContext queryContext, QueryMetadata queryMetadata) + { + this.queryContext = requireNonNull(queryContext, "queryContext is null"); + this.queryMetadata = requireNonNull(queryMetadata, "queryMetadata is null"); + } + + public QueryContext getQueryContext() + { + return queryContext; + } + + public QueryMetadata getQueryMetadata() + { + return queryMetadata; + } +} diff --git a/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageJobInterpolatedValues.java b/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageJobInterpolatedValues.java new file mode 100644 index 0000000000000..c6e994dd22f72 --- /dev/null +++ b/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageJobInterpolatedValues.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.openlineage; + +import java.util.function.Function; + +import static java.util.Objects.requireNonNull; + +public enum OpenLineageJobInterpolatedValues +{ + QUERY_ID(jobContext -> jobContext.getQueryMetadata().getQueryId()), + SOURCE(jobContext -> jobContext.getQueryContext().getSource().orElse("")), + CLIENT_IP(jobContext -> jobContext.getQueryContext().getRemoteClientAddress().orElse("")), + USER(jobContext -> jobContext.getQueryContext().getUser()); + + private final Function valueProvider; + + OpenLineageJobInterpolatedValues(Function valueProvider) + { + this.valueProvider = requireNonNull(valueProvider, "valueProvider is null"); + } + + public String value(OpenLineageJobContext context) + { + return valueProvider.apply(context); + } +} diff --git a/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineagePrestoFacet.java b/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineagePrestoFacet.java new file mode 100644 index 0000000000000..ec5cd0eaf041c --- /dev/null +++ b/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineagePrestoFacet.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.openlineage; + +import static java.util.Locale.ENGLISH; + +public enum OpenLineagePrestoFacet +{ + PRESTO_METADATA, + PRESTO_QUERY_STATISTICS, + PRESTO_QUERY_CONTEXT; + + public String asText() + { + return name().toLowerCase(ENGLISH); + } +} diff --git a/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageTransport.java b/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageTransport.java new file mode 100644 index 0000000000000..fc950740a6a3d --- /dev/null +++ b/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageTransport.java @@ -0,0 +1,21 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.openlineage; + +public enum OpenLineageTransport +{ + CONSOLE, + HTTP, + /**/ +} diff --git a/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageTransportConfig.java b/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageTransportConfig.java new file mode 100644 index 0000000000000..d8e6dd4f07149 --- /dev/null +++ b/presto-openlineage-event-listener/src/main/java/com/facebook/presto/plugin/openlineage/OpenLineageTransportConfig.java @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.openlineage; + +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +public class OpenLineageTransportConfig +{ + private OpenLineageTransport transport = OpenLineageTransport.CONSOLE; + + public OpenLineageTransportConfig() + { + } + + public OpenLineageTransportConfig(Map config) + { + requireNonNull(config, "config is null"); + String transportType = config.get("openlineage-event-listener.transport.type"); + if (transportType != null) { + this.transport = OpenLineageTransport.valueOf(transportType.toUpperCase()); + } + } + + public OpenLineageTransport getTransport() + { + return transport; + } + + public OpenLineageTransportConfig setTransport(OpenLineageTransport transport) + { + this.transport = transport; + return this; + } +} diff --git a/presto-openlineage-event-listener/src/main/resources/META-INF/services/com.facebook.presto.spi.Plugin b/presto-openlineage-event-listener/src/main/resources/META-INF/services/com.facebook.presto.spi.Plugin new file mode 100644 index 0000000000000..b8cf1f6abf7e3 --- /dev/null +++ b/presto-openlineage-event-listener/src/main/resources/META-INF/services/com.facebook.presto.spi.Plugin @@ -0,0 +1 @@ +com.facebook.presto.plugin.openlineage.OpenLineageEventListenerPlugin diff --git a/presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/OpenLineageMemoryTransport.java b/presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/OpenLineageMemoryTransport.java new file mode 100644 index 0000000000000..d1da13a95d512 --- /dev/null +++ b/presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/OpenLineageMemoryTransport.java @@ -0,0 +1,56 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.openlineage; + +import com.google.common.collect.ImmutableList; +import io.openlineage.client.OpenLineage; +import io.openlineage.client.transports.Transport; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class OpenLineageMemoryTransport + extends Transport +{ + private final List processedEvents = Collections.synchronizedList(new ArrayList<>()); + + @Override + public void emit(OpenLineage.RunEvent runEvent) + { + processedEvents.add(runEvent); + } + + @Override + public void emit(OpenLineage.DatasetEvent datasetEvent) + { + processedEvents.add(datasetEvent); + } + + @Override + public void emit(OpenLineage.JobEvent jobEvent) + { + processedEvents.add(jobEvent); + } + + public void clearProcessedEvents() + { + processedEvents.clear(); + } + + public List getProcessedEvents() + { + return ImmutableList.copyOf(processedEvents); + } +} diff --git a/presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/PrestoEventData.java b/presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/PrestoEventData.java new file mode 100644 index 0000000000000..fcbfee384a847 --- /dev/null +++ b/presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/PrestoEventData.java @@ -0,0 +1,158 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.openlineage; + +import com.facebook.presto.common.RuntimeStats; +import com.facebook.presto.common.resourceGroups.QueryType; +import com.facebook.presto.spi.eventlistener.QueryCompletedEvent; +import com.facebook.presto.spi.eventlistener.QueryContext; +import com.facebook.presto.spi.eventlistener.QueryCreatedEvent; +import com.facebook.presto.spi.eventlistener.QueryIOMetadata; +import com.facebook.presto.spi.eventlistener.QueryMetadata; +import com.facebook.presto.spi.eventlistener.QueryStatistics; +import com.facebook.presto.spi.resourceGroups.ResourceGroupId; +import com.facebook.presto.spi.session.ResourceEstimates; + +import java.net.URI; +import java.time.Duration; +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; + +public final class PrestoEventData +{ + public static final QueryIOMetadata queryIOMetadata; + public static final QueryContext queryContext; + public static final QueryMetadata queryMetadata; + public static final QueryStatistics queryStatistics; + public static final QueryCompletedEvent queryCompleteEvent; + public static final QueryCreatedEvent queryCreatedEvent; + + private PrestoEventData() + { + throw new UnsupportedOperationException("This is a utility class and cannot be instantiated"); + } + + static { + queryIOMetadata = new QueryIOMetadata(Collections.emptyList(), Optional.empty()); + + queryContext = new QueryContext( + "user", + Optional.of("principal"), + Optional.of("127.0.0.1"), + Optional.of("Some-User-Agent"), + Optional.of("Some client info"), + new HashSet<>(), // clientTags + Optional.of("some-presto-client"), + Optional.of("catalog"), + Optional.of("schema"), + Optional.of(new ResourceGroupId("name")), + new HashMap<>(), // sessionProperties + new ResourceEstimates(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()), + "serverAddress", + "serverVersion", + "environment", + "worker"); // workerType + + queryMetadata = new QueryMetadata( + "queryId", + Optional.of("transactionId"), + "create table b.c as select * from y.z", + "queryHash", + Optional.of("preparedQuery"), + "COMPLETED", + URI.create("http://localhost"), + Optional.of("queryPlan"), + Optional.empty(), // jsonPlan + Optional.empty(), // graphvizPlan + Optional.empty(), // payload + List.of(), // runtimeOptimizedStages + Optional.empty(), // tracingId + Optional.of("updateType")); + + queryStatistics = new QueryStatistics( + Duration.ofSeconds(1), // cpuTime + Duration.ofSeconds(1), // retriedCpuTime + Duration.ofSeconds(1), // wallTime + Duration.ofSeconds(1), // totalScheduledTime + Duration.ofSeconds(0), // waitingForPrerequisitesTime + Duration.ofSeconds(0), // queuedTime + Duration.ofSeconds(0), // waitingForResourcesTime + Duration.ofSeconds(0), // semanticAnalyzingTime + Duration.ofSeconds(0), // columnAccessPermissionCheckingTime + Duration.ofSeconds(0), // dispatchingTime + Duration.ofSeconds(0), // planningTime + Optional.empty(), // analysisTime + Duration.ofSeconds(1), // executionTime + 0, // peakRunningTasks + 0L, // peakUserMemoryBytes + 0L, // peakTotalNonRevocableMemoryBytes + 0L, // peakTaskUserMemory + 0L, // peakTaskTotalMemory + 0L, // peakNodeTotalMemory + 0L, // shuffledBytes + 0L, // shuffledRows + 0L, // totalBytes + 0L, // totalRows + 0L, // outputBytes + 0L, // outputRows + 0L, // writtenOutputBytes + 0L, // writtenOutputRows + 0L, // writtenIntermediateBytes + 0L, // spilledBytes + 0.0, // cumulativeMemory + 0.0, // cumulativeTotalMemory + 0, // completedSplits + true, // complete + new RuntimeStats()); + + queryCompleteEvent = new QueryCompletedEvent( + queryMetadata, + queryStatistics, + queryContext, + queryIOMetadata, + Optional.empty(), // failureInfo + Collections.emptyList(), // warnings + Optional.of(QueryType.INSERT), // queryType + Collections.emptyList(), // failedTasks + Instant.parse("2025-04-28T11:23:55.384424Z"), // createTime + Instant.parse("2025-04-28T11:24:16.256207Z"), // executionStartTime + Instant.parse("2025-04-28T11:24:26.993340Z"), // endTime + Collections.emptyList(), // stageStatistics + Collections.emptyList(), // operatorStatistics + Collections.emptyList(), // planStatisticsRead + Collections.emptyList(), // planStatisticsWritten + Collections.emptyMap(), // planNodeHash + Collections.emptyMap(), // canonicalPlan + Optional.empty(), // statsEquivalentPlan + Optional.empty(), // expandedQuery + Collections.emptyList(), // optimizerInformation + Collections.emptyList(), // cteInformationList + Collections.emptySet(), // scalarFunctions + Collections.emptySet(), // aggregateFunctions + Collections.emptySet(), // windowFunctions + Optional.empty(), // prestoSparkExecutionContext + Collections.emptyMap(), // hboPlanHash + Optional.empty(), // planNodeIdMap + Optional.empty()); // qualifiedName + + queryCreatedEvent = new QueryCreatedEvent( + Instant.parse("2025-04-28T11:23:55.384424Z"), + queryContext, + queryMetadata); + } +} diff --git a/presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/TestOpenLineageEventListener.java b/presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/TestOpenLineageEventListener.java new file mode 100644 index 0000000000000..c94b4dd8a0cb1 --- /dev/null +++ b/presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/TestOpenLineageEventListener.java @@ -0,0 +1,109 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.openlineage; + +import com.google.common.collect.ImmutableMap; +import io.openlineage.client.OpenLineage.RunEvent; +import org.testng.annotations.Test; + +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.entry; + +public class TestOpenLineageEventListener +{ + @Test + public void testGetCompleteEvent() + { + OpenLineageEventListener listener = (OpenLineageEventListener) createEventListener(Map.of( + "openlineage-event-listener.transport.type", "CONSOLE", + "openlineage-event-listener.presto.uri", "http://testhost")); + + RunEvent result = listener.getCompletedEvent(PrestoEventData.queryCompleteEvent); + + assertThat(result.getEventType()).isEqualTo(RunEvent.EventType.COMPLETE); + assertThat(result.getEventTime().toInstant()).isEqualTo(PrestoEventData.queryCompleteEvent.getEndTime()); + assertThat(result.getRun().getRunId().toString()).startsWith("01967c23-ae78-7"); + assertThat(result.getJob().getNamespace()).isEqualTo("presto://testhost"); + assertThat(result.getJob().getName()).isEqualTo("queryId"); + + Map prestoQueryMetadata = result + .getRun() + .getFacets() + .getAdditionalProperties() + .get("presto_metadata") + .getAdditionalProperties(); + + assertThat(prestoQueryMetadata) + .containsOnly( + entry("query_id", "queryId"), + entry("transaction_id", "transactionId"), + entry("query_plan", "queryPlan")); + + Map prestoQueryContext = + result + .getRun() + .getFacets() + .getAdditionalProperties() + .get("presto_query_context") + .getAdditionalProperties(); + + assertThat(prestoQueryContext) + .containsOnly( + entry("server_address", "serverAddress"), + entry("environment", "environment"), + entry("user", "user"), + entry("principal", "principal"), + entry("source", "some-presto-client"), + entry("client_info", "Some client info"), + entry("remote_client_address", "127.0.0.1"), + entry("user_agent", "Some-User-Agent")); + } + + @Test + public void testGetStartEvent() + { + OpenLineageEventListener listener = (OpenLineageEventListener) createEventListener(Map.of( + "openlineage-event-listener.transport.type", OpenLineageTransport.CONSOLE.toString(), + "openlineage-event-listener.presto.uri", "http://testhost:8080")); + + RunEvent result = listener.getStartEvent(PrestoEventData.queryCreatedEvent); + + assertThat(result.getEventType()).isEqualTo(RunEvent.EventType.START); + assertThat(result.getEventTime().toInstant()).isEqualTo(PrestoEventData.queryCreatedEvent.getCreateTime()); + assertThat(result.getRun().getRunId().toString()).startsWith("01967c23-ae78-7"); + assertThat(result.getJob().getNamespace()).isEqualTo("presto://testhost:8080"); + assertThat(result.getJob().getName()).isEqualTo("queryId"); + } + + @Test + public void testJobNameFormatting() + { + OpenLineageEventListener listener = (OpenLineageEventListener) createEventListener(Map.of( + "openlineage-event-listener.transport.type", "CONSOLE", + "openlineage-event-listener.presto.uri", "http://testhost:8080", + "openlineage-event-listener.job.name-format", "$QUERY_ID-$USER-$SOURCE-$CLIENT_IP-abc123")); + + RunEvent result = listener.getCompletedEvent(PrestoEventData.queryCompleteEvent); + + assertThat(result.getJob().getNamespace()).isEqualTo("presto://testhost:8080"); + assertThat(result.getJob().getName()).isEqualTo("queryId-user-some-presto-client-127.0.0.1-abc123"); + } + + private static com.facebook.presto.spi.eventlistener.EventListener createEventListener(Map config) + { + return new OpenLineageEventListenerFactory().create(ImmutableMap.copyOf(config)); + } +} diff --git a/presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/TestOpenLineageEventListenerConfig.java b/presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/TestOpenLineageEventListenerConfig.java new file mode 100644 index 0000000000000..c2751858b46e2 --- /dev/null +++ b/presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/TestOpenLineageEventListenerConfig.java @@ -0,0 +1,90 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.openlineage; + +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.net.URI; +import java.util.Map; + +import static com.facebook.presto.common.resourceGroups.QueryType.DELETE; +import static com.facebook.presto.common.resourceGroups.QueryType.INSERT; +import static com.facebook.presto.common.resourceGroups.QueryType.SELECT; +import static com.facebook.presto.plugin.openlineage.OpenLineagePrestoFacet.PRESTO_METADATA; +import static com.facebook.presto.plugin.openlineage.OpenLineagePrestoFacet.PRESTO_QUERY_STATISTICS; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestOpenLineageEventListenerConfig +{ + @Test + public void testDefaults() + { + OpenLineageEventListenerConfig config = new OpenLineageEventListenerConfig(); + assertThat(config.getPrestoURI()).isNull(); + assertThat(config.getNamespace()).isEmpty(); + assertThat(config.getJobNameFormat()).isEqualTo("$QUERY_ID"); + assertThat(config.getDisabledFacets()).isEmpty(); + assertThat(config.getIncludeQueryTypes()).containsExactlyInAnyOrder( + DELETE, INSERT, + com.facebook.presto.common.resourceGroups.QueryType.MERGE, + com.facebook.presto.common.resourceGroups.QueryType.UPDATE, + com.facebook.presto.common.resourceGroups.QueryType.DATA_DEFINITION); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = ImmutableMap.builder() + .put("openlineage-event-listener.presto.uri", "http://testpresto") + .put("openlineage-event-listener.presto.include-query-types", "SELECT,DELETE") + .put("openlineage-event-listener.disabled-facets", "PRESTO_METADATA,PRESTO_QUERY_STATISTICS") + .put("openlineage-event-listener.namespace", "testnamespace") + .put("openlineage-event-listener.job.name-format", "$QUERY_ID-$USER-$SOURCE-$CLIENT_IP-abc123") + .build(); + + OpenLineageEventListenerConfig config = new OpenLineageEventListenerConfig(properties); + + assertThat(config.getPrestoURI()).isEqualTo(URI.create("http://testpresto")); + assertThat(config.getIncludeQueryTypes()).containsExactlyInAnyOrder(SELECT, DELETE); + assertThat(config.getDisabledFacets()).containsExactlyInAnyOrder(PRESTO_METADATA, PRESTO_QUERY_STATISTICS); + assertThat(config.getNamespace()).hasValue("testnamespace"); + assertThat(config.getJobNameFormat()).isEqualTo("$QUERY_ID-$USER-$SOURCE-$CLIENT_IP-abc123"); + } + + @Test + public void testIsJobNameFormatValid() + { + assertThat(configWithFormat("abc123").isJobNameFormatValid()).isTrue(); + assertThat(configWithFormat("$QUERY_ID").isJobNameFormatValid()).isTrue(); + assertThat(configWithFormat("$USER").isJobNameFormatValid()).isTrue(); + assertThat(configWithFormat("$SOURCE").isJobNameFormatValid()).isTrue(); + assertThat(configWithFormat("$CLIENT_IP").isJobNameFormatValid()).isTrue(); + assertThat(configWithFormat("$QUERY_ID-$USER-$SOURCE-$CLIENT_IP-abc123").isJobNameFormatValid()).isTrue(); + assertThat(configWithFormat("$QUERY_ID $USER $SOURCE $CLIENT_IP abc123").isJobNameFormatValid()).isTrue(); + + assertThat(configWithFormat("$query_id").isJobNameFormatValid()).isFalse(); + assertThat(configWithFormat("$UNKNOWN").isJobNameFormatValid()).isFalse(); + assertThat(configWithFormat("${QUERY_ID}").isJobNameFormatValid()).isFalse(); + assertThat(configWithFormat("$$QUERY_ID").isJobNameFormatValid()).isFalse(); + assertThat(configWithFormat("\\$QUERY_ID").isJobNameFormatValid()).isFalse(); + } + + private static OpenLineageEventListenerConfig configWithFormat(String format) + { + return new OpenLineageEventListenerConfig() + .setPrestoURI(URI.create("http://testpresto")) + .setJobNameFormat(format); + } +} diff --git a/presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/TestOpenLineageEventListenerPlugin.java b/presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/TestOpenLineageEventListenerPlugin.java new file mode 100644 index 0000000000000..54de0767a77b7 --- /dev/null +++ b/presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/TestOpenLineageEventListenerPlugin.java @@ -0,0 +1,50 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.openlineage; + +import com.facebook.presto.spi.eventlistener.EventListenerFactory; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import static com.google.common.collect.Iterables.getOnlyElement; + +public class TestOpenLineageEventListenerPlugin +{ + @Test + public void testCreateConsoleEventListener() + { + OpenLineageEventListenerPlugin plugin = new OpenLineageEventListenerPlugin(); + + EventListenerFactory factory = getOnlyElement(plugin.getEventListenerFactories()); + factory.create( + ImmutableMap.builder() + .put("openlineage-event-listener.presto.uri", "http://localhost:8080") + .put("openlineage-event-listener.transport.type", "console") + .build()); + } + + @Test + public void testCreateHttpEventListener() + { + OpenLineageEventListenerPlugin plugin = new OpenLineageEventListenerPlugin(); + + EventListenerFactory factory = getOnlyElement(plugin.getEventListenerFactories()); + factory.create( + ImmutableMap.builder() + .put("openlineage-event-listener.presto.uri", "http://localhost:8080") + .put("openlineage-event-listener.transport.type", "http") + .put("openlineage-event-listener.transport.url", "http://testurl") + .build()); + } +} diff --git a/presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/TestOpenLineageHttpTransportConfig.java b/presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/TestOpenLineageHttpTransportConfig.java new file mode 100644 index 0000000000000..e572a30bcb3fb --- /dev/null +++ b/presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/TestOpenLineageHttpTransportConfig.java @@ -0,0 +1,62 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.openlineage; + +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.net.URI; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TestOpenLineageHttpTransportConfig +{ + @Test + public void testDefaults() + { + OpenLineageHttpTransportConfig config = new OpenLineageHttpTransportConfig(); + assertThat(config.getUrl()).isNull(); + assertThat(config.getEndpoint()).isNull(); + assertThat(config.getTimeoutMillis()).isEqualTo(5000L); + assertThat(config.getApiKey()).isEmpty(); + assertThat(config.getHeaders()).isEmpty(); + assertThat(config.getUrlParams()).isEmpty(); + assertThat(config.getCompression()).isEqualTo(OpenLineageHttpTransportConfig.Compression.NONE); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = ImmutableMap.builder() + .put("openlineage-event-listener.transport.url", "http://testurl") + .put("openlineage-event-listener.transport.endpoint", "/test/endpoint") + .put("openlineage-event-listener.transport.api-key", "dummy") + .put("openlineage-event-listener.transport.timeout", "30s") + .put("openlineage-event-listener.transport.headers", "header1:value1,header2:value2") + .put("openlineage-event-listener.transport.url-params", "urlParam1:urlVal1,urlParam2:urlVal2") + .put("openlineage-event-listener.transport.compression", "gzip") + .build(); + + OpenLineageHttpTransportConfig config = new OpenLineageHttpTransportConfig(properties); + + assertThat(config.getUrl()).isEqualTo(URI.create("http://testurl")); + assertThat(config.getEndpoint()).isEqualTo("/test/endpoint"); + assertThat(config.getApiKey()).hasValue("dummy"); + assertThat(config.getTimeoutMillis()).isEqualTo(30000L); + assertThat(config.getHeaders()).containsEntry("header1", "value1").containsEntry("header2", "value2"); + assertThat(config.getUrlParams()).containsEntry("urlParam1", "urlVal1").containsEntry("urlParam2", "urlVal2"); + assertThat(config.getCompression()).isEqualTo(OpenLineageHttpTransportConfig.Compression.GZIP); + } +} diff --git a/presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/TestOpenLineageTransportConfig.java b/presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/TestOpenLineageTransportConfig.java new file mode 100644 index 0000000000000..7ac7782a5fd74 --- /dev/null +++ b/presto-openlineage-event-listener/src/test/java/com/facebook/presto/plugin/openlineage/TestOpenLineageTransportConfig.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.openlineage; + +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TestOpenLineageTransportConfig +{ + @Test + public void testDefaults() + { + OpenLineageTransportConfig config = new OpenLineageTransportConfig(); + assertThat(config.getTransport()).isEqualTo(OpenLineageTransport.CONSOLE); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = ImmutableMap.builder() + .put("openlineage-event-listener.transport.type", "HTTP") + .build(); + + OpenLineageTransportConfig config = new OpenLineageTransportConfig(properties); + assertThat(config.getTransport()).isEqualTo(OpenLineageTransport.HTTP); + } +} diff --git a/presto-server/src/main/provisio/presto.xml b/presto-server/src/main/provisio/presto.xml index b7176785fedd4..c7181b7898f56 100644 --- a/presto-server/src/main/provisio/presto.xml +++ b/presto-server/src/main/provisio/presto.xml @@ -41,6 +41,12 @@ + + + + + + From 2e8c439526f95ad6453236120bb5c2e5a98cc5eb Mon Sep 17 00:00:00 2001 From: "jianjian.xie" Date: Mon, 2 Mar 2026 16:42:46 -0800 Subject: [PATCH 2/3] docs: Add OpenLineage event listener documentation --- presto-docs/src/main/sphinx/develop.rst | 1 + .../develop/openlineage-event-listener.rst | 163 ++++++++++++++++++ 2 files changed, 164 insertions(+) create mode 100644 presto-docs/src/main/sphinx/develop/openlineage-event-listener.rst diff --git a/presto-docs/src/main/sphinx/develop.rst b/presto-docs/src/main/sphinx/develop.rst index 8f58863395ad8..2ef0136f8a6ee 100644 --- a/presto-docs/src/main/sphinx/develop.rst +++ b/presto-docs/src/main/sphinx/develop.rst @@ -17,6 +17,7 @@ This guide is intended for Presto contributors and plugin developers. develop/system-access-control develop/password-authenticator develop/event-listener + develop/openlineage-event-listener develop/client-protocol develop/worker-protocol develop/serialized-page diff --git a/presto-docs/src/main/sphinx/develop/openlineage-event-listener.rst b/presto-docs/src/main/sphinx/develop/openlineage-event-listener.rst new file mode 100644 index 0000000000000..b2129ee6dfc47 --- /dev/null +++ b/presto-docs/src/main/sphinx/develop/openlineage-event-listener.rst @@ -0,0 +1,163 @@ +============================== +OpenLineage Event Listener +============================== + +The OpenLineage event listener plugin emits query events in the +`OpenLineage `_ format, enabling integration with +lineage tracking systems such as `Marquez `_, +`Atlan `_, and `DataHub `_. + +The plugin captures: + +* Query start events (``START``) +* Query completion events (``COMPLETE`` or ``FAIL``) +* Input and output dataset information including column-level lineage + +Installation +------------ + +The OpenLineage event listener plugin is bundled with Presto and requires +no additional installation. + +Configuration +------------- + +Create an ``etc/event-listener.properties`` file on the coordinator with the +following required properties: + +.. code-block:: none + + event-listener.name=openlineage-event-listener + openlineage-event-listener.presto.uri=http://presto-coordinator:8080 + openlineage-event-listener.transport.type=CONSOLE + +Transport Types +^^^^^^^^^^^^^^^ + +The plugin supports two transport types for emitting OpenLineage events: + +**Console Transport** + +Writes OpenLineage events as JSON to stdout. Useful for debugging and +development. + +.. code-block:: none + + event-listener.name=openlineage-event-listener + openlineage-event-listener.presto.uri=http://presto-coordinator:8080 + openlineage-event-listener.transport.type=CONSOLE + +**HTTP Transport** + +Sends OpenLineage events to an HTTP endpoint (e.g., Marquez API). + +.. code-block:: none + + event-listener.name=openlineage-event-listener + openlineage-event-listener.presto.uri=http://presto-coordinator:8080 + openlineage-event-listener.transport.type=HTTP + openlineage-event-listener.transport.url=http://marquez:5000 + openlineage-event-listener.transport.endpoint=/api/v1/lineage + +Configuration Properties +^^^^^^^^^^^^^^^^^^^^^^^^ + +.. list-table:: + :widths: 40 10 10 40 + :header-rows: 1 + + * - Property + - Required + - Default + - Description + * - ``openlineage-event-listener.presto.uri`` + - Yes + - + - URI of the Presto server. Used for namespace rendering in OpenLineage events. + * - ``openlineage-event-listener.transport.type`` + - No + - ``CONSOLE`` + - Transport type for emitting events. Supported values: ``CONSOLE``, ``HTTP``. + * - ``openlineage-event-listener.namespace`` + - No + - + - Override the default namespace for OpenLineage jobs. Defaults to the Presto URI with ``presto://`` scheme. + * - ``openlineage-event-listener.job.name-format`` + - No + - ``$QUERY_ID`` + - Format string for the OpenLineage job name. Supported placeholders: ``$QUERY_ID``, ``$USER``, ``$SOURCE``, ``$CLIENT_IP``. + * - ``openlineage-event-listener.presto.include-query-types`` + - No + - ``DELETE,INSERT,MERGE,UPDATE,DATA_DEFINITION`` + - Comma-separated list of query types that generate OpenLineage events. Other query types are filtered out on completion. + * - ``openlineage-event-listener.disabled-facets`` + - No + - + - Comma-separated list of facets to exclude from events. Supported values: ``PRESTO_METADATA``, ``PRESTO_QUERY_STATISTICS``, ``PRESTO_QUERY_CONTEXT``. + +HTTP Transport Properties +^^^^^^^^^^^^^^^^^^^^^^^^^ + +These properties apply when ``openlineage-event-listener.transport.type`` is set to ``HTTP``. + +.. list-table:: + :widths: 40 10 10 40 + :header-rows: 1 + + * - Property + - Required + - Default + - Description + * - ``openlineage-event-listener.transport.url`` + - Yes + - + - URL of the OpenLineage API server. + * - ``openlineage-event-listener.transport.endpoint`` + - No + - + - Custom API path for receiving events. + * - ``openlineage-event-listener.transport.api-key`` + - No + - + - API key for authentication. Sent as a ``Bearer`` token. + * - ``openlineage-event-listener.transport.timeout`` + - No + - ``5s`` + - HTTP request timeout. Accepts duration strings (e.g., ``5s``, ``30s``, ``1m``). + * - ``openlineage-event-listener.transport.headers`` + - No + - + - Custom HTTP headers as comma-separated ``key:value`` pairs. + * - ``openlineage-event-listener.transport.url-params`` + - No + - + - Custom URL query parameters as comma-separated ``key:value`` pairs. + * - ``openlineage-event-listener.transport.compression`` + - No + - ``NONE`` + - HTTP body compression. Supported values: ``NONE``, ``GZIP``. + +Event Details +------------- + +The plugin emits the following OpenLineage facets: + +**Run Facets** + +* ``processing_engine`` - Presto server version information +* ``presto_metadata`` - Query ID, transaction ID, and query plan +* ``presto_query_context`` - User, server address, environment, source, client info +* ``presto_query_statistics`` - Detailed query execution statistics (on completion only) +* ``nominalTime`` - Query start and end times (on completion only) +* ``errorMessage`` - Failure message (on failure only) + +**Job Facets** + +* ``jobType`` - ``BATCH`` / ``PRESTO`` / ``QUERY`` +* ``sql`` - The SQL query text with dialect ``presto`` + +**Dataset Facets** + +* ``schema`` - Column names and types for input and output datasets +* ``dataSource`` - Catalog and schema information +* ``columnLineage`` - Column-level lineage mapping from input to output columns From 6e153b8c1da85cfedeed7afa3c4f9fe03e23fddb Mon Sep 17 00:00:00 2001 From: "jianjian.xie" Date: Tue, 3 Mar 2026 10:43:37 -0800 Subject: [PATCH 3/3] docs: Fix RST title formatting and remove Latin abbreviations --- .../main/sphinx/develop/openlineage-event-listener.rst | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/presto-docs/src/main/sphinx/develop/openlineage-event-listener.rst b/presto-docs/src/main/sphinx/develop/openlineage-event-listener.rst index b2129ee6dfc47..45b11a2b7b598 100644 --- a/presto-docs/src/main/sphinx/develop/openlineage-event-listener.rst +++ b/presto-docs/src/main/sphinx/develop/openlineage-event-listener.rst @@ -1,6 +1,6 @@ -============================== +========================== OpenLineage Event Listener -============================== +========================== The OpenLineage event listener plugin emits query events in the `OpenLineage `_ format, enabling integration with @@ -49,7 +49,7 @@ development. **HTTP Transport** -Sends OpenLineage events to an HTTP endpoint (e.g., Marquez API). +Sends OpenLineage events to an HTTP endpoint such as the Marquez API. .. code-block:: none @@ -123,7 +123,7 @@ These properties apply when ``openlineage-event-listener.transport.type`` is set * - ``openlineage-event-listener.transport.timeout`` - No - ``5s`` - - HTTP request timeout. Accepts duration strings (e.g., ``5s``, ``30s``, ``1m``). + - HTTP request timeout. Accepts duration strings. For example: ``5s``, ``30s``, ``1m``. * - ``openlineage-event-listener.transport.headers`` - No -