Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ Property Name Description

``iceberg.metrics-max-inferred-column`` The maximum number of columns for which metrics ``100``
are collected.
``iceberg.max-statistics-file-cache-size`` Maximum size in bytes that should be consumed by the ``256MB``
statistics file cache.
======================================================= ============================================================= ============

Table Properties
Expand Down
5 changes: 5 additions & 0 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>stats</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.drift</groupId>
<artifactId>drift-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.facebook.presto.hive.HiveWrittenPartitions;
import com.facebook.presto.hive.NodeVersion;
import com.facebook.presto.iceberg.changelog.ChangelogUtil;
import com.facebook.presto.iceberg.statistics.StatisticsFileCache;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
Expand Down Expand Up @@ -182,6 +183,7 @@ public abstract class IcebergAbstractMetadata
protected final RowExpressionService rowExpressionService;
protected final FilterStatsCalculatorService filterStatsCalculatorService;
protected Transaction transaction;
protected final StatisticsFileCache statisticsFileCache;

private final StandardFunctionResolution functionResolution;
private final ConcurrentMap<SchemaTableName, Table> icebergTables = new ConcurrentHashMap<>();
Expand All @@ -192,14 +194,16 @@ public IcebergAbstractMetadata(
RowExpressionService rowExpressionService,
JsonCodec<CommitTaskData> commitTaskCodec,
NodeVersion nodeVersion,
FilterStatsCalculatorService filterStatsCalculatorService)
FilterStatsCalculatorService filterStatsCalculatorService,
StatisticsFileCache statisticsFileCache)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
this.statisticsFileCache = requireNonNull(statisticsFileCache, "statisticsFileCache is null");
}

protected final Table getIcebergTable(ConnectorSession session, SchemaTableName schemaTableName)
Expand Down Expand Up @@ -733,7 +737,7 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
@Override
public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTableHandle tableHandle, Optional<ConnectorTableLayoutHandle> tableLayoutHandle, List<ColumnHandle> columnHandles, Constraint<ColumnHandle> constraint)
{
TableStatistics baseStatistics = calculateBaseTableStatistics(this, typeManager, session, (IcebergTableHandle) tableHandle, tableLayoutHandle, columnHandles, constraint);
TableStatistics baseStatistics = calculateBaseTableStatistics(this, typeManager, session, statisticsFileCache, (IcebergTableHandle) tableHandle, tableLayoutHandle, columnHandles, constraint);
return calculateStatisticsConsideringLayout(filterStatsCalculatorService, rowExpressionService, baseStatistics, session, tableLayoutHandle);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import com.facebook.presto.iceberg.procedure.RemoveOrphanFiles;
import com.facebook.presto.iceberg.procedure.RollbackToSnapshotProcedure;
import com.facebook.presto.iceberg.procedure.UnregisterTableProcedure;
import com.facebook.presto.iceberg.statistics.StatisticsFileCache;
import com.facebook.presto.iceberg.statistics.StatisticsFileCacheKey;
import com.facebook.presto.orc.CachingStripeMetadataSource;
import com.facebook.presto.orc.DwrfAwareStripeMetadataSourceFactory;
import com.facebook.presto.orc.EncryptionLibrary;
Expand Down Expand Up @@ -71,6 +73,7 @@
import com.facebook.presto.spi.connector.ConnectorPlanOptimizerProvider;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.procedure.Procedure;
import com.facebook.presto.spi.statistics.ColumnStatistics;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.inject.Binder;
Expand Down Expand Up @@ -170,6 +173,20 @@ public void setup(Binder binder)
binder.bind(ConnectorPlanOptimizerProvider.class).to(IcebergPlanOptimizerProvider.class).in(Scopes.SINGLETON);
}

@Singleton
@Provides
public StatisticsFileCache createStatisticsFileCache(IcebergConfig config, MBeanExporter exporter)
{
Cache<StatisticsFileCacheKey, ColumnStatistics> delegate = CacheBuilder.newBuilder()
.maximumWeight(config.getMaxStatisticsFileCacheSize().toBytes())
.<StatisticsFileCacheKey, ColumnStatistics>weigher((key, entry) -> (int) entry.getEstimatedSize())
.recordStats()
.build();
CacheStatsMBean bean = new CacheStatsMBean(delegate);
exporter.export(generatedNameOf(StatisticsFileCache.class, connectorId), bean);
return new StatisticsFileCache(delegate);
}

@ForCachingHiveMetastore
@Singleton
@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.facebook.presto.spi.statistics.ColumnStatisticType;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import org.apache.iceberg.hadoop.HadoopFileIO;

import javax.validation.constraints.DecimalMax;
Expand All @@ -33,6 +34,8 @@
import static com.facebook.presto.iceberg.CatalogType.HIVE;
import static com.facebook.presto.iceberg.IcebergFileFormat.PARQUET;
import static com.facebook.presto.iceberg.util.StatisticsUtil.decodeMergeFlags;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.airlift.units.DataSize.succinctDataSize;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT;
Expand Down Expand Up @@ -67,6 +70,7 @@ public class IcebergConfig
private long manifestCacheExpireDuration = IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;
private long manifestCacheMaxContentLength = IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT;
private int splitManagerThreads = Runtime.getRuntime().availableProcessors();
private DataSize maxStatisticsFileCacheSize = succinctDataSize(256, MEGABYTE);

@NotNull
public FileFormat getFileFormat()
Expand Down Expand Up @@ -395,4 +399,17 @@ public IcebergConfig setMetricsMaxInferredColumn(int metricsMaxInferredColumn)
this.metricsMaxInferredColumn = metricsMaxInferredColumn;
return this;
}

public DataSize getMaxStatisticsFileCacheSize()
{
return maxStatisticsFileCacheSize;
}

@Config("iceberg.max-statistics-file-cache-size")
@ConfigDescription("The maximum size in bytes the statistics file cache should consume")
public IcebergConfig setMaxStatisticsFileCacheSize(DataSize maxStatisticsFileCacheSize)
{
this.maxStatisticsFileCacheSize = maxStatisticsFileCacheSize;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public enum IcebergErrorCode
ICEBERG_INVALID_TABLE_TIMESTAMP(12, USER_ERROR),
ICEBERG_ROLLBACK_ERROR(13, EXTERNAL),
ICEBERG_INVALID_FORMAT_VERSION(14, USER_ERROR),
ICEBERG_UNKNOWN_MANIFEST_TYPE(15, EXTERNAL);
ICEBERG_UNKNOWN_MANIFEST_TYPE(15, EXTERNAL),
ICEBERG_CACHE_WRITE_ERROR(16, INTERNAL_ERROR);

private final ErrorCode errorCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.facebook.presto.hive.metastore.PrestoTableType;
import com.facebook.presto.hive.metastore.PrincipalPrivileges;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.iceberg.statistics.StatisticsFileCache;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorNewTableLayout;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
Expand Down Expand Up @@ -160,9 +161,10 @@ public IcebergHiveMetadata(
JsonCodec<CommitTaskData> commitTaskCodec,
NodeVersion nodeVersion,
FilterStatsCalculatorService filterStatsCalculatorService,
IcebergHiveTableOperationsConfig hiveTableOeprationsConfig)
IcebergHiveTableOperationsConfig hiveTableOeprationsConfig,
StatisticsFileCache statisticsFileCache)
{
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService);
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache);
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.hiveTableOeprationsConfig = requireNonNull(hiveTableOeprationsConfig, "hiveTableOperationsConfig is null");
Expand Down Expand Up @@ -448,7 +450,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
org.apache.iceberg.Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
TableStatistics icebergStatistics = calculateBaseTableStatistics(this, typeManager, session, (IcebergTableHandle) tableHandle, tableLayoutHandle, columnHandles, constraint);
TableStatistics icebergStatistics = calculateBaseTableStatistics(this, typeManager, session, statisticsFileCache, (IcebergTableHandle) tableHandle, tableLayoutHandle, columnHandles, constraint);
EnumSet<ColumnStatisticType> mergeFlags = getHiveStatisticsMergeStrategy(session);
TableStatistics mergedStatistics = Optional.of(mergeFlags)
.filter(set -> !set.isEmpty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.NodeVersion;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.iceberg.statistics.StatisticsFileCache;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
Expand All @@ -39,6 +40,7 @@ public class IcebergHiveMetadataFactory
final NodeVersion nodeVersion;
final FilterStatsCalculatorService filterStatsCalculatorService;
final IcebergHiveTableOperationsConfig operationsConfig;
final StatisticsFileCache statisticsFileCache;

@Inject
public IcebergHiveMetadataFactory(
Expand All @@ -50,7 +52,8 @@ public IcebergHiveMetadataFactory(
JsonCodec<CommitTaskData> commitTaskCodec,
NodeVersion nodeVersion,
FilterStatsCalculatorService filterStatsCalculatorService,
IcebergHiveTableOperationsConfig operationsConfig)
IcebergHiveTableOperationsConfig operationsConfig,
StatisticsFileCache statisticsFileCache)
{
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
Expand All @@ -61,6 +64,7 @@ public IcebergHiveMetadataFactory(
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
this.operationsConfig = requireNonNull(operationsConfig, "operationsConfig is null");
this.statisticsFileCache = requireNonNull(statisticsFileCache, "statisticsFileCache is null");
}

public ConnectorMetadata create()
Expand All @@ -74,6 +78,7 @@ public ConnectorMetadata create()
commitTaskCodec,
nodeVersion,
filterStatsCalculatorService,
operationsConfig);
operationsConfig,
statisticsFileCache);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.NodeVersion;
import com.facebook.presto.hive.TableAlreadyExistsException;
import com.facebook.presto.iceberg.statistics.StatisticsFileCache;
import com.facebook.presto.iceberg.util.IcebergPrestoModelConverters;
import com.facebook.presto.spi.ConnectorNewTableLayout;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
Expand Down Expand Up @@ -82,9 +83,10 @@ public IcebergNativeMetadata(
JsonCodec<CommitTaskData> commitTaskCodec,
CatalogType catalogType,
NodeVersion nodeVersion,
FilterStatsCalculatorService filterStatsCalculatorService)
FilterStatsCalculatorService filterStatsCalculatorService,
StatisticsFileCache statisticsFileCache)
{
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService);
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache);
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
this.catalogType = requireNonNull(catalogType, "catalogType is null");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.NodeVersion;
import com.facebook.presto.iceberg.statistics.StatisticsFileCache;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
Expand All @@ -36,6 +37,7 @@ public class IcebergNativeMetadataFactory
final RowExpressionService rowExpressionService;
final NodeVersion nodeVersion;
final FilterStatsCalculatorService filterStatsCalculatorService;
final StatisticsFileCache statisticsFileCache;

@Inject
public IcebergNativeMetadataFactory(
Expand All @@ -46,7 +48,8 @@ public IcebergNativeMetadataFactory(
RowExpressionService rowExpressionService,
JsonCodec<CommitTaskData> commitTaskCodec,
NodeVersion nodeVersion,
FilterStatsCalculatorService filterStatsCalculatorService)
FilterStatsCalculatorService filterStatsCalculatorService,
StatisticsFileCache statisticsFileCache)
{
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
Expand All @@ -57,10 +60,11 @@ public IcebergNativeMetadataFactory(
requireNonNull(config, "config is null");
this.catalogType = config.getCatalogType();
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
this.statisticsFileCache = requireNonNull(statisticsFileCache, "statisticsFileCache is null");
}

public ConnectorMetadata create()
{
return new IcebergNativeMetadata(catalogFactory, typeManager, functionResolution, rowExpressionService, commitTaskCodec, catalogType, nodeVersion, filterStatsCalculatorService);
return new IcebergNativeMetadata(catalogFactory, typeManager, functionResolution, rowExpressionService, commitTaskCodec, catalogType, nodeVersion, filterStatsCalculatorService, statisticsFileCache);
}
}
Loading