diff --git a/core/src/main/java/org/apache/iceberg/CachingCatalog.java b/core/src/main/java/org/apache/iceberg/CachingCatalog.java index 8f201406d64b..95c01a5397f1 100644 --- a/core/src/main/java/org/apache/iceberg/CachingCatalog.java +++ b/core/src/main/java/org/apache/iceberg/CachingCatalog.java @@ -99,36 +99,35 @@ public Table loadTable(TableIdentifier ident) { @Override public Table createTable(TableIdentifier ident, Schema schema, PartitionSpec spec, String location, Map properties) { - AtomicBoolean created = new AtomicBoolean(false); - Table table = tableCache.get(canonicalizeIdentifier(ident), identifier -> { - created.set(true); - return catalog.createTable(identifier, schema, spec, location, properties); - }); - - if (!created.get()) { - throw new AlreadyExistsException("Table already exists: %s", ident); - } - - return table; + return buildTable(ident, schema) + .withPartitionSpec(spec) + .withLocation(location) + .withProperties(properties) + .create(); } @Override public Transaction newCreateTableTransaction(TableIdentifier ident, Schema schema, PartitionSpec spec, String location, Map properties) { - // create a new transaction without altering the cache. the table doesn't exist until the transaction is committed. - // if the table is created before the transaction commits, any cached version is correct and the transaction create - // will fail. if the transaction commits before another create, then the cache will be empty. - return catalog.newCreateTableTransaction(ident, schema, spec, location, properties); + return buildTable(ident, schema) + .withPartitionSpec(spec) + .withLocation(location) + .withProperties(properties) + .createTransaction(); } @Override public Transaction newReplaceTableTransaction(TableIdentifier ident, Schema schema, PartitionSpec spec, String location, Map properties, boolean orCreate) { - // create a new transaction without altering the cache. the table doesn't change until the transaction is committed. - // when the transaction commits, invalidate the table in the cache if it is present. - return CommitCallbackTransaction.addCallback( - catalog.newReplaceTableTransaction(ident, schema, spec, location, properties, orCreate), - () -> invalidate(canonicalizeIdentifier(ident))); + TableBuilder builder = buildTable(ident, schema) + .withPartitionSpec(spec) + .withLocation(location) + .withProperties(properties); + if (orCreate) { + return builder.createOrReplaceTransaction(); + } else { + return builder.replaceTransaction(); + } } @Override @@ -160,4 +159,90 @@ private Iterable metadataTableIdentifiers(TableIdentifier ident return builder.build(); } + + @Override + public TableBuilder buildTable(TableIdentifier identifier, Schema schema) { + return new CachingTableBuilder(identifier, schema); + } + + private class CachingTableBuilder implements TableBuilder { + private final TableIdentifier ident; + private final TableBuilder innerBuilder; + + private CachingTableBuilder(TableIdentifier identifier, Schema schema) { + this.innerBuilder = catalog.buildTable(identifier, schema); + this.ident = identifier; + } + + @Override + public TableBuilder withPartitionSpec(PartitionSpec spec) { + innerBuilder.withPartitionSpec(spec); + return this; + } + + @Override + public TableBuilder withSortOrder(SortOrder sortOrder) { + innerBuilder.withSortOrder(sortOrder); + return this; + } + + @Override + public TableBuilder withLocation(String location) { + innerBuilder.withLocation(location); + return this; + } + + @Override + public TableBuilder withProperties(Map properties) { + innerBuilder.withProperties(properties); + return this; + } + + @Override + public TableBuilder withProperty(String key, String value) { + innerBuilder.withProperty(key, value); + return this; + } + + @Override + public Table create() { + AtomicBoolean created = new AtomicBoolean(false); + Table table = tableCache.get(canonicalizeIdentifier(ident), identifier -> { + created.set(true); + return innerBuilder.create(); + }); + + if (!created.get()) { + throw new AlreadyExistsException("Table already exists: %s", ident); + } + + return table; + } + + @Override + public Transaction createTransaction() { + // create a new transaction without altering the cache. the table doesn't exist until the transaction is + // committed. if the table is created before the transaction commits, any cached version is correct and the + // transaction create will fail. if the transaction commits before another create, then the cache will be empty. + return innerBuilder.createTransaction(); + } + + @Override + public Transaction replaceTransaction() { + // create a new transaction without altering the cache. the table doesn't change until the transaction is + // committed. when the transaction commits, invalidate the table in the cache if it is present. + return CommitCallbackTransaction.addCallback( + innerBuilder.replaceTransaction(), + () -> invalidate(canonicalizeIdentifier(ident))); + } + + @Override + public Transaction createOrReplaceTransaction() { + // create a new transaction without altering the cache. the table doesn't change until the transaction is + // committed. when the transaction commits, invalidate the table in the cache if it is present. + return CommitCallbackTransaction.addCallback( + innerBuilder.createOrReplaceTransaction(), + () -> invalidate(canonicalizeIdentifier(ident))); + } + } }