Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import com.google.common.base.Splitter;
import com.google.common.base.Suppliers;
import com.google.common.base.VerifyException;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
Expand Down Expand Up @@ -110,6 +113,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
import java.util.function.Function;
Expand Down Expand Up @@ -201,6 +205,9 @@ public class IcebergMetadata

private final Map<String, Optional<Long>> snapshotIds = new ConcurrentHashMap<>();
private final Map<SchemaTableName, TableMetadata> tableMetadataCache = new ConcurrentHashMap<>();

private final LoadingCache<String, ConcurrentHashMap<SchemaTableName, Table>> tableCache;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nested keying query id -> SchemaTableName -> Table is redundant.
Make SchemaTableName the key here, as in tableMetadataCache (and effectively in snapshotIds, although the code doesn't make it obvious)

private final LoadingCache<String, ConcurrentHashMap<SchemaTableName, TableStatistics>> tableStatisticsCache;
private final ViewReaderUtil.PrestoViewReader viewReader = new ViewReaderUtil.PrestoViewReader();

private Transaction transaction;
Expand All @@ -221,6 +228,14 @@ public IcebergMetadata(
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null");
this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null");
this.tableCache = CacheBuilder.newBuilder()
.maximumSize(100)
.expireAfterWrite(5, TimeUnit.MINUTES)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IcebergMetadata object is per-query scoped, so you don't need time-based eviction here.
That's why tableMetadataCache uses ordinary map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I did not know that. Then we do not even need the two levels query -> table but one level is enough.

.build(new TableCacheLoader());
this.tableStatisticsCache = CacheBuilder.newBuilder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if adding stats cache is an improvement, i would expect this to be reflected by a test change.
Currently i see TestIcebergMetadataFileOperations passes without modification.

Let's move statistics to separate PR (or at least separate commit), so that we can focus on metastore interactions here.

.maximumSize(100)
.expireAfterWrite(5, TimeUnit.MINUTES)
.build(new TableStatisticsCacheLoader());
}

@Override
Expand Down Expand Up @@ -270,7 +285,7 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
IcebergTableName name = IcebergTableName.from(tableName.getTableName());
verify(name.getTableType() == DATA, "Wrong table type: " + name.getTableType());

Optional<Table> hiveTable = metastore.getTable(new HiveIdentity(session), tableName.getSchemaName(), name.getTableName());
Optional<Table> hiveTable = getHiveTable(session, tableName);
if (hiveTable.isEmpty()) {
return null;
}
Expand All @@ -293,6 +308,19 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
TupleDomain.all());
}

Optional<Table> getHiveTable(ConnectorSession session, SchemaTableName schemaTableName)
{
var queryTableCache = tableCache.getUnchecked(session.getQueryId());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional<Table> table = Optional.ofNullable(queryTableCache.get(schemaTableName));
if (table.isEmpty()) {
HiveIdentity identity = new HiveIdentity(session);
log.debug("Requesting table from metastore: " + schemaTableName);
table = metastore.getTable(identity, schemaTableName.getSchemaName(), schemaTableName.getTableName());
table.ifPresent(value -> queryTableCache.put(schemaTableName, value));
}
return table;
}

@Override
public Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTableName tableName)
{
Expand All @@ -304,7 +332,7 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
{
IcebergTableName name = IcebergTableName.from(tableName.getTableName());

Optional<Table> hiveTable = metastore.getTable(new HiveIdentity(session), tableName.getSchemaName(), name.getTableName());
Optional<Table> hiveTable = getHiveTable(session, tableName);
if (hiveTable.isEmpty() || !isIcebergTable(hiveTable.get())) {
return Optional.empty();
}
Expand Down Expand Up @@ -728,7 +756,7 @@ public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHan

private ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName table)
{
if (metastore.getTable(new HiveIdentity(session), table.getSchemaName(), table.getTableName()).isEmpty()) {
if (getHiveTable(session, table).isEmpty()) {
throw new TableNotFoundException(table);
}

Expand Down Expand Up @@ -1005,8 +1033,12 @@ private static Set<Integer> identityPartitionColumnsInAllSpecs(org.apache.iceber
public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTableHandle tableHandle, Constraint constraint)
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
org.apache.iceberg.Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
return TableStatisticsMaker.getTableStatistics(typeManager, constraint, handle, icebergTable);
return tableStatisticsCache.getUnchecked(session.getQueryId()).computeIfAbsent(
handle.getSchemaTableName(),
stn -> {
org.apache.iceberg.Table icebergTable = getIcebergTable(session, stn);
return TableStatisticsMaker.getTableStatistics(typeManager, constraint, handle, icebergTable);
});
}

private Optional<Long> getSnapshotId(org.apache.iceberg.Table table, Optional<Long> snapshotId)
Expand All @@ -1029,7 +1061,7 @@ org.apache.iceberg.Table getIcebergTable(ConnectorSession session, SchemaTableNa
public void createMaterializedView(ConnectorSession session, SchemaTableName viewName, ConnectorMaterializedViewDefinition definition, boolean replace, boolean ignoreExisting)
{
HiveIdentity identity = new HiveIdentity(session);
Optional<Table> existing = metastore.getTable(identity, viewName.getSchemaName(), viewName.getTableName());
Optional<Table> existing = getHiveTable(session, viewName);

// It's a create command where the materialized view already exists and 'if not exists' clause is not specified
if (!replace && existing.isPresent()) {
Expand Down Expand Up @@ -1100,7 +1132,7 @@ public void createMaterializedView(ConnectorSession session, SchemaTableName vie
public void dropMaterializedView(ConnectorSession session, SchemaTableName viewName)
{
final HiveIdentity identity = new HiveIdentity(session);
Table view = metastore.getTable(identity, viewName.getSchemaName(), viewName.getTableName())
Table view = getHiveTable(session, viewName)
.orElseThrow(() -> new MaterializedViewNotFoundException(viewName));

String storageTableName = view.getParameters().get(STORAGE_TABLE);
Expand All @@ -1113,6 +1145,7 @@ public void dropMaterializedView(ConnectorSession session, SchemaTableName viewN
}
}
metastore.dropTable(identity, viewName.getSchemaName(), viewName.getTableName(), true);
tableCache.getUnchecked(session.getQueryId()).remove(viewName);
}

@Override
Expand Down Expand Up @@ -1215,7 +1248,7 @@ public List<SchemaTableName> listMaterializedViews(ConnectorSession session, Opt
@Override
public Optional<ConnectorMaterializedViewDefinition> getMaterializedView(ConnectorSession session, SchemaTableName viewName)
{
Optional<Table> tableOptional = metastore.getTable(new HiveIdentity(session), viewName.getSchemaName(), viewName.getTableName());
Optional<Table> tableOptional = getHiveTable(session, viewName);
if (tableOptional.isEmpty()) {
return Optional.empty();
}
Expand Down Expand Up @@ -1330,4 +1363,26 @@ public long getSnapshotId()
return this.snapshotId;
}
}

private static final class TableCacheLoader
extends CacheLoader<String, ConcurrentHashMap<SchemaTableName, Table>>
{
@Override
public ConcurrentHashMap<SchemaTableName, Table> load(final String unused)
throws Exception
{
return new ConcurrentHashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think i don't understand what this is doing.

Anyway, i guess this will go away once we move away from unnecessary Cache use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache builder needs a function as an argument which creates the cache entry if there is a miss for a certain key. Here we simply create a new ConcurrentHashMap if there is no cache for a certain query id, yet.

}
}

private static final class TableStatisticsCacheLoader
extends CacheLoader<String, ConcurrentHashMap<SchemaTableName, TableStatistics>>
{
@Override
public ConcurrentHashMap<SchemaTableName, TableStatistics> load(final String unused)
throws Exception
{
return new ConcurrentHashMap<>();
}
}
}