Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions api/src/main/java/org/apache/iceberg/io/TableLocationSupplier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/


package org.apache.iceberg.io;

import java.util.Map;
import java.util.function.Supplier;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.catalog.TableIdentifier;

/**
* Supplier interface to allow for customizing table locations based on
* contextual information from the table.
*/
public interface TableLocationSupplier extends Supplier<String> {
default TableLocationSupplier uuid(String uuid) {
return this;
}

default TableLocationSupplier identifier(TableIdentifier identifier) {
return this;
}

default TableLocationSupplier schema(Schema schema) {
return this;
}

default TableLocationSupplier partitionSpec(PartitionSpec partitionSpec) {
return this;
}

default TableLocationSupplier location(String currentLocation) {
return this;
}

default TableLocationSupplier sortOrder(SortOrder sortOrder) {
return this;
}

default TableLocationSupplier properties(Map<String, String> properties) {
return this;
}
}
39 changes: 34 additions & 5 deletions core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.TableLocationSupplier;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -154,6 +155,25 @@ public String toString() {

protected abstract String defaultWarehouseLocation(TableIdentifier tableIdentifier);

protected TableLocationSupplier tableLocationSupplier(TableIdentifier tableIdentifier) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you choose to make this a supplier rather than passing the incoming options to a method like defaultWarehouseLocation? Is that because the table UUID is created in the TableMetadata constructor?

Are there cases where you think the location would be based on the schema or sort order? I can't think of valid cases and I wonder if this could actually be a bad idea. For example, maybe this could look for a column pii struct<email string, ...> and choose to place the table in a "sensitive" bucket, but I don't think that would be a good pattern for people to follow because it is error-prone: using PII for the column name could break it.

I'm also skeptical that location would be based on the partition spec, especially because the spec may change.

// Return a supplier that mimics the current default warehouse location behavior
// and preserves the current location for existing replace behavior
return new TableLocationSupplier() {
private String currentLocation;

@Override
public TableLocationSupplier location(String currentLocation) {
this.currentLocation = currentLocation;
return this;
}

@Override
public String get() {
return currentLocation != null ? currentLocation : defaultWarehouseLocation(tableIdentifier);
}
};
}

protected class BaseMetastoreCatalogTableBuilder implements TableBuilder {
private final TableIdentifier identifier;
private final Schema schema;
Expand Down Expand Up @@ -208,7 +228,7 @@ public Table create() {
throw new AlreadyExistsException("Table already exists: %s", identifier);
}

String baseLocation = location != null ? location : defaultWarehouseLocation(identifier);
TableLocationSupplier baseLocation = location != null ? () -> location : tableLocationSupplier(identifier);
Map<String, String> properties = propertiesBuilder.build();
TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, properties);

Expand All @@ -228,7 +248,7 @@ public Transaction createTransaction() {
throw new AlreadyExistsException("Table already exists: %s", identifier);
}

String baseLocation = location != null ? location : defaultWarehouseLocation(identifier);
TableLocationSupplier baseLocation = location != null ? () -> location : tableLocationSupplier(identifier);
Map<String, String> properties = propertiesBuilder.build();
TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, properties);
return Transactions.createTableTransaction(identifier.toString(), ops, metadata);
Expand All @@ -252,10 +272,19 @@ private Transaction newReplaceTableTransaction(boolean orCreate) {

TableMetadata metadata;
if (ops.current() != null) {
String baseLocation = location != null ? location : ops.current().location();
metadata = ops.current().buildReplacement(schema, spec, sortOrder, baseLocation, propertiesBuilder.build());
TableLocationSupplier baseLocation = location != null ? () -> location :
tableLocationSupplier(identifier)
.schema(schema)
.partitionSpec(spec)
.uuid(ops.current().uuid())
.sortOrder(ops.current().sortOrder())
.properties(ops.current().properties())
.location(ops.current().location());

metadata = ops.current().buildReplacement(schema, spec, sortOrder, baseLocation.get(),
propertiesBuilder.build());
} else {
String baseLocation = location != null ? location : defaultWarehouseLocation(identifier);
TableLocationSupplier baseLocation = location != null ? () -> location : tableLocationSupplier(identifier);
metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, propertiesBuilder.build());
}

Expand Down
31 changes: 26 additions & 5 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.function.Predicate;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.TableLocationSupplier;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -66,30 +67,42 @@ public static TableMetadata newTableMetadata(TableOperations ops,
PartitionSpec spec,
String location,
Map<String, String> properties) {
return newTableMetadata(schema, spec, SortOrder.unsorted(), location, properties, DEFAULT_TABLE_FORMAT_VERSION);
return newTableMetadata(schema, spec, SortOrder.unsorted(), () -> location, properties,
DEFAULT_TABLE_FORMAT_VERSION);
}

public static TableMetadata newTableMetadata(Schema schema,
PartitionSpec spec,
SortOrder sortOrder,
String location,
Map<String, String> properties) {
return newTableMetadata(schema, spec, sortOrder, location, properties, DEFAULT_TABLE_FORMAT_VERSION);
return newTableMetadata(schema, spec, sortOrder, () -> location, properties, DEFAULT_TABLE_FORMAT_VERSION);
}

public static TableMetadata newTableMetadata(Schema schema,
PartitionSpec spec,
SortOrder sortOrder,
TableLocationSupplier tableLocationSupplier,
Map<String, String> properties) {
return newTableMetadata(schema, spec, sortOrder, tableLocationSupplier, properties, DEFAULT_TABLE_FORMAT_VERSION);
}

public static TableMetadata newTableMetadata(Schema schema,
PartitionSpec spec,
String location,
Map<String, String> properties) {
return newTableMetadata(schema, spec, SortOrder.unsorted(), location, properties, DEFAULT_TABLE_FORMAT_VERSION);
return newTableMetadata(schema, spec, SortOrder.unsorted(), () -> location, properties,
DEFAULT_TABLE_FORMAT_VERSION);
}

static TableMetadata newTableMetadata(Schema schema,
PartitionSpec spec,
SortOrder sortOrder,
String location,
TableLocationSupplier tableLocationSupplier,
Map<String, String> properties,
int formatVersion) {
String uuid = UUID.randomUUID().toString();

// reassign all column ids to ensure consistency
AtomicInteger lastColumnId = new AtomicInteger(0);
Schema freshSchema = TypeUtil.assignFreshIds(INITIAL_SCHEMA_ID, schema, lastColumnId::incrementAndGet);
Expand All @@ -116,7 +129,15 @@ static TableMetadata newTableMetadata(Schema schema,
// break existing tables.
MetricsConfig.fromProperties(properties).validateReferencedColumns(schema);

return new TableMetadata(null, formatVersion, UUID.randomUUID().toString(), location,
// Seed all relevant information to the table location supplier
tableLocationSupplier
.uuid(uuid)
.schema(schema)
.partitionSpec(spec)
.sortOrder(sortOrder)
.properties(properties);

return new TableMetadata(null, formatVersion, uuid, tableLocationSupplier.get(),
INITIAL_SEQUENCE_NUMBER, System.currentTimeMillis(),
lastColumnId.get(), freshSchema.schemaId(), ImmutableList.of(freshSchema),
freshSpec.specId(), ImmutableList.of(freshSpec), freshSpec.lastAssignedFieldId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ public void testUpdateSortOrder() {
SortOrder order = SortOrder.builderFor(schema).asc("x").build();

TableMetadata sortedByX = TableMetadata.newTableMetadata(
schema, PartitionSpec.unpartitioned(), order, null, ImmutableMap.of());
schema, PartitionSpec.unpartitioned(), order, () -> null, ImmutableMap.of());
Assert.assertEquals("Should have 1 sort order", 1, sortedByX.sortOrders().size());
Assert.assertEquals("Should use orderId 1", 1, sortedByX.sortOrder().orderId());
Assert.assertEquals("Should be sorted by one field", 1, sortedByX.sortOrder().fields().size());
Expand Down
5 changes: 3 additions & 2 deletions core/src/test/java/org/apache/iceberg/TestTables.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public static TestTable create(File temp, String name, Schema schema, PartitionS
throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp);
}

ops.commit(null, newTableMetadata(schema, spec, sortOrder, temp.toString(), ImmutableMap.of(), formatVersion));
ops.commit(null, newTableMetadata(schema, spec, sortOrder, () -> temp.toString(), ImmutableMap.of(),
formatVersion));

return new TestTable(ops, name);
}
Expand All @@ -74,7 +75,7 @@ public static Transaction beginCreate(File temp, String name, Schema schema,
throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp);
}

TableMetadata metadata = newTableMetadata(schema, spec, sortOrder, temp.toString(), ImmutableMap.of(), 1);
TableMetadata metadata = newTableMetadata(schema, spec, sortOrder, () -> temp.toString(), ImmutableMap.of(), 1);

return Transactions.createTableTransaction(name, ops, metadata);
}
Expand Down