diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java index 645efc964778..ac5e285c9c09 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java @@ -37,6 +37,9 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; import org.apache.iceberg.Tables; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.Transactions; +import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; @@ -131,20 +134,10 @@ private Table loadMetadataTable(String location, String metadataTableName, Metad @Override public Table create(Schema schema, PartitionSpec spec, SortOrder order, Map properties, String location) { - Preconditions.checkNotNull(schema, "A table schema is required"); - - TableOperations ops = newTableOps(location); - if (ops.current() != null) { - throw new AlreadyExistsException("Table already exists at location: %s", location); - } - - Map tableProps = properties == null ? ImmutableMap.of() : properties; - PartitionSpec partitionSpec = spec == null ? PartitionSpec.unpartitioned() : spec; - SortOrder sortOrder = order == null ? SortOrder.unsorted() : order; - TableMetadata metadata = TableMetadata.newTableMetadata(schema, partitionSpec, sortOrder, location, tableProps); - ops.commit(null, metadata); - - return new BaseTable(ops, location); + return buildTable(location, schema).withPartitionSpec(spec) + .withSortOrder(order) + .withProperties(properties) + .create(); } /** @@ -200,6 +193,165 @@ TableOperations newTableOps(String location) { } } + private TableMetadata tableMetadata(Schema schema, PartitionSpec spec, SortOrder order, + Map properties, String location) { + Preconditions.checkNotNull(schema, "A table schema is required"); + + Map tableProps = properties == null ? ImmutableMap.of() : properties; + PartitionSpec partitionSpec = spec == null ? PartitionSpec.unpartitioned() : spec; + SortOrder sortOrder = order == null ? SortOrder.unsorted() : order; + return TableMetadata.newTableMetadata(schema, partitionSpec, sortOrder, location, tableProps); + } + + /** + * Start a transaction to create a table. + * + * @param location a location for the table + * @param schema a schema + * @param spec a partition spec + * @param properties a string map of table properties + * @return a {@link Transaction} to create the table + * @throws AlreadyExistsException if the table already exists + */ + public Transaction newCreateTableTransaction( + String location, + Schema schema, + PartitionSpec spec, + Map properties) { + return buildTable(location, schema).withPartitionSpec(spec).withProperties(properties).createTransaction(); + } + + /** + * Start a transaction to replace a table. + * + * @param location a location for the table + * @param schema a schema + * @param spec a partition spec + * @param properties a string map of table properties + * @param orCreate whether to create the table if not exists + * @return a {@link Transaction} to replace the table + * @throws NoSuchTableException if the table doesn't exist and orCreate is false + */ + public Transaction newReplaceTableTransaction( + String location, + Schema schema, + PartitionSpec spec, + Map properties, + boolean orCreate) { + + + Catalog.TableBuilder builder = buildTable(location, schema).withPartitionSpec(spec).withProperties(properties); + return orCreate ? builder.createOrReplaceTransaction() : builder.replaceTransaction(); + } + + public Catalog.TableBuilder buildTable(String location, Schema schema) { + return new HadoopTableBuilder(location, schema); + } + + private class HadoopTableBuilder implements Catalog.TableBuilder { + private final String location; + private final Schema schema; + private final ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder(); + private PartitionSpec spec = PartitionSpec.unpartitioned(); + private SortOrder sortOrder = SortOrder.unsorted(); + + + HadoopTableBuilder(String location, Schema schema) { + this.location = location; + this.schema = schema; + } + + @Override + public Catalog.TableBuilder withPartitionSpec(PartitionSpec newSpec) { + this.spec = newSpec != null ? newSpec : PartitionSpec.unpartitioned(); + return this; + } + + @Override + public Catalog.TableBuilder withSortOrder(SortOrder newSortOrder) { + this.sortOrder = newSortOrder != null ? newSortOrder : SortOrder.unsorted(); + return this; + } + + @Override + public Catalog.TableBuilder withLocation(String newLocation) { + Preconditions.checkArgument(newLocation == null || location.equals(newLocation), + String.format("Table location %s differs from the table location (%s) from the PathIdentifier", + newLocation, location)); + return this; + } + + @Override + public Catalog.TableBuilder withProperties(Map properties) { + if (properties != null) { + propertiesBuilder.putAll(properties); + } + return this; + } + + @Override + public Catalog.TableBuilder withProperty(String key, String value) { + propertiesBuilder.put(key, value); + return this; + } + + @Override + public Table create() { + TableOperations ops = newTableOps(location); + if (ops.current() != null) { + throw new AlreadyExistsException("Table already exists at location: %s", location); + } + + Map properties = propertiesBuilder.build(); + TableMetadata metadata = tableMetadata(schema, spec, sortOrder, properties, location); + ops.commit(null, metadata); + return new BaseTable(ops, location); + } + + @Override + public Transaction createTransaction() { + TableOperations ops = newTableOps(location); + if (ops.current() != null) { + throw new AlreadyExistsException("Table already exists: %s", location); + } + + Map properties = propertiesBuilder.build(); + TableMetadata metadata = tableMetadata(schema, spec, null, properties, location); + return Transactions.createTableTransaction(location, ops, metadata); + } + + @Override + public Transaction replaceTransaction() { + return newReplaceTableTransaction(false); + } + + @Override + public Transaction createOrReplaceTransaction() { + return newReplaceTableTransaction(true); + } + + private Transaction newReplaceTableTransaction(boolean orCreate) { + TableOperations ops = newTableOps(location); + if (!orCreate && ops.current() == null) { + throw new NoSuchTableException("No such table: %s", location); + } + + Map properties = propertiesBuilder.build(); + TableMetadata metadata; + if (ops.current() != null) { + metadata = ops.current().buildReplacement(schema, spec, SortOrder.unsorted(), location, properties); + } else { + metadata = tableMetadata(schema, spec, null, properties, location); + } + + if (orCreate) { + return Transactions.createOrReplaceTableTransaction(location, ops, metadata); + } else { + return Transactions.replaceTableTransaction(location, ops, metadata); + } + } + } + @Override public void setConf(Configuration conf) { this.conf = conf; diff --git a/spark3/src/main/java/org/apache/iceberg/spark/PathIdentifier.java b/spark3/src/main/java/org/apache/iceberg/spark/PathIdentifier.java new file mode 100644 index 000000000000..235097ea46cc --- /dev/null +++ b/spark3/src/main/java/org/apache/iceberg/spark/PathIdentifier.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark; + +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Splitter; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.spark.sql.connector.catalog.Identifier; + +public class PathIdentifier implements Identifier { + private static final Splitter SPLIT = Splitter.on("/"); + private static final Joiner JOIN = Joiner.on("/"); + private final String[] namespace; + private final String location; + private final String name; + + public PathIdentifier(String location) { + this.location = location; + List pathParts = SPLIT.splitToList(location); + name = Iterables.getLast(pathParts); + namespace = pathParts.size() > 1 ? + new String[]{JOIN.join(pathParts.subList(0, pathParts.size() - 1))} : + new String[0]; + } + + @Override + public String[] namespace() { + return namespace; + } + + @Override + public String name() { + return name; + } + + public String location() { + return location; + } +} diff --git a/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 73b3019b6ff9..656f3c46a5f6 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -36,6 +36,7 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.hive.HiveCatalog; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; @@ -89,6 +90,7 @@ public class SparkCatalog extends BaseCatalog { private boolean cacheEnabled = true; private SupportsNamespaces asNamespaceCatalog = null; private String[] defaultNamespace = null; + private HadoopTables tables; /** * Build an Iceberg {@link Catalog} to be used by this Spark catalog adapter. @@ -135,7 +137,7 @@ protected TableIdentifier buildIdentifier(Identifier identifier) { @Override public SparkTable loadTable(Identifier ident) throws NoSuchTableException { try { - Table icebergTable = icebergCatalog.loadTable(buildIdentifier(ident)); + Table icebergTable = load(ident); return new SparkTable(icebergTable, !cacheEnabled); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { throw new NoSuchTableException(ident); @@ -148,12 +150,12 @@ public SparkTable createTable(Identifier ident, StructType schema, Map properties) throws TableAlreadyExistsException { Schema icebergSchema = SparkSchemaUtil.convert(schema); try { - Table icebergTable = icebergCatalog.createTable( - buildIdentifier(ident), - icebergSchema, - Spark3Util.toPartitionSpec(icebergSchema, transforms), - properties.get("location"), - Spark3Util.rebuildCreateProperties(properties)); + Catalog.TableBuilder builder = newBuilder(ident, icebergSchema); + Table icebergTable = builder + .withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, transforms)) + .withLocation(properties.get("location")) + .withProperties(Spark3Util.rebuildCreateProperties(properties)) + .create(); return new SparkTable(icebergTable, !cacheEnabled); } catch (AlreadyExistsException e) { throw new TableAlreadyExistsException(ident); @@ -165,12 +167,12 @@ public StagedTable stageCreate(Identifier ident, StructType schema, Transform[] Map properties) throws TableAlreadyExistsException { Schema icebergSchema = SparkSchemaUtil.convert(schema); try { - return new StagedSparkTable(icebergCatalog.newCreateTableTransaction( - buildIdentifier(ident), - icebergSchema, - Spark3Util.toPartitionSpec(icebergSchema, transforms), - properties.get("location"), - Spark3Util.rebuildCreateProperties(properties))); + Catalog.TableBuilder builder = newBuilder(ident, icebergSchema); + Transaction transaction = builder.withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, transforms)) + .withLocation(properties.get("location")) + .withProperties(Spark3Util.rebuildCreateProperties(properties)) + .createTransaction(); + return new StagedSparkTable(transaction); } catch (AlreadyExistsException e) { throw new TableAlreadyExistsException(ident); } @@ -181,13 +183,12 @@ public StagedTable stageReplace(Identifier ident, StructType schema, Transform[] Map properties) throws NoSuchTableException { Schema icebergSchema = SparkSchemaUtil.convert(schema); try { - return new StagedSparkTable(icebergCatalog.newReplaceTableTransaction( - buildIdentifier(ident), - icebergSchema, - Spark3Util.toPartitionSpec(icebergSchema, transforms), - properties.get("location"), - Spark3Util.rebuildCreateProperties(properties), - false /* do not create */)); + Catalog.TableBuilder builder = newBuilder(ident, icebergSchema); + Transaction transaction = builder.withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, transforms)) + .withLocation(properties.get("location")) + .withProperties(Spark3Util.rebuildCreateProperties(properties)) + .replaceTransaction(); + return new StagedSparkTable(transaction); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { throw new NoSuchTableException(ident); } @@ -197,13 +198,12 @@ public StagedTable stageReplace(Identifier ident, StructType schema, Transform[] public StagedTable stageCreateOrReplace(Identifier ident, StructType schema, Transform[] transforms, Map properties) { Schema icebergSchema = SparkSchemaUtil.convert(schema); - return new StagedSparkTable(icebergCatalog.newReplaceTableTransaction( - buildIdentifier(ident), - icebergSchema, - Spark3Util.toPartitionSpec(icebergSchema, transforms), - properties.get("location"), - Spark3Util.rebuildCreateProperties(properties), - true /* create or replace */)); + Catalog.TableBuilder builder = newBuilder(ident, icebergSchema); + Transaction transaction = builder.withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, transforms)) + .withLocation(properties.get("location")) + .withProperties(Spark3Util.rebuildCreateProperties(properties)) + .createOrReplaceTransaction(); + return new StagedSparkTable(transaction); } @Override @@ -236,7 +236,7 @@ public SparkTable alterTable(Identifier ident, TableChange... changes) throws No } try { - Table table = icebergCatalog.loadTable(buildIdentifier(ident)); + Table table = load(ident); commitChanges(table, setLocation, setSnapshotId, pickSnapshotId, propertyChanges, schemaChanges); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { throw new NoSuchTableException(ident); @@ -248,7 +248,9 @@ public SparkTable alterTable(Identifier ident, TableChange... changes) throws No @Override public boolean dropTable(Identifier ident) { try { - return icebergCatalog.dropTable(buildIdentifier(ident), true); + return isPathIdentifier(ident) ? + tables.dropTable(((PathIdentifier) ident).location()) : + icebergCatalog.dropTable(buildIdentifier(ident)); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { return false; } @@ -257,6 +259,8 @@ public boolean dropTable(Identifier ident) { @Override public void renameTable(Identifier from, Identifier to) throws NoSuchTableException, TableAlreadyExistsException { try { + checkNotPathIdentifier(from, "renameTable"); + checkNotPathIdentifier(to, "renameTable"); icebergCatalog.renameTable(buildIdentifier(from), buildIdentifier(to)); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { throw new NoSuchTableException(from); @@ -268,7 +272,7 @@ public void renameTable(Identifier from, Identifier to) throws NoSuchTableExcept @Override public void invalidateTable(Identifier ident) { try { - icebergCatalog.loadTable(buildIdentifier(ident)).refresh(); + load(ident).refresh(); } catch (org.apache.iceberg.exceptions.NoSuchTableException ignored) { // ignore if the table doesn't exist, it is not cached } @@ -400,6 +404,7 @@ public final void initialize(String name, CaseInsensitiveStringMap options) { Catalog catalog = buildIcebergCatalog(name, options); this.catalogName = name; + this.tables = new HadoopTables(SparkSession.active().sessionState().newHadoopConf()); this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(catalog) : catalog; if (catalog instanceof SupportsNamespaces) { this.asNamespaceCatalog = (SupportsNamespaces) catalog; @@ -453,4 +458,27 @@ private static void commitChanges(Table table, SetProperty setLocation, SetPrope transaction.commitTransaction(); } + + private static boolean isPathIdentifier(Identifier ident) { + return ident instanceof PathIdentifier; + } + + private static void checkNotPathIdentifier(Identifier identifier, String method) { + if (identifier instanceof PathIdentifier) { + throw new IllegalArgumentException(String.format("Cannot pass path based identifier to %s method. %s is a path.", + method, identifier)); + } + } + + private Table load(Identifier ident) { + return isPathIdentifier(ident) ? + tables.load(((PathIdentifier) ident).location()) : + icebergCatalog.loadTable(buildIdentifier(ident)); + } + + private Catalog.TableBuilder newBuilder(Identifier ident, Schema schema) { + return isPathIdentifier(ident) ? + tables.buildTable(((PathIdentifier) ident).location(), schema) : + icebergCatalog.buildTable(buildIdentifier(ident), schema); + } } diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestPathIdentifier.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestPathIdentifier.java new file mode 100644 index 000000000000..f858809c23f9 --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestPathIdentifier.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.File; +import java.io.IOException; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.Schema; +import org.apache.iceberg.hadoop.HadoopTableOperations; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.PathIdentifier; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkTestBase; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestPathIdentifier extends SparkTestBase { + + private static final Schema SCHEMA = new Schema( + required(1, "id", Types.LongType.get()), + required(2, "data", Types.StringType.get())); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + private File tableLocation; + private PathIdentifier identifier; + private SparkCatalog sparkCatalog; + + @Before + public void before() throws IOException { + tableLocation = temp.newFolder(); + identifier = new PathIdentifier(tableLocation.getAbsolutePath()); + sparkCatalog = new SparkCatalog(); + sparkCatalog.initialize("test", new CaseInsensitiveStringMap(ImmutableMap.of())); + } + + @After + public void after() { + tableLocation.delete(); + sparkCatalog = null; + } + + @Test + public void testPathIdentifier() throws TableAlreadyExistsException, NoSuchTableException { + SparkTable table = sparkCatalog.createTable(identifier, + SparkSchemaUtil.convert(SCHEMA), + new Transform[0], + ImmutableMap.of()); + + Assert.assertEquals(table.table().location(), tableLocation.getAbsolutePath()); + Assert.assertTrue(table.table() instanceof BaseTable); + Assert.assertTrue(((BaseTable) table.table()).operations() instanceof HadoopTableOperations); + + Assert.assertEquals(sparkCatalog.loadTable(identifier), table); + Assert.assertTrue(sparkCatalog.dropTable(identifier)); + } +} +