Skip to content

Commit

Permalink
Implement OpenLineage spark-extension-interfaces (#1267)
Browse files Browse the repository at this point in the history
  • Loading branch information
ddebowczyk92 authored Aug 29, 2024
1 parent 924cb13 commit 558c625
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 42 deletions.
8 changes: 7 additions & 1 deletion spark-bigquery-connector-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@
</dependency>
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>openlineage-spark_2.12</artifactId>
<artifactId>spark-extension-interfaces</artifactId>
</dependency>
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>spark-extension-entrypoint</artifactId>
<version>1.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2024 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spark.bigquery;

import io.openlineage.spark.extension.OpenLineageExtensionProvider;
import io.openlineage.spark.shade.extension.v1.lifecycle.plan.SparkOpenLineageExtensionVisitor;

public class SparkBigQueryLineageProvider implements OpenLineageExtensionProvider {
@Override
public String getVisitorClassName() {
return SparkOpenLineageExtensionVisitor.class.getCanonicalName();
}

@Override
public String shadedPackage() {
return "com.google.cloud.spark.bigquery.repackaged.io.openlineage.spark.shade";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.google.cloud.spark.bigquery.SparkBigQueryLineageProvider
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,32 @@
*/
package com.google.cloud.spark.bigquery.v2;

import com.google.cloud.bigquery.connector.common.BigQueryUtil;
import com.google.cloud.spark.bigquery.InjectorBuilder;
import com.google.cloud.spark.bigquery.SparkBigQueryConfig;
import com.google.cloud.spark.bigquery.write.CreatableRelationProviderHelper;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Injector;
import io.openlineage.spark.shade.client.OpenLineage;
import io.openlineage.spark.shade.client.utils.DatasetIdentifier;
import io.openlineage.spark.shade.extension.v1.LineageRelationProvider;
import java.util.Map;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableProvider;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.sources.BaseRelation;
import org.apache.spark.sql.sources.CreatableRelationProvider;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import scala.collection.JavaConverters;

public class Spark31BigQueryTableProvider extends BaseBigQuerySource
implements TableProvider, CreatableRelationProvider {
implements TableProvider, CreatableRelationProvider, LineageRelationProvider {

private static final Transform[] EMPTY_TRANSFORM_ARRAY = {};

Expand Down Expand Up @@ -64,4 +73,16 @@ public BaseRelation createRelation(
return new CreatableRelationProviderHelper()
.createRelation(sqlContext, mode, parameters, data, ImmutableMap.of());
}

@Override
public DatasetIdentifier getLineageDatasetIdentifier(
String sparkListenerEventName,
OpenLineage openLineage,
Object sqlContext,
Object parameters) {
Map<String, String> properties = JavaConverters.mapAsJavaMap((CaseInsensitiveMap) parameters);
Injector injector = new InjectorBuilder().withOptions(properties).build();
SparkBigQueryConfig config = injector.getInstance(SparkBigQueryConfig.class);
return new DatasetIdentifier(BigQueryUtil.friendlyTableName(config.getTableId()), "bigquery");
}
}
4 changes: 4 additions & 0 deletions spark-bigquery-dsv2/spark-bigquery-dsv2-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
<artifactId>spark-bigquery-connector-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>spark-extension-interfaces</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.12</artifactId>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
*/
package com.google.cloud.spark.bigquery.v2;

import com.google.cloud.spark.bigquery.BigQueryRelationProvider;
import org.apache.spark.sql.sources.DataSourceRegister;

public abstract class BaseBigQuerySource extends BigQueryRelationProvider
implements DataSourceRegister {
public abstract class BaseBigQuerySource implements DataSourceRegister {

@Override
public String shortName() {
Expand Down
1 change: 1 addition & 0 deletions spark-bigquery-dsv2/spark-bigquery-dsv2-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>openlineage-spark_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
11 changes: 10 additions & 1 deletion spark-bigquery-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
<guava.version>33.2.1-jre</guava.version>
<jackson.version>2.17.2</jackson.version>
<netty.version>4.1.112.Final</netty.version>
<openlineage-spark.version>1.13.1</openlineage-spark.version>
<openlineage-spark.version>1.19.0</openlineage-spark.version>
<paranamer.version>2.8</paranamer.version>
<!-- Don't upgrade protobuf until bigquerystorage is compatible with 4.x version of proto.
Ref : https://github.com/protocolbuffers/protobuf/issues/16452-->
Expand Down Expand Up @@ -300,6 +300,11 @@
<artifactId>mockito-core</artifactId>
<version>4.11.0</version>
</dependency>
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>spark-extension-interfaces</artifactId>
<version>${openlineage-spark.version}</version>
</dependency>
<dependency>
<groupId>org.conscrypt</groupId>
<artifactId>conscrypt-openjdk-uber</artifactId>
Expand Down Expand Up @@ -703,6 +708,10 @@
<pattern>org.xerial.snappy</pattern>
<shadedPattern>com.google.cloud.spark.bigquery.repackaged.org.xerial.snappy</shadedPattern>
</relocation>
<relocation>
<pattern>io.openlineage.spark.shade</pattern>
<shadedPattern>com.google.cloud.spark.bigquery.repackaged.io.openlineage.spark.shade</shadedPattern>
</relocation>
<relocation>
<pattern>META-INF/native/libnetty</pattern>
<shadedPattern>META-INF/native/libcom_google_cloud_spark_bigquery_repackaged_netty
Expand Down

0 comments on commit 558c625

Please sign in to comment.