Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 105 additions & 20 deletions core/src/main/java/org/apache/iceberg/CachingCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,36 +99,35 @@ public Table loadTable(TableIdentifier ident) {
@Override
public Table createTable(TableIdentifier ident, Schema schema, PartitionSpec spec, String location,
Map<String, String> properties) {
AtomicBoolean created = new AtomicBoolean(false);
Table table = tableCache.get(canonicalizeIdentifier(ident), identifier -> {
created.set(true);
return catalog.createTable(identifier, schema, spec, location, properties);
});

if (!created.get()) {
throw new AlreadyExistsException("Table already exists: %s", ident);
}

return table;
return buildTable(ident, schema)
.withPartitionSpec(spec)
.withLocation(location)
.withProperties(properties)
.create();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider moving these to be default implementations in the interface.

}

@Override
public Transaction newCreateTableTransaction(TableIdentifier ident, Schema schema, PartitionSpec spec,
String location, Map<String, String> properties) {
// create a new transaction without altering the cache. the table doesn't exist until the transaction is committed.
// if the table is created before the transaction commits, any cached version is correct and the transaction create
// will fail. if the transaction commits before another create, then the cache will be empty.
return catalog.newCreateTableTransaction(ident, schema, spec, location, properties);
return buildTable(ident, schema)
.withPartitionSpec(spec)
.withLocation(location)
.withProperties(properties)
.createTransaction();
}

@Override
public Transaction newReplaceTableTransaction(TableIdentifier ident, Schema schema, PartitionSpec spec,
String location, Map<String, String> properties, boolean orCreate) {
// create a new transaction without altering the cache. the table doesn't change until the transaction is committed.
// when the transaction commits, invalidate the table in the cache if it is present.
return CommitCallbackTransaction.addCallback(
catalog.newReplaceTableTransaction(ident, schema, spec, location, properties, orCreate),
() -> invalidate(canonicalizeIdentifier(ident)));
TableBuilder builder = buildTable(ident, schema)
.withPartitionSpec(spec)
.withLocation(location)
.withProperties(properties);
if (orCreate) {
return builder.createOrReplaceTransaction();
} else {
return builder.replaceTransaction();
}
}

@Override
Expand Down Expand Up @@ -160,4 +159,90 @@ private Iterable<TableIdentifier> metadataTableIdentifiers(TableIdentifier ident

return builder.build();
}

@Override
public TableBuilder buildTable(TableIdentifier identifier, Schema schema) {
return new CachingTableBuilder(identifier, schema);
}

private class CachingTableBuilder implements TableBuilder {
private final TableIdentifier ident;
private final TableBuilder innerBuilder;

private CachingTableBuilder(TableIdentifier identifier, Schema schema) {
this.innerBuilder = catalog.buildTable(identifier, schema);
this.ident = identifier;
}

@Override
public TableBuilder withPartitionSpec(PartitionSpec spec) {
innerBuilder.withPartitionSpec(spec);
return this;
}

@Override
public TableBuilder withSortOrder(SortOrder sortOrder) {
innerBuilder.withSortOrder(sortOrder);
return this;
}

@Override
public TableBuilder withLocation(String location) {
innerBuilder.withLocation(location);
return this;
}

@Override
public TableBuilder withProperties(Map<String, String> properties) {
innerBuilder.withProperties(properties);
return this;
}

@Override
public TableBuilder withProperty(String key, String value) {
innerBuilder.withProperty(key, value);
return this;
}

@Override
public Table create() {
AtomicBoolean created = new AtomicBoolean(false);
Table table = tableCache.get(canonicalizeIdentifier(ident), identifier -> {
created.set(true);
return innerBuilder.create();
});

if (!created.get()) {
throw new AlreadyExistsException("Table already exists: %s", ident);
}

return table;
}

@Override
public Transaction createTransaction() {
// create a new transaction without altering the cache. the table doesn't exist until the transaction is
// committed. if the table is created before the transaction commits, any cached version is correct and the
// transaction create will fail. if the transaction commits before another create, then the cache will be empty.
return innerBuilder.createTransaction();
}

@Override
public Transaction replaceTransaction() {
// create a new transaction without altering the cache. the table doesn't change until the transaction is
// committed. when the transaction commits, invalidate the table in the cache if it is present.
return CommitCallbackTransaction.addCallback(
innerBuilder.replaceTransaction(),
() -> invalidate(canonicalizeIdentifier(ident)));
}

@Override
public Transaction createOrReplaceTransaction() {
// create a new transaction without altering the cache. the table doesn't change until the transaction is
// committed. when the transaction commits, invalidate the table in the cache if it is present.
return CommitCallbackTransaction.addCallback(
innerBuilder.createOrReplaceTransaction(),
() -> invalidate(canonicalizeIdentifier(ident)));
}
}
}