diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java index c17ed59b2674..6f6b817a04f9 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java @@ -65,6 +65,60 @@ private CatalogProperties() {} public static final long CACHE_EXPIRATION_INTERVAL_MS_DEFAULT = TimeUnit.SECONDS.toMillis(30); public static final long CACHE_EXPIRATION_INTERVAL_MS_OFF = -1; + /** + * Controls whether to use caching during table metadata reads or not. + * + *

Enabling table metadata file caching require the following configuration constraints to be true: + * + *

+ */ + public static final String IO_TABLE_METADATA_CACHE_ENABLED = "io.table-metadata.cache-enabled"; + + public static final boolean IO_TABLE_METADATA_CACHE_ENABLED_DEFAULT = false; + + /** + * Controls the maximum duration for which an entry stays in the table metadata cache. + * + *

Must be a non-negative value. Following are specific behaviors of this config: + * + *

+ */ + public static final String IO_TABLE_METADATA_CACHE_EXPIRATION_INTERVAL_MS = + "io.table-metadata.cache.expiration-interval-ms"; + + public static final long IO_TABLE_METADATA_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT = + TimeUnit.MINUTES.toMillis(10); + + /** + * Controls the maximum total amount of bytes to cache in the table metadata cache. + * + *

Must be a positive value. + */ + public static final String IO_TABLE_METADATA_CACHE_MAX_TOTAL_BYTES = + "io.table-metadata.cache.max-total-bytes"; + + public static final long IO_TABLE_METADATA_CACHE_MAX_TOTAL_BYTES_DEFAULT = 100 * 1024 * 1024; + + /** + * Controls the maximum length of file to be considered for caching a table metadata file. + * + *

An {@link org.apache.iceberg.io.InputFile} will not be cached if the length is longer than + * this limit. Must be a positive value. + */ + public static final String IO_TABLE_METADATA_CACHE_MAX_CONTENT_LENGTH = + "io.table-metadata.cache.max-content-length"; + + public static final long IO_TABLE_METADATA_CACHE_MAX_CONTENT_LENGTH_DEFAULT = 8 * 1024 * 1024; + /** * Controls whether to use caching during manifest reads or not. * @@ -109,7 +163,7 @@ private CatalogProperties() {} public static final long IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT = 100 * 1024 * 1024; /** - * Controls the maximum length of file to be considered for caching. + * Controls the maximum length of file to be considered for caching a manifest file. * *

An {@link org.apache.iceberg.io.InputFile} will not be cached if the length is longer than * this limit. Must be a positive value. diff --git a/core/src/main/java/org/apache/iceberg/SystemConfigs.java b/core/src/main/java/org/apache/iceberg/SystemConfigs.java index ad40c17e3076..d0e5f5985a6b 100644 --- a/core/src/main/java/org/apache/iceberg/SystemConfigs.java +++ b/core/src/main/java/org/apache/iceberg/SystemConfigs.java @@ -71,7 +71,20 @@ private SystemConfigs() {} /** * Maximum number of distinct {@link org.apache.iceberg.io.FileIO} that is allowed to have - * associated {@link org.apache.iceberg.io.ContentCache} in memory at a time. + * associated {@link org.apache.iceberg.io.ContentCache} in memory at a time in the table metadata + * cache. + */ + public static final ConfigEntry IO_TABLE_METADATA_CACHE_MAX_FILEIO = + new ConfigEntry<>( + "iceberg.io.table-metadata.cache.fileio-max", + "ICEBERG_IO_TABLE_METADATA_CACHE_FILEIO_MAX", + 8, + Integer::parseUnsignedInt); + + /** + * Maximum number of distinct {@link org.apache.iceberg.io.FileIO} that is allowed to have + * associated {@link org.apache.iceberg.io.ContentCache} in memory at a time in the manifest + * cache. */ public static final ConfigEntry IO_MANIFEST_CACHE_MAX_FILEIO = new ConfigEntry<>( diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java index eeeeeab8a699..220d88fe8534 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java @@ -20,6 +20,8 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonNode; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -36,14 +38,19 @@ import org.apache.iceberg.TableMetadata.SnapshotLogEntry; import org.apache.iceberg.encryption.EncryptedKey; import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.ContentCache; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.JsonUtil; +import org.apache.iceberg.util.PropertyUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TableMetadataParser { @@ -84,6 +91,8 @@ public static Codec fromFileName(String fileName) { private TableMetadataParser() {} + private static final Logger LOG = LoggerFactory.getLogger(TableMetadataParser.class); + // visible for testing static final String FORMAT_VERSION = "format-version"; static final String TABLE_UUID = "table-uuid"; @@ -115,6 +124,21 @@ private TableMetadataParser() {} static final String NEXT_ROW_ID = "next-row-id"; static final int MIN_NULL_CURRENT_SNAPSHOT_VERSION = 3; + private static Caffeine newTableMetadataCacheBuilder() { + int maxSize = SystemConfigs.IO_TABLE_METADATA_CACHE_MAX_FILEIO.value(); + return Caffeine.newBuilder() + .weakKeys() + .softValues() + .maximumSize(maxSize) + .removalListener( + (io, contentCache, cause) -> + LOG.debug("Evicted {} from FileIO-level cache ({})", io, cause)) + .recordStats(); + } + + private static final Cache CONTENT_CACHES = + newTableMetadataCacheBuilder().build(); + public static void overwrite(TableMetadata metadata, OutputFile outputFile) { internalWrite(metadata, outputFile, true); } @@ -294,6 +318,15 @@ public static TableMetadata read(FileIO io, String path) { return read(io.newInputFile(path)); } + public static TableMetadata read(FileIO io, InputFile file) { + InputFile wrapped = newInputFile(io, file); + return read(wrapped); + } + + /** + * @deprecated since 1.11.0, will be removed in 1.12.0; use {@link #read(FileIO, InputFile)} instead. + */ + @Deprecated public static TableMetadata read(InputFile file) { Codec codec = Codec.fromFileName(file.location()); try (InputStream is = @@ -304,6 +337,55 @@ public static TableMetadata read(InputFile file) { } } + private static InputFile newInputFile(FileIO io, InputFile input) { + if (cachingEnabled(io)) { + return contentCache(io).tryCache(input); + } + + return input; + } + + private static boolean cachingEnabled(FileIO io) { + try { + return PropertyUtil.propertyAsBoolean( + io.properties(), + CatalogProperties.IO_TABLE_METADATA_CACHE_ENABLED, + CatalogProperties.IO_TABLE_METADATA_CACHE_ENABLED_DEFAULT); + } catch (UnsupportedOperationException e) { + return false; + } + } + + @VisibleForTesting + static ContentCache contentCache(FileIO io) { + return CONTENT_CACHES.get( + io, + fileIO -> + new ContentCache( + cacheDurationMs(fileIO), cacheTotalBytes(fileIO), cacheMaxContentLength(fileIO))); + } + + private static long cacheDurationMs(FileIO io) { + return PropertyUtil.propertyAsLong( + io.properties(), + CatalogProperties.IO_TABLE_METADATA_CACHE_EXPIRATION_INTERVAL_MS, + CatalogProperties.IO_TABLE_METADATA_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT); + } + + private static long cacheTotalBytes(FileIO io) { + return PropertyUtil.propertyAsLong( + io.properties(), + CatalogProperties.IO_TABLE_METADATA_CACHE_MAX_TOTAL_BYTES, + CatalogProperties.IO_TABLE_METADATA_CACHE_MAX_TOTAL_BYTES_DEFAULT); + } + + private static long cacheMaxContentLength(FileIO io) { + return PropertyUtil.propertyAsLong( + io.properties(), + CatalogProperties.IO_TABLE_METADATA_CACHE_MAX_CONTENT_LENGTH, + CatalogProperties.IO_TABLE_METADATA_CACHE_MAX_CONTENT_LENGTH_DEFAULT); + } + /** * Read TableMetadata from a JSON string. * diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadataParser.java b/core/src/test/java/org/apache/iceberg/TestTableMetadataParser.java index 87c618b9adcd..2ebc68c53903 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadataParser.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadataParser.java @@ -24,22 +24,26 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.assertj.core.api.Assertions.assertThat; -import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; -import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.zip.GZIPInputStream; import java.util.zip.ZipException; import org.apache.iceberg.TableMetadataParser.Codec; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.io.ContentCache; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types.BooleanType; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -55,27 +59,110 @@ private static List parameters() { @Parameter private String codecName; + private FileIO io; + private final List fileNames = new ArrayList<>(); + @TestTemplate public void testGzipCompressionProperty() throws IOException { + io = new HadoopFileIO(); + String fileName = getFileName("v3"); + TableMetadata metadata = createTableMetadata(); + writeTableMetadata(fileName, metadata); + readAndTestTableMetadata(fileName, metadata); + + ContentCache cache = TableMetadataParser.contentCache(io); + assertThat(cache.stats().loadCount()).isEqualTo(0); + assertThat(cache.stats().hitCount()).isEqualTo(0); + assertThat(cache.stats().missCount()).isEqualTo(0); + } + + @TestTemplate + public void testGzipCompressionPropertyWithCache() throws IOException { + io = new HadoopFileIO(); + io.initialize(ImmutableMap.of(CatalogProperties.IO_TABLE_METADATA_CACHE_ENABLED, "true")); + String fileName = getFileName("v3"); + TableMetadata metadata = createTableMetadata(); + writeTableMetadata(fileName, metadata); + readAndTestTableMetadata(fileName, metadata); + + ContentCache cache = TableMetadataParser.contentCache(io); + assertThat(cache.stats().loadCount()).isEqualTo(1); + assertThat(cache.stats().hitCount()).isEqualTo(0); + assertThat(cache.stats().missCount()).isEqualTo(1); + + readAndTestTableMetadata(fileName, metadata); + assertThat(cache.stats().loadCount()).isEqualTo(1); + assertThat(cache.stats().hitCount()).isEqualTo(1); + assertThat(cache.stats().missCount()).isEqualTo(1); + + String anotherFileName = getFileName("another-v3"); + writeTableMetadata(anotherFileName, metadata); + readAndTestTableMetadata(anotherFileName, metadata); + assertThat(cache.stats().loadCount()).isEqualTo(2); + assertThat(cache.stats().hitCount()).isEqualTo(1); + assertThat(cache.stats().missCount()).isEqualTo(2); + } + + @TestTemplate + public void testGzipCompressionPropertyWithSmallCache() throws IOException { + io = new HadoopFileIO(); + io.initialize(ImmutableMap.of( + CatalogProperties.IO_TABLE_METADATA_CACHE_ENABLED, "true", + CatalogProperties.IO_TABLE_METADATA_CACHE_MAX_TOTAL_BYTES, "1", + CatalogProperties.IO_TABLE_METADATA_CACHE_MAX_CONTENT_LENGTH, "1" + )); + String fileName = getFileName("v3"); + TableMetadata metadata = createTableMetadata(); + writeTableMetadata(fileName, metadata); + readAndTestTableMetadata(fileName, metadata); + + ContentCache cache = TableMetadataParser.contentCache(io); + assertThat(cache.stats().loadCount()).isEqualTo(0); + assertThat(cache.stats().hitCount()).isEqualTo(0); + assertThat(cache.stats().missCount()).isEqualTo(0); + } + + @BeforeEach + public void setup() { + fileNames.clear(); + } + + @AfterEach + public void cleanup() throws IOException { + for (String fileName : fileNames) { + java.nio.file.Files.deleteIfExists(Paths.get(fileName)); + java.nio.file.Files.deleteIfExists(Paths.get("." + fileName + ".crc")); + } + io.close(); + } + + private String getFileName(String prefix) { Codec codec = Codec.fromName(codecName); String fileExtension = getFileExtension(codec); - String fileName = "v3" + fileExtension; - OutputFile outputFile = Files.localOutput(fileName); + String fileName = prefix + fileExtension; + fileNames.add(fileName); + return fileName; + } + + private TableMetadata createTableMetadata() { Map properties = Maps.newHashMap(); properties.put(TableProperties.METADATA_COMPRESSION, codecName); String location = "file://tmp/db/table"; - TableMetadata metadata = newTableMetadata(SCHEMA, unpartitioned(), location, properties); + return newTableMetadata(SCHEMA, unpartitioned(), location, properties); + } + + private void writeTableMetadata(String fileName, TableMetadata metadata) { + OutputFile outputFile = io.newOutputFile(fileName); + Map properties = Maps.newHashMap(); + properties.put(TableProperties.METADATA_COMPRESSION, codecName); TableMetadataParser.write(metadata, outputFile); - assertThat(isCompressed(fileName)).isEqualTo(codec == Codec.GZIP); - TableMetadata actualMetadata = TableMetadataParser.read(Files.localInput(new File(fileName))); - verifyMetadata(metadata, actualMetadata); } - @AfterEach - public void cleanup() throws IOException { - Codec codec = Codec.fromName(codecName); - Path metadataFilePath = Paths.get("v3" + getFileExtension(codec)); - java.nio.file.Files.deleteIfExists(metadataFilePath); + private void readAndTestTableMetadata(String fileName, TableMetadata metadata) throws IOException { + assertThat(isCompressed(fileName)).isEqualTo(Codec.fromName(codecName) == Codec.GZIP); + TableMetadata actualMetadata = + TableMetadataParser.read(io, io.newInputFile(fileName)); + verifyMetadata(metadata, actualMetadata); } private void verifyMetadata(TableMetadata expected, TableMetadata actual) {