diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java index e9bc7d556ba3..95fe6a074c0a 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java @@ -57,6 +57,60 @@ private CatalogProperties() {} public static final long CACHE_EXPIRATION_INTERVAL_MS_DEFAULT = TimeUnit.SECONDS.toMillis(30); public static final long CACHE_EXPIRATION_INTERVAL_MS_OFF = -1; + /** + * Controls whether to use caching during manifest reads or not. + * + *

Enabling manifest file caching require the following configuration constraints to be true: + * + *

+ */ + public static final String IO_MANIFEST_CACHE_ENABLED = "io.manifest.cache-enabled"; + + public static final boolean IO_MANIFEST_CACHE_ENABLED_DEFAULT = false; + + /** + * Controls the maximum duration for which an entry stays in the manifest cache. + * + *

Must be a non-negative value. Following are specific behaviors of this config: + * + *

+ */ + public static final String IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS = + "io.manifest.cache.expiration-interval-ms"; + + public static final long IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT = + TimeUnit.SECONDS.toMillis(60); + + /** + * Controls the maximum total amount of bytes to cache in manifest cache. + * + *

Must be a positive value. + */ + public static final String IO_MANIFEST_CACHE_MAX_TOTAL_BYTES = + "io.manifest.cache.max-total-bytes"; + + public static final long IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT = 100 * 1024 * 1024; + + /** + * Controls the maximum length of file to be considered for caching. + * + *

An {@link org.apache.iceberg.io.InputFile} will not be cached if the length is longer than + * this limit. Must be a positive value. + */ + public static final String IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH = + "io.manifest.cache.max-content-length"; + + public static final long IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT = 8 * 1024 * 1024; + public static final String URI = "uri"; public static final String CLIENT_POOL_SIZE = "clients"; public static final int CLIENT_POOL_SIZE_DEFAULT = 2; diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java index f039907b8682..85e268d43378 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import java.io.IOException; import java.util.Map; import org.apache.iceberg.ManifestReader.FileType; @@ -25,16 +27,23 @@ import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.ContentCache; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.PropertyUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ManifestFiles { private ManifestFiles() {} + private static final Logger LOG = LoggerFactory.getLogger(ManifestFiles.class); + private static final org.apache.avro.Schema MANIFEST_AVRO_SCHEMA = AvroSchemaUtil.convert( ManifestFile.schema(), @@ -44,6 +53,36 @@ private ManifestFiles() {} ManifestFile.PARTITION_SUMMARY_TYPE, GenericPartitionFieldSummary.class.getName())); + @VisibleForTesting + static Cache newManifestCache() { + return Caffeine.newBuilder() + .weakKeys() + .softValues() + .maximumSize(maxFileIO()) + .removalListener( + (io, contentCache, cause) -> + LOG.debug("Evicted {} from FileIO-level cache ({})", io, cause)) + .recordStats() + .build(); + } + + private static final Cache CONTENT_CACHES = newManifestCache(); + + @VisibleForTesting + static ContentCache contentCache(FileIO io) { + return CONTENT_CACHES.get( + io, + fileIO -> + new ContentCache( + cacheDurationMs(fileIO), cacheTotalBytes(fileIO), cacheMaxContentLength(fileIO))); + } + + /** Drop manifest file cache object for a FileIO if exists. */ + public static synchronized void dropCache(FileIO fileIO) { + CONTENT_CACHES.invalidate(fileIO); + CONTENT_CACHES.cleanUp(); + } + /** * Returns a {@link CloseableIterable} of file paths in the {@link ManifestFile}. * @@ -86,7 +125,7 @@ public static ManifestReader read( manifest.content() == ManifestContent.DATA, "Cannot read a delete manifest with a ManifestReader: %s", manifest); - InputFile file = io.newInputFile(manifest.path(), manifest.length()); + InputFile file = newInputFile(io, manifest.path(), manifest.length()); InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest); return new ManifestReader<>(file, specsById, inheritableMetadata, FileType.DATA_FILES); } @@ -140,7 +179,7 @@ public static ManifestReader readDeleteManifest( manifest.content() == ManifestContent.DELETES, "Cannot read a data manifest with a DeleteManifestReader: %s", manifest); - InputFile file = io.newInputFile(manifest.path(), manifest.length()); + InputFile file = newInputFile(io, manifest.path(), manifest.length()); InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest); return new ManifestReader<>(file, specsById, inheritableMetadata, FileType.DELETE_FILES); } @@ -300,4 +339,67 @@ private static ManifestFile copyManifestInternal( return writer.toManifestFile(); } + + private static InputFile newInputFile(FileIO io, String path, long length) { + boolean enabled = false; + + try { + enabled = cachingEnabled(io); + } catch (UnsupportedOperationException e) { + // There is an issue reading io.properties(). Disable caching. + enabled = false; + } + + if (enabled) { + ContentCache cache = contentCache(io); + Preconditions.checkNotNull( + cache, + "ContentCache creation failed. Check that all manifest caching configurations has valid value."); + LOG.debug("FileIO-level cache stats: {}", CONTENT_CACHES.stats()); + return cache.tryCache(io, path, length); + } + + // caching is not enable for this io or caught RuntimeException. + return io.newInputFile(path, length); + } + + private static int maxFileIO() { + String value = System.getProperty(SystemProperties.IO_MANIFEST_CACHE_MAX_FILEIO); + if (value != null) { + try { + return Integer.parseUnsignedInt(value); + } catch (NumberFormatException e) { + // will return the default + } + } + return SystemProperties.IO_MANIFEST_CACHE_MAX_FILEIO_DEFAULT; + } + + static boolean cachingEnabled(FileIO io) { + return PropertyUtil.propertyAsBoolean( + io.properties(), + CatalogProperties.IO_MANIFEST_CACHE_ENABLED, + CatalogProperties.IO_MANIFEST_CACHE_ENABLED_DEFAULT); + } + + static long cacheDurationMs(FileIO io) { + return PropertyUtil.propertyAsLong( + io.properties(), + CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS, + CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT); + } + + static long cacheTotalBytes(FileIO io) { + return PropertyUtil.propertyAsLong( + io.properties(), + CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES, + CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT); + } + + static long cacheMaxContentLength(FileIO io) { + return PropertyUtil.propertyAsLong( + io.properties(), + CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH, + CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT); + } } diff --git a/core/src/main/java/org/apache/iceberg/SystemProperties.java b/core/src/main/java/org/apache/iceberg/SystemProperties.java index 3d44b195ffe1..1d3c00b97cf7 100644 --- a/core/src/main/java/org/apache/iceberg/SystemProperties.java +++ b/core/src/main/java/org/apache/iceberg/SystemProperties.java @@ -33,6 +33,14 @@ private SystemProperties() {} /** Whether to use the shared worker pool when planning table scans. */ public static final String SCAN_THREAD_POOL_ENABLED = "iceberg.scan.plan-in-worker-pool"; + /** + * Maximum number of distinct {@link org.apache.iceberg.io.FileIO} that is allowed to have + * associated {@link org.apache.iceberg.io.ContentCache} in memory at a time. + */ + public static final String IO_MANIFEST_CACHE_MAX_FILEIO = "iceberg.io.manifest.cache.fileio-max"; + + public static final int IO_MANIFEST_CACHE_MAX_FILEIO_DEFAULT = 8; + static boolean getBoolean(String systemProperty, boolean defaultValue) { String value = System.getProperty(systemProperty); if (value != null) { diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index fc4c0d2f4879..8f34994d6374 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -35,11 +35,13 @@ import org.apache.iceberg.io.SupportsPrefixOperations; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.util.SerializableMap; import org.apache.iceberg.util.SerializableSupplier; public class HadoopFileIO implements FileIO, HadoopConfigurable, SupportsPrefixOperations { private SerializableSupplier hadoopConf; + private SerializableMap properties = SerializableMap.copyOf(ImmutableMap.of()); /** * Constructor used for dynamic FileIO loading. @@ -61,6 +63,11 @@ public Configuration conf() { return hadoopConf.get(); } + @Override + public void initialize(Map props) { + this.properties = SerializableMap.copyOf(props); + } + @Override public InputFile newInputFile(String path) { return HadoopInputFile.fromLocation(path, hadoopConf.get()); @@ -89,7 +96,7 @@ public void deleteFile(String path) { @Override public Map properties() { - return ImmutableMap.of(); + return properties.immutableMap(); } @Override diff --git a/core/src/main/java/org/apache/iceberg/io/ContentCache.java b/core/src/main/java/org/apache/iceberg/io/ContentCache.java new file mode 100644 index 000000000000..c999f3f333f6 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/ContentCache.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Weigher; +import com.github.benmanes.caffeine.cache.stats.CacheStats; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.List; +import java.util.function.Function; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class that provides file-content caching during reading. + * + *

The file-content caching is initiated by calling {@link ContentCache#tryCache(FileIO, String, + * long)}. Given a FileIO, a file location string, and file length that is within allowed limit, + * ContentCache will return a {@link CachingInputFile} that is backed by the cache. Calling {@link + * CachingInputFile#newStream()} will return a {@link ByteBufferInputStream} backed by list of + * {@link ByteBuffer} from the cache if such file-content exist in the cache. If the file-content + * does not exist in the cache yet, a regular InputFile will be instantiated, read-ahead, and loaded + * into the cache before returning ByteBufferInputStream. The regular InputFile is also used as a + * fallback if cache loading fail. + */ +public class ContentCache { + private static final Logger LOG = LoggerFactory.getLogger(ContentCache.class); + private static final int BUFFER_CHUNK_SIZE = 4 * 1024 * 1024; // 4MB + + private final long expireAfterAccessMs; + private final long maxTotalBytes; + private final long maxContentLength; + private final Cache cache; + + /** + * Constructor for ContentCache class. + * + * @param expireAfterAccessMs controls the duration for which entries in the ContentCache are hold + * since last access. Must be greater or equal than 0. Setting 0 means cache entries expire + * only if it gets evicted due to memory pressure. + * @param maxTotalBytes controls the maximum total amount of bytes to cache in ContentCache. Must + * be greater than 0. + * @param maxContentLength controls the maximum length of file to be considered for caching. Must + * be greater than 0. + */ + public ContentCache(long expireAfterAccessMs, long maxTotalBytes, long maxContentLength) { + ValidationException.check(expireAfterAccessMs >= 0, "expireAfterAccessMs is less than 0"); + ValidationException.check(maxTotalBytes > 0, "maxTotalBytes is equal or less than 0"); + ValidationException.check(maxContentLength > 0, "maxContentLength is equal or less than 0"); + this.expireAfterAccessMs = expireAfterAccessMs; + this.maxTotalBytes = maxTotalBytes; + this.maxContentLength = maxContentLength; + + Caffeine builder = Caffeine.newBuilder(); + if (expireAfterAccessMs > 0) { + builder = builder.expireAfterAccess(Duration.ofMillis(expireAfterAccessMs)); + } + + this.cache = + builder + .maximumWeight(maxTotalBytes) + .weigher( + (Weigher) + (key, value) -> (int) Math.min(value.length, Integer.MAX_VALUE)) + .softValues() + .removalListener( + (location, cacheEntry, cause) -> + LOG.debug("Evicted {} from ContentCache ({})", location, cause)) + .recordStats() + .build(); + } + + public long expireAfterAccess() { + return expireAfterAccessMs; + } + + public long maxContentLength() { + return maxContentLength; + } + + public long maxTotalBytes() { + return maxTotalBytes; + } + + public CacheStats stats() { + return cache.stats(); + } + + public CacheEntry get(String key, Function mappingFunction) { + return cache.get(key, mappingFunction); + } + + public CacheEntry getIfPresent(String location) { + return cache.getIfPresent(location); + } + + /** + * Try cache the file-content of file in the given location upon stream reading. + * + *

If length is longer than maximum length allowed by ContentCache, a regular {@link InputFile} + * and no caching will be done for that file. Otherwise, this method will return a {@link + * CachingInputFile} that serve file reads backed by ContentCache. + * + * @param io a FileIO associated with the location. + * @param location URL/path of a file accessible by io. + * @param length the known length of such file. + * @return a {@link CachingInputFile} if length is within allowed limit. Otherwise, a regular + * {@link InputFile} for given location. + */ + public InputFile tryCache(FileIO io, String location, long length) { + if (length <= maxContentLength) { + return new CachingInputFile(this, io, location, length); + } + return io.newInputFile(location, length); + } + + public void invalidate(String key) { + cache.invalidate(key); + } + + public void invalidateAll() { + cache.invalidateAll(); + } + + public void cleanUp() { + cache.cleanUp(); + } + + public long estimatedCacheSize() { + return cache.estimatedSize(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("expireAfterAccessMs", expireAfterAccessMs) + .add("maxContentLength", maxContentLength) + .add("maxTotalBytes", maxTotalBytes) + .add("cacheStats", cache.stats()) + .toString(); + } + + private static class CacheEntry { + private final long length; + private final List buffers; + + private CacheEntry(long length, List buffers) { + this.length = length; + this.buffers = buffers; + } + } + + /** + * A subclass of {@link InputFile} that is backed by a {@link ContentCache}. + * + *

Calling {@link CachingInputFile#newStream()} will return a {@link ByteBufferInputStream} + * backed by list of {@link ByteBuffer} from the cache if such file-content exist in the cache. If + * the file-content does not exist in the cache, a regular InputFile will be instantiated, + * read-ahead, and loaded into the cache before returning ByteBufferInputStream. The regular + * InputFile is also used as a fallback if cache loading fail. + */ + private static class CachingInputFile implements InputFile { + private final ContentCache contentCache; + private final FileIO io; + private final String location; + private final long length; + private InputFile fallbackInputFile = null; + + private CachingInputFile(ContentCache cache, FileIO io, String location, long length) { + this.contentCache = cache; + this.io = io; + this.location = location; + this.length = length; + } + + private InputFile wrappedInputFile() { + if (fallbackInputFile == null) { + fallbackInputFile = io.newInputFile(location, length); + } + return fallbackInputFile; + } + + @Override + public long getLength() { + CacheEntry buf = contentCache.getIfPresent(location); + if (buf != null) { + return buf.length; + } else if (fallbackInputFile != null) { + return fallbackInputFile.getLength(); + } else { + return length; + } + } + + /** + * Opens a new {@link SeekableInputStream} for the underlying data file, either from cache or + * from the inner FileIO. + * + *

If data file is not cached yet, and it can fit in the cache, the file content will be + * cached first before returning a {@link ByteBufferInputStream}. Otherwise, return a new + * SeekableInputStream from the inner FIleIO. + * + * @return a {@link ByteBufferInputStream} if file exist in the cache or can fit in the cache. + * Otherwise, return a new SeekableInputStream from the inner FIleIO. + */ + @Override + public SeekableInputStream newStream() { + try { + // read from cache if file length is less than or equal to maximum length allowed to + // cache. + if (getLength() <= contentCache.maxContentLength()) { + return cachedStream(); + } + + // fallback to non-caching input stream. + return wrappedInputFile().newStream(); + } catch (FileNotFoundException e) { + throw new NotFoundException( + e, "Failed to open input stream for file %s: %s", location, e.toString()); + } catch (IOException e) { + throw new UncheckedIOException( + String.format("Failed to open input stream for file %s: %s", location, e), e); + } + } + + @Override + public String location() { + return location; + } + + @Override + public boolean exists() { + CacheEntry buf = contentCache.getIfPresent(location); + return buf != null || wrappedInputFile().exists(); + } + + private CacheEntry cacheEntry() { + long start = System.currentTimeMillis(); + try (SeekableInputStream stream = wrappedInputFile().newStream()) { + long fileLength = getLength(); + long totalBytesToRead = fileLength; + List buffers = Lists.newArrayList(); + + while (totalBytesToRead > 0) { + // read the stream in 4MB chunk + int bytesToRead = (int) Math.min(BUFFER_CHUNK_SIZE, totalBytesToRead); + byte[] buf = new byte[bytesToRead]; + int bytesRead = IOUtil.readRemaining(stream, buf, 0, bytesToRead); + totalBytesToRead -= bytesRead; + + if (bytesRead < bytesToRead) { + // Read less than it should be, possibly hitting EOF. Abandon caching by throwing + // IOException and let the caller fallback to non-caching input file. + throw new IOException( + String.format( + "Expected to read %d bytes, but only %d bytes read.", + fileLength, fileLength - totalBytesToRead)); + } else { + buffers.add(ByteBuffer.wrap(buf)); + } + } + + CacheEntry newEntry = new CacheEntry(fileLength, buffers); + LOG.debug("cacheEntry took {} ms for {}", (System.currentTimeMillis() - start), location); + return newEntry; + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + } + + private SeekableInputStream cachedStream() throws IOException { + try { + CacheEntry entry = contentCache.get(location, k -> cacheEntry()); + Preconditions.checkNotNull( + entry, "CacheEntry should not be null when there is no RuntimeException occurs"); + LOG.debug("Cache stats: {}", contentCache.stats()); + return ByteBufferInputStream.wrap(entry.buffers); + } catch (UncheckedIOException ex) { + throw ex.getCause(); + } catch (RuntimeException ex) { + throw new IOException("Caught an error while reading from cache", ex); + } + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestManifestCaching.java b/core/src/test/java/org/apache/iceberg/TestManifestCaching.java new file mode 100644 index 000000000000..9e0f335b1c89 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestManifestCaching.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.types.Types.NestedField.required; + +import com.github.benmanes.caffeine.cache.Cache; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.LongStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.hadoop.HadoopTableTestBase; +import org.apache.iceberg.io.ContentCache; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; + +public class TestManifestCaching extends HadoopTableTestBase { + + // Schema passed to create tables + static final Schema SCHEMA = + new Schema( + required(3, "id", Types.IntegerType.get(), "unique ID"), + required(4, "data", Types.StringType.get())); + + // Partition spec used to create tables + static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build(); + + @Test + public void testPlanWithCache() throws Exception { + Map properties = + ImmutableMap.of( + CatalogProperties.FILE_IO_IMPL, + HadoopFileIO.class.getName(), + CatalogProperties.IO_MANIFEST_CACHE_ENABLED, + "true"); + Table table = createTable(properties); + ContentCache cache = ManifestFiles.contentCache(table.io()); + Assert.assertEquals(0, cache.estimatedCacheSize()); + + int numFiles = 4; + List files16Mb = newFiles(numFiles, 16 * 1024 * 1024); + appendFiles(files16Mb, table); + + // planTask with SPLIT_SIZE half of the file size + TableScan scan1 = + table.newScan().option(TableProperties.SPLIT_SIZE, String.valueOf(8 * 1024 * 1024)); + Assert.assertEquals( + "Should get 2 tasks per file", numFiles * 2, Iterables.size(scan1.planTasks())); + Assert.assertEquals( + "All manifest files should be cached", numFiles, cache.estimatedCacheSize()); + Assert.assertEquals( + "All manifest files should be recently loaded", numFiles, cache.stats().loadCount()); + long missCount = cache.stats().missCount(); + + // planFiles and verify that cache size still the same + TableScan scan2 = table.newScan(); + Assert.assertEquals("Should get 1 tasks per file", numFiles, Iterables.size(scan2.planFiles())); + Assert.assertEquals("Cache size should remain the same", numFiles, cache.estimatedCacheSize()); + Assert.assertEquals( + "All manifest file reads should hit cache", missCount, cache.stats().missCount()); + + ManifestFiles.dropCache(table.io()); + } + + @Test + public void testPlanWithSmallCache() throws Exception { + Map properties = + ImmutableMap.of( + CatalogProperties.FILE_IO_IMPL, HadoopFileIO.class.getName(), + CatalogProperties.IO_MANIFEST_CACHE_ENABLED, "true", + CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES, "1", + CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH, "1"); + Table table = createTable(properties); + + int numFiles = 4; + List files16Mb = newFiles(numFiles, 16 * 1024 * 1024); + appendFiles(files16Mb, table); + + // We should never hit cache. + TableScan scan = table.newScan(); + ContentCache cache = ManifestFiles.contentCache(scan.table().io()); + Assert.assertEquals(1, cache.maxContentLength()); + Assert.assertEquals(1, cache.maxTotalBytes()); + Assert.assertEquals("Should get 1 tasks per file", numFiles, Iterables.size(scan.planFiles())); + Assert.assertEquals("Cache should be empty", 0, cache.estimatedCacheSize()); + Assert.assertEquals("File should not be loaded through cache", 0, cache.stats().loadCount()); + Assert.assertEquals("Cache should not serve file", 0, cache.stats().requestCount()); + ManifestFiles.dropCache(scan.table().io()); + } + + @Test + public void testUniqueCache() throws Exception { + Map properties1 = + ImmutableMap.of( + CatalogProperties.FILE_IO_IMPL, + HadoopFileIO.class.getName(), + CatalogProperties.IO_MANIFEST_CACHE_ENABLED, + "true"); + Table table1 = createTable(properties1); + + Map properties2 = + ImmutableMap.of( + CatalogProperties.FILE_IO_IMPL, HadoopFileIO.class.getName(), + CatalogProperties.IO_MANIFEST_CACHE_ENABLED, "true", + CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES, "1", + CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH, "1"); + Table table2 = createTable(properties2); + + ContentCache cache1 = ManifestFiles.contentCache(table1.io()); + ContentCache cache2 = ManifestFiles.contentCache(table2.io()); + ContentCache cache3 = ManifestFiles.contentCache(table2.io()); + Assert.assertNotSame(cache1, cache2); + Assert.assertSame(cache2, cache3); + + ManifestFiles.dropCache(table1.io()); + ManifestFiles.dropCache(table2.io()); + } + + @Test + public void testRecreateCache() throws Exception { + Map properties = + ImmutableMap.of( + CatalogProperties.FILE_IO_IMPL, + HadoopFileIO.class.getName(), + CatalogProperties.IO_MANIFEST_CACHE_ENABLED, + "true"); + Table table = createTable(properties); + + ContentCache cache1 = ManifestFiles.contentCache(table.io()); + ManifestFiles.dropCache(table.io()); + + ContentCache cache2 = ManifestFiles.contentCache(table.io()); + Assert.assertNotSame(cache1, cache2); + ManifestFiles.dropCache(table.io()); + } + + @Test + public void testWeakFileIOReferenceCleanUp() { + Cache manifestCache = ManifestFiles.newManifestCache(); + int maxIO = SystemProperties.IO_MANIFEST_CACHE_MAX_FILEIO_DEFAULT; + FileIO firstIO = null; + ContentCache firstCache = null; + for (int i = 0; i < maxIO; i++) { + FileIO io = cacheEnabledHadoopFileIO(); + ContentCache cache = contentCache(manifestCache, io); + if (i == 0) { + firstIO = io; + firstCache = cache; + } + } + + System.gc(); + manifestCache.cleanUp(); + awaitQuiescence(); + Assert.assertEquals(maxIO, manifestCache.estimatedSize()); + Assert.assertEquals(maxIO, manifestCache.stats().loadCount()); + Assert.assertEquals( + "No entries should be evicted before IO_MANIFEST_CACHE_MAX_FILEIO_DEFAULT exceeded.", + 0, + manifestCache.stats().evictionCount()); + + // Insert one more FileIO to trigger cache eviction. + FileIO lastIO = cacheEnabledHadoopFileIO(); + ContentCache lastCache = contentCache(manifestCache, lastIO); + System.gc(); + manifestCache.cleanUp(); + awaitQuiescence(); + + // Verify that manifestCache evicts all FileIO except the firstIO and lastIO. + ContentCache cache1 = contentCache(manifestCache, firstIO); + ContentCache cacheN = contentCache(manifestCache, lastIO); + Assert.assertEquals(firstCache, cache1); + Assert.assertEquals(lastCache, cacheN); + Assert.assertEquals(2, manifestCache.estimatedSize()); + Assert.assertEquals(maxIO + 1, manifestCache.stats().loadCount()); + Assert.assertEquals(maxIO - 1, manifestCache.stats().evictionCount()); + } + + /** + * Helper to get existing or insert new {@link ContentCache} into the given manifestCache. + * + * @return an existing or new {@link ContentCache} associated with given io. + */ + private static ContentCache contentCache(Cache manifestCache, FileIO io) { + return manifestCache.get( + io, + fileIO -> + new ContentCache( + ManifestFiles.cacheDurationMs(fileIO), + ManifestFiles.cacheTotalBytes(fileIO), + ManifestFiles.cacheMaxContentLength(fileIO))); + } + + private FileIO cacheEnabledHadoopFileIO() { + Map properties = + ImmutableMap.of( + CatalogProperties.FILE_IO_IMPL, + HadoopFileIO.class.getName(), + CatalogProperties.IO_MANIFEST_CACHE_ENABLED, + "true"); + HadoopFileIO io = new HadoopFileIO(new Configuration()); + io.initialize(properties); + return io; + } + + /** + * Wait until Caffeine cache complete its background maintenance tasks. + * + *

By default, Caffeine use {@link ForkJoinPool#commonPool()} as its executor. + */ + private void awaitQuiescence() { + boolean quiescent = ForkJoinPool.commonPool().awaitQuiescence(10, TimeUnit.SECONDS); + Assert.assertTrue("ForkJoinPool.commonPool() does not quiesce within 10s.", quiescent); + } + + private Table createTable(Map properties) throws Exception { + TableIdentifier tableIdent = TableIdentifier.of("db", "ns1", "ns2", "tbl"); + return hadoopCatalog(properties) + .buildTable(tableIdent, SCHEMA) + .withPartitionSpec(SPEC) + .create(); + } + + private void appendFiles(Iterable files, Table table) { + for (DataFile file : files) { + AppendFiles appendFile = table.newAppend(); + appendFile.appendFile(file); + appendFile.commit(); + } + } + + private List newFiles(int numFiles, long sizeInBytes) { + return newFiles(numFiles, sizeInBytes, FileFormat.PARQUET, 1); + } + + private List newFiles( + int numFiles, long sizeInBytes, FileFormat fileFormat, int numOffset) { + List files = Lists.newArrayList(); + for (int fileNum = 0; fileNum < numFiles; fileNum++) { + files.add(newFile(sizeInBytes, fileFormat, numOffset)); + } + return files; + } + + private DataFile newFile(long sizeInBytes, FileFormat fileFormat, int numOffsets) { + String fileName = UUID.randomUUID().toString(); + DataFiles.Builder builder = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(fileFormat.addExtension(fileName)) + .withFileSizeInBytes(sizeInBytes) + .withRecordCount(2); + + if (numOffsets > 1) { + long stepSize = sizeInBytes / numOffsets; + List offsets = + LongStream.range(0, numOffsets) + .map(i -> i * stepSize) + .boxed() + .collect(Collectors.toList()); + builder.withSplitOffsets(offsets); + } + + return builder.build(); + } +}