diff --git a/api/src/main/java/org/apache/iceberg/io/TableLocationSupplier.java b/api/src/main/java/org/apache/iceberg/io/TableLocationSupplier.java new file mode 100644 index 000000000000..4b08f3979139 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/io/TableLocationSupplier.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.iceberg.io; + +import java.util.Map; +import java.util.function.Supplier; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.catalog.TableIdentifier; + +/** + * Supplier interface to allow for customizing table locations based on + * contextual information from the table. + */ +public interface TableLocationSupplier extends Supplier { + default TableLocationSupplier uuid(String uuid) { + return this; + } + + default TableLocationSupplier identifier(TableIdentifier identifier) { + return this; + } + + default TableLocationSupplier schema(Schema schema) { + return this; + } + + default TableLocationSupplier partitionSpec(PartitionSpec partitionSpec) { + return this; + } + + default TableLocationSupplier location(String currentLocation) { + return this; + } + + default TableLocationSupplier sortOrder(SortOrder sortOrder) { + return this; + } + + default TableLocationSupplier properties(Map properties) { + return this; + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java index 0730667a68fc..82949e86bb0b 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java @@ -25,6 +25,7 @@ import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.TableLocationSupplier; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -154,6 +155,25 @@ public String toString() { protected abstract String defaultWarehouseLocation(TableIdentifier tableIdentifier); + protected TableLocationSupplier tableLocationSupplier(TableIdentifier tableIdentifier) { + // Return a supplier that mimics the current default warehouse location behavior + // and preserves the current location for existing replace behavior + return new TableLocationSupplier() { + private String currentLocation; + + @Override + public TableLocationSupplier location(String currentLocation) { + this.currentLocation = currentLocation; + return this; + } + + @Override + public String get() { + return currentLocation != null ? currentLocation : defaultWarehouseLocation(tableIdentifier); + } + }; + } + protected class BaseMetastoreCatalogTableBuilder implements TableBuilder { private final TableIdentifier identifier; private final Schema schema; @@ -208,7 +228,7 @@ public Table create() { throw new AlreadyExistsException("Table already exists: %s", identifier); } - String baseLocation = location != null ? location : defaultWarehouseLocation(identifier); + TableLocationSupplier baseLocation = location != null ? () -> location : tableLocationSupplier(identifier); Map properties = propertiesBuilder.build(); TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, properties); @@ -228,7 +248,7 @@ public Transaction createTransaction() { throw new AlreadyExistsException("Table already exists: %s", identifier); } - String baseLocation = location != null ? location : defaultWarehouseLocation(identifier); + TableLocationSupplier baseLocation = location != null ? () -> location : tableLocationSupplier(identifier); Map properties = propertiesBuilder.build(); TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, properties); return Transactions.createTableTransaction(identifier.toString(), ops, metadata); @@ -252,10 +272,19 @@ private Transaction newReplaceTableTransaction(boolean orCreate) { TableMetadata metadata; if (ops.current() != null) { - String baseLocation = location != null ? location : ops.current().location(); - metadata = ops.current().buildReplacement(schema, spec, sortOrder, baseLocation, propertiesBuilder.build()); + TableLocationSupplier baseLocation = location != null ? () -> location : + tableLocationSupplier(identifier) + .schema(schema) + .partitionSpec(spec) + .uuid(ops.current().uuid()) + .sortOrder(ops.current().sortOrder()) + .properties(ops.current().properties()) + .location(ops.current().location()); + + metadata = ops.current().buildReplacement(schema, spec, sortOrder, baseLocation.get(), + propertiesBuilder.build()); } else { - String baseLocation = location != null ? location : defaultWarehouseLocation(identifier); + TableLocationSupplier baseLocation = location != null ? () -> location : tableLocationSupplier(identifier); metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, propertiesBuilder.build()); } diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 5703cc4ad4d1..b1c786b80b90 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -31,6 +31,7 @@ import java.util.function.Predicate; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.TableLocationSupplier; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Objects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -66,7 +67,8 @@ public static TableMetadata newTableMetadata(TableOperations ops, PartitionSpec spec, String location, Map properties) { - return newTableMetadata(schema, spec, SortOrder.unsorted(), location, properties, DEFAULT_TABLE_FORMAT_VERSION); + return newTableMetadata(schema, spec, SortOrder.unsorted(), () -> location, properties, + DEFAULT_TABLE_FORMAT_VERSION); } public static TableMetadata newTableMetadata(Schema schema, @@ -74,22 +76,33 @@ public static TableMetadata newTableMetadata(Schema schema, SortOrder sortOrder, String location, Map properties) { - return newTableMetadata(schema, spec, sortOrder, location, properties, DEFAULT_TABLE_FORMAT_VERSION); + return newTableMetadata(schema, spec, sortOrder, () -> location, properties, DEFAULT_TABLE_FORMAT_VERSION); + } + + public static TableMetadata newTableMetadata(Schema schema, + PartitionSpec spec, + SortOrder sortOrder, + TableLocationSupplier tableLocationSupplier, + Map properties) { + return newTableMetadata(schema, spec, sortOrder, tableLocationSupplier, properties, DEFAULT_TABLE_FORMAT_VERSION); } public static TableMetadata newTableMetadata(Schema schema, PartitionSpec spec, String location, Map properties) { - return newTableMetadata(schema, spec, SortOrder.unsorted(), location, properties, DEFAULT_TABLE_FORMAT_VERSION); + return newTableMetadata(schema, spec, SortOrder.unsorted(), () -> location, properties, + DEFAULT_TABLE_FORMAT_VERSION); } static TableMetadata newTableMetadata(Schema schema, PartitionSpec spec, SortOrder sortOrder, - String location, + TableLocationSupplier tableLocationSupplier, Map properties, int formatVersion) { + String uuid = UUID.randomUUID().toString(); + // reassign all column ids to ensure consistency AtomicInteger lastColumnId = new AtomicInteger(0); Schema freshSchema = TypeUtil.assignFreshIds(INITIAL_SCHEMA_ID, schema, lastColumnId::incrementAndGet); @@ -116,7 +129,15 @@ static TableMetadata newTableMetadata(Schema schema, // break existing tables. MetricsConfig.fromProperties(properties).validateReferencedColumns(schema); - return new TableMetadata(null, formatVersion, UUID.randomUUID().toString(), location, + // Seed all relevant information to the table location supplier + tableLocationSupplier + .uuid(uuid) + .schema(schema) + .partitionSpec(spec) + .sortOrder(sortOrder) + .properties(properties); + + return new TableMetadata(null, formatVersion, uuid, tableLocationSupplier.get(), INITIAL_SEQUENCE_NUMBER, System.currentTimeMillis(), lastColumnId.get(), freshSchema.schemaId(), ImmutableList.of(freshSchema), freshSpec.specId(), ImmutableList.of(freshSpec), freshSpec.lastAssignedFieldId(), diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index d3cebec1f328..bec39c4e2ed5 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -621,7 +621,7 @@ public void testUpdateSortOrder() { SortOrder order = SortOrder.builderFor(schema).asc("x").build(); TableMetadata sortedByX = TableMetadata.newTableMetadata( - schema, PartitionSpec.unpartitioned(), order, null, ImmutableMap.of()); + schema, PartitionSpec.unpartitioned(), order, () -> null, ImmutableMap.of()); Assert.assertEquals("Should have 1 sort order", 1, sortedByX.sortOrders().size()); Assert.assertEquals("Should use orderId 1", 1, sortedByX.sortOrder().orderId()); Assert.assertEquals("Should be sorted by one field", 1, sortedByX.sortOrder().fields().size()); diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java b/core/src/test/java/org/apache/iceberg/TestTables.java index 2e0ecd60359d..3c9744581e6e 100644 --- a/core/src/test/java/org/apache/iceberg/TestTables.java +++ b/core/src/test/java/org/apache/iceberg/TestTables.java @@ -58,7 +58,8 @@ public static TestTable create(File temp, String name, Schema schema, PartitionS throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp); } - ops.commit(null, newTableMetadata(schema, spec, sortOrder, temp.toString(), ImmutableMap.of(), formatVersion)); + ops.commit(null, newTableMetadata(schema, spec, sortOrder, () -> temp.toString(), ImmutableMap.of(), + formatVersion)); return new TestTable(ops, name); } @@ -74,7 +75,7 @@ public static Transaction beginCreate(File temp, String name, Schema schema, throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp); } - TableMetadata metadata = newTableMetadata(schema, spec, sortOrder, temp.toString(), ImmutableMap.of(), 1); + TableMetadata metadata = newTableMetadata(schema, spec, sortOrder, () -> temp.toString(), ImmutableMap.of(), 1); return Transactions.createTableTransaction(name, ops, metadata); }