diff --git a/api/src/main/java/org/apache/iceberg/util/CharSequenceSet.java b/api/src/main/java/org/apache/iceberg/util/CharSequenceSet.java index 80c8284bce39..6d318e558fc8 100644 --- a/api/src/main/java/org/apache/iceberg/util/CharSequenceSet.java +++ b/api/src/main/java/org/apache/iceberg/util/CharSequenceSet.java @@ -22,11 +22,14 @@ import java.io.Serializable; import java.util.Collection; import java.util.Iterator; +import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Iterators; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; public class CharSequenceSet implements Set, Serializable { private static final ThreadLocal wrappers = ThreadLocal.withInitial( @@ -152,4 +155,28 @@ public boolean removeAll(Collection objects) { public void clear() { wrapperSet.clear(); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + CharSequenceSet that = (CharSequenceSet) o; + return wrapperSet.equals(that.wrapperSet); + } + + @Override + public int hashCode() { + return Objects.hash(wrapperSet); + } + + @Override + public String toString() { + return Streams.stream(iterator()).collect(Collectors.joining("CharSequenceSet({", ", ", "})")); + } } diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java index 9cea03d2f1ae..8fad3839f82d 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java @@ -196,7 +196,7 @@ public Transaction createOrReplaceTransaction() { private Transaction newReplaceTableTransaction(boolean orCreate) { TableOperations ops = newTableOps(identifier); if (!orCreate && ops.current() == null) { - throw new NoSuchTableException("No such table: %s", identifier); + throw new NoSuchTableException("Table does not exist: %s", identifier); } TableMetadata metadata; diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index 978c58807905..9e65ac87dae0 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -25,6 +25,7 @@ import java.util.function.Function; import java.util.function.Predicate; import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.io.FileIO; @@ -115,7 +116,12 @@ protected void doRefresh() { public void commit(TableMetadata base, TableMetadata metadata) { // if the metadata is already out of date, reject it if (base != current()) { - throw new CommitFailedException("Cannot commit: stale table metadata"); + if (base != null) { + throw new CommitFailedException("Cannot commit: stale table metadata"); + } else { + // when current is non-null, the table exists. but when base is null, the commit is trying to create the table + throw new AlreadyExistsException("Table already exists: %s", tableName()); + } } // if the metadata is not changed, return early if (base == metadata) { diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java index af939f36f879..28d516377c1f 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java @@ -94,7 +94,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { try { Map table = getTable(); - if (!table.isEmpty()) { + if (base != null) { validateMetadataLocation(table, base); String oldMetadataLocation = base.metadataFileLocation(); // Start atomic update @@ -123,6 +123,10 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { } catch (SQLWarning e) { throw new UncheckedSQLException(e, "Database warning"); } catch (SQLException e) { + // SQLite doesn't set SQLState or throw SQLIntegrityConstraintViolationException + if (e.getMessage().contains("constraint failed")) { + throw new AlreadyExistsException("Table already exists: %s", tableIdentifier); + } throw new UncheckedSQLException(e, "Unknown failure"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index fe941cae5247..5171849c8c57 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -19,22 +19,43 @@ package org.apache.iceberg.catalog; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.AppendFiles; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.ReplaceSortOrder; import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdatePartitionSpec; +import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.CharSequenceSet; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Assume; import org.junit.Test; @@ -43,32 +64,95 @@ public abstract class CatalogTests { private static final Namespace NS = Namespace.of("newdb"); + private static final TableIdentifier TABLE = TableIdentifier.of(NS, "table"); + // Schema passed to create tables - static final Schema SCHEMA = new Schema( + private static final Schema SCHEMA = new Schema( required(3, "id", Types.IntegerType.get(), "unique ID"), required(4, "data", Types.StringType.get()) ); // This is the actual schema for the table, with column IDs reassigned - static final Schema TABLE_SCHEMA = new Schema( + private static final Schema TABLE_SCHEMA = new Schema( required(1, "id", Types.IntegerType.get(), "unique ID"), required(2, "data", Types.StringType.get()) ); + // This is the actual schema for the table, with column IDs reassigned + private static final Schema REPLACE_SCHEMA = new Schema( + required(2, "id", Types.IntegerType.get(), "unique ID"), + required(3, "data", Types.StringType.get()) + ); + + // another schema that is not the same + private static final Schema OTHER_SCHEMA = new Schema( + required(1, "some_id", Types.IntegerType.get()) + ); + + // Partition spec used to create tables + private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) + .bucket("id", 16) + .build(); + + private static final PartitionSpec TABLE_SPEC = PartitionSpec.builderFor(TABLE_SCHEMA) + .bucket("id", 16) + .build(); + + private static final PartitionSpec REPLACE_SPEC = PartitionSpec.builderFor(REPLACE_SCHEMA) + .bucket("id", 16) + .withSpecId(1) + .build(); + // Partition spec used to create tables - static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) - .bucket("data", 16) + static final SortOrder WRITE_ORDER = SortOrder.builderFor(SCHEMA) + .asc(Expressions.bucket("id", 16)) + .asc("id") + .build(); + + static final SortOrder TABLE_WRITE_ORDER = SortOrder.builderFor(TABLE_SCHEMA) + .asc(Expressions.bucket("id", 16)) + .asc("id") + .build(); + + static final SortOrder REPLACE_WRITE_ORDER = SortOrder.builderFor(REPLACE_SCHEMA) + .asc(Expressions.bucket("id", 16)) + .asc("id") .build(); static final DataFile FILE_A = DataFiles.builder(SPEC) .withPath("/path/to/data-a.parquet") - .withFileSizeInBytes(0) - .withPartitionPath("data_bucket=0") // easy way to set partition data for now + .withFileSizeInBytes(10) + .withPartitionPath("id_bucket=0") // easy way to set partition data for now + .withRecordCount(2) // needs at least one record or else metrics will filter it out + .build(); + + static final DataFile FILE_B = DataFiles.builder(SPEC) + .withPath("/path/to/data-b.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("id_bucket=1") // easy way to set partition data for now + .withRecordCount(2) // needs at least one record or else metrics will filter it out + .build(); + + static final DataFile FILE_C = DataFiles.builder(SPEC) + .withPath("/path/to/data-c.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("id_bucket=2") // easy way to set partition data for now .withRecordCount(2) // needs at least one record or else metrics will filter it out .build(); protected abstract C catalog(); - protected abstract boolean supportsNamespaceProperties(); + + protected boolean supportsNamespaceProperties() { + return true; + } + + protected boolean requiresNamespaceCreate() { + return false; + } + + protected boolean supportsServerSideRetry() { + return false; + } @Test public void testCreateNamespace() { @@ -319,6 +403,1346 @@ public void testNamespaceWithDot() { Assert.assertFalse("Namespace should not exist", catalog.namespaceExists(withDot)); } + @Test + public void testBasicCreateTable() { + C catalog = catalog(); + + TableIdentifier ident = TableIdentifier.of("ns", "table"); + + Assert.assertFalse("Table should not exist", catalog.tableExists(ident)); + + Table table = catalog.buildTable(ident, SCHEMA).create(); + Assert.assertTrue("Table should exist", catalog.tableExists(ident)); + + // validate table settings + Assert.assertEquals("Table name should report its full name", catalog.name() + "." + ident, table.name()); + Assert.assertEquals("Schema should match expected ID assignment", + TABLE_SCHEMA.asStruct(), table.schema().asStruct()); + Assert.assertNotNull("Should have a location", table.location()); + Assert.assertTrue("Should be unpartitioned", table.spec().isUnpartitioned()); + Assert.assertTrue("Should be unsorted", table.sortOrder().isUnsorted()); + Assert.assertNotNull("Should have table properties", table.properties()); + } + + @Test + public void testBasicCreateTableThatAlreadyExists() { + C catalog = catalog(); + + TableIdentifier ident = TableIdentifier.of("ns", "table"); + + Assert.assertFalse("Table should not exist", catalog.tableExists(ident)); + + catalog.buildTable(ident, SCHEMA).create(); + Assert.assertTrue("Table should exist", catalog.tableExists(ident)); + + AssertHelpers.assertThrows("Should fail to create a table that already exists", + AlreadyExistsException.class, "ns.table", + () -> catalog.buildTable(ident, OTHER_SCHEMA).create()); + + Table table = catalog.loadTable(ident); + Assert.assertEquals("Schema should match original table schema", + TABLE_SCHEMA.asStruct(), table.schema().asStruct()); + } + + @Test + public void testCompleteCreateTable() { + C catalog = catalog(); + + TableIdentifier ident = TableIdentifier.of("ns", "table"); + + Assert.assertFalse("Table should not exist", catalog.tableExists(ident)); + + Map properties = ImmutableMap.of("user", "someone", "created-at", "2022-02-25T00:38:19"); + Table table = catalog.buildTable(ident, SCHEMA) + .withLocation("file:/tmp/ns/table") + .withPartitionSpec(SPEC) + .withSortOrder(WRITE_ORDER) + .withProperties(properties) + .create(); + + // validate table settings + Assert.assertEquals("Table name should report its full name", catalog.name() + "." + ident, table.name()); + Assert.assertTrue("Table should exist", catalog.tableExists(ident)); + Assert.assertEquals("Schema should match expected ID assignment", + TABLE_SCHEMA.asStruct(), table.schema().asStruct()); + Assert.assertNotNull("Should have a location", table.location()); + Assert.assertEquals("Should use requested partition spec", TABLE_SPEC, table.spec()); + Assert.assertEquals("Should use requested write order", TABLE_WRITE_ORDER, table.sortOrder()); + Assert.assertEquals("Table properties should be a superset of the requested properties", + properties.entrySet(), + Sets.intersection(properties.entrySet(), table.properties().entrySet())); + } + + @Test + public void testLoadTable() { + C catalog = catalog(); + + TableIdentifier ident = TableIdentifier.of("ns", "table"); + + Assert.assertFalse("Table should not exist", catalog.tableExists(ident)); + + Map properties = ImmutableMap.of("user", "someone", "created-at", "2022-02-25T00:38:19"); + catalog.buildTable(ident, SCHEMA) + .withLocation("file:/tmp/ns/table") + .withPartitionSpec(SPEC) + .withSortOrder(WRITE_ORDER) + .withProperties(properties) + .create(); + Assert.assertTrue("Table should exist", catalog.tableExists(ident)); + + Table table = catalog.loadTable(ident); + // validate table settings + Assert.assertEquals("Table name should report its full name", catalog.name() + "." + ident, table.name()); + Assert.assertTrue("Table should exist", catalog.tableExists(ident)); + Assert.assertEquals("Schema should match expected ID assignment", + TABLE_SCHEMA.asStruct(), table.schema().asStruct()); + Assert.assertNotNull("Should have a location", table.location()); + Assert.assertEquals("Should use requested partition spec", TABLE_SPEC, table.spec()); + Assert.assertEquals("Should use requested write order", TABLE_WRITE_ORDER, table.sortOrder()); + Assert.assertEquals("Table properties should be a superset of the requested properties", + properties.entrySet(), + Sets.intersection(properties.entrySet(), table.properties().entrySet())); + } + + @Test + public void testLoadMissingTable() { + C catalog = catalog(); + + TableIdentifier ident = TableIdentifier.of("ns", "table"); + + Assert.assertFalse("Table should not exist", catalog.tableExists(ident)); + AssertHelpers.assertThrows("Should fail to load a nonexistent table", + NoSuchTableException.class, ident.toString(), + () -> catalog.loadTable(ident)); + } + + @Test + public void testDropTable() { + C catalog = catalog(); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + Assert.assertFalse("Table should not exist before create", catalog.tableExists(TABLE)); + + catalog.buildTable(TABLE, SCHEMA).create(); + Assert.assertTrue("Table should exist after create", catalog.tableExists(TABLE)); + + boolean dropped = catalog.dropTable(TABLE); + Assert.assertTrue("Should drop a table that does exist", dropped); + Assert.assertFalse("Table should not exist after drop", catalog.tableExists(TABLE)); + } + + @Test + public void testDropMissingTable() { + C catalog = catalog(); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + TableIdentifier noSuchTableIdent = TableIdentifier.of(NS, "notable"); + Assert.assertFalse("Table should not exist", catalog.tableExists(noSuchTableIdent)); + Assert.assertFalse("Should not drop a table that does not exist", catalog.dropTable(noSuchTableIdent)); + } + + @Test + public void testListTables() { + C catalog = catalog(); + + Namespace ns1 = Namespace.of("ns_1"); + Namespace ns2 = Namespace.of("ns_2"); + + TableIdentifier ns1Table1 = TableIdentifier.of(ns1, "table_1"); + TableIdentifier ns1Table2 = TableIdentifier.of(ns1, "table_2"); + TableIdentifier ns2Table1 = TableIdentifier.of(ns2, "table_1"); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(ns1); + catalog.createNamespace(ns2); + } + + assertEmpty("Should not have tables in a new namespace, ns_1", catalog, ns1); + assertEmpty("Should not have tables in a new namespace, ns_2", catalog, ns2); + + catalog.buildTable(ns1Table1, SCHEMA).create(); + + Assert.assertEquals("Should contain ns_1.table_1 after create", + ImmutableSet.of(ns1Table1), Sets.newHashSet(catalog.listTables(ns1))); + + catalog.buildTable(ns2Table1, SCHEMA).create(); + + Assert.assertEquals("Should contain ns_2.table_1 after create", + ImmutableSet.of(ns2Table1), Sets.newHashSet(catalog.listTables(ns2))); + Assert.assertEquals("Should not show changes to ns_2 in ns_1", + ImmutableSet.of(ns1Table1), Sets.newHashSet(catalog.listTables(ns1))); + + catalog.buildTable(ns1Table2, SCHEMA).create(); + + Assert.assertEquals("Should not show changes to ns_1 in ns_2", + ImmutableSet.of(ns2Table1), Sets.newHashSet(catalog.listTables(ns2))); + Assert.assertEquals("Should contain ns_1.table_2 after create", + ImmutableSet.of(ns1Table1, ns1Table2), Sets.newHashSet(catalog.listTables(ns1))); + + catalog.dropTable(ns1Table1); + + Assert.assertEquals("Should not show changes to ns_1 in ns_2", + ImmutableSet.of(ns2Table1), Sets.newHashSet(catalog.listTables(ns2))); + Assert.assertEquals("Should not contain ns_1.table_1 after drop", + ImmutableSet.of(ns1Table2), Sets.newHashSet(catalog.listTables(ns1))); + + catalog.dropTable(ns1Table2); + + Assert.assertEquals("Should not show changes to ns_1 in ns_2", + ImmutableSet.of(ns2Table1), Sets.newHashSet(catalog.listTables(ns2))); + assertEmpty("Should not contain ns_1.table_2 after drop", catalog, ns1); + + catalog.dropTable(ns2Table1); + assertEmpty("Should not contain ns_2.table_1 after drop", catalog, ns2); + } + + @Test + public void testUpdateTableSchema() { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).create(); + UpdateSchema update = table.updateSchema() + .addColumn("new_col", Types.LongType.get()); + + Schema expected = update.apply(); + + update.commit(); + + Table loaded = catalog.loadTable(TABLE); + + Assert.assertEquals("Loaded table should have expected schema", expected.asStruct(), loaded.schema().asStruct()); + } + + @Test + public void testUUIDValidation() { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).create(); + UpdateSchema update = table.updateSchema() + .addColumn("new_col", Types.LongType.get()); + + Assert.assertTrue("Should successfully drop table", catalog.dropTable(TABLE)); + catalog.buildTable(TABLE, OTHER_SCHEMA).create(); + + String expectedMessage = supportsServerSideRetry() ? "Requirement failed: UUID does not match" : "Cannot commit"; + AssertHelpers.assertThrows("Should reject changes to tables that have been dropped and recreated", + CommitFailedException.class, expectedMessage, update::commit); + + Table loaded = catalog.loadTable(TABLE); + Assert.assertEquals("Loaded table should have expected schema", + OTHER_SCHEMA.asStruct(), loaded.schema().asStruct()); + } + + @Test + public void testUpdateTableSchemaServerSideRetry() { + Assume.assumeTrue("Schema update recovery is only supported with server-side retry", supportsServerSideRetry()); + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).create(); + + UpdateSchema update = table.updateSchema() + .addColumn("new_col", Types.LongType.get()); + Schema expected = update.apply(); + + // update the spec concurrently so that the first update fails, but can succeed on retry + catalog.loadTable(TABLE).updateSpec() + .addField("shard", Expressions.bucket("id", 16)) + .commit(); + + // commit the original update + update.commit(); + + Table loaded = catalog.loadTable(TABLE); + Assert.assertEquals("Loaded table should have expected schema", expected.asStruct(), loaded.schema().asStruct()); + } + + @Test + public void testUpdateTableSchemaConflict() { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).create(); + + UpdateSchema update = table.updateSchema() + .addColumn("new_col", Types.LongType.get()); + + // update the schema concurrently so that the original update fails + UpdateSchema concurrent = catalog.loadTable(TABLE).updateSchema() + .deleteColumn("data"); + Schema expected = concurrent.apply(); + concurrent.commit(); + + // attempt to commit the original update + String expectedMessage = supportsServerSideRetry() ? + "Requirement failed: current schema changed" : "Cannot commit"; + AssertHelpers.assertThrows("Second schema update commit should fail because of a conflict", + CommitFailedException.class, expectedMessage, update::commit); + + Table loaded = catalog.loadTable(TABLE); + Assert.assertEquals("Loaded table should have expected schema", expected.asStruct(), loaded.schema().asStruct()); + } + + @Test + public void testUpdateTableSchemaAssignmentConflict() { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).create(); + + UpdateSchema update = table.updateSchema() + .addColumn("new_col", Types.LongType.get()); + + // update the schema concurrently so that the original update fails + UpdateSchema concurrent = catalog.loadTable(TABLE).updateSchema() + .addColumn("another_col", Types.StringType.get()); + Schema expected = concurrent.apply(); + concurrent.commit(); + + // attempt to commit the original update + String expectedMessage = supportsServerSideRetry() ? + "Requirement failed: last assigned field id changed" : "Cannot commit"; + AssertHelpers.assertThrows("Second schema update commit should fail because of a conflict", + CommitFailedException.class, expectedMessage, update::commit); + + Table loaded = catalog.loadTable(TABLE); + Assert.assertEquals("Loaded table should have expected schema", expected.asStruct(), loaded.schema().asStruct()); + } + + @Test + public void testUpdateTableSpec() { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).create(); + UpdatePartitionSpec update = table.updateSpec() + .addField("shard", Expressions.bucket("id", 16)); + + PartitionSpec expected = update.apply(); + + update.commit(); + + Table loaded = catalog.loadTable(TABLE); + + // the spec ID may not match, so check equality of the fields + Assert.assertEquals("Loaded table should have expected spec", expected.fields(), loaded.spec().fields()); + } + + @Test + public void testUpdateTableSpecServerSideRetry() { + Assume.assumeTrue("Spec update recovery is only supported with server-side retry", supportsServerSideRetry()); + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).create(); + + UpdatePartitionSpec update = table.updateSpec() + .addField("shard", Expressions.bucket("id", 16)); + PartitionSpec expected = update.apply(); + + // update the schema concurrently so that the first update fails, but can succeed on retry + catalog.loadTable(TABLE).updateSchema() + .addColumn("another_col", Types.StringType.get()) + .commit(); + + // commit the original update + update.commit(); + + Table loaded = catalog.loadTable(TABLE); + + // the spec ID may not match, so check equality of the fields + Assert.assertEquals("Loaded table should have expected spec", expected.fields(), loaded.spec().fields()); + } + + @Test + public void testUpdateTableSpecConflict() { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).withPartitionSpec(SPEC).create(); + + UpdatePartitionSpec update = table.updateSpec() + .addField("shard", Expressions.bucket("data", 16)); + + // update the spec concurrently so that the original update fails + UpdatePartitionSpec concurrent = catalog.loadTable(TABLE).updateSpec() + .removeField(Expressions.bucket("id", 16)); + PartitionSpec expected = concurrent.apply(); + concurrent.commit(); + + // attempt to commit the original update + String expectedMessage = supportsServerSideRetry() ? + "Requirement failed: default partition spec changed" : "Cannot commit"; + AssertHelpers.assertThrows("Second partition spec update commit should fail because of a conflict", + CommitFailedException.class, expectedMessage, update::commit); + + Table loaded = catalog.loadTable(TABLE); + + // the spec ID may not match, so check equality of the fields + Assert.assertEquals("Loaded table should have expected spec", expected.fields(), loaded.spec().fields()); + } + + @Test + public void testUpdateTableAssignmentSpecConflict() { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).create(); + + UpdatePartitionSpec update = table.updateSpec() + .addField("shard", Expressions.bucket("id", 16)); + + // update the spec concurrently so that the original update fails + UpdatePartitionSpec concurrent = catalog.loadTable(TABLE).updateSpec() + .addField("shard", Expressions.truncate("id", 100)); + PartitionSpec expected = concurrent.apply(); + concurrent.commit(); + + // attempt to commit the original update + String expectedMessage = supportsServerSideRetry() ? + "Requirement failed: last assigned partition id changed" : "Cannot commit"; + AssertHelpers.assertThrows("Second partition spec update commit should fail because of a conflict", + CommitFailedException.class, expectedMessage, update::commit); + + Table loaded = catalog.loadTable(TABLE); + + // the spec ID may not match, so check equality of the fields + Assert.assertEquals("Loaded table should have expected spec", expected.fields(), loaded.spec().fields()); + } + + @Test + public void testUpdateTableSortOrder() { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).create(); + ReplaceSortOrder update = table.replaceSortOrder() + .asc(Expressions.bucket("id", 16)) + .asc("id"); + + SortOrder expected = update.apply(); + + update.commit(); + + Table loaded = catalog.loadTable(TABLE); + + // the sort order ID may not match, so check equality of the fields + Assert.assertEquals("Loaded table should have expected order", expected.fields(), loaded.sortOrder().fields()); + } + + @Test + public void testUpdateTableSortOrderServerSideRetry() { + Assume.assumeTrue("Sort order update recovery is only supported with server-side retry", supportsServerSideRetry()); + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).create(); + + ReplaceSortOrder update = table.replaceSortOrder() + .asc(Expressions.bucket("id", 16)) + .asc("id"); + SortOrder expected = update.apply(); + + // update the schema concurrently so that the first update fails, but can succeed on retry + catalog.loadTable(TABLE).updateSchema() + .addColumn("another_col", Types.StringType.get()) + .commit(); + + // commit the original update + update.commit(); + + Table loaded = catalog.loadTable(TABLE); + + // the sort order ID may not match, so check equality of the fields + Assert.assertEquals("Loaded table should have expected order", expected.fields(), loaded.sortOrder().fields()); + } + + @Test + public void testAppend() throws IOException { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA) + .withPartitionSpec(SPEC) + .create(); + + try (CloseableIterable tasks = table.newScan().planFiles()) { + Assert.assertFalse("Should contain no files", tasks.iterator().hasNext()); + } + + table.newFastAppend().appendFile(FILE_A).commit(); + + assertFiles(table, FILE_A); + } + + @Test + public void testConcurrentAppendEmptyTable() { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA) + .withPartitionSpec(SPEC) + .create(); + + assertNoFiles(table); + + // create an uncommitted append + AppendFiles append = table.newFastAppend().appendFile(FILE_A); + append.apply(); // apply changes to eagerly write metadata + + catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_B).commit(); + assertFiles(catalog.loadTable(TABLE), FILE_B); + + // the uncommitted append should retry and succeed + append.commit(); + assertFiles(catalog.loadTable(TABLE), FILE_A, FILE_B); + } + + @Test + public void testConcurrentAppendNonEmptyTable() { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA) + .withPartitionSpec(SPEC) + .create(); + + assertNoFiles(table); + + // TODO: skip the initial refresh in FastAppend so that commits actually fail + + // create an initial snapshot + catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_C).commit(); + + // create an uncommitted append + AppendFiles append = table.newFastAppend().appendFile(FILE_A); + append.apply(); // apply changes to eagerly write metadata + + catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_B).commit(); + assertFiles(catalog.loadTable(TABLE), FILE_B, FILE_C); + + // the uncommitted append should retry and succeed + append.commit(); + assertFiles(catalog.loadTable(TABLE), FILE_A, FILE_B, FILE_C); + } + + @Test + public void testUpdateTransaction() { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).create(); + + Transaction transaction = table.newTransaction(); + + UpdateSchema updateSchema = transaction.updateSchema() + .addColumn("new_col", Types.LongType.get()); + Schema expectedSchema = updateSchema.apply(); + updateSchema.commit(); + + UpdatePartitionSpec updateSpec = transaction.updateSpec() + .addField("shard", Expressions.bucket("id", 16)); + PartitionSpec expectedSpec = updateSpec.apply(); + updateSpec.commit(); + + transaction.commitTransaction(); + + Table loaded = catalog.loadTable(TABLE); + + Assert.assertEquals("Loaded table should have expected schema", + expectedSchema.asStruct(), loaded.schema().asStruct()); + Assert.assertEquals("Loaded table should have expected spec", + expectedSpec.fields(), loaded.spec().fields()); + + assertPreviousMetadataFileCount(loaded, 1); + } + + @Test + public void testCreateTransaction() { + C catalog = catalog(); + + Transaction create = catalog.buildTable(TABLE, SCHEMA).createTransaction(); + + Assert.assertFalse("Table should not exist after createTransaction", catalog.tableExists(TABLE)); + + create.newFastAppend() + .appendFile(FILE_A) + .commit(); + + Assert.assertFalse("Table should not exist after append commit", catalog.tableExists(TABLE)); + + create.commitTransaction(); + + Assert.assertTrue("Table should exist after append commit", catalog.tableExists(TABLE)); + Table table = catalog.loadTable(TABLE); + assertFiles(table, FILE_A); + assertPreviousMetadataFileCount(table, 0); + } + + @Test + public void testCompleteCreateTransaction() { + C catalog = catalog(); + + Map properties = ImmutableMap.of("user", "someone", "created-at", "2022-02-25T00:38:19"); + Transaction create = catalog.buildTable(TABLE, SCHEMA) + .withLocation("file:/tmp/ns/table") + .withPartitionSpec(SPEC) + .withSortOrder(WRITE_ORDER) + .withProperties(properties) + .createTransaction(); + + Assert.assertFalse("Table should not exist after createTransaction", catalog.tableExists(TABLE)); + + create.newFastAppend() + .appendFile(FILE_A) + .commit(); + + Assert.assertFalse("Table should not exist after append commit", catalog.tableExists(TABLE)); + + create.commitTransaction(); + + Assert.assertTrue("Table should exist after append commit", catalog.tableExists(TABLE)); + Table table = catalog.loadTable(TABLE); + + Assert.assertEquals("Table schema should match the new schema", + TABLE_SCHEMA.asStruct(), table.schema().asStruct()); + Assert.assertEquals("Table should have create partition spec", TABLE_SPEC.fields(), table.spec().fields()); + Assert.assertEquals("Table should have create sort order", TABLE_WRITE_ORDER, table.sortOrder()); + Assert.assertEquals("Table properties should be a superset of the requested properties", + properties.entrySet(), + Sets.intersection(properties.entrySet(), table.properties().entrySet())); + Assert.assertEquals("Table location should match requested", "file:/tmp/ns/table", table.location()); + assertFiles(table, FILE_A); + assertPreviousMetadataFileCount(table, 0); + } + + @Test + public void testConcurrentCreateTransaction() { + C catalog = catalog(); + + Transaction create = catalog.buildTable(TABLE, SCHEMA).createTransaction(); + + Assert.assertFalse("Table should not exist after createTransaction", catalog.tableExists(TABLE)); + + create.newFastAppend() + .appendFile(FILE_A) + .commit(); + + Assert.assertFalse("Table should not exist after append commit", catalog.tableExists(TABLE)); + + catalog.buildTable(TABLE, OTHER_SCHEMA).create(); + + Assertions.setMaxStackTraceElementsDisplayed(Integer.MAX_VALUE); + AssertHelpers.assertThrows("Should fail because table was created concurrently", + AlreadyExistsException.class, "Table already exists", create::commitTransaction); + + // validate the concurrently created table is unmodified + Table table = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match concurrent create", + OTHER_SCHEMA.asStruct(), table.schema().asStruct()); + assertNoFiles(table); + } + + @Test + public void testCreateOrReplaceTransactionCreate() { + C catalog = catalog(); + + Transaction create = catalog.buildTable(TABLE, SCHEMA).createOrReplaceTransaction(); + + Assert.assertFalse("Table should not exist after createTransaction", catalog.tableExists(TABLE)); + + create.newFastAppend() + .appendFile(FILE_A) + .commit(); + + Assert.assertFalse("Table should not exist after append commit", catalog.tableExists(TABLE)); + + create.commitTransaction(); + + Assert.assertTrue("Table should exist after append commit", catalog.tableExists(TABLE)); + Table table = catalog.loadTable(TABLE); + assertFiles(table, FILE_A); + assertPreviousMetadataFileCount(table, 0); + } + + @Test + public void testCompleteCreateOrReplaceTransactionCreate() { + C catalog = catalog(); + + Map properties = ImmutableMap.of("user", "someone", "created-at", "2022-02-25T00:38:19"); + Transaction createOrReplace = catalog.buildTable(TABLE, SCHEMA) + .withLocation("file:/tmp/ns/table") + .withPartitionSpec(SPEC) + .withSortOrder(WRITE_ORDER) + .withProperties(properties) + .createOrReplaceTransaction(); + + Assert.assertFalse("Table should not exist after createTransaction", catalog.tableExists(TABLE)); + + createOrReplace.newFastAppend() + .appendFile(FILE_A) + .commit(); + + Assert.assertFalse("Table should not exist after append commit", catalog.tableExists(TABLE)); + + createOrReplace.commitTransaction(); + + Assert.assertTrue("Table should exist after append commit", catalog.tableExists(TABLE)); + Table table = catalog.loadTable(TABLE); + + Assert.assertEquals("Table schema should match the new schema", + TABLE_SCHEMA.asStruct(), table.schema().asStruct()); + Assert.assertEquals("Table should have create partition spec", TABLE_SPEC.fields(), table.spec().fields()); + Assert.assertEquals("Table should have create sort order", TABLE_WRITE_ORDER, table.sortOrder()); + Assert.assertEquals("Table properties should be a superset of the requested properties", + properties.entrySet(), + Sets.intersection(properties.entrySet(), table.properties().entrySet())); + Assert.assertEquals("Table location should match requested", "file:/tmp/ns/table", table.location()); + assertFiles(table, FILE_A); + assertPreviousMetadataFileCount(table, 0); + } + + @Test + public void testCreateOrReplaceReplaceTransactionReplace() { + C catalog = catalog(); + + Table original = catalog.buildTable(TABLE, OTHER_SCHEMA).create(); + + Assert.assertTrue("Table should exist before replaceTransaction", catalog.tableExists(TABLE)); + + Transaction createOrReplace = catalog.buildTable(TABLE, SCHEMA).createOrReplaceTransaction(); + + Assert.assertTrue("Table should still exist after replaceTransaction", catalog.tableExists(TABLE)); + + createOrReplace.newFastAppend() + .appendFile(FILE_A) + .commit(); + + // validate table has not changed + Table table = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match concurrent create", + OTHER_SCHEMA.asStruct(), table.schema().asStruct()); + assertUUIDsMatch(original, table); + assertNoFiles(table); + + createOrReplace.commitTransaction(); + + // validate the table after replace + Assert.assertTrue("Table should exist after append commit", catalog.tableExists(TABLE)); + table.refresh(); // refresh should work with UUID validation + + Table loaded = catalog.loadTable(TABLE); + + Assert.assertEquals("Table schema should match the new schema", + REPLACE_SCHEMA.asStruct(), loaded.schema().asStruct()); + assertUUIDsMatch(original, loaded); + assertFiles(loaded, FILE_A); + assertPreviousMetadataFileCount(loaded, 1); + } + + @Test + public void testCompleteCreateOrReplaceTransactionReplace() { + C catalog = catalog(); + + Table original = catalog.buildTable(TABLE, OTHER_SCHEMA).create(); + + Assert.assertTrue("Table should exist before replaceTransaction", catalog.tableExists(TABLE)); + + Map properties = ImmutableMap.of("user", "someone", "created-at", "2022-02-25T00:38:19"); + Transaction createOrReplace = catalog.buildTable(TABLE, SCHEMA) + .withLocation("file:/tmp/ns/table") + .withPartitionSpec(SPEC) + .withSortOrder(WRITE_ORDER) + .withProperties(properties) + .createOrReplaceTransaction(); + + Assert.assertTrue("Table should still exist after replaceTransaction", catalog.tableExists(TABLE)); + + createOrReplace.newFastAppend() + .appendFile(FILE_A) + .commit(); + + // validate table has not changed + Table table = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match concurrent create", + OTHER_SCHEMA.asStruct(), table.schema().asStruct()); + Assert.assertTrue("Table should be unpartitioned", table.spec().isUnpartitioned()); + Assert.assertTrue("Table should be unsorted", table.sortOrder().isUnsorted()); + Assert.assertNotEquals("Created at should not match", + table.properties().get("created-at"), + "2022-02-25T00:38:19"); + assertUUIDsMatch(original, table); + assertNoFiles(table); + + createOrReplace.commitTransaction(); + + // validate the table after replace + Assert.assertTrue("Table should exist after append commit", catalog.tableExists(TABLE)); + table.refresh(); // refresh should work with UUID validation + + Table loaded = catalog.loadTable(TABLE); + + Assert.assertEquals("Table schema should match the new schema", + REPLACE_SCHEMA.asStruct(), loaded.schema().asStruct()); + Assert.assertEquals("Table should have replace partition spec", REPLACE_SPEC, loaded.spec()); + Assert.assertEquals("Table should have replace sort order", REPLACE_WRITE_ORDER, loaded.sortOrder()); + Assert.assertEquals("Table properties should be a superset of the requested properties", + properties.entrySet(), + Sets.intersection(properties.entrySet(), loaded.properties().entrySet())); + Assert.assertEquals("Table location should be replaced", "file:/tmp/ns/table", table.location()); + assertUUIDsMatch(original, loaded); + assertFiles(loaded, FILE_A); + assertPreviousMetadataFileCount(loaded, 1); + } + + @Test + public void testCreateOrReplaceTransactionConcurrentCreate() { + Assume.assumeTrue("Conversion to replace transaction is not supported by REST catalog", supportsServerSideRetry()); + + C catalog = catalog(); + + Transaction createOrReplace = catalog.buildTable(TABLE, SCHEMA).createOrReplaceTransaction(); + + Assert.assertFalse("Table should not exist after createTransaction", catalog.tableExists(TABLE)); + + createOrReplace.newFastAppend() + .appendFile(FILE_A) + .commit(); + + Assert.assertFalse("Table should not exist after append commit", catalog.tableExists(TABLE)); + + catalog.buildTable(TABLE, OTHER_SCHEMA).create(); + + AssertHelpers.assertThrows("Should fail because table was created concurrently", + CommitFailedException.class, "Table already exists", createOrReplace::commitTransaction); + + // validate the concurrently created table is unmodified + Table table = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match concurrent create", + OTHER_SCHEMA.asStruct(), table.schema().asStruct()); + assertNoFiles(table); + } + + @Test + public void testReplaceTransaction() { + C catalog = catalog(); + + Table original = catalog.buildTable(TABLE, OTHER_SCHEMA).create(); + + Assert.assertTrue("Table should exist before replaceTransaction", catalog.tableExists(TABLE)); + + Transaction replace = catalog.buildTable(TABLE, SCHEMA).replaceTransaction(); + + Assert.assertTrue("Table should still exist after replaceTransaction", catalog.tableExists(TABLE)); + + replace.newFastAppend() + .appendFile(FILE_A) + .commit(); + + // validate table has not changed + Table table = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match concurrent create", + OTHER_SCHEMA.asStruct(), table.schema().asStruct()); + assertUUIDsMatch(original, table); + assertNoFiles(table); + + replace.commitTransaction(); + + // validate the table after replace + Assert.assertTrue("Table should exist after append commit", catalog.tableExists(TABLE)); + table.refresh(); // refresh should work with UUID validation + + Table loaded = catalog.loadTable(TABLE); + + Assert.assertEquals("Table schema should match the new schema", + REPLACE_SCHEMA.asStruct(), loaded.schema().asStruct()); + assertUUIDsMatch(original, loaded); + assertFiles(loaded, FILE_A); + assertPreviousMetadataFileCount(loaded, 1); + } + + @Test + public void testCompleteReplaceTransaction() { + C catalog = catalog(); + + Table original = catalog.buildTable(TABLE, OTHER_SCHEMA).create(); + + Assert.assertTrue("Table should exist before replaceTransaction", catalog.tableExists(TABLE)); + + Map properties = ImmutableMap.of("user", "someone", "created-at", "2022-02-25T00:38:19"); + Transaction replace = catalog.buildTable(TABLE, SCHEMA) + .withLocation("file:/tmp/ns/table") + .withPartitionSpec(SPEC) + .withSortOrder(WRITE_ORDER) + .withProperties(properties) + .replaceTransaction(); + + Assert.assertTrue("Table should still exist after replaceTransaction", catalog.tableExists(TABLE)); + + replace.newFastAppend() + .appendFile(FILE_A) + .commit(); + + // validate table has not changed + Table table = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match concurrent create", + OTHER_SCHEMA.asStruct(), table.schema().asStruct()); + Assert.assertTrue("Table should be unpartitioned", table.spec().isUnpartitioned()); + Assert.assertTrue("Table should be unsorted", table.sortOrder().isUnsorted()); + Assert.assertNotEquals("Created at should not match", + table.properties().get("created-at"), + "2022-02-25T00:38:19"); + assertUUIDsMatch(original, table); + assertNoFiles(table); + + replace.commitTransaction(); + + // validate the table after replace + Assert.assertTrue("Table should exist after append commit", catalog.tableExists(TABLE)); + table.refresh(); // refresh should work with UUID validation + + Table loaded = catalog.loadTable(TABLE); + + Assert.assertEquals("Table schema should match the new schema", + REPLACE_SCHEMA.asStruct(), loaded.schema().asStruct()); + Assert.assertEquals("Table should have replace partition spec", REPLACE_SPEC, loaded.spec()); + Assert.assertEquals("Table should have replace sort order", REPLACE_WRITE_ORDER, loaded.sortOrder()); + Assert.assertEquals("Table properties should be a superset of the requested properties", + properties.entrySet(), + Sets.intersection(properties.entrySet(), loaded.properties().entrySet())); + Assert.assertEquals("Table location should be replaced", "file:/tmp/ns/table", table.location()); + assertUUIDsMatch(original, loaded); + assertFiles(loaded, FILE_A); + assertPreviousMetadataFileCount(loaded, 1); + } + + @Test + public void testReplaceTransactionRequiresTableExists() { + C catalog = catalog(); + + AssertHelpers.assertThrows("Should fail to create replace transaction with a missing table", + NoSuchTableException.class, "Table does not exist", + () -> catalog.buildTable(TABLE, SCHEMA).replaceTransaction()); + } + + @Test + public void testConcurrentReplaceTransactions() { + C catalog = catalog(); + + Transaction transaction = catalog.buildTable(TABLE, SCHEMA).createTransaction(); + transaction.newFastAppend() + .appendFile(FILE_A) + .commit(); + transaction.commitTransaction(); + + Table original = catalog.loadTable(TABLE); + assertFiles(original, FILE_A); + + Transaction secondReplace = catalog.buildTable(TABLE, SCHEMA) + .replaceTransaction(); + secondReplace.newFastAppend() + .appendFile(FILE_C) + .commit(); + + Transaction firstReplace = catalog.buildTable(TABLE, SCHEMA) + .replaceTransaction(); + firstReplace.newFastAppend() + .appendFile(FILE_B) + .commit(); + firstReplace.commitTransaction(); + + Table afterFirstReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match the original schema", + original.schema().asStruct(), afterFirstReplace.schema().asStruct()); + Assert.assertTrue("Table should be unpartitioned", + afterFirstReplace.spec().isUnpartitioned()); + Assert.assertTrue("Table should be unsorted", + afterFirstReplace.sortOrder().isUnsorted()); + assertUUIDsMatch(original, afterFirstReplace); + assertFiles(afterFirstReplace, FILE_B); + + secondReplace.commitTransaction(); + + Table afterSecondReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match the original schema", + original.schema().asStruct(), afterSecondReplace.schema().asStruct()); + Assert.assertTrue("Table should be unpartitioned", + afterSecondReplace.spec().isUnpartitioned()); + Assert.assertTrue("Table should be unsorted", + afterSecondReplace.sortOrder().isUnsorted()); + assertUUIDsMatch(original, afterSecondReplace); + assertFiles(afterSecondReplace, FILE_C); + } + + @Test + public void testConcurrentReplaceTransactionSchema() { + C catalog = catalog(); + + Transaction transaction = catalog.buildTable(TABLE, OTHER_SCHEMA).createTransaction(); + transaction.newFastAppend() + .appendFile(FILE_A) + .commit(); + transaction.commitTransaction(); + + Table original = catalog.loadTable(TABLE); + assertFiles(original, FILE_A); + + Transaction secondReplace = catalog.buildTable(TABLE, OTHER_SCHEMA) + .replaceTransaction(); + secondReplace.newFastAppend() + .appendFile(FILE_C) + .commit(); + + Transaction firstReplace = catalog.buildTable(TABLE, SCHEMA) + .replaceTransaction(); + firstReplace.newFastAppend() + .appendFile(FILE_B) + .commit(); + firstReplace.commitTransaction(); + + Table afterFirstReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match the new schema", + REPLACE_SCHEMA.asStruct(), afterFirstReplace.schema().asStruct()); + assertUUIDsMatch(original, afterFirstReplace); + assertFiles(afterFirstReplace, FILE_B); + + secondReplace.commitTransaction(); + + Table afterSecondReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match the original schema", + original.schema().asStruct(), afterSecondReplace.schema().asStruct()); + assertUUIDsMatch(original, afterSecondReplace); + assertFiles(afterSecondReplace, FILE_C); + } + + @Test + public void testConcurrentReplaceTransactionSchema2() { + C catalog = catalog(); + + Transaction transaction = catalog.buildTable(TABLE, OTHER_SCHEMA).createTransaction(); + transaction.newFastAppend() + .appendFile(FILE_A) + .commit(); + transaction.commitTransaction(); + + Table original = catalog.loadTable(TABLE); + assertFiles(original, FILE_A); + + Transaction secondReplace = catalog.buildTable(TABLE, SCHEMA) + .replaceTransaction(); + secondReplace.newFastAppend() + .appendFile(FILE_C) + .commit(); + + Transaction firstReplace = catalog.buildTable(TABLE, OTHER_SCHEMA) + .replaceTransaction(); + firstReplace.newFastAppend() + .appendFile(FILE_B) + .commit(); + firstReplace.commitTransaction(); + + Table afterFirstReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match the original schema", + original.schema().asStruct(), afterFirstReplace.schema().asStruct()); + assertUUIDsMatch(original, afterFirstReplace); + assertFiles(afterFirstReplace, FILE_B); + + secondReplace.commitTransaction(); + + Table afterSecondReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match the new schema", + REPLACE_SCHEMA.asStruct(), afterSecondReplace.schema().asStruct()); + assertUUIDsMatch(original, afterSecondReplace); + assertFiles(afterSecondReplace, FILE_C); + } + + @Test + public void testConcurrentReplaceTransactionSchemaConflict() { + Assume.assumeTrue("Schema conflicts are detected server-side", supportsServerSideRetry()); + + C catalog = catalog(); + + Transaction transaction = catalog.buildTable(TABLE, OTHER_SCHEMA).createTransaction(); + transaction.newFastAppend() + .appendFile(FILE_A) + .commit(); + transaction.commitTransaction(); + + Table original = catalog.loadTable(TABLE); + assertFiles(original, FILE_A); + + Transaction secondReplace = catalog.buildTable(TABLE, SCHEMA) + .replaceTransaction(); + secondReplace.newFastAppend() + .appendFile(FILE_C) + .commit(); + + Transaction firstReplace = catalog.buildTable(TABLE, SCHEMA) + .replaceTransaction(); + firstReplace.newFastAppend() + .appendFile(FILE_B) + .commit(); + firstReplace.commitTransaction(); + + Table afterFirstReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match the original schema", + REPLACE_SCHEMA.asStruct(), afterFirstReplace.schema().asStruct()); + assertUUIDsMatch(original, afterFirstReplace); + assertFiles(afterFirstReplace, FILE_B); + + // even though the new schema is identical, the assertion that the last assigned id has not changed will fail + AssertHelpers.assertThrows("Should reject concurrent schema update", + CommitFailedException.class, "last assigned field id changed", secondReplace::commitTransaction); + } + + @Test + public void testConcurrentReplaceTransactionPartitionSpec() { + C catalog = catalog(); + + Transaction transaction = catalog.buildTable(TABLE, SCHEMA).createTransaction(); + transaction.newFastAppend() + .appendFile(FILE_A) + .commit(); + transaction.commitTransaction(); + + Table original = catalog.loadTable(TABLE); + assertFiles(original, FILE_A); + + Transaction secondReplace = catalog.buildTable(TABLE, SCHEMA) + .replaceTransaction(); + secondReplace.newFastAppend() + .appendFile(FILE_C) + .commit(); + + Transaction firstReplace = catalog.buildTable(TABLE, SCHEMA) + .withPartitionSpec(SPEC) + .replaceTransaction(); + firstReplace.newFastAppend() + .appendFile(FILE_B) + .commit(); + firstReplace.commitTransaction(); + + Table afterFirstReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table spec should match the new spec", + TABLE_SPEC.fields(), afterFirstReplace.spec().fields()); + assertUUIDsMatch(original, afterFirstReplace); + assertFiles(afterFirstReplace, FILE_B); + + secondReplace.commitTransaction(); + + Table afterSecondReplace = catalog.loadTable(TABLE); + Assert.assertTrue("Table should be unpartitioned", + afterSecondReplace.spec().isUnpartitioned()); + assertUUIDsMatch(original, afterSecondReplace); + assertFiles(afterSecondReplace, FILE_C); + } + + @Test + public void testConcurrentReplaceTransactionPartitionSpec2() { + C catalog = catalog(); + + Transaction transaction = catalog.buildTable(TABLE, SCHEMA).createTransaction(); + transaction.newFastAppend() + .appendFile(FILE_A) + .commit(); + transaction.commitTransaction(); + + Table original = catalog.loadTable(TABLE); + assertFiles(original, FILE_A); + + Transaction secondReplace = catalog.buildTable(TABLE, SCHEMA) + .withPartitionSpec(SPEC) + .replaceTransaction(); + secondReplace.newFastAppend() + .appendFile(FILE_C) + .commit(); + + Transaction firstReplace = catalog.buildTable(TABLE, SCHEMA) + .replaceTransaction(); + firstReplace.newFastAppend() + .appendFile(FILE_B) + .commit(); + firstReplace.commitTransaction(); + + Table afterFirstReplace = catalog.loadTable(TABLE); + Assert.assertTrue("Table should be unpartitioned", + afterFirstReplace.spec().isUnpartitioned()); + assertUUIDsMatch(original, afterFirstReplace); + assertFiles(afterFirstReplace, FILE_B); + + secondReplace.commitTransaction(); + + Table afterSecondReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table spec should match the new spec", + TABLE_SPEC.fields(), afterSecondReplace.spec().fields()); + assertUUIDsMatch(original, afterSecondReplace); + assertFiles(afterSecondReplace, FILE_C); + } + + @Test + public void testConcurrentReplaceTransactionPartitionSpecConflict() { + Assume.assumeTrue("Spec conflicts are detected server-side", supportsServerSideRetry()); + C catalog = catalog(); + + Transaction transaction = catalog.buildTable(TABLE, SCHEMA).createTransaction(); + transaction.newFastAppend() + .appendFile(FILE_A) + .commit(); + transaction.commitTransaction(); + + Table original = catalog.loadTable(TABLE); + assertFiles(original, FILE_A); + + Transaction secondReplace = catalog.buildTable(TABLE, SCHEMA) + .withPartitionSpec(SPEC) + .replaceTransaction(); + secondReplace.newFastAppend() + .appendFile(FILE_C) + .commit(); + + Transaction firstReplace = catalog.buildTable(TABLE, SCHEMA) + .withPartitionSpec(SPEC) + .replaceTransaction(); + firstReplace.newFastAppend() + .appendFile(FILE_B) + .commit(); + firstReplace.commitTransaction(); + + Table afterFirstReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table spec should match the new spec", + TABLE_SPEC.fields(), afterFirstReplace.spec().fields()); + assertUUIDsMatch(original, afterFirstReplace); + assertFiles(afterFirstReplace, FILE_B); + + // even though the new spec is identical, the assertion that the last assigned id has not changed will fail + AssertHelpers.assertThrows("Should reject concurrent spec update", + CommitFailedException.class, "last assigned partition id changed", secondReplace::commitTransaction); + } + + @Test + public void testConcurrentReplaceTransactionSortOrder() { + C catalog = catalog(); + + Transaction transaction = catalog.buildTable(TABLE, SCHEMA).createTransaction(); + transaction.newFastAppend() + .appendFile(FILE_A) + .commit(); + transaction.commitTransaction(); + + Table original = catalog.loadTable(TABLE); + assertFiles(original, FILE_A); + + Transaction secondReplace = catalog.buildTable(TABLE, SCHEMA) + .replaceTransaction(); + secondReplace.newFastAppend() + .appendFile(FILE_C) + .commit(); + + Transaction firstReplace = catalog.buildTable(TABLE, SCHEMA) + .withSortOrder(WRITE_ORDER) + .replaceTransaction(); + firstReplace.newFastAppend() + .appendFile(FILE_B) + .commit(); + firstReplace.commitTransaction(); + + Table afterFirstReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table order should match the new order", + TABLE_WRITE_ORDER, afterFirstReplace.sortOrder()); + assertUUIDsMatch(original, afterFirstReplace); + assertFiles(afterFirstReplace, FILE_B); + + secondReplace.commitTransaction(); + + Table afterSecondReplace = catalog.loadTable(TABLE); + Assert.assertTrue("Table should be unsorted", + afterSecondReplace.sortOrder().isUnsorted()); + assertUUIDsMatch(original, afterSecondReplace); + assertFiles(afterSecondReplace, FILE_C); + } + + @Test + public void testConcurrentReplaceTransactionSortOrderConflict() { + C catalog = catalog(); + + Transaction transaction = catalog.buildTable(TABLE, SCHEMA).createTransaction(); + transaction.newFastAppend() + .appendFile(FILE_A) + .commit(); + transaction.commitTransaction(); + + Table original = catalog.loadTable(TABLE); + assertFiles(original, FILE_A); + + Transaction secondReplace = catalog.buildTable(TABLE, SCHEMA) + .withSortOrder(WRITE_ORDER) + .replaceTransaction(); + secondReplace.newFastAppend() + .appendFile(FILE_C) + .commit(); + + Transaction firstReplace = catalog.buildTable(TABLE, SCHEMA) + .withSortOrder(SortOrder.builderFor(SCHEMA) + .desc(Expressions.bucket("id", 16)) + .desc("id") + .build()) + .replaceTransaction(); + firstReplace.newFastAppend() + .appendFile(FILE_B) + .commit(); + firstReplace.commitTransaction(); + + Table afterFirstReplace = catalog.loadTable(TABLE); + Assert.assertTrue("Table order should be set", + afterFirstReplace.sortOrder().isSorted()); + assertUUIDsMatch(original, afterFirstReplace); + assertFiles(afterFirstReplace, FILE_B); + + secondReplace.commitTransaction(); + + Table afterSecondReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table order should match the new order", + TABLE_WRITE_ORDER.fields(), afterSecondReplace.sortOrder().fields()); + assertUUIDsMatch(original, afterSecondReplace); + assertFiles(afterSecondReplace, FILE_C); + } + + private static void assertEmpty(String context, Catalog catalog, Namespace ns) { + try { + Assert.assertEquals(context, 0, catalog.listTables(ns).size()); + } catch (NoSuchNamespaceException e) { + // it is okay if the catalog throws NoSuchNamespaceException when it is empty + } + } + + public void assertUUIDsMatch(Table expected, Table actual) { + Assert.assertEquals("Table UUID should not change", + ((BaseTable) expected).operations().current().uuid(), + ((BaseTable) actual).operations().current().uuid()); + } + + public void assertPreviousMetadataFileCount(Table table, int metadataFileCount) { + TableOperations ops = ((BaseTable) table).operations(); + Assert.assertEquals("Table should have correct number of previous metadata locations", + metadataFileCount, ops.current().previousFiles().size()); + } + + public void assertNoFiles(Table table) { + try (CloseableIterable tasks = table.newScan().planFiles()) { + Assert.assertFalse("Should contain no files", tasks.iterator().hasNext()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public void assertFiles(Table table, DataFile... files) { + try (CloseableIterable tasks = table.newScan().planFiles()) { + List paths = Streams.stream(tasks) + .map(FileScanTask::file) + .map(DataFile::path) + .collect(Collectors.toList()); + Assert.assertEquals("Should contain expected number of data files", files.length, paths.size()); + Assert.assertEquals("Should contain correct file paths", + CharSequenceSet.of(Iterables.transform(Arrays.asList(files), DataFile::path)), + CharSequenceSet.of(paths)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + private List concat(List starting, Namespace... additional) { List namespaces = Lists.newArrayList(); namespaces.addAll(starting); diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java index f291baf51f6e..e0e03e9206ce 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java @@ -168,7 +168,7 @@ public void testReplaceTableTxnTableNotExists() { AssertHelpers.assertThrows( "Should not be possible to start a new replace table txn", NoSuchTableException.class, - "No such table: hivedb.tbl", + "Table does not exist: hivedb.tbl", () -> catalog.newReplaceTableTransaction(TABLE_IDENTIFIER, SCHEMA, SPEC, false)); }