Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,12 @@ public Table loadTable(Identifier identifier) throws NoSuchTableException {
try {
GenericTable genericTable =
this.polarisCatalog.loadGenericTable(Spark3Util.identifierToTableIdentifier(identifier));
return PolarisCatalogUtils.loadSparkTable(genericTable);
// Currently Hudi supports Spark Datasource V1, therefore we return a V1Table
if (PolarisCatalogUtils.useHudi(genericTable.getFormat())) {
return PolarisCatalogUtils.loadV1SparkTable(genericTable, identifier, name());
} else {
return PolarisCatalogUtils.loadV2SparkTable(genericTable);
}
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(identifier);
}
Expand Down Expand Up @@ -111,7 +116,12 @@ public Table createTable(
baseLocation,
null,
properties);
return PolarisCatalogUtils.loadSparkTable(genericTable);
// Currently Hudi supports Spark Datasource V1, therefore we return a V1Table
if (PolarisCatalogUtils.useHudi(format)) {
return PolarisCatalogUtils.loadV1SparkTable(genericTable, identifier, name());
} else {
return PolarisCatalogUtils.loadV2SparkTable(genericTable);
}
} catch (AlreadyExistsException e) {
throw new TableAlreadyExistsException(identifier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iceberg.spark.SupportsReplaceView;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.polaris.spark.utils.DeltaHelper;
import org.apache.polaris.spark.utils.HudiHelper;
import org.apache.polaris.spark.utils.PolarisCatalogUtils;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
Expand Down Expand Up @@ -69,6 +70,7 @@ public class SparkCatalog
@VisibleForTesting protected org.apache.iceberg.spark.SparkCatalog icebergsSparkCatalog = null;
@VisibleForTesting protected PolarisSparkCatalog polarisSparkCatalog = null;
@VisibleForTesting protected DeltaHelper deltaHelper = null;
@VisibleForTesting protected HudiHelper hudiHelper = null;

@Override
public String name() {
Expand Down Expand Up @@ -130,6 +132,7 @@ public void initialize(String name, CaseInsensitiveStringMap options) {
this.catalogName = name;
initRESTCatalog(name, options);
this.deltaHelper = new DeltaHelper(options);
this.hudiHelper = new HudiHelper(options);
}

@Override
Expand All @@ -154,12 +157,16 @@ public Table createTable(
throw new UnsupportedOperationException(
"Create table without location key is not supported by Polaris. Please provide location or path on table creation.");
}

if (PolarisCatalogUtils.useDelta(provider)) {
// For delta table, we load the delta catalog to help dealing with the
// delta log creation.
TableCatalog deltaCatalog = deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
return deltaCatalog.createTable(ident, schema, transforms, properties);
} else if (PolarisCatalogUtils.useHudi(provider)) {
// For creating the hudi table, we load HoodieCatalog
// to create the .hoodie folder in cloud storage
TableCatalog hudiCatalog = hudiHelper.loadHudiCatalog(this.polarisSparkCatalog);
return hudiCatalog.createTable(ident, schema, transforms, properties);
} else {
return this.polarisSparkCatalog.createTable(ident, schema, transforms, properties);
}
Expand All @@ -180,8 +187,12 @@ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchT
// using ALTER TABLE ...SET LOCATION, and ALTER TABLE ... SET FILEFORMAT.
TableCatalog deltaCatalog = deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
return deltaCatalog.alterTable(ident, changes);
} else if (PolarisCatalogUtils.useHudi(provider)) {
TableCatalog hudiCatalog = hudiHelper.loadHudiCatalog(this.polarisSparkCatalog);
return hudiCatalog.alterTable(ident, changes);
} else {
return this.polarisSparkCatalog.alterTable(ident);
}
return this.polarisSparkCatalog.alterTable(ident);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.polaris.spark.utils;

import org.apache.iceberg.common.DynConstructors;
import org.apache.polaris.spark.PolarisSparkCatalog;
import org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

public class HudiHelper {
public static final String HUDI_CATALOG_IMPL_KEY = "hudi-catalog-impl";
private static final String DEFAULT_HUDI_CATALOG_CLASS =
"org.apache.spark.sql.hudi.catalog.HoodieCatalog";

private TableCatalog hudiCatalog = null;
private String hudiCatalogImpl = DEFAULT_HUDI_CATALOG_CLASS;

public HudiHelper(CaseInsensitiveStringMap options) {
if (options.get(HUDI_CATALOG_IMPL_KEY) != null) {
this.hudiCatalogImpl = options.get(HUDI_CATALOG_IMPL_KEY);
}
}

public TableCatalog loadHudiCatalog(PolarisSparkCatalog polarisSparkCatalog) {
if (this.hudiCatalog != null) {
return this.hudiCatalog;
}

DynConstructors.Ctor<TableCatalog> ctor;
try {
ctor = DynConstructors.builder(TableCatalog.class).impl(hudiCatalogImpl).buildChecked();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(
String.format("Cannot initialize Hudi Catalog %s: %s", hudiCatalogImpl, e.getMessage()),
e);
}

try {
this.hudiCatalog = ctor.newInstance();
} catch (ClassCastException e) {
throw new IllegalArgumentException(
String.format(
"Cannot initialize Hudi Catalog, %s does not implement TableCatalog.",
hudiCatalogImpl),
e);
}

// set the polaris spark catalog as the delegate catalog of hudi catalog
// will be used in HoodieCatalog's loadTable
((DelegatingCatalogExtension) this.hudiCatalog).setDelegateCatalog(polarisSparkCatalog);
return this.hudiCatalog;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,20 @@
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.polaris.spark.rest.GenericTable;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableProvider;
import org.apache.spark.sql.execution.datasources.DataSource;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import scala.Option;

public class PolarisCatalogUtils {

public static final String TABLE_PROVIDER_KEY = "provider";
public static final String TABLE_PATH_KEY = "path";

Expand All @@ -50,6 +56,10 @@ public static boolean useDelta(String provider) {
return "delta".equalsIgnoreCase(provider);
}

public static boolean useHudi(String provider) {
return "hudi".equalsIgnoreCase(provider);
}

/**
* For tables whose location is managed by Spark Session Catalog, there will be no location or
* path in the properties.
Expand All @@ -61,16 +71,11 @@ public static boolean isTableWithSparkManagedLocation(Map<String, String> proper
}

/**
* Load spark table using DataSourceV2.
*
* @return V2Table if DataSourceV2 is available for the table format. For delta table, it returns
* DeltaTableV2.
* Normalize table properties for loading Spark tables by ensuring the TABLE_PATH_KEY is properly
* set. DataSourceV2 requires the path property on table loading.
*/
public static Table loadSparkTable(GenericTable genericTable) {
SparkSession sparkSession = SparkSession.active();
TableProvider provider =
DataSource.lookupDataSourceV2(genericTable.getFormat(), sparkSession.sessionState().conf())
.get();
private static Map<String, String> normalizeTablePropertiesForLoadSparkTable(
GenericTable genericTable) {
Map<String, String> properties = genericTable.getProperties();
boolean hasLocationClause = properties.get(TableCatalog.PROP_LOCATION) != null;
boolean hasPathClause = properties.get(TABLE_PATH_KEY) != null;
Expand All @@ -87,10 +92,80 @@ public static Table loadSparkTable(GenericTable genericTable) {
tableProperties.put(TABLE_PATH_KEY, properties.get(TableCatalog.PROP_LOCATION));
}
}
return tableProperties;
}

/**
* Load spark table using DataSourceV2.
*
* @return V2Table if DataSourceV2 is available for the table format. For delta table, it returns
* DeltaTableV2.
*/
public static Table loadV2SparkTable(GenericTable genericTable) {
SparkSession sparkSession = SparkSession.active();
TableProvider provider =
DataSource.lookupDataSourceV2(genericTable.getFormat(), sparkSession.sessionState().conf())
.get();
Map<String, String> tableProperties = normalizeTablePropertiesForLoadSparkTable(genericTable);
return DataSourceV2Utils.getTableFromProvider(
provider, new CaseInsensitiveStringMap(tableProperties), scala.Option.empty());
}

/**
* Return a Spark V1Table for formats that do not use DataSourceV2. Currently, this is being used
* for Hudi tables
*/
public static Table loadV1SparkTable(
GenericTable genericTable, Identifier identifier, String catalogName) {
Map<String, String> tableProperties = normalizeTablePropertiesForLoadSparkTable(genericTable);

// Need full identifier in order to construct CatalogTable
String namespacePath = String.join(".", identifier.namespace());
TableIdentifier tableIdentifier =
new TableIdentifier(
identifier.name(), Option.apply(namespacePath), Option.apply(catalogName));

scala.collection.immutable.Map<String, String> scalaOptions =
(scala.collection.immutable.Map<String, String>)
scala.collection.immutable.Map$.MODULE$.apply(
scala.collection.JavaConverters.mapAsScalaMap(tableProperties).toSeq());

org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat storage =
DataSource.buildStorageFormatFromOptions(scalaOptions);

// Currently Polaris generic table does not contain any schema information, partition columns,
// stats, etc
// for now we will fill the parameters we have from polaris catalog, and let underlying client
// resolve the rest within its catalog implementation
org.apache.spark.sql.types.StructType emptySchema = new org.apache.spark.sql.types.StructType();
scala.collection.immutable.Seq<String> emptyStringSeq =
scala.collection.JavaConverters.asScalaBuffer(new java.util.ArrayList<String>()).toList();
CatalogTable catalogTable =
new CatalogTable(
tableIdentifier,
CatalogTableType.EXTERNAL(),
storage,
emptySchema,
Option.apply(genericTable.getFormat()),
emptyStringSeq,
scala.Option.empty(),
genericTable.getProperties().get(TableCatalog.PROP_OWNER),
System.currentTimeMillis(),
-1L,
"",
scalaOptions,
scala.Option.empty(),
scala.Option.empty(),
scala.Option.empty(),
emptyStringSeq,
false,
true,
scala.collection.immutable.Map$.MODULE$.empty(),
scala.Option.empty());

return new org.apache.spark.sql.connector.catalog.V1Table(catalogTable);
}

/**
* Get the catalogAuth field inside the RESTSessionCatalog used by Iceberg Spark Catalog use
* reflection. TODO: Deprecate this function once the iceberg client is updated to 1.9.0 to use
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.spark;

import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableChange;

/**
* This is a fake hudi catalog class that is used for testing. This class is a noop class that
* directly passes all calls to the delegate CatalogPlugin configured as part of
* DelegatingCatalogExtension.
*/
public class NoopHudiCatalog extends DelegatingCatalogExtension {

@Override
public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
return super.loadTable(ident);
}
}
Loading