diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index 57c5849c11aa1..7cb7abc426b7c 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -515,7 +515,8 @@ Property Name Description ``iceberg.io.manifest.cache.max-total-bytes`` Maximum size of cache size in bytes. ``104857600`` ``iceberg.io.manifest.cache.expiration-interval-ms`` Maximum time duration in milliseconds for which an entry ``60000`` - stays in the manifest cache. + stays in the manifest cache. Set to 0 to disable entry + expiration. ``iceberg.io.manifest.cache.max-content-length`` Maximum length of a manifest file to be considered for ``8388608`` caching in bytes. Manifest files with a length exceeding diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HdfsCachedInputFile.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HdfsCachedInputFile.java new file mode 100644 index 0000000000000..b292213b8f08e --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HdfsCachedInputFile.java @@ -0,0 +1,119 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.airlift.log.Logger; +import org.apache.iceberg.io.ByteBufferInputStream; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SeekableInputStream; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.io.IOUtil.readRemaining; + +public class HdfsCachedInputFile + implements InputFile +{ + private static final Logger LOG = Logger.get(HdfsCachedInputFile.class); + + private final InputFile delegate; + private final ManifestFileCacheKey cacheKey; + private final ManifestFileCache cache; + + public HdfsCachedInputFile(InputFile delegate, ManifestFileCacheKey cacheKey, ManifestFileCache cache) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.cacheKey = requireNonNull(cacheKey, "cacheKey is null"); + this.cache = requireNonNull(cache, "cache is null"); + } + + @Override + public long getLength() + { + ManifestFileCachedContent cachedContent = cache.getIfPresent(cacheKey); + if (cachedContent != null) { + return cachedContent.getLength(); + } + return delegate.getLength(); + } + + @Override + public SeekableInputStream newStream() + { + ManifestFileCachedContent cachedContent = cache.getIfPresent(cacheKey); + if (cachedContent != null) { + return ByteBufferInputStream.wrap(cachedContent.getData()); + } + + long fileLength = delegate.getLength(); + if (cache.isEnabled() && fileLength <= cache.getMaxFileLength()) { + try { + ManifestFileCachedContent content = readFully(delegate, fileLength, cache.getBufferChunkSize()); + cache.put(cacheKey, content); + cache.recordFileSize(content.getLength()); + return ByteBufferInputStream.wrap(content.getData()); + } + catch (IOException e) { + LOG.warn("Failed to cache input file: {}. Falling back to direct read.", delegate.location(), e); + } + } + + return delegate.newStream(); + } + + @Override + public String location() + { + return delegate.location(); + } + + @Override + public boolean exists() + { + return cache.getIfPresent(cacheKey) != null || delegate.exists(); + } + + private static ManifestFileCachedContent readFully(InputFile input, long fileLength, long chunkSize) + throws IOException + { + try (SeekableInputStream stream = input.newStream()) { + long totalBytesToRead = fileLength; + List buffers = new ArrayList<>( + ((int) (fileLength / chunkSize)) + + (fileLength % chunkSize == 0 ? 0 : 1)); + + while (totalBytesToRead > 0) { + int bytesToRead = (int) Math.min(chunkSize, totalBytesToRead); + byte[] buf = new byte[bytesToRead]; + int bytesRead = readRemaining(stream, buf, 0, bytesToRead); + totalBytesToRead -= bytesRead; + + if (bytesRead < bytesToRead) { + throw new IOException( + format("Failed to read %d bytes from file %s : %d bytes read.", + fileLength, input.location(), fileLength - totalBytesToRead)); + } + else { + buffers.add(ByteBuffer.wrap(buf)); + } + } + return new ManifestFileCachedContent(buffers, fileLength); + } + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HdfsFileIO.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HdfsFileIO.java index 3eaebcf7b0a36..45e4c64d16859 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HdfsFileIO.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HdfsFileIO.java @@ -17,13 +17,16 @@ import com.facebook.presto.hive.HdfsEnvironment; import com.facebook.presto.spi.PrestoException; import org.apache.hadoop.fs.Path; +import org.apache.iceberg.ManifestFile; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import java.io.IOException; +import java.util.Optional; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR; +import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; public class HdfsFileIO @@ -31,11 +34,13 @@ public class HdfsFileIO { private final HdfsEnvironment environment; private final HdfsContext context; + private final ManifestFileCache manifestFileCache; - public HdfsFileIO(HdfsEnvironment environment, HdfsContext context) + public HdfsFileIO(ManifestFileCache manifestFileCache, HdfsEnvironment environment, HdfsContext context) { this.environment = requireNonNull(environment, "environment is null"); this.context = requireNonNull(context, "context is null"); + this.manifestFileCache = requireNonNull(manifestFileCache, "manifestFileCache is null"); } @Override @@ -44,6 +49,25 @@ public InputFile newInputFile(String path) return new HdfsInputFile(new Path(path), environment, context); } + @Override + public InputFile newInputFile(String path, long length) + { + return new HdfsInputFile(new Path(path), environment, context, Optional.of(length)); + } + + @Override + public InputFile newInputFile(ManifestFile manifest) + { + checkArgument( + manifest.keyMetadata() == null, + "Cannot decrypt manifest: %s (use EncryptingFileIO)", + manifest.path()); + InputFile inputFile = new HdfsInputFile(new Path(manifest.path()), environment, context, Optional.of(manifest.length())); + return manifestFileCache.isEnabled() ? + new HdfsCachedInputFile(inputFile, new ManifestFileCacheKey(manifest.path()), manifestFileCache) : + inputFile; + } + @Override public OutputFile newOutputFile(String path) { @@ -61,4 +85,12 @@ public void deleteFile(String pathString) throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete file: " + path, e); } } + + protected InputFile newCachedInputFile(String path) + { + InputFile inputFile = new HdfsInputFile(new Path(path), environment, context); + return manifestFileCache.isEnabled() ? + new HdfsCachedInputFile(inputFile, new ManifestFileCacheKey(path), manifestFileCache) : + inputFile; + } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HdfsInputFile.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HdfsInputFile.java index 0529073e62e87..a07e2b17ad965 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HdfsInputFile.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HdfsInputFile.java @@ -22,6 +22,8 @@ import org.apache.iceberg.io.SeekableInputStream; import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR; import static java.util.Objects.requireNonNull; @@ -32,14 +34,21 @@ public class HdfsInputFile private final InputFile delegate; private final HdfsEnvironment environment; private final String user; + private final AtomicLong length; - public HdfsInputFile(Path path, HdfsEnvironment environment, HdfsContext context) + public HdfsInputFile(Path path, HdfsEnvironment environment, HdfsContext context, Optional length) { requireNonNull(path, "path is null"); this.environment = requireNonNull(environment, "environment is null"); + this.length = new AtomicLong(length.orElse(-1L)); requireNonNull(context, "context is null"); try { - this.delegate = HadoopInputFile.fromPath(path, environment.getFileSystem(context, path), environment.getConfiguration(context, path)); + if (this.length.get() < 0) { + this.delegate = HadoopInputFile.fromPath(path, environment.getFileSystem(context, path), environment.getConfiguration(context, path)); + } + else { + this.delegate = HadoopInputFile.fromPath(path, this.length.get(), environment.getFileSystem(context, path), environment.getConfiguration(context, path)); + } } catch (IOException e) { throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "Failed to create input file: " + path, e); @@ -47,10 +56,20 @@ public HdfsInputFile(Path path, HdfsEnvironment environment, HdfsContext context this.user = context.getIdentity().getUser(); } + public HdfsInputFile(Path path, HdfsEnvironment environment, HdfsContext context) + { + this(path, environment, context, Optional.empty()); + } + @Override public long getLength() { - return environment.doAs(user, delegate::getLength); + return length.updateAndGet(value -> { + if (value < 0) { + return environment.doAs(user, delegate::getLength); + } + return value; + }); } @Override diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HiveTableOperations.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HiveTableOperations.java index f7a153f0a1066..688a57cff011e 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HiveTableOperations.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HiveTableOperations.java @@ -105,7 +105,7 @@ public class HiveTableOperations private final String tableName; private final Optional owner; private final Optional location; - private final FileIO fileIO; + private final HdfsFileIO fileIO; private final IcebergHiveTableOperationsConfig config; private TableMetadata currentMetadata; @@ -121,10 +121,11 @@ public HiveTableOperations( HdfsEnvironment hdfsEnvironment, HdfsContext hdfsContext, IcebergHiveTableOperationsConfig config, + ManifestFileCache manifestFileCache, String database, String table) { - this(new HdfsFileIO(hdfsEnvironment, hdfsContext), + this(new HdfsFileIO(manifestFileCache, hdfsEnvironment, hdfsContext), metastore, metastoreContext, config, @@ -140,12 +141,13 @@ public HiveTableOperations( HdfsEnvironment hdfsEnvironment, HdfsContext hdfsContext, IcebergHiveTableOperationsConfig config, + ManifestFileCache manifestFileCache, String database, String table, String owner, String location) { - this(new HdfsFileIO(hdfsEnvironment, hdfsContext), + this(new HdfsFileIO(manifestFileCache, hdfsEnvironment, hdfsContext), metastore, metastoreContext, config, @@ -156,7 +158,7 @@ public HiveTableOperations( } private HiveTableOperations( - FileIO fileIO, + HdfsFileIO fileIO, ExtendedHiveMetastore metastore, MetastoreContext metastoreContext, IcebergHiveTableOperationsConfig config, @@ -409,7 +411,7 @@ private void refreshFromMetadataLocation(String newLocation) config.getTableRefreshMaxRetryTime().toMillis(), config.getTableRefreshBackoffScaleFactor()) .run(metadataLocation -> newMetadata.set( - TableMetadataParser.read(fileIO, io().newInputFile(metadataLocation)))); + TableMetadataParser.read(fileIO, fileIO.newCachedInputFile(metadataLocation)))); } catch (RuntimeException e) { throw new TableNotFoundException(getSchemaTableName(), "Table metadata is missing", e); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java index 9f65aebba0e3a..59c19af45b08b 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java @@ -93,6 +93,8 @@ import javax.inject.Singleton; +import java.nio.ByteBuffer; +import java.time.Duration; import java.util.Optional; import java.util.concurrent.ExecutorService; @@ -213,6 +215,26 @@ public StatisticsFileCache createStatisticsFileCache(IcebergConfig config, MBean return statisticsFileCache; } + @Singleton + @Provides + public ManifestFileCache createManifestFileCache(IcebergConfig config, MBeanExporter exporter) + { + CacheBuilder delegate = CacheBuilder.newBuilder() + .maximumWeight(config.getMaxManifestCacheSize()) + .weigher((key, entry) -> (int) entry.getData().stream().mapToLong(ByteBuffer::capacity).sum()) + .recordStats(); + if (config.getManifestCacheExpireDuration() > 0) { + delegate.expireAfterWrite(Duration.ofMillis(config.getManifestCacheExpireDuration())); + } + ManifestFileCache manifestFileCache = new ManifestFileCache( + delegate.build(), + config.getManifestCachingEnabled(), + config.getManifestCacheMaxContentLength(), + config.getManifestCacheMaxChunkSize().toBytes()); + exporter.export(generatedNameOf(ManifestFileCache.class, connectorId), manifestFileCache); + return manifestFileCache; + } + @ForCachingHiveMetastore @Singleton @Provides diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java index d1bc37ff0e0be..a90c7c888e6bf 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java @@ -68,10 +68,11 @@ public class IcebergConfig private EnumSet hiveStatisticsMergeFlags = EnumSet.noneOf(ColumnStatisticType.class); private String fileIOImpl = HadoopFileIO.class.getName(); - private boolean manifestCachingEnabled; + private boolean manifestCachingEnabled = true; private long maxManifestCacheSize = IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT; private long manifestCacheExpireDuration = IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT; private long manifestCacheMaxContentLength = IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT; + private DataSize manifestCacheMaxChunkSize = succinctDataSize(2, MEGABYTE); private int splitManagerThreads = Runtime.getRuntime().availableProcessors(); private DataSize maxStatisticsFileCacheSize = succinctDataSize(256, MEGABYTE); @@ -362,6 +363,20 @@ public IcebergConfig setManifestCacheMaxContentLength(long manifestCacheMaxConte return this; } + public DataSize getManifestCacheMaxChunkSize() + { + return manifestCacheMaxChunkSize; + } + + @Min(1024) + @Config("iceberg.io.manifest.cache.max-chunk-size") + @ConfigDescription("Maximum length of a buffer used to cache manifest file content. Only applicable to HIVE catalog.") + public IcebergConfig setManifestCacheMaxChunkSize(DataSize manifestCacheMaxChunkSize) + { + this.manifestCacheMaxChunkSize = manifestCacheMaxChunkSize; + return this; + } + @Min(0) public int getSplitManagerThreads() { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java index e7b9f3858fc3f..e8c4829c5cdca 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java @@ -59,6 +59,7 @@ import com.facebook.presto.spi.statistics.TableStatisticType; import com.facebook.presto.spi.statistics.TableStatistics; import com.facebook.presto.spi.statistics.TableStatisticsMetadata; +import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableList; @@ -161,6 +162,7 @@ public class IcebergHiveMetadata private final DateTimeZone timeZone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(ZoneId.of(TimeZone.getDefault().getID()))); private final IcebergHiveTableOperationsConfig hiveTableOeprationsConfig; private final Cache> tableCache; + private final ManifestFileCache manifestFileCache; public IcebergHiveMetadata( ExtendedHiveMetastore metastore, @@ -172,13 +174,15 @@ public IcebergHiveMetadata( NodeVersion nodeVersion, FilterStatsCalculatorService filterStatsCalculatorService, IcebergHiveTableOperationsConfig hiveTableOeprationsConfig, - StatisticsFileCache statisticsFileCache) + StatisticsFileCache statisticsFileCache, + ManifestFileCache manifestFileCache) { super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache); this.metastore = requireNonNull(metastore, "metastore is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.hiveTableOeprationsConfig = requireNonNull(hiveTableOeprationsConfig, "hiveTableOperationsConfig is null"); this.tableCache = CacheBuilder.newBuilder().maximumSize(MAXIMUM_PER_QUERY_TABLE_CACHE_SIZE).build(); + this.manifestFileCache = requireNonNull(manifestFileCache, "manifestFileCache is null"); } public ExtendedHiveMetastore getMetastore() @@ -186,10 +190,16 @@ public ExtendedHiveMetastore getMetastore() return metastore; } + @VisibleForTesting + public ManifestFileCache getManifestFileCache() + { + return manifestFileCache; + } + @Override protected org.apache.iceberg.Table getRawIcebergTable(ConnectorSession session, SchemaTableName schemaTableName) { - return getHiveIcebergTable(metastore, hdfsEnvironment, hiveTableOeprationsConfig, session, schemaTableName); + return getHiveIcebergTable(metastore, hdfsEnvironment, hiveTableOeprationsConfig, manifestFileCache, session, schemaTableName); } @Override @@ -333,6 +343,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con hdfsEnvironment, hdfsContext, hiveTableOeprationsConfig, + manifestFileCache, schemaName, tableName, session.getUser(), @@ -613,7 +624,7 @@ public void registerTable(ConnectorSession clientSession, SchemaTableName schema InputFile inputFile = new HdfsInputFile(metadataLocation, hdfsEnvironment, hdfsContext); TableMetadata tableMetadata; try { - tableMetadata = TableMetadataParser.read(new HdfsFileIO(hdfsEnvironment, hdfsContext), inputFile); + tableMetadata = TableMetadataParser.read(new HdfsFileIO(manifestFileCache, hdfsEnvironment, hdfsContext), inputFile); } catch (Exception e) { throw new PrestoException(ICEBERG_INVALID_METADATA, String.format("Unable to read metadata file %s", metadataLocation), e); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java index 5bc8c9e4a8b0c..c9ae6847824e0 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java @@ -41,6 +41,7 @@ public class IcebergHiveMetadataFactory final FilterStatsCalculatorService filterStatsCalculatorService; final IcebergHiveTableOperationsConfig operationsConfig; final StatisticsFileCache statisticsFileCache; + final ManifestFileCache manifestFileCache; @Inject public IcebergHiveMetadataFactory( @@ -53,7 +54,8 @@ public IcebergHiveMetadataFactory( NodeVersion nodeVersion, FilterStatsCalculatorService filterStatsCalculatorService, IcebergHiveTableOperationsConfig operationsConfig, - StatisticsFileCache statisticsFileCache) + StatisticsFileCache statisticsFileCache, + ManifestFileCache manifestFileCache) { this.metastore = requireNonNull(metastore, "metastore is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); @@ -65,6 +67,7 @@ public IcebergHiveMetadataFactory( this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null"); this.operationsConfig = requireNonNull(operationsConfig, "operationsConfig is null"); this.statisticsFileCache = requireNonNull(statisticsFileCache, "statisticsFileCache is null"); + this.manifestFileCache = requireNonNull(manifestFileCache, "manifestFileCache is null"); } public ConnectorMetadata create() @@ -79,6 +82,7 @@ public ConnectorMetadata create() nodeVersion, filterStatsCalculatorService, operationsConfig, - statisticsFileCache); + statisticsFileCache, + manifestFileCache); } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeCatalogFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeCatalogFactory.java index 5a24e94f1d8ea..02f04f64f137b 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeCatalogFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeCatalogFactory.java @@ -128,7 +128,7 @@ protected Map getProperties(ConnectorSession session) { Map properties = new HashMap<>(); if (icebergConfig.getManifestCachingEnabled()) { - loadCachingProperties(properties, icebergConfig); + properties.putAll(loadCachingProperties(icebergConfig)); } if (icebergConfig.getFileIOImpl() != null) { properties.put(FILE_IO_IMPL, icebergConfig.getFileIOImpl()); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java index 622aa3ff6a3a4..168e9ebfd4a93 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java @@ -238,7 +238,7 @@ public static Table getShallowWrappedIcebergTable(Schema schema, PartitionSpec s return new PrestoIcebergTableForMetricsConfig(schema, spec, properties, sortOrder); } - public static Table getHiveIcebergTable(ExtendedHiveMetastore metastore, HdfsEnvironment hdfsEnvironment, IcebergHiveTableOperationsConfig config, ConnectorSession session, SchemaTableName table) + public static Table getHiveIcebergTable(ExtendedHiveMetastore metastore, HdfsEnvironment hdfsEnvironment, IcebergHiveTableOperationsConfig config, ManifestFileCache manifestFileCache, ConnectorSession session, SchemaTableName table) { HdfsContext hdfsContext = new HdfsContext(session, table.getSchemaName(), table.getTableName()); TableOperations operations = new HiveTableOperations( @@ -247,6 +247,7 @@ public static Table getHiveIcebergTable(ExtendedHiveMetastore metastore, HdfsEnv hdfsEnvironment, hdfsContext, config, + manifestFileCache, table.getSchemaName(), table.getTableName()); return new BaseTable(operations, quotedTableName(table)); @@ -852,12 +853,14 @@ public static Map getPartitionKeys(PartitionSpec spec return Collections.unmodifiableMap(partitionKeys); } - public static void loadCachingProperties(Map properties, IcebergConfig icebergConfig) + public static Map loadCachingProperties(IcebergConfig icebergConfig) { - properties.put(IO_MANIFEST_CACHE_ENABLED, "true"); - properties.put(IO_MANIFEST_CACHE_MAX_TOTAL_BYTES, String.valueOf(icebergConfig.getMaxManifestCacheSize())); - properties.put(IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH, String.valueOf(icebergConfig.getManifestCacheMaxContentLength())); - properties.put(IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS, String.valueOf(icebergConfig.getManifestCacheExpireDuration())); + return ImmutableMap.builderWithExpectedSize(4) + .put(IO_MANIFEST_CACHE_ENABLED, "true") + .put(IO_MANIFEST_CACHE_MAX_TOTAL_BYTES, String.valueOf(icebergConfig.getMaxManifestCacheSize())) + .put(IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH, String.valueOf(icebergConfig.getManifestCacheMaxContentLength())) + .put(IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS, String.valueOf(icebergConfig.getManifestCacheExpireDuration())) + .build(); } public static long getDataSequenceNumber(ContentFile file) diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ManifestFileCache.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ManifestFileCache.java new file mode 100644 index 0000000000000..130505b51ab38 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ManifestFileCache.java @@ -0,0 +1,74 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.airlift.stats.DistributionStat; +import com.facebook.presto.hive.CacheStatsMBean; +import com.google.common.cache.Cache; +import com.google.common.cache.ForwardingCache; +import org.weakref.jmx.Managed; +import org.weakref.jmx.Nested; + +public class ManifestFileCache + extends ForwardingCache.SimpleForwardingCache +{ + private final DistributionStat fileSizes = new DistributionStat(); + private final long maxFileLength; + private final boolean enabled; + private final long bufferChunkSize; + private final CacheStatsMBean statsMBean; + + public ManifestFileCache(Cache delegate, boolean enabled, long maxFileLength, long bufferChunkSize) + { + super(delegate); + this.maxFileLength = maxFileLength; + this.enabled = enabled; + this.bufferChunkSize = bufferChunkSize; + this.statsMBean = new CacheStatsMBean(delegate); + } + + @Managed + @Nested + public CacheStatsMBean getCacheStats() + { + return statsMBean; + } + + @Managed + @Nested + public DistributionStat getFileSizeDistribution() + { + return fileSizes; + } + + public void recordFileSize(long size) + { + fileSizes.add(size); + } + + public long getMaxFileLength() + { + return maxFileLength; + } + + public long getBufferChunkSize() + { + return bufferChunkSize; + } + + public boolean isEnabled() + { + return enabled; + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ManifestFileCacheKey.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ManifestFileCacheKey.java new file mode 100644 index 0000000000000..915459a185c69 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ManifestFileCacheKey.java @@ -0,0 +1,50 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +public class ManifestFileCacheKey +{ + private final String path; + + public ManifestFileCacheKey(String path) + { + this.path = requireNonNull(path, "path is null"); + } + + @Override + public int hashCode() + { + return Objects.hash(path); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof ManifestFileCacheKey)) { + return false; + } + ManifestFileCacheKey other = (ManifestFileCacheKey) obj; + return Objects.equals(path, other.path); + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ManifestFileCachedContent.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ManifestFileCachedContent.java new file mode 100644 index 0000000000000..56c9ea6ec0308 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ManifestFileCachedContent.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import java.nio.ByteBuffer; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +public class ManifestFileCachedContent +{ + private final List data; + private final long length; + + public ManifestFileCachedContent(final List data, long length) + { + this.data = requireNonNull(data, "data is null"); + this.length = length; + } + + public List getData() + { + return data; + } + + public long getLength() + { + return length; + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index 9de94a10f6889..8a947912c969c 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -178,8 +178,9 @@ public abstract class IcebergDistributedTestBase extends AbstractTestQueryFramework { private static final String METADATA_FILE_EXTENSION = ".metadata.json"; - private final CatalogType catalogType; - private final Map extraConnectorProperties; + protected final CatalogType catalogType; + protected final Map extraConnectorProperties; + protected IcebergQueryRunner icebergQueryRunner; protected IcebergDistributedTestBase(CatalogType catalogType, Map extraConnectorProperties) { @@ -196,10 +197,11 @@ protected IcebergDistributedTestBase(CatalogType catalogType) protected QueryRunner createQueryRunner() throws Exception { - return IcebergQueryRunner.builder() + this.icebergQueryRunner = IcebergQueryRunner.builder() .setCatalogType(catalogType) .setExtraConnectorProperties(extraConnectorProperties) - .build().getQueryRunner(); + .build(); + return icebergQueryRunner.getQueryRunner(); } @Test diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java index bc66bae273792..43f06897ad3c6 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java @@ -61,7 +61,7 @@ public void testDefaults() .setPushdownFilterEnabled(false) .setDeleteAsJoinRewriteEnabled(true) .setRowsForMetadataOptimizationThreshold(1000) - .setManifestCachingEnabled(false) + .setManifestCachingEnabled(true) .setFileIOImpl(HadoopFileIO.class.getName()) .setMaxManifestCacheSize(IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT) .setManifestCacheExpireDuration(IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT) @@ -70,6 +70,7 @@ public void testDefaults() .setMetadataPreviousVersionsMax(METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT) .setMetadataDeleteAfterCommit(METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT) .setMetricsMaxInferredColumn(METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT) + .setManifestCacheMaxChunkSize(succinctDataSize(2, MEGABYTE)) .setMaxStatisticsFileCacheSize(succinctDataSize(256, MEGABYTE)) .setStatisticsKllSketchKParameter(1024)); } @@ -94,11 +95,12 @@ public void testExplicitPropertyMappings() .put("iceberg.pushdown-filter-enabled", "true") .put("iceberg.delete-as-join-rewrite-enabled", "false") .put("iceberg.rows-for-metadata-optimization-threshold", "500") - .put("iceberg.io.manifest.cache-enabled", "true") + .put("iceberg.io.manifest.cache-enabled", "false") .put("iceberg.io-impl", "com.facebook.presto.iceberg.HdfsFileIO") .put("iceberg.io.manifest.cache.max-total-bytes", "1048576000") .put("iceberg.io.manifest.cache.expiration-interval-ms", "600000") .put("iceberg.io.manifest.cache.max-content-length", "10485760") + .put("iceberg.io.manifest.cache.max-chunk-size", "1MB") .put("iceberg.split-manager-threads", "42") .put("iceberg.metadata-previous-versions-max", "1") .put("iceberg.metadata-delete-after-commit", "true") @@ -124,11 +126,12 @@ public void testExplicitPropertyMappings() .setPushdownFilterEnabled(true) .setDeleteAsJoinRewriteEnabled(false) .setRowsForMetadataOptimizationThreshold(500) - .setManifestCachingEnabled(true) + .setManifestCachingEnabled(false) .setFileIOImpl("com.facebook.presto.iceberg.HdfsFileIO") .setMaxManifestCacheSize(1048576000) .setManifestCacheExpireDuration(600000) .setManifestCacheMaxContentLength(10485760) + .setManifestCacheMaxChunkSize(succinctDataSize(1, MEGABYTE)) .setSplitManagerThreads(42) .setMetadataPreviousVersionsMax(1) .setMetadataDeleteAfterCommit(true) diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java index 68fba51a4526a..32b4893126f3a 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java @@ -13,23 +13,41 @@ */ package com.facebook.presto.iceberg.hive; +import com.facebook.presto.Session; +import com.facebook.presto.common.QualifiedObjectName; import com.facebook.presto.hive.metastore.ExtendedHiveMetastore; import com.facebook.presto.iceberg.IcebergDistributedTestBase; +import com.facebook.presto.iceberg.IcebergHiveMetadata; import com.facebook.presto.iceberg.IcebergHiveTableOperationsConfig; import com.facebook.presto.iceberg.IcebergUtil; +import com.facebook.presto.iceberg.ManifestFileCache; import com.facebook.presto.metadata.CatalogManager; +import com.facebook.presto.metadata.CatalogMetadata; +import com.facebook.presto.metadata.MetadataUtil; import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.TableHandle; +import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata; import com.google.common.base.Joiner; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheStats; import com.google.common.collect.ImmutableMap; import org.apache.iceberg.Table; import org.testng.annotations.Test; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + import static com.facebook.presto.hive.metastore.InMemoryCachingHiveMetastore.memoizeMetastore; import static com.facebook.presto.iceberg.CatalogType.HIVE; import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES; import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; @Test public class TestIcebergDistributedHive @@ -66,6 +84,94 @@ public void testStatisticsFileCache() // so this test won't complete successfully. } + @Test + public void testManifestFileCaching() + throws Exception + { + String catalogName = "iceberg_manifest_caching"; + Map catalogProperties = new HashMap<>(this.icebergQueryRunner.getIcebergCatalogs().get("iceberg")); + catalogProperties.put("iceberg.io.manifest.cache-enabled", "true"); + getQueryRunner().createCatalog(catalogName, "iceberg", catalogProperties); + Session session = Session.builder(getSession()) + .setCatalog(catalogName) + .setSchema("default") + .build(); + assertQuerySucceeds(session, "CREATE SCHEMA IF NOT EXISTS default"); + assertQuerySucceeds(session, "CREATE TABLE test_manifest_file_cache(i int)"); + Session txnSession = Session.builder(session) + .setTransactionId(getQueryRunner().getTransactionManager().beginTransaction(false)) + .build(); + Optional handle = MetadataUtil.getOptionalTableHandle(txnSession, + getQueryRunner().getTransactionManager(), + QualifiedObjectName.valueOf(txnSession.getCatalog().get(), txnSession.getSchema().get(), "test_manifest_file_cache"), + Optional.empty()); + CatalogMetadata catalogMetadata = getQueryRunner().getTransactionManager() + .getCatalogMetadata(txnSession.getTransactionId().get(), handle.get().getConnectorId()); + Field delegate = ClassLoaderSafeConnectorMetadata.class.getDeclaredField("delegate"); + delegate.setAccessible(true); + IcebergHiveMetadata metadata = (IcebergHiveMetadata) delegate.get(catalogMetadata.getMetadataFor(handle.get().getConnectorId())); + ManifestFileCache manifestFileCache = metadata.getManifestFileCache(); + assertUpdate(session, "INSERT INTO test_manifest_file_cache VALUES 1, 2, 3, 4, 5", 5); + manifestFileCache.invalidateAll(); + assertEquals(manifestFileCache.size(), 0); + CacheStats initial = manifestFileCache.stats(); + assertQuerySucceeds(session, "SELECT count(*) from test_manifest_file_cache group by i"); + CacheStats firstQuery = manifestFileCache.stats(); + assertTrue(firstQuery.minus(initial).missCount() > 0); + assertTrue(manifestFileCache.size() > 0); + assertQuerySucceeds(session, "SELECT count(*) from test_manifest_file_cache group by i"); + CacheStats secondQuery = manifestFileCache.stats(); + assertEquals(secondQuery.minus(firstQuery).missCount(), 0); + assertTrue(secondQuery.minus(firstQuery).hitCount() > 0); + assertTrue(manifestFileCache.size() > 0); + + assertQuerySucceeds(session, "DROP TABLE test_manifest_file_cache"); + assertQuerySucceeds(session, "DROP SCHEMA default"); + } + + @Test + public void testManifestFileCachingDisabled() + throws Exception + { + String catalogName = "iceberg_no_manifest_caching"; + Map catalogProperties = new HashMap<>(this.icebergQueryRunner.getIcebergCatalogs().get("iceberg")); + catalogProperties.put("iceberg.io.manifest.cache-enabled", "false"); + getQueryRunner().createCatalog(catalogName, "iceberg", catalogProperties); + Session session = Session.builder(getSession()) + .setCatalog(catalogName) + .setSchema("default") + .build(); + assertQuerySucceeds(session, "CREATE SCHEMA IF NOT EXISTS default"); + assertQuerySucceeds(session, "CREATE TABLE test_manifest_file_cache_disabled(i int)"); + assertUpdate(session, "INSERT INTO test_manifest_file_cache_disabled VALUES 1, 2, 3, 4, 5", 5); + Session metadataSession = Session.builder(session) + .setTransactionId(getQueryRunner().getTransactionManager().beginTransaction(false)) + .build(); + Optional handle = MetadataUtil.getOptionalTableHandle(metadataSession, + getQueryRunner().getTransactionManager(), + QualifiedObjectName.valueOf(metadataSession.getCatalog().get(), metadataSession.getSchema().get(), "test_manifest_file_cache_disabled"), + Optional.empty()); + CatalogMetadata catalogMetadata = getQueryRunner().getTransactionManager() + .getCatalogMetadata(metadataSession.getTransactionId().get(), handle.get().getConnectorId()); + Field delegate = ClassLoaderSafeConnectorMetadata.class.getDeclaredField("delegate"); + delegate.setAccessible(true); + IcebergHiveMetadata metadata = (IcebergHiveMetadata) delegate.get(catalogMetadata.getMetadataFor(handle.get().getConnectorId())); + ManifestFileCache manifestFileCache = metadata.getManifestFileCache(); + assertFalse(manifestFileCache.isEnabled()); + CacheStats initial = manifestFileCache.stats(); + assertQuerySucceeds(session, "SELECT count(*) from test_manifest_file_cache_disabled group by i"); + CacheStats firstQuery = manifestFileCache.stats(); + assertEquals(firstQuery.minus(initial).hitCount(), 0); + assertEquals(manifestFileCache.size(), 0); + assertQuerySucceeds(session, "SELECT count(*) from test_manifest_file_cache_disabled group by i"); + CacheStats secondQuery = manifestFileCache.stats(); + assertEquals(secondQuery.minus(firstQuery).hitCount(), 0); + assertEquals(manifestFileCache.size(), 0); + + assertQuerySucceeds(session, "DROP TABLE test_manifest_file_cache_disabled"); + assertQuerySucceeds(session, "DROP SCHEMA default"); + } + @Override protected Table loadTable(String tableName) { @@ -75,6 +181,7 @@ protected Table loadTable(String tableName) return IcebergUtil.getHiveIcebergTable(getFileHiveMetastore(), getHdfsEnvironment(), new IcebergHiveTableOperationsConfig(), + new ManifestFileCache(CacheBuilder.newBuilder().build(), false, 0, 1024 * 1024), getQueryRunner().getDefaultSession().toConnectorSession(connectorId), SchemaTableName.valueOf("tpch." + tableName)); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java index a512a8679965e..c473c8f5536ca 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java @@ -36,6 +36,7 @@ import com.facebook.presto.iceberg.IcebergMetadataColumn; import com.facebook.presto.iceberg.IcebergQueryRunner; import com.facebook.presto.iceberg.IcebergUtil; +import com.facebook.presto.iceberg.ManifestFileCache; import com.facebook.presto.metadata.CatalogManager; import com.facebook.presto.metadata.Metadata; import com.facebook.presto.spi.ColumnHandle; @@ -54,6 +55,7 @@ import com.facebook.presto.testing.MaterializedResult; import com.facebook.presto.testing.QueryRunner; import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -581,6 +583,7 @@ private Table loadTable(String tableName) return IcebergUtil.getHiveIcebergTable(getFileHiveMetastore(), getHdfsEnvironment(), new IcebergHiveTableOperationsConfig(), + new ManifestFileCache(CacheBuilder.newBuilder().build(), false, 0, 1024), getQueryRunner().getDefaultSession().toConnectorSession(connectorId), SchemaTableName.valueOf("tpch." + tableName)); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java index af57eda755b20..e55ee11a09c71 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java @@ -18,9 +18,11 @@ import com.facebook.presto.iceberg.IcebergDistributedSmokeTestBase; import com.facebook.presto.iceberg.IcebergHiveTableOperationsConfig; import com.facebook.presto.iceberg.IcebergUtil; +import com.facebook.presto.iceberg.ManifestFileCache; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.tests.DistributedQueryRunner; +import com.google.common.cache.CacheBuilder; import org.apache.iceberg.Table; import java.io.File; @@ -61,6 +63,7 @@ protected Table getIcebergTable(ConnectorSession session, String schema, String return IcebergUtil.getHiveIcebergTable(getFileHiveMetastore(), getHdfsEnvironment(), new IcebergHiveTableOperationsConfig(), + new ManifestFileCache(CacheBuilder.newBuilder().build(), false, 0, 1024), session, SchemaTableName.valueOf(schema + "." + tableName)); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestRenameTableOnFragileFileSystem.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestRenameTableOnFragileFileSystem.java index 4b04b327d82d7..33d0ac12d5e8e 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestRenameTableOnFragileFileSystem.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestRenameTableOnFragileFileSystem.java @@ -44,6 +44,7 @@ import com.facebook.presto.iceberg.IcebergTableHandle; import com.facebook.presto.iceberg.IcebergTableName; import com.facebook.presto.iceberg.IcebergTableType; +import com.facebook.presto.iceberg.ManifestFileCache; import com.facebook.presto.iceberg.statistics.StatisticsFileCache; import com.facebook.presto.metadata.FunctionAndTypeManager; import com.facebook.presto.metadata.MetadataManager; @@ -373,7 +374,8 @@ private void testRenameTableWithFailSignalAndValidation(FailSignal failSignal, R } } - private void createFile(FileSystem fileSystem, Path path, byte[] content) throws IOException + private void createFile(FileSystem fileSystem, Path path, byte[] content) + throws IOException { FSDataOutputStream outputStream = fileSystem.create(path, true, 1024); outputStream.write(content); @@ -411,7 +413,8 @@ private ConnectorMetadata getIcebergHiveMetadata(ExtendedHiveMetastore metastore new NodeVersion("test_node_v1"), FILTER_STATS_CALCULATOR_SERVICE, new IcebergHiveTableOperationsConfig(), - new StatisticsFileCache(CacheBuilder.newBuilder().build())); + new StatisticsFileCache(CacheBuilder.newBuilder().build()), + new ManifestFileCache(CacheBuilder.newBuilder().build(), false, 0, 1024)); return icebergHiveMetadataFactory.create(); } @@ -490,25 +493,29 @@ public URI getUri() } @Override - public FSDataInputStream open(Path f, int bufferSize) throws IOException + public FSDataInputStream open(Path f, int bufferSize) + throws IOException { return delegate.open(f, bufferSize); } @Override - public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException + public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) + throws IOException { return delegate.create(f, permission, overwrite, bufferSize, replication, blockSize, progress); } @Override - public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException + public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) + throws IOException { return delegate.append(f, bufferSize, progress); } @Override - public boolean rename(Path src, Path dst) throws IOException + public boolean rename(Path src, Path dst) + throws IOException { if (failSignal.get() == FailSignal.RENAME) { return false; @@ -517,7 +524,8 @@ public boolean rename(Path src, Path dst) throws IOException } @Override - public boolean delete(Path f, boolean recursive) throws IOException + public boolean delete(Path f, boolean recursive) + throws IOException { if (failSignal.get() == FailSignal.DELETE) { return false; @@ -526,7 +534,8 @@ public boolean delete(Path f, boolean recursive) throws IOException } @Override - public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException + public FileStatus[] listStatus(Path f) + throws FileNotFoundException, IOException { return delegate.listStatus(f); } @@ -544,7 +553,8 @@ public Path getWorkingDirectory() } @Override - public boolean mkdirs(Path f, FsPermission permission) throws IOException + public boolean mkdirs(Path f, FsPermission permission) + throws IOException { if (this.failSignal.get() == FailSignal.MKDIRS) { return false; @@ -553,7 +563,8 @@ public boolean mkdirs(Path f, FsPermission permission) throws IOException } @Override - public FileStatus getFileStatus(Path f) throws IOException + public FileStatus getFileStatus(Path f) + throws IOException { return delegate.getFileStatus(f); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestRemoveOrphanFilesProcedureHive.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestRemoveOrphanFilesProcedureHive.java index d1f20a56494b8..625fbcd30514e 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestRemoveOrphanFilesProcedureHive.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestRemoveOrphanFilesProcedureHive.java @@ -20,12 +20,14 @@ import com.facebook.presto.iceberg.HiveTableOperations; import com.facebook.presto.iceberg.IcebergHiveTableOperationsConfig; import com.facebook.presto.iceberg.IcebergUtil; +import com.facebook.presto.iceberg.ManifestFileCache; import com.facebook.presto.iceberg.hive.IcebergFileHiveMetastore; import com.facebook.presto.metadata.CatalogManager; import com.facebook.presto.spi.ColumnMetadata; import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.SchemaTableName; +import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.iceberg.PartitionSpec; @@ -69,6 +71,7 @@ Table createTable(String tableName, String targetPath, Map table getHdfsEnvironment(), hdfsContext, new IcebergHiveTableOperationsConfig(), + new ManifestFileCache(CacheBuilder.newBuilder().build(), false, 0, 1024), "tpch", tableName, session.getUser(), @@ -91,6 +94,7 @@ Table loadTable(String tableName) return IcebergUtil.getHiveIcebergTable(getFileHiveMetastore(), getHdfsEnvironment(), new IcebergHiveTableOperationsConfig(), + new ManifestFileCache(CacheBuilder.newBuilder().build(), false, 0, 1024), getQueryRunner().getDefaultSession().toConnectorSession(connectorId), SchemaTableName.valueOf("tpch." + tableName)); }