Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.Transaction;
Expand All @@ -125,6 +127,8 @@
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Term;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.IntegerType;
Expand All @@ -133,6 +137,7 @@
import org.apache.iceberg.types.Types.StringType;
import org.apache.iceberg.types.Types.StructType;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
Expand Down Expand Up @@ -171,6 +176,7 @@
import static io.trino.plugin.base.util.Procedures.checkProcedureArgument;
import static io.trino.plugin.hive.HiveApplyProjectionUtil.extractSupportedProjectedColumns;
import static io.trino.plugin.hive.HiveApplyProjectionUtil.replaceWithNewVariables;
import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
import static io.trino.plugin.hive.util.HiveUtil.isStructuralType;
import static io.trino.plugin.iceberg.ConstraintExtractor.extractTupleDomain;
import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression;
Expand Down Expand Up @@ -200,6 +206,7 @@
import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
import static io.trino.plugin.iceberg.IcebergTableProperties.getTableLocation;
import static io.trino.plugin.iceberg.IcebergUtil.canEnforceColumnConstraintInSpecs;
import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue;
import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle;
Expand Down Expand Up @@ -233,6 +240,7 @@
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc;
import static io.trino.spi.type.UuidType.UUID;
import static java.lang.Integer.parseInt;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
Expand All @@ -248,6 +256,7 @@
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
import static org.apache.iceberg.util.LocationUtil.stripTrailingSlash;
import static org.apache.iceberg.util.SnapshotUtil.schemaFor;

public class IcebergMetadata
Expand All @@ -265,6 +274,10 @@ public class IcebergMetadata

private static final FunctionName NUMBER_OF_DISTINCT_VALUES = new FunctionName("approx_distinct");

private static final String METADATA_FOLDER_NAME = "metadata";
private static final String METADATA_FILE_EXTENSION = ".metadata.json";
public static final String EXISTING_LATEST_METADATA_LOCATION = "existingLatestMetadataLocation";

private final TypeManager typeManager;
private final TypeOperators typeOperators;
private final JsonCodec<CommitTaskData> commitTaskCodec;
Expand Down Expand Up @@ -670,20 +683,45 @@ public Optional<ConnectorTableLayout> getNewTableLayout(ConnectorSession session
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode)
{
verify(transaction == null, "transaction already set");
transaction = newCreateTableTransaction(catalog, tableMetadata, session);
String location = transaction.table().location();
Optional<String> providedLocation = getTableLocation(tableMetadata.getProperties());
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
try {
if (fileSystem.listFiles(location).hasNext()) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, format("" +
"Cannot create a table on a non-empty location: %s, set 'iceberg.unique-table-location=true' in your Iceberg catalog properties " +
"to use unique table locations for every table.", location));

if (providedLocation.isPresent()) {
try {
boolean locationExists = fileSystem.newInputFile(providedLocation.get()).exists();
if (locationExists) {
Optional<String> latestMetadataLocation = getLatestMetadataLocation(fileSystem, providedLocation.get());
if (latestMetadataLocation.isPresent()) {
ConnectorTableMetadata connectorTableMetadata = createConnectorMetadataWithExistingMetadata(fileSystem, tableMetadata, latestMetadataLocation.get());
transaction = newCreateTableTransaction(catalog, connectorTableMetadata, session);
}
else {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "No metadata file exists at location: " + providedLocation.get());
}
}
else {
transaction = newCreateTableTransaction(catalog, tableMetadata, session);
}
}
catch (IOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to create table with provided location: " + providedLocation.get());
}
return newWritableTableHandle(tableMetadata.getTable(), transaction.table(), retryMode);
}
catch (IOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed checking new table's location: " + location, e);
else {
transaction = newCreateTableTransaction(catalog, tableMetadata, session);
String location = transaction.table().location();
try {
if (fileSystem.listFiles(location).hasNext()) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, format("" +
"Cannot create a table on a non-empty location: %s, set 'iceberg.unique-table-location=true' in your Iceberg catalog properties " +
"to use unique table locations for every table.", location));
}
}
catch (IOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed checking new table's location: " + location, e);
}
}
return newWritableTableHandle(tableMetadata.getTable(), transaction.table(), retryMode);
}

@Override
Expand Down Expand Up @@ -1450,6 +1488,63 @@ private List<ColumnMetadata> getColumnMetadatas(Schema schema)
return columns.build();
}

private ConnectorTableMetadata createConnectorMetadataWithExistingMetadata(TrinoFileSystem fileSystem, ConnectorTableMetadata tableMetadata, String metadataLocation)
{
FileIO fileIO = fileSystem.toFileIo();
InputFile metadataFile = fileIO.newInputFile(metadataLocation);
TableMetadata existingMetadata = TableMetadataParser.read(fileIO, metadataFile);

Map<String, Object> existingTableMetadataProps = new HashMap<>();
existingTableMetadataProps.putAll(tableMetadata.getProperties());
for (Map.Entry<String, String> entry : existingMetadata.properties().entrySet()) {
existingTableMetadataProps.put(entry.getKey(), (Object) entry.getValue());
}
existingTableMetadataProps.put(EXISTING_LATEST_METADATA_LOCATION, metadataLocation);

Optional<String> comment = Optional.ofNullable((String) existingTableMetadataProps.get(TABLE_COMMENT));
return new ConnectorTableMetadata(tableMetadata.getTable(), getColumnMetadatas(existingMetadata.schema()), existingTableMetadataProps, comment);
}

private static Optional<String> getLatestMetadataLocation(TrinoFileSystem fileSystem, String location)
{
String latestMetadataLocation = null;
try {
if (fileSystem.listFiles(location).hasNext()) {
String metadataDirectoryLocation = stripTrailingSlash(location) + File.separator + METADATA_FOLDER_NAME;
FileIterator fileIterator = fileSystem.listFiles(metadataDirectoryLocation);
int latestMetadataVersion = -1;
while (fileIterator.hasNext()) {
FileEntry fileEntry = fileIterator.next();
if (fileEntry.path().endsWith(METADATA_FILE_EXTENSION)) {
int version = parseVersion(fileEntry.path());
if (version > latestMetadataVersion) {
Comment on lines +1519 to +1520
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible that there could be a tie here. For example, if two writers tried updating the table at the same time. If we detect that there is a tie for the highest version number, we should throw an exception.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will take care in register_table procedure.

latestMetadataVersion = version;
latestMetadataLocation = fileEntry.path();
}
}
}
}
}
catch (IOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed checking table's location: " + location, e);
}
return Optional.ofNullable(latestMetadataLocation);
}

// TODO duplicate of AbstractIcebergTableOperations#parseVersion
private static int parseVersion(String metadataLocation)
{
int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0
int versionEnd = metadataLocation.indexOf('-', versionStart);
try {
return parseInt(metadataLocation.substring(versionStart, versionEnd));
}
catch (NumberFormatException | IndexOutOfBoundsException e) {
log.warn(e, "Unable to parse version from metadata location: %s", metadataLocation);
return -1;
}
}

@Override
public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, Map<String, Object> analyzeProperties)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MAX;
import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MIN;
import static io.trino.plugin.iceberg.IcebergMetadata.EXISTING_LATEST_METADATA_LOCATION;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static io.trino.spi.session.PropertyMetadata.doubleProperty;
import static io.trino.spi.session.PropertyMetadata.enumProperty;
Expand Down Expand Up @@ -118,6 +119,11 @@ public static List<String> getPartitioning(Map<String, Object> tableProperties)
return partitioning == null ? ImmutableList.of() : ImmutableList.copyOf(partitioning);
}

public static Optional<String> getExistingLatestMetadataLocation(Map<String, Object> tableProperties)
{
return Optional.ofNullable((String) tableProperties.get(EXISTING_LATEST_METADATA_LOCATION));
}

public static Optional<String> getTableLocation(Map<String, Object> tableProperties)
{
return Optional.ofNullable((String) tableProperties.get(LOCATION_PROPERTY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
import static io.trino.plugin.iceberg.ColumnIdentity.createColumnIdentity;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_PARTITION_VALUE;
import static io.trino.plugin.iceberg.IcebergMetadata.EXISTING_LATEST_METADATA_LOCATION;
import static io.trino.plugin.iceberg.IcebergMetadata.ORC_BLOOM_FILTER_COLUMNS_KEY;
import static io.trino.plugin.iceberg.IcebergMetadata.ORC_BLOOM_FILTER_FPP_KEY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
Expand All @@ -95,6 +96,7 @@
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_COLUMNS;
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_FPP;
import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.getExistingLatestMetadataLocation;
import static io.trino.plugin.iceberg.IcebergTableProperties.getOrcBloomFilterColumns;
import static io.trino.plugin.iceberg.IcebergTableProperties.getOrcBloomFilterFpp;
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
Expand Down Expand Up @@ -573,6 +575,11 @@ public static Transaction newCreateTableTransaction(TrinoCatalog catalog, Connec
propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toIceberg().toString());
propertiesBuilder.put(FORMAT_VERSION, Integer.toString(IcebergTableProperties.getFormatVersion(tableMetadata.getProperties())));

Optional<String> existingLatestMetadataLocation = getExistingLatestMetadataLocation(tableMetadata.getProperties());
if (existingLatestMetadataLocation.isPresent()) {
propertiesBuilder.put(EXISTING_LATEST_METADATA_LOCATION, existingLatestMetadataLocation.get());
}

// iceberg ORC format bloom filter properties used by create table
List<String> columns = getOrcBloomFilterColumns(tableMetadata.getProperties());
if (!columns.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static io.trino.plugin.hive.ViewReaderUtil.isPrestoView;
import static io.trino.plugin.hive.metastore.PrincipalPrivileges.NO_PRIVILEGES;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.trino.plugin.iceberg.IcebergMetadata.EXISTING_LATEST_METADATA_LOCATION;
import static io.trino.plugin.iceberg.IcebergUtil.isIcebergTable;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -88,7 +89,10 @@ protected final String getRefreshedLocation(boolean invalidateCaches)
@Override
protected final void commitNewTable(TableMetadata metadata)
{
String newMetadataLocation = writeNewMetadata(metadata, version + 1);
String newMetadataLocation = metadata.properties().get(EXISTING_LATEST_METADATA_LOCATION);
if (newMetadataLocation == null) {
newMetadataLocation = writeNewMetadata(metadata, version + 1);
}

Table.Builder builder = Table.builder()
.setDatabaseName(database)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1239,15 +1239,15 @@ public void testTableNameCollision()
}

@Test
public void testCreateTableSucceedsOnEmptyDirectory()
public void testCreateTableFailureOnEmptyDirectory()
{
File tempDir = getDistributedQueryRunner().getCoordinator().getBaseDataDir().toFile();
String tmpName = "test_rename_table_tmp_" + randomTableSuffix();
Path newPath = tempDir.toPath().resolve(tmpName);
File directory = newPath.toFile();
verify(directory.mkdirs(), "Could not make directory on filesystem");
try {
assertUpdate("CREATE TABLE " + tmpName + " WITH (location='" + directory + "') AS SELECT 1 as a", 1);
assertQueryFails("CREATE TABLE " + tmpName + " WITH (location='" + directory + "') AS SELECT 1 as a", "No metadata file exists at location.*");
}
finally {
assertUpdate("DROP TABLE IF EXISTS " + tmpName);
Expand Down Expand Up @@ -1290,11 +1290,9 @@ private void testCreateTableLikeForFormat(IcebergFileFormat otherFormat)
format(" format = '%s',\n format_version = 2,\n location = '%s'\n)", format, getTableLocation("test_create_table_like_copy2")));
dropTable("test_create_table_like_copy2");

assertQueryFails("CREATE TABLE test_create_table_like_copy3 (LIKE test_create_table_like_original INCLUDING PROPERTIES)",
"Cannot create a table on a non-empty location.*");
assertQuerySucceeds("CREATE TABLE test_create_table_like_copy3 (LIKE test_create_table_like_original INCLUDING PROPERTIES)");

assertQueryFails(format("CREATE TABLE test_create_table_like_copy4 (LIKE test_create_table_like_original INCLUDING PROPERTIES) WITH (format = '%s')", otherFormat),
"Cannot create a table on a non-empty location.*");
assertQuerySucceeds(format("CREATE TABLE test_create_table_like_copy4 (LIKE test_create_table_like_original INCLUDING PROPERTIES) WITH (format = '%s')", otherFormat));
}

private String getTablePropertiesString(String tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.google.common.base.Verify.verify;
import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.plugin.iceberg.DataFileRecord.toDataFileRecord;
Expand Down Expand Up @@ -113,4 +116,37 @@ public void testCreateAndDrop()
assertFalse(fileSystem.exists(new Path(dataFile.getFilePath())), "The data file should have been removed");
assertFalse(fileSystem.exists(tableLocation), "The directory corresponding to the dropped Iceberg table should be removed as we don't allow shared locations.");
}

@Test
public void testCreateDropAndRecreateTable()
{
String tableName = "testCreateDropAndRecreateTableOld_" + randomTableSuffix();
assertUpdate(format("CREATE TABLE %s (a int, b varchar, c boolean)", tableName));
assertUpdate(format("INSERT INTO %s values(1, 'string1', true)", tableName), 1);
assertUpdate(format("INSERT INTO %s values(2, 'string2', false)", tableName), 1);
assertThat(query(format("SELECT * FROM %s", tableName)))
.matches("VALUES ROW(INT '1', VARCHAR 'string1', BOOLEAN 'true'), ROW(INT '2', VARCHAR 'string2', BOOLEAN 'false')");

String tableLocation = getTableLocation(tableName);
metastore.dropTable("tpch", tableName, false);
assertThat(metastore.getTable("tpch", tableName)).as("Table metastore in Hive should be dropped").isEmpty();

String newTableName = "testCreateDropAndRecreateTableNew_" + randomTableSuffix();
assertUpdate(format("CREATE TABLE %s (dummy int) WITH (location = '%s')", newTableName, tableLocation));
assertThat(query(format("SELECT * FROM %s", newTableName)))
.matches("VALUES ROW(INT '1', VARCHAR 'string1', BOOLEAN 'true'), ROW(INT '2', VARCHAR 'string2', BOOLEAN 'false')");
assertUpdate(format("DROP TABLE %s", newTableName));
}

private String getTableLocation(String tableName)
{
Pattern locationPattern = Pattern.compile(".*location = '(.*?)'.*", Pattern.DOTALL);
Matcher m = locationPattern.matcher((String) computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue());
if (m.find()) {
String location = m.group(1);
verify(!m.find(), "Unexpected second match");
return location;
}
throw new IllegalStateException("Location not found in SHOW CREATE TABLE result");
}
}