diff --git a/presto-iceberg/pom.xml b/presto-iceberg/pom.xml
index bbf7da4e68cf5..413d1393095d6 100644
--- a/presto-iceberg/pom.xml
+++ b/presto-iceberg/pom.xml
@@ -419,6 +419,12 @@
+
+ com.github.ben-manes.caffeine
+ caffeine
+ 2.9.3
+
+
org.weakref
jmxutils
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HdfsCachedInputFile.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HdfsCachedInputFile.java
new file mode 100644
index 0000000000000..fd0195ad9daa7
--- /dev/null
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HdfsCachedInputFile.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.iceberg;
+
+import com.facebook.airlift.log.Logger;
+import org.apache.iceberg.io.ByteBufferInputStream;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.iceberg.io.IOUtil.readRemaining;
+
+public class HdfsCachedInputFile
+ implements InputFile
+{
+ private static final Logger LOG = Logger.get(HdfsCachedInputFile.class);
+
+ private final InputFile delegate;
+ private final ManifestFileCacheKey cacheKey;
+ private final ManifestFileCache cache;
+
+ public HdfsCachedInputFile(InputFile delegate, ManifestFileCacheKey cacheKey, ManifestFileCache cache)
+ {
+ this.delegate = requireNonNull(delegate, "delegate is null");
+ this.cacheKey = requireNonNull(cacheKey, "cacheKey is null");
+ this.cache = requireNonNull(cache, "cache is null");
+ }
+
+ @Override
+ public long getLength()
+ {
+ ManifestFileCachedContent cachedContent = cache.getIfPresent(cacheKey);
+ if (cachedContent != null) {
+ return cachedContent.getLength();
+ }
+ return delegate.getLength();
+ }
+
+ @Override
+ public SeekableInputStream newStream()
+ {
+ ManifestFileCachedContent cachedContent = cache.getIfPresent(cacheKey);
+ if (cachedContent != null) {
+ return ByteBufferInputStream.wrap(cachedContent.getData());
+ }
+
+ long fileLength = delegate.getLength();
+ if (fileLength <= cache.getMaxFileLength() && cache.isEnabled()) {
+ try {
+ ManifestFileCachedContent content = readFully(delegate, fileLength, cache.getBufferChunkSize());
+ cache.put(cacheKey, content);
+ cache.recordFileSize(content.getLength());
+ return ByteBufferInputStream.wrap(content.getData());
+ }
+ catch (IOException e) {
+ LOG.warn("Failed to cache input file. Falling back to direct read.", e);
+ }
+ }
+
+ return delegate.newStream();
+ }
+
+ @Override
+ public String location()
+ {
+ return delegate.location();
+ }
+
+ @Override
+ public boolean exists()
+ {
+ return cache.getIfPresent(cacheKey) != null || delegate.exists();
+ }
+
+ private static ManifestFileCachedContent readFully(InputFile input, long fileLength, long chunkSize)
+ throws IOException
+ {
+ try (SeekableInputStream stream = input.newStream()) {
+ long totalBytesToRead = fileLength;
+ List buffers = new ArrayList<>(
+ ((int) (fileLength / chunkSize)) +
+ (fileLength % chunkSize == 0 ? 0 : 1));
+
+ while (totalBytesToRead > 0) {
+ int bytesToRead = (int) Math.min(chunkSize, totalBytesToRead);
+ byte[] buf = new byte[bytesToRead];
+ int bytesRead = readRemaining(stream, buf, 0, bytesToRead);
+ totalBytesToRead -= bytesRead;
+
+ if (bytesRead < bytesToRead) {
+ throw new IOException(
+ String.format(
+ "Failed to read %d bytes: %d bytes in stream",
+ fileLength, fileLength - totalBytesToRead));
+ }
+ else {
+ buffers.add(ByteBuffer.wrap(buf));
+ }
+ }
+ return new ManifestFileCachedContent(buffers, fileLength);
+ }
+ }
+}
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HdfsFileIO.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HdfsFileIO.java
index 3eaebcf7b0a36..5e86b6e40b1da 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HdfsFileIO.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HdfsFileIO.java
@@ -17,13 +17,16 @@
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.spi.PrestoException;
import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import java.io.IOException;
+import java.util.Optional;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
+import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
public class HdfsFileIO
@@ -31,11 +34,13 @@ public class HdfsFileIO
{
private final HdfsEnvironment environment;
private final HdfsContext context;
+ private final ManifestFileCache manifestFileCache;
- public HdfsFileIO(HdfsEnvironment environment, HdfsContext context)
+ public HdfsFileIO(ManifestFileCache manifestFileCache, HdfsEnvironment environment, HdfsContext context)
{
this.environment = requireNonNull(environment, "environment is null");
this.context = requireNonNull(context, "context is null");
+ this.manifestFileCache = requireNonNull(manifestFileCache, "manifestFileCache is null");
}
@Override
@@ -44,6 +49,33 @@ public InputFile newInputFile(String path)
return new HdfsInputFile(new Path(path), environment, context);
}
+ @Override
+ public InputFile newInputFile(String path, long length)
+ {
+ return new HdfsInputFile(new Path(path), environment, context, Optional.of(length));
+ }
+
+ protected InputFile newCachedInputFile(String path)
+ {
+ InputFile inputFile = new HdfsInputFile(new Path(path), environment, context);
+ return manifestFileCache.isEnabled() ?
+ new HdfsCachedInputFile(inputFile, new ManifestFileCacheKey(path), manifestFileCache) :
+ inputFile;
+ }
+
+ @Override
+ public InputFile newInputFile(ManifestFile manifest)
+ {
+ checkArgument(
+ manifest.keyMetadata() == null,
+ "Cannot decrypt manifest: {} (use EncryptingFileIO)",
+ manifest.path());
+ InputFile inputFile = new HdfsInputFile(new Path(manifest.path()), environment, context, Optional.of(manifest.length()));
+ return manifestFileCache.isEnabled() ?
+ new HdfsCachedInputFile(inputFile, new ManifestFileCacheKey(manifest.path()), manifestFileCache) :
+ inputFile;
+ }
+
@Override
public OutputFile newOutputFile(String path)
{
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HdfsInputFile.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HdfsInputFile.java
index 0529073e62e87..a07e2b17ad965 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HdfsInputFile.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HdfsInputFile.java
@@ -22,6 +22,8 @@
import org.apache.iceberg.io.SeekableInputStream;
import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static java.util.Objects.requireNonNull;
@@ -32,14 +34,21 @@ public class HdfsInputFile
private final InputFile delegate;
private final HdfsEnvironment environment;
private final String user;
+ private final AtomicLong length;
- public HdfsInputFile(Path path, HdfsEnvironment environment, HdfsContext context)
+ public HdfsInputFile(Path path, HdfsEnvironment environment, HdfsContext context, Optional length)
{
requireNonNull(path, "path is null");
this.environment = requireNonNull(environment, "environment is null");
+ this.length = new AtomicLong(length.orElse(-1L));
requireNonNull(context, "context is null");
try {
- this.delegate = HadoopInputFile.fromPath(path, environment.getFileSystem(context, path), environment.getConfiguration(context, path));
+ if (this.length.get() < 0) {
+ this.delegate = HadoopInputFile.fromPath(path, environment.getFileSystem(context, path), environment.getConfiguration(context, path));
+ }
+ else {
+ this.delegate = HadoopInputFile.fromPath(path, this.length.get(), environment.getFileSystem(context, path), environment.getConfiguration(context, path));
+ }
}
catch (IOException e) {
throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "Failed to create input file: " + path, e);
@@ -47,10 +56,20 @@ public HdfsInputFile(Path path, HdfsEnvironment environment, HdfsContext context
this.user = context.getIdentity().getUser();
}
+ public HdfsInputFile(Path path, HdfsEnvironment environment, HdfsContext context)
+ {
+ this(path, environment, context, Optional.empty());
+ }
+
@Override
public long getLength()
{
- return environment.doAs(user, delegate::getLength);
+ return length.updateAndGet(value -> {
+ if (value < 0) {
+ return environment.doAs(user, delegate::getLength);
+ }
+ return value;
+ });
}
@Override
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HiveTableOperations.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HiveTableOperations.java
index f7a153f0a1066..688a57cff011e 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HiveTableOperations.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HiveTableOperations.java
@@ -105,7 +105,7 @@ public class HiveTableOperations
private final String tableName;
private final Optional owner;
private final Optional location;
- private final FileIO fileIO;
+ private final HdfsFileIO fileIO;
private final IcebergHiveTableOperationsConfig config;
private TableMetadata currentMetadata;
@@ -121,10 +121,11 @@ public HiveTableOperations(
HdfsEnvironment hdfsEnvironment,
HdfsContext hdfsContext,
IcebergHiveTableOperationsConfig config,
+ ManifestFileCache manifestFileCache,
String database,
String table)
{
- this(new HdfsFileIO(hdfsEnvironment, hdfsContext),
+ this(new HdfsFileIO(manifestFileCache, hdfsEnvironment, hdfsContext),
metastore,
metastoreContext,
config,
@@ -140,12 +141,13 @@ public HiveTableOperations(
HdfsEnvironment hdfsEnvironment,
HdfsContext hdfsContext,
IcebergHiveTableOperationsConfig config,
+ ManifestFileCache manifestFileCache,
String database,
String table,
String owner,
String location)
{
- this(new HdfsFileIO(hdfsEnvironment, hdfsContext),
+ this(new HdfsFileIO(manifestFileCache, hdfsEnvironment, hdfsContext),
metastore,
metastoreContext,
config,
@@ -156,7 +158,7 @@ public HiveTableOperations(
}
private HiveTableOperations(
- FileIO fileIO,
+ HdfsFileIO fileIO,
ExtendedHiveMetastore metastore,
MetastoreContext metastoreContext,
IcebergHiveTableOperationsConfig config,
@@ -409,7 +411,7 @@ private void refreshFromMetadataLocation(String newLocation)
config.getTableRefreshMaxRetryTime().toMillis(),
config.getTableRefreshBackoffScaleFactor())
.run(metadataLocation -> newMetadata.set(
- TableMetadataParser.read(fileIO, io().newInputFile(metadataLocation))));
+ TableMetadataParser.read(fileIO, fileIO.newCachedInputFile(metadataLocation))));
}
catch (RuntimeException e) {
throw new TableNotFoundException(getSchemaTableName(), "Table metadata is missing", e);
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java
index 6ac0fc4622b24..26a417cb7391d 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java
@@ -83,6 +83,7 @@
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.procedure.Procedure;
import com.facebook.presto.spi.statistics.ColumnStatistics;
+import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.inject.Binder;
@@ -93,8 +94,11 @@
import javax.inject.Singleton;
+import java.nio.ByteBuffer;
+import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
@@ -198,7 +202,7 @@ protected void setup(Binder binder)
@Provides
public StatisticsFileCache createStatisticsFileCache(IcebergConfig config, MBeanExporter exporter)
{
- Cache delegate = CacheBuilder.newBuilder()
+ com.github.benmanes.caffeine.cache.Cache delegate = Caffeine.newBuilder()
.maximumWeight(config.getMaxStatisticsFileCacheSize().toBytes())
.weigher((key, entry) -> (int) entry.getEstimatedSize())
.recordStats()
@@ -208,6 +212,26 @@ public StatisticsFileCache createStatisticsFileCache(IcebergConfig config, MBean
return statisticsFileCache;
}
+ @Singleton
+ @Provides
+ public ManifestFileCache createManifestFileCache(IcebergConfig config, MBeanExporter exporter)
+ {
+ com.github.benmanes.caffeine.cache.Caffeine delegate = Caffeine.newBuilder()
+ .maximumWeight(config.getMaxManifestCacheSize())
+ .weigher((key, entry) -> (int) entry.getData().stream().mapToLong(ByteBuffer::capacity).sum())
+ .recordStats();
+ if (config.getManifestCacheExpireDuration() > 0) {
+ delegate.expireAfterWrite(config.getManifestCacheExpireDuration(), MILLISECONDS);
+ }
+ ManifestFileCache manifestFileCache = new ManifestFileCache(
+ delegate.build(),
+ config.getManifestCachingEnabled(),
+ config.getManifestCacheMaxContentLength(),
+ config.getManifestCacheMaxChunkSize().toBytes());
+ exporter.export(generatedNameOf(ManifestFileCache.class, connectorId), manifestFileCache);
+ return manifestFileCache;
+ }
+
@ForCachingHiveMetastore
@Singleton
@Provides
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java
index 42328ddb4e7f4..019478786c3da 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java
@@ -67,10 +67,11 @@ public class IcebergConfig
private EnumSet hiveStatisticsMergeFlags = EnumSet.noneOf(ColumnStatisticType.class);
private String fileIOImpl = HadoopFileIO.class.getName();
- private boolean manifestCachingEnabled;
+ private boolean manifestCachingEnabled = true;
private long maxManifestCacheSize = IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT;
private long manifestCacheExpireDuration = IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;
private long manifestCacheMaxContentLength = IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT;
+ private DataSize manifestCacheMaxChunkSize = succinctDataSize(2, MEGABYTE);
private int splitManagerThreads = Runtime.getRuntime().availableProcessors();
private DataSize maxStatisticsFileCacheSize = succinctDataSize(256, MEGABYTE);
@@ -348,6 +349,20 @@ public IcebergConfig setManifestCacheMaxContentLength(long manifestCacheMaxConte
return this;
}
+ public DataSize getManifestCacheMaxChunkSize()
+ {
+ return manifestCacheMaxChunkSize;
+ }
+
+ @Min(1024)
+ @Config("iceberg.io.manifest.cache.max-chunk-size")
+ @ConfigDescription("Maximum length of a buffer used to cache manifest file content. Only applicable to HIVE catalog.")
+ public IcebergConfig setManifestCacheMaxChunkSize(DataSize manifestCacheMaxChunkSize)
+ {
+ this.manifestCacheMaxChunkSize = manifestCacheMaxChunkSize;
+ return this;
+ }
+
@Min(0)
public int getSplitManagerThreads()
{
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java
index 5db5a45c24e85..ad4e1a7e8331a 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java
@@ -162,6 +162,7 @@ public class IcebergHiveMetadata
private final DateTimeZone timeZone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(ZoneId.of(TimeZone.getDefault().getID())));
private final IcebergHiveTableOperationsConfig hiveTableOeprationsConfig;
private final Cache> tableCache;
+ private final ManifestFileCache manifestFileCache;
public IcebergHiveMetadata(
ExtendedHiveMetastore metastore,
@@ -173,13 +174,15 @@ public IcebergHiveMetadata(
NodeVersion nodeVersion,
FilterStatsCalculatorService filterStatsCalculatorService,
IcebergHiveTableOperationsConfig hiveTableOeprationsConfig,
- StatisticsFileCache statisticsFileCache)
+ StatisticsFileCache statisticsFileCache,
+ ManifestFileCache manifestFileCache)
{
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache);
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.hiveTableOeprationsConfig = requireNonNull(hiveTableOeprationsConfig, "hiveTableOperationsConfig is null");
this.tableCache = CacheBuilder.newBuilder().maximumSize(MAXIMUM_PER_QUERY_TABLE_CACHE_SIZE).build();
+ this.manifestFileCache = requireNonNull(manifestFileCache, "manifestFileCache is null");
}
public ExtendedHiveMetastore getMetastore()
@@ -187,10 +190,15 @@ public ExtendedHiveMetastore getMetastore()
return metastore;
}
+ public ManifestFileCache getManifestFileCache()
+ {
+ return manifestFileCache;
+ }
+
@Override
protected org.apache.iceberg.Table getRawIcebergTable(ConnectorSession session, SchemaTableName schemaTableName)
{
- return getHiveIcebergTable(metastore, hdfsEnvironment, hiveTableOeprationsConfig, session, schemaTableName);
+ return getHiveIcebergTable(metastore, hdfsEnvironment, hiveTableOeprationsConfig, manifestFileCache, session, schemaTableName);
}
@Override
@@ -334,6 +342,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
hdfsEnvironment,
hdfsContext,
hiveTableOeprationsConfig,
+ manifestFileCache,
schemaName,
tableName,
session.getUser(),
@@ -614,7 +623,7 @@ public void registerTable(ConnectorSession clientSession, SchemaTableName schema
InputFile inputFile = new HdfsInputFile(metadataLocation, hdfsEnvironment, hdfsContext);
TableMetadata tableMetadata;
try {
- tableMetadata = TableMetadataParser.read(new HdfsFileIO(hdfsEnvironment, hdfsContext), inputFile);
+ tableMetadata = TableMetadataParser.read(new HdfsFileIO(manifestFileCache, hdfsEnvironment, hdfsContext), inputFile);
}
catch (Exception e) {
throw new PrestoException(ICEBERG_INVALID_METADATA, String.format("Unable to read metadata file %s", metadataLocation), e);
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java
index 5bc8c9e4a8b0c..c9ae6847824e0 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java
@@ -41,6 +41,7 @@ public class IcebergHiveMetadataFactory
final FilterStatsCalculatorService filterStatsCalculatorService;
final IcebergHiveTableOperationsConfig operationsConfig;
final StatisticsFileCache statisticsFileCache;
+ final ManifestFileCache manifestFileCache;
@Inject
public IcebergHiveMetadataFactory(
@@ -53,7 +54,8 @@ public IcebergHiveMetadataFactory(
NodeVersion nodeVersion,
FilterStatsCalculatorService filterStatsCalculatorService,
IcebergHiveTableOperationsConfig operationsConfig,
- StatisticsFileCache statisticsFileCache)
+ StatisticsFileCache statisticsFileCache,
+ ManifestFileCache manifestFileCache)
{
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
@@ -65,6 +67,7 @@ public IcebergHiveMetadataFactory(
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
this.operationsConfig = requireNonNull(operationsConfig, "operationsConfig is null");
this.statisticsFileCache = requireNonNull(statisticsFileCache, "statisticsFileCache is null");
+ this.manifestFileCache = requireNonNull(manifestFileCache, "manifestFileCache is null");
}
public ConnectorMetadata create()
@@ -79,6 +82,7 @@ public ConnectorMetadata create()
nodeVersion,
filterStatsCalculatorService,
operationsConfig,
- statisticsFileCache);
+ statisticsFileCache,
+ manifestFileCache);
}
}
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeCatalogFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeCatalogFactory.java
index 4d23a97a3ff71..9c9d1f94467c7 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeCatalogFactory.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeCatalogFactory.java
@@ -121,7 +121,7 @@ protected Map getProperties(ConnectorSession session)
{
Map properties = new HashMap<>();
if (icebergConfig.getManifestCachingEnabled()) {
- loadCachingProperties(properties, icebergConfig);
+ properties.putAll(loadCachingProperties(icebergConfig));
}
if (icebergConfig.getFileIOImpl() != null) {
properties.put(FILE_IO_IMPL, icebergConfig.getFileIOImpl());
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java
index 229b77d61f8f4..9d7dd8b6d9dea 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java
@@ -243,7 +243,7 @@ public static Table getShallowWrappedIcebergTable(Schema schema, PartitionSpec s
return new PrestoIcebergTableForMetricsConfig(schema, spec, properties, sortOrder);
}
- public static Table getHiveIcebergTable(ExtendedHiveMetastore metastore, HdfsEnvironment hdfsEnvironment, IcebergHiveTableOperationsConfig config, ConnectorSession session, SchemaTableName table)
+ public static Table getHiveIcebergTable(ExtendedHiveMetastore metastore, HdfsEnvironment hdfsEnvironment, IcebergHiveTableOperationsConfig config, ManifestFileCache manifestFileCache, ConnectorSession session, SchemaTableName table)
{
HdfsContext hdfsContext = new HdfsContext(session, table.getSchemaName(), table.getTableName());
TableOperations operations = new HiveTableOperations(
@@ -252,6 +252,7 @@ public static Table getHiveIcebergTable(ExtendedHiveMetastore metastore, HdfsEnv
hdfsEnvironment,
hdfsContext,
config,
+ manifestFileCache,
table.getSchemaName(),
table.getTableName());
return new BaseTable(operations, quotedTableName(table));
@@ -857,12 +858,14 @@ public static Map getPartitionKeys(PartitionSpec spec
return Collections.unmodifiableMap(partitionKeys);
}
- public static void loadCachingProperties(Map properties, IcebergConfig icebergConfig)
+ public static Map loadCachingProperties(IcebergConfig icebergConfig)
{
- properties.put(IO_MANIFEST_CACHE_ENABLED, "true");
- properties.put(IO_MANIFEST_CACHE_MAX_TOTAL_BYTES, String.valueOf(icebergConfig.getMaxManifestCacheSize()));
- properties.put(IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH, String.valueOf(icebergConfig.getManifestCacheMaxContentLength()));
- properties.put(IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS, String.valueOf(icebergConfig.getManifestCacheExpireDuration()));
+ return ImmutableMap.builderWithExpectedSize(4)
+ .put(IO_MANIFEST_CACHE_ENABLED, "true")
+ .put(IO_MANIFEST_CACHE_MAX_TOTAL_BYTES, String.valueOf(icebergConfig.getMaxManifestCacheSize()))
+ .put(IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH, String.valueOf(icebergConfig.getManifestCacheMaxContentLength()))
+ .put(IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS, String.valueOf(icebergConfig.getManifestCacheExpireDuration()))
+ .build();
}
public static long getDataSequenceNumber(ContentFile> file)
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ManifestFileCache.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ManifestFileCache.java
new file mode 100644
index 0000000000000..6de74f4fa84b9
--- /dev/null
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ManifestFileCache.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.iceberg;
+
+import com.facebook.airlift.stats.DistributionStat;
+import com.facebook.presto.iceberg.cache.CaffeineCacheStatsMBean;
+import com.facebook.presto.iceberg.cache.SimpleForwardingCache;
+import com.github.benmanes.caffeine.cache.Cache;
+import org.weakref.jmx.Managed;
+import org.weakref.jmx.Nested;
+
+public class ManifestFileCache
+ extends SimpleForwardingCache
+{
+ private final DistributionStat fileSizes = new DistributionStat();
+ private final long maxFileLength;
+ private final boolean enabled;
+ private final long bufferChunkSize;
+ private final CaffeineCacheStatsMBean statsMBean;
+
+ public ManifestFileCache(Cache delegate, boolean enabled, long maxFileLength, long bufferChunkSize)
+ {
+ super(delegate);
+ this.maxFileLength = maxFileLength;
+ this.enabled = enabled;
+ this.bufferChunkSize = bufferChunkSize;
+ this.statsMBean = new CaffeineCacheStatsMBean(delegate);
+ }
+
+ @Managed
+ @Nested
+ public CaffeineCacheStatsMBean getCacheStats()
+ {
+ return statsMBean;
+ }
+
+ @Managed
+ @Nested
+ public DistributionStat getFileSizeDistribution()
+ {
+ return fileSizes;
+ }
+
+ public void recordFileSize(long size)
+ {
+ fileSizes.add(size);
+ }
+
+ public long getMaxFileLength()
+ {
+ return maxFileLength;
+ }
+
+ public long getBufferChunkSize()
+ {
+ return bufferChunkSize;
+ }
+
+ public boolean isEnabled()
+ {
+ return enabled;
+ }
+}
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ManifestFileCacheKey.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ManifestFileCacheKey.java
new file mode 100644
index 0000000000000..915459a185c69
--- /dev/null
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ManifestFileCacheKey.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.iceberg;
+
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+public class ManifestFileCacheKey
+{
+ private final String path;
+
+ public ManifestFileCacheKey(String path)
+ {
+ this.path = requireNonNull(path, "path is null");
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(path);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof ManifestFileCacheKey)) {
+ return false;
+ }
+ ManifestFileCacheKey other = (ManifestFileCacheKey) obj;
+ return Objects.equals(path, other.path);
+ }
+}
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ManifestFileCachedContent.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ManifestFileCachedContent.java
new file mode 100644
index 0000000000000..56c9ea6ec0308
--- /dev/null
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ManifestFileCachedContent.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.iceberg;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+public class ManifestFileCachedContent
+{
+ private final List data;
+ private final long length;
+
+ public ManifestFileCachedContent(final List data, long length)
+ {
+ this.data = requireNonNull(data, "data is null");
+ this.length = length;
+ }
+
+ public List getData()
+ {
+ return data;
+ }
+
+ public long getLength()
+ {
+ return length;
+ }
+}
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/cache/CaffeineCacheStatsMBean.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/cache/CaffeineCacheStatsMBean.java
new file mode 100644
index 0000000000000..10a00ddcefaee
--- /dev/null
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/cache/CaffeineCacheStatsMBean.java
@@ -0,0 +1,76 @@
+package com.facebook.presto.iceberg.cache;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import org.weakref.jmx.Managed;
+
+import static java.util.Objects.requireNonNull;
+
+public class CaffeineCacheStatsMBean
+{
+ private final Cache, ?> cache;
+
+ public CaffeineCacheStatsMBean(Cache, ?> cache)
+ {
+ this.cache = requireNonNull(cache, "cache is null");
+ }
+
+ @Managed
+ public long getEstimatedSize()
+ {
+ return cache.estimatedSize();
+ }
+
+ @Managed
+ public long getHitCount()
+ {
+ return cache.stats().hitCount();
+ }
+
+ @Managed
+ public long getMissCount()
+ {
+ return cache.stats().missCount();
+ }
+
+ @Managed
+ public double getHitRate()
+ {
+ return cache.stats().hitRate();
+ }
+
+ @Managed
+ public double getMissRate()
+ {
+ return cache.stats().missRate();
+ }
+
+ @Managed
+ public double getEvictionCount()
+ {
+ return cache.stats().evictionCount();
+ }
+
+ @Managed
+ public double getEvictionWeight()
+ {
+ return cache.stats().evictionWeight();
+ }
+
+ @Managed
+ public double getLoadCount()
+ {
+ return cache.stats().loadCount();
+ }
+
+ @Managed
+ public double getRequestCount()
+ {
+ return cache.stats().requestCount();
+ }
+
+ @Managed
+ public double getAverageLoadPenalty()
+ {
+ return cache.stats().averageLoadPenalty();
+ }
+}
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/cache/SimpleForwardingCache.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/cache/SimpleForwardingCache.java
new file mode 100644
index 0000000000000..1925bf6b0a9ba
--- /dev/null
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/cache/SimpleForwardingCache.java
@@ -0,0 +1,100 @@
+package com.facebook.presto.iceberg.cache;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Policy;
+import com.github.benmanes.caffeine.cache.stats.CacheStats;
+import org.checkerframework.checker.index.qual.NonNegative;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+
+public class SimpleForwardingCache implements Cache
+{
+ private final Cache delegate;
+
+ public SimpleForwardingCache(Cache delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public @Nullable V getIfPresent(@NonNull Object key)
+ {
+ return delegate.getIfPresent(key);
+ }
+
+ @Override
+ public @Nullable V get(@NonNull K key, @NonNull Function super K, ? extends V> mappingFunction)
+ {
+ return delegate.get(key, mappingFunction);
+ }
+
+ @Override
+ public @NonNull Map<@NonNull K, @NonNull V> getAllPresent(@NonNull Iterable<@NonNull ?> keys)
+ {
+ return delegate.getAllPresent(keys);
+ }
+
+ @Override
+ public void put(@NonNull K key, @NonNull V value)
+ {
+ delegate.put(key, value);
+ }
+
+ @Override
+ public void putAll(@NonNull Map extends @NonNull K, ? extends @NonNull V> map)
+ {
+ delegate.putAll(map);
+ }
+
+ @Override
+ public void invalidate(@NonNull Object key)
+ {
+ delegate.invalidate(key);
+ }
+
+ @Override
+ public void invalidateAll(@NonNull Iterable<@NonNull ?> keys)
+ {
+ delegate.invalidateAll(keys);
+ }
+
+ @Override
+ public void invalidateAll()
+ {
+ delegate.invalidateAll();
+ }
+
+ @Override
+ public @NonNegative long estimatedSize()
+ {
+ return delegate.estimatedSize();
+ }
+
+ @Override
+ public @NonNull CacheStats stats()
+ {
+ return delegate.stats();
+ }
+
+ @Override
+ public @NonNull ConcurrentMap<@NonNull K, @NonNull V> asMap()
+ {
+ return delegate.asMap();
+ }
+
+ @Override
+ public void cleanUp()
+ {
+ delegate.cleanUp();
+ }
+
+ @Override
+ public @NonNull Policy policy()
+ {
+ return delegate.policy();
+ }
+}
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/statistics/StatisticsFileCache.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/statistics/StatisticsFileCache.java
index 28bf787a0257a..8e4bec899995a 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/statistics/StatisticsFileCache.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/statistics/StatisticsFileCache.java
@@ -14,10 +14,10 @@
package com.facebook.presto.iceberg.statistics;
import com.facebook.airlift.stats.DistributionStat;
-import com.facebook.presto.hive.CacheStatsMBean;
+import com.facebook.presto.iceberg.cache.CaffeineCacheStatsMBean;
+import com.facebook.presto.iceberg.cache.SimpleForwardingCache;
import com.facebook.presto.spi.statistics.ColumnStatistics;
-import com.google.common.cache.Cache;
-import com.google.common.cache.ForwardingCache.SimpleForwardingCache;
+import com.github.benmanes.caffeine.cache.Cache;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;
@@ -26,17 +26,17 @@ public class StatisticsFileCache
{
private final DistributionStat fileSizes = new DistributionStat();
private final DistributionStat columnCounts = new DistributionStat();
- private final CacheStatsMBean cacheStats;
+ private final CaffeineCacheStatsMBean cacheStats;
public StatisticsFileCache(Cache delegate)
{
super(delegate);
- cacheStats = new CacheStatsMBean(delegate);
+ cacheStats = new CaffeineCacheStatsMBean(delegate);
}
@Managed
@Nested
- public CacheStatsMBean getCacheStats()
+ public CaffeineCacheStatsMBean getCacheStats()
{
return cacheStats;
}
diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java
index 89a4d678808d7..397e07b6c8e11 100644
--- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java
+++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java
@@ -52,9 +52,9 @@
import com.facebook.presto.testing.MaterializedRow;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
+import com.github.benmanes.caffeine.cache.stats.CacheStats;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
-import com.google.common.cache.CacheStats;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -175,8 +175,9 @@ public abstract class IcebergDistributedTestBase
extends AbstractTestQueryFramework
{
private static final String METADATA_FILE_EXTENSION = ".metadata.json";
- private final CatalogType catalogType;
- private final Map extraConnectorProperties;
+ protected final CatalogType catalogType;
+ protected final Map extraConnectorProperties;
+ protected IcebergQueryRunner icebergQueryRunner;
protected IcebergDistributedTestBase(CatalogType catalogType, Map extraConnectorProperties)
{
@@ -193,10 +194,11 @@ protected IcebergDistributedTestBase(CatalogType catalogType)
protected QueryRunner createQueryRunner()
throws Exception
{
- return IcebergQueryRunner.builder()
+ this.icebergQueryRunner = IcebergQueryRunner.builder()
.setCatalogType(catalogType)
.setExtraConnectorProperties(extraConnectorProperties)
- .build().getQueryRunner();
+ .build();
+ return icebergQueryRunner.getQueryRunner();
}
@Test
diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java
index 588b7273d44c5..6d9f1167f9c66 100644
--- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java
+++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java
@@ -60,7 +60,7 @@ public void testDefaults()
.setPushdownFilterEnabled(false)
.setDeleteAsJoinRewriteEnabled(true)
.setRowsForMetadataOptimizationThreshold(1000)
- .setManifestCachingEnabled(false)
+ .setManifestCachingEnabled(true)
.setFileIOImpl(HadoopFileIO.class.getName())
.setMaxManifestCacheSize(IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT)
.setManifestCacheExpireDuration(IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT)
@@ -69,6 +69,7 @@ public void testDefaults()
.setMetadataPreviousVersionsMax(METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)
.setMetadataDeleteAfterCommit(METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)
.setMetricsMaxInferredColumn(METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT)
+ .setManifestCacheMaxChunkSize(succinctDataSize(2, MEGABYTE))
.setMaxStatisticsFileCacheSize(succinctDataSize(256, MEGABYTE))
.setStatisticsKllSketchKParameter(1024));
}
@@ -92,11 +93,12 @@ public void testExplicitPropertyMappings()
.put("iceberg.pushdown-filter-enabled", "true")
.put("iceberg.delete-as-join-rewrite-enabled", "false")
.put("iceberg.rows-for-metadata-optimization-threshold", "500")
- .put("iceberg.io.manifest.cache-enabled", "true")
+ .put("iceberg.io.manifest.cache-enabled", "false")
.put("iceberg.io-impl", "com.facebook.presto.iceberg.HdfsFileIO")
.put("iceberg.io.manifest.cache.max-total-bytes", "1048576000")
.put("iceberg.io.manifest.cache.expiration-interval-ms", "600000")
.put("iceberg.io.manifest.cache.max-content-length", "10485760")
+ .put("iceberg.io.manifest.cache.max-chunk-size", "1MB")
.put("iceberg.split-manager-threads", "42")
.put("iceberg.metadata-previous-versions-max", "1")
.put("iceberg.metadata-delete-after-commit", "true")
@@ -121,11 +123,12 @@ public void testExplicitPropertyMappings()
.setPushdownFilterEnabled(true)
.setDeleteAsJoinRewriteEnabled(false)
.setRowsForMetadataOptimizationThreshold(500)
- .setManifestCachingEnabled(true)
+ .setManifestCachingEnabled(false)
.setFileIOImpl("com.facebook.presto.iceberg.HdfsFileIO")
.setMaxManifestCacheSize(1048576000)
.setManifestCacheExpireDuration(600000)
.setManifestCacheMaxContentLength(10485760)
+ .setManifestCacheMaxChunkSize(succinctDataSize(1, MEGABYTE))
.setSplitManagerThreads(42)
.setMetadataPreviousVersionsMax(1)
.setMetadataDeleteAfterCommit(true)
diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java
index f0f8627813bf7..f22151a97efbf 100644
--- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java
+++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java
@@ -13,23 +13,42 @@
*/
package com.facebook.presto.iceberg.hive;
+import com.facebook.presto.Session;
+import com.facebook.presto.common.QualifiedObjectName;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.iceberg.IcebergDistributedTestBase;
+import com.facebook.presto.iceberg.IcebergHiveMetadata;
import com.facebook.presto.iceberg.IcebergHiveTableOperationsConfig;
import com.facebook.presto.iceberg.IcebergUtil;
+import com.facebook.presto.iceberg.ManifestFileCache;
import com.facebook.presto.metadata.CatalogManager;
+import com.facebook.presto.metadata.CatalogMetadata;
+import com.facebook.presto.metadata.MetadataUtil;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.TableHandle;
+import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata;
+import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.base.Joiner;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheStats;
import com.google.common.collect.ImmutableMap;
import org.apache.iceberg.Table;
import org.testng.annotations.Test;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
import static com.facebook.presto.hive.metastore.InMemoryCachingHiveMetastore.memoizeMetastore;
import static com.facebook.presto.iceberg.CatalogType.HIVE;
import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
@Test
public class TestIcebergDistributedHive
@@ -66,6 +85,94 @@ public void testStatisticsFileCache()
// so this test won't complete successfully.
}
+ @Test
+ public void testManifestFileCaching()
+ throws Exception
+ {
+ String catalogName = "iceberg_manifest_caching";
+ Map catalogProperties = new HashMap<>(this.icebergQueryRunner.getIcebergCatalogs().get("iceberg"));
+ catalogProperties.put("iceberg.io.manifest.cache-enabled", "true");
+ getQueryRunner().createCatalog(catalogName, "iceberg", catalogProperties);
+ Session session = Session.builder(getSession())
+ .setCatalog(catalogName)
+ .setSchema("default")
+ .build();
+ assertQuerySucceeds(session, "CREATE SCHEMA IF NOT EXISTS default");
+ assertQuerySucceeds(session, "CREATE TABLE test_manifest_file_cache(i int)");
+ Session txnSession = Session.builder(session)
+ .setTransactionId(getQueryRunner().getTransactionManager().beginTransaction(false))
+ .build();
+ Optional handle = MetadataUtil.getOptionalTableHandle(txnSession,
+ getQueryRunner().getTransactionManager(),
+ QualifiedObjectName.valueOf(txnSession.getCatalog().get(), txnSession.getSchema().get(), "test_manifest_file_cache"),
+ Optional.empty());
+ CatalogMetadata catalogMetadata = getQueryRunner().getTransactionManager()
+ .getCatalogMetadata(txnSession.getTransactionId().get(), handle.get().getConnectorId());
+ Field delegate = ClassLoaderSafeConnectorMetadata.class.getDeclaredField("delegate");
+ delegate.setAccessible(true);
+ IcebergHiveMetadata metadata = (IcebergHiveMetadata) delegate.get(catalogMetadata.getMetadataFor(handle.get().getConnectorId()));
+ ManifestFileCache manifestFileCache = metadata.getManifestFileCache();
+ assertUpdate(session, "INSERT INTO test_manifest_file_cache VALUES 1, 2, 3, 4, 5", 5);
+ manifestFileCache.invalidateAll();
+ assertEquals(manifestFileCache.estimatedSize(), 0);
+ com.github.benmanes.caffeine.cache.stats.CacheStats initial = manifestFileCache.stats();
+ assertQuerySucceeds(session, "SELECT count(*) from test_manifest_file_cache group by i");
+ com.github.benmanes.caffeine.cache.stats.CacheStats firstQuery = manifestFileCache.stats();
+ assertTrue(firstQuery.minus(initial).missCount() > 0);
+ assertTrue(manifestFileCache.estimatedSize() > 0);
+ assertQuerySucceeds(session, "SELECT count(*) from test_manifest_file_cache group by i");
+ com.github.benmanes.caffeine.cache.stats.CacheStats secondQuery = manifestFileCache.stats();
+ assertEquals(secondQuery.minus(firstQuery).missCount(), 0);
+ assertTrue(secondQuery.minus(firstQuery).hitCount() > 0);
+ assertTrue(manifestFileCache.estimatedSize() > 0);
+
+ //drop table
+ assertQuerySucceeds(session, "DROP TABLE test_manifest_file_cache");
+ }
+
+ @Test
+ public void testManifestFileCachingDisabled()
+ throws Exception
+ {
+ String catalogName = "iceberg_no_manifest_caching";
+ Map catalogProperties = new HashMap<>(this.icebergQueryRunner.getIcebergCatalogs().get("iceberg"));
+ catalogProperties.put("iceberg.io.manifest.cache-enabled", "false");
+ getQueryRunner().createCatalog(catalogName, "iceberg", catalogProperties);
+ Session session = Session.builder(getSession())
+ .setCatalog(catalogName)
+ .setSchema("default")
+ .build();
+ assertQuerySucceeds(session, "CREATE SCHEMA IF NOT EXISTS default");
+ assertQuerySucceeds(session, "CREATE TABLE test_manifest_file_cache_disabled(i int)");
+ assertUpdate(session, "INSERT INTO test_manifest_file_cache_disabled VALUES 1, 2, 3, 4, 5", 5);
+ Session metadataSession = Session.builder(session)
+ .setTransactionId(getQueryRunner().getTransactionManager().beginTransaction(false))
+ .build();
+ Optional handle = MetadataUtil.getOptionalTableHandle(metadataSession,
+ getQueryRunner().getTransactionManager(),
+ QualifiedObjectName.valueOf(metadataSession.getCatalog().get(), metadataSession.getSchema().get(), "test_manifest_file_cache_disabled"),
+ Optional.empty());
+ CatalogMetadata catalogMetadata = getQueryRunner().getTransactionManager()
+ .getCatalogMetadata(metadataSession.getTransactionId().get(), handle.get().getConnectorId());
+ Field delegate = ClassLoaderSafeConnectorMetadata.class.getDeclaredField("delegate");
+ delegate.setAccessible(true);
+ IcebergHiveMetadata metadata = (IcebergHiveMetadata) delegate.get(catalogMetadata.getMetadataFor(handle.get().getConnectorId()));
+ ManifestFileCache manifestFileCache = metadata.getManifestFileCache();
+ assertFalse(manifestFileCache.isEnabled());
+ com.github.benmanes.caffeine.cache.stats.CacheStats initial = manifestFileCache.stats();
+ assertQuerySucceeds(session, "SELECT count(*) from test_manifest_file_cache_disabled group by i");
+ com.github.benmanes.caffeine.cache.stats.CacheStats firstQuery = manifestFileCache.stats();
+ assertEquals(firstQuery.minus(initial).hitCount(), 0); //
+ assertEquals(manifestFileCache.estimatedSize(), 0);
+ assertQuerySucceeds(session, "SELECT count(*) from test_manifest_file_cache_disabled group by i");
+ com.github.benmanes.caffeine.cache.stats.CacheStats secondQuery = manifestFileCache.stats();
+ assertEquals(secondQuery.minus(firstQuery).hitCount(), 0);
+ assertEquals(manifestFileCache.estimatedSize(), 0);
+
+ //drop table
+ assertQuerySucceeds(session, "DROP TABLE test_manifest_file_cache_disabled");
+ }
+
@Override
protected Table loadTable(String tableName)
{
@@ -75,6 +182,7 @@ protected Table loadTable(String tableName)
return IcebergUtil.getHiveIcebergTable(getFileHiveMetastore(),
getHdfsEnvironment(),
new IcebergHiveTableOperationsConfig(),
+ new ManifestFileCache(Caffeine.newBuilder().build(), false, 0, 1024 * 1024),
getQueryRunner().getDefaultSession().toConnectorSession(connectorId),
SchemaTableName.valueOf("tpch." + tableName));
}
diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java
index a512a8679965e..094941bb10347 100644
--- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java
+++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java
@@ -36,6 +36,7 @@
import com.facebook.presto.iceberg.IcebergMetadataColumn;
import com.facebook.presto.iceberg.IcebergQueryRunner;
import com.facebook.presto.iceberg.IcebergUtil;
+import com.facebook.presto.iceberg.ManifestFileCache;
import com.facebook.presto.metadata.CatalogManager;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.ColumnHandle;
@@ -54,6 +55,8 @@
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -581,6 +584,7 @@ private Table loadTable(String tableName)
return IcebergUtil.getHiveIcebergTable(getFileHiveMetastore(),
getHdfsEnvironment(),
new IcebergHiveTableOperationsConfig(),
+ new ManifestFileCache(Caffeine.newBuilder().build(), false, 0, 1024),
getQueryRunner().getDefaultSession().toConnectorSession(connectorId),
SchemaTableName.valueOf("tpch." + tableName));
}
diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java
index 406d522ecfc20..fddfc2e68bf54 100644
--- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java
+++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java
@@ -25,9 +25,12 @@
import com.facebook.presto.iceberg.IcebergDistributedSmokeTestBase;
import com.facebook.presto.iceberg.IcebergHiveTableOperationsConfig;
import com.facebook.presto.iceberg.IcebergUtil;
+import com.facebook.presto.iceberg.ManifestFileCache;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.tests.DistributedQueryRunner;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableSet;
import org.apache.iceberg.Table;
@@ -79,6 +82,7 @@ protected Table getIcebergTable(ConnectorSession session, String schema, String
return IcebergUtil.getHiveIcebergTable(getFileHiveMetastore(),
getHdfsEnvironment(),
new IcebergHiveTableOperationsConfig(),
+ new ManifestFileCache(Caffeine.newBuilder().build(), false, 0, 1024),
session,
SchemaTableName.valueOf(schema + "." + tableName));
}
diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestRenameTableOnFragileFileSystem.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestRenameTableOnFragileFileSystem.java
index 4b04b327d82d7..3093eb08f9fb2 100644
--- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestRenameTableOnFragileFileSystem.java
+++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestRenameTableOnFragileFileSystem.java
@@ -44,6 +44,7 @@
import com.facebook.presto.iceberg.IcebergTableHandle;
import com.facebook.presto.iceberg.IcebergTableName;
import com.facebook.presto.iceberg.IcebergTableType;
+import com.facebook.presto.iceberg.ManifestFileCache;
import com.facebook.presto.iceberg.statistics.StatisticsFileCache;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.metadata.MetadataManager;
@@ -69,7 +70,7 @@
import com.facebook.presto.sql.relational.RowExpressionDomainTranslator;
import com.facebook.presto.sql.relational.RowExpressionOptimizer;
import com.facebook.presto.testing.TestingConnectorSession;
-import com.google.common.cache.CacheBuilder;
+import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -373,7 +374,8 @@ private void testRenameTableWithFailSignalAndValidation(FailSignal failSignal, R
}
}
- private void createFile(FileSystem fileSystem, Path path, byte[] content) throws IOException
+ private void createFile(FileSystem fileSystem, Path path, byte[] content)
+ throws IOException
{
FSDataOutputStream outputStream = fileSystem.create(path, true, 1024);
outputStream.write(content);
@@ -411,7 +413,8 @@ private ConnectorMetadata getIcebergHiveMetadata(ExtendedHiveMetastore metastore
new NodeVersion("test_node_v1"),
FILTER_STATS_CALCULATOR_SERVICE,
new IcebergHiveTableOperationsConfig(),
- new StatisticsFileCache(CacheBuilder.newBuilder().build()));
+ new StatisticsFileCache(Caffeine.newBuilder().build()),
+ new ManifestFileCache(Caffeine.newBuilder().build(), false, 0, 1024));
return icebergHiveMetadataFactory.create();
}
@@ -490,25 +493,29 @@ public URI getUri()
}
@Override
- public FSDataInputStream open(Path f, int bufferSize) throws IOException
+ public FSDataInputStream open(Path f, int bufferSize)
+ throws IOException
{
return delegate.open(f, bufferSize);
}
@Override
- public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException
+ public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)
+ throws IOException
{
return delegate.create(f, permission, overwrite, bufferSize, replication, blockSize, progress);
}
@Override
- public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException
+ public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
+ throws IOException
{
return delegate.append(f, bufferSize, progress);
}
@Override
- public boolean rename(Path src, Path dst) throws IOException
+ public boolean rename(Path src, Path dst)
+ throws IOException
{
if (failSignal.get() == FailSignal.RENAME) {
return false;
@@ -517,7 +524,8 @@ public boolean rename(Path src, Path dst) throws IOException
}
@Override
- public boolean delete(Path f, boolean recursive) throws IOException
+ public boolean delete(Path f, boolean recursive)
+ throws IOException
{
if (failSignal.get() == FailSignal.DELETE) {
return false;
@@ -526,7 +534,8 @@ public boolean delete(Path f, boolean recursive) throws IOException
}
@Override
- public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException
+ public FileStatus[] listStatus(Path f)
+ throws FileNotFoundException, IOException
{
return delegate.listStatus(f);
}
@@ -544,7 +553,8 @@ public Path getWorkingDirectory()
}
@Override
- public boolean mkdirs(Path f, FsPermission permission) throws IOException
+ public boolean mkdirs(Path f, FsPermission permission)
+ throws IOException
{
if (this.failSignal.get() == FailSignal.MKDIRS) {
return false;
@@ -553,7 +563,8 @@ public boolean mkdirs(Path f, FsPermission permission) throws IOException
}
@Override
- public FileStatus getFileStatus(Path f) throws IOException
+ public FileStatus getFileStatus(Path f)
+ throws IOException
{
return delegate.getFileStatus(f);
}
diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestRemoveOrphanFilesProcedureHive.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestRemoveOrphanFilesProcedureHive.java
index d1f20a56494b8..53d311e2d64fa 100644
--- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestRemoveOrphanFilesProcedureHive.java
+++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestRemoveOrphanFilesProcedureHive.java
@@ -20,12 +20,15 @@
import com.facebook.presto.iceberg.HiveTableOperations;
import com.facebook.presto.iceberg.IcebergHiveTableOperationsConfig;
import com.facebook.presto.iceberg.IcebergUtil;
+import com.facebook.presto.iceberg.ManifestFileCache;
import com.facebook.presto.iceberg.hive.IcebergFileHiveMetastore;
import com.facebook.presto.metadata.CatalogManager;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.SchemaTableName;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.iceberg.PartitionSpec;
@@ -69,6 +72,7 @@ Table createTable(String tableName, String targetPath, Map table
getHdfsEnvironment(),
hdfsContext,
new IcebergHiveTableOperationsConfig(),
+ new ManifestFileCache(Caffeine.newBuilder().build(), false, 0, 1024),
"tpch",
tableName,
session.getUser(),
@@ -91,6 +95,7 @@ Table loadTable(String tableName)
return IcebergUtil.getHiveIcebergTable(getFileHiveMetastore(),
getHdfsEnvironment(),
new IcebergHiveTableOperationsConfig(),
+ new ManifestFileCache(Caffeine.newBuilder().build(), false, 0, 1024),
getQueryRunner().getDefaultSession().toConnectorSession(connectorId),
SchemaTableName.valueOf("tpch." + tableName));
}