Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion core/src/main/java/org/apache/iceberg/CatalogProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,60 @@ private CatalogProperties() {}
public static final long CACHE_EXPIRATION_INTERVAL_MS_DEFAULT = TimeUnit.SECONDS.toMillis(30);
public static final long CACHE_EXPIRATION_INTERVAL_MS_OFF = -1;

/**
* Controls whether to use caching during table metadata reads or not.
*
* <p>Enabling table metadata file caching require the following configuration constraints to be true:
*
* <ul>
* <li>{@link #IO_TABLE_METADATA_CACHE_EXPIRATION_INTERVAL_MS} must be a non-negative value.
* <li>{@link #IO_TABLE_METADATA_CACHE_MAX_TOTAL_BYTES} must be a positive value.
* <li>{@link #IO_TABLE_METADATA_CACHE_MAX_CONTENT_LENGTH} must be a positive value.
* </ul>
*/
public static final String IO_TABLE_METADATA_CACHE_ENABLED = "io.table-metadata.cache-enabled";

public static final boolean IO_TABLE_METADATA_CACHE_ENABLED_DEFAULT = false;

/**
* Controls the maximum duration for which an entry stays in the table metadata cache.
*
* <p>Must be a non-negative value. Following are specific behaviors of this config:
*
* <ul>
* <li>Zero - Cache entries expires only if it gets evicted due to memory pressure from {@link
* #IO_TABLE_METADATA_CACHE_MAX_TOTAL_BYTES} setting.
* <li>Positive Values - Cache entries expire if not accessed via the cache after this many
* milliseconds
* </ul>
*/
public static final String IO_TABLE_METADATA_CACHE_EXPIRATION_INTERVAL_MS =
"io.table-metadata.cache.expiration-interval-ms";

public static final long IO_TABLE_METADATA_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT =
TimeUnit.MINUTES.toMillis(10);

/**
* Controls the maximum total amount of bytes to cache in the table metadata cache.
*
* <p>Must be a positive value.
*/
public static final String IO_TABLE_METADATA_CACHE_MAX_TOTAL_BYTES =
"io.table-metadata.cache.max-total-bytes";

public static final long IO_TABLE_METADATA_CACHE_MAX_TOTAL_BYTES_DEFAULT = 100 * 1024 * 1024;

/**
* Controls the maximum length of file to be considered for caching a table metadata file.
*
* <p>An {@link org.apache.iceberg.io.InputFile} will not be cached if the length is longer than
* this limit. Must be a positive value.
*/
public static final String IO_TABLE_METADATA_CACHE_MAX_CONTENT_LENGTH =
"io.table-metadata.cache.max-content-length";

public static final long IO_TABLE_METADATA_CACHE_MAX_CONTENT_LENGTH_DEFAULT = 8 * 1024 * 1024;

/**
* Controls whether to use caching during manifest reads or not.
*
Expand Down Expand Up @@ -109,7 +163,7 @@ private CatalogProperties() {}
public static final long IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT = 100 * 1024 * 1024;

/**
* Controls the maximum length of file to be considered for caching.
* Controls the maximum length of file to be considered for caching a manifest file.
*
* <p>An {@link org.apache.iceberg.io.InputFile} will not be cached if the length is longer than
* this limit. Must be a positive value.
Expand Down
15 changes: 14 additions & 1 deletion core/src/main/java/org/apache/iceberg/SystemConfigs.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,20 @@ private SystemConfigs() {}

/**
* Maximum number of distinct {@link org.apache.iceberg.io.FileIO} that is allowed to have
* associated {@link org.apache.iceberg.io.ContentCache} in memory at a time.
* associated {@link org.apache.iceberg.io.ContentCache} in memory at a time in the table metadata
* cache.
*/
public static final ConfigEntry<Integer> IO_TABLE_METADATA_CACHE_MAX_FILEIO =
new ConfigEntry<>(
"iceberg.io.table-metadata.cache.fileio-max",
"ICEBERG_IO_TABLE_METADATA_CACHE_FILEIO_MAX",
8,
Integer::parseUnsignedInt);

/**
* Maximum number of distinct {@link org.apache.iceberg.io.FileIO} that is allowed to have
* associated {@link org.apache.iceberg.io.ContentCache} in memory at a time in the manifest
* cache.
*/
public static final ConfigEntry<Integer> IO_MANIFEST_CACHE_MAX_FILEIO =
new ConfigEntry<>(
Expand Down
82 changes: 82 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableMetadataParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -36,14 +38,19 @@
import org.apache.iceberg.TableMetadata.SnapshotLogEntry;
import org.apache.iceberg.encryption.EncryptedKey;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.ContentCache;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.JsonUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableMetadataParser {

Expand Down Expand Up @@ -84,6 +91,8 @@ public static Codec fromFileName(String fileName) {

private TableMetadataParser() {}

private static final Logger LOG = LoggerFactory.getLogger(TableMetadataParser.class);

// visible for testing
static final String FORMAT_VERSION = "format-version";
static final String TABLE_UUID = "table-uuid";
Expand Down Expand Up @@ -115,6 +124,21 @@ private TableMetadataParser() {}
static final String NEXT_ROW_ID = "next-row-id";
static final int MIN_NULL_CURRENT_SNAPSHOT_VERSION = 3;

private static Caffeine<Object, Object> newTableMetadataCacheBuilder() {
int maxSize = SystemConfigs.IO_TABLE_METADATA_CACHE_MAX_FILEIO.value();
return Caffeine.newBuilder()
.weakKeys()
.softValues()
.maximumSize(maxSize)
.removalListener(
(io, contentCache, cause) ->
LOG.debug("Evicted {} from FileIO-level cache ({})", io, cause))
.recordStats();
}

private static final Cache<FileIO, ContentCache> CONTENT_CACHES =
newTableMetadataCacheBuilder().build();

public static void overwrite(TableMetadata metadata, OutputFile outputFile) {
internalWrite(metadata, outputFile, true);
}
Expand Down Expand Up @@ -294,6 +318,15 @@ public static TableMetadata read(FileIO io, String path) {
return read(io.newInputFile(path));
}

public static TableMetadata read(FileIO io, InputFile file) {
InputFile wrapped = newInputFile(io, file);
return read(wrapped);
}

/**
* @deprecated since 1.11.0, will be removed in 1.12.0; use {@link #read(FileIO, InputFile)} instead.
*/
@Deprecated
public static TableMetadata read(InputFile file) {
Codec codec = Codec.fromFileName(file.location());
try (InputStream is =
Expand All @@ -304,6 +337,55 @@ public static TableMetadata read(InputFile file) {
}
}

private static InputFile newInputFile(FileIO io, InputFile input) {
if (cachingEnabled(io)) {
return contentCache(io).tryCache(input);
}

return input;
}

private static boolean cachingEnabled(FileIO io) {
try {
return PropertyUtil.propertyAsBoolean(
io.properties(),
CatalogProperties.IO_TABLE_METADATA_CACHE_ENABLED,
CatalogProperties.IO_TABLE_METADATA_CACHE_ENABLED_DEFAULT);
} catch (UnsupportedOperationException e) {
return false;
}
}

@VisibleForTesting
static ContentCache contentCache(FileIO io) {
return CONTENT_CACHES.get(
io,
fileIO ->
new ContentCache(
cacheDurationMs(fileIO), cacheTotalBytes(fileIO), cacheMaxContentLength(fileIO)));
}

private static long cacheDurationMs(FileIO io) {
return PropertyUtil.propertyAsLong(
io.properties(),
CatalogProperties.IO_TABLE_METADATA_CACHE_EXPIRATION_INTERVAL_MS,
CatalogProperties.IO_TABLE_METADATA_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT);
}

private static long cacheTotalBytes(FileIO io) {
return PropertyUtil.propertyAsLong(
io.properties(),
CatalogProperties.IO_TABLE_METADATA_CACHE_MAX_TOTAL_BYTES,
CatalogProperties.IO_TABLE_METADATA_CACHE_MAX_TOTAL_BYTES_DEFAULT);
}

private static long cacheMaxContentLength(FileIO io) {
return PropertyUtil.propertyAsLong(
io.properties(),
CatalogProperties.IO_TABLE_METADATA_CACHE_MAX_CONTENT_LENGTH,
CatalogProperties.IO_TABLE_METADATA_CACHE_MAX_CONTENT_LENGTH_DEFAULT);
}

/**
* Read TableMetadata from a JSON string.
*
Expand Down
113 changes: 100 additions & 13 deletions core/src/test/java/org/apache/iceberg/TestTableMetadataParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,26 @@
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipException;
import org.apache.iceberg.TableMetadataParser.Codec;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.ContentCache;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types.BooleanType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

Expand All @@ -55,27 +59,110 @@ private static List<Object> parameters() {

@Parameter private String codecName;

private FileIO io;
private final List<String> fileNames = new ArrayList<>();

@TestTemplate
public void testGzipCompressionProperty() throws IOException {
io = new HadoopFileIO();
String fileName = getFileName("v3");
TableMetadata metadata = createTableMetadata();
writeTableMetadata(fileName, metadata);
readAndTestTableMetadata(fileName, metadata);

ContentCache cache = TableMetadataParser.contentCache(io);
assertThat(cache.stats().loadCount()).isEqualTo(0);
assertThat(cache.stats().hitCount()).isEqualTo(0);
assertThat(cache.stats().missCount()).isEqualTo(0);
}

@TestTemplate
public void testGzipCompressionPropertyWithCache() throws IOException {
io = new HadoopFileIO();
io.initialize(ImmutableMap.of(CatalogProperties.IO_TABLE_METADATA_CACHE_ENABLED, "true"));
String fileName = getFileName("v3");
TableMetadata metadata = createTableMetadata();
writeTableMetadata(fileName, metadata);
readAndTestTableMetadata(fileName, metadata);

ContentCache cache = TableMetadataParser.contentCache(io);
assertThat(cache.stats().loadCount()).isEqualTo(1);
assertThat(cache.stats().hitCount()).isEqualTo(0);
assertThat(cache.stats().missCount()).isEqualTo(1);

readAndTestTableMetadata(fileName, metadata);
assertThat(cache.stats().loadCount()).isEqualTo(1);
assertThat(cache.stats().hitCount()).isEqualTo(1);
assertThat(cache.stats().missCount()).isEqualTo(1);

String anotherFileName = getFileName("another-v3");
writeTableMetadata(anotherFileName, metadata);
readAndTestTableMetadata(anotherFileName, metadata);
assertThat(cache.stats().loadCount()).isEqualTo(2);
assertThat(cache.stats().hitCount()).isEqualTo(1);
assertThat(cache.stats().missCount()).isEqualTo(2);
}

@TestTemplate
public void testGzipCompressionPropertyWithSmallCache() throws IOException {
io = new HadoopFileIO();
io.initialize(ImmutableMap.of(
CatalogProperties.IO_TABLE_METADATA_CACHE_ENABLED, "true",
CatalogProperties.IO_TABLE_METADATA_CACHE_MAX_TOTAL_BYTES, "1",
CatalogProperties.IO_TABLE_METADATA_CACHE_MAX_CONTENT_LENGTH, "1"
));
String fileName = getFileName("v3");
TableMetadata metadata = createTableMetadata();
writeTableMetadata(fileName, metadata);
readAndTestTableMetadata(fileName, metadata);

ContentCache cache = TableMetadataParser.contentCache(io);
assertThat(cache.stats().loadCount()).isEqualTo(0);
assertThat(cache.stats().hitCount()).isEqualTo(0);
assertThat(cache.stats().missCount()).isEqualTo(0);
}

@BeforeEach
public void setup() {
fileNames.clear();
}

@AfterEach
public void cleanup() throws IOException {
for (String fileName : fileNames) {
java.nio.file.Files.deleteIfExists(Paths.get(fileName));
java.nio.file.Files.deleteIfExists(Paths.get("." + fileName + ".crc"));
}
io.close();
}

private String getFileName(String prefix) {
Codec codec = Codec.fromName(codecName);
String fileExtension = getFileExtension(codec);
String fileName = "v3" + fileExtension;
OutputFile outputFile = Files.localOutput(fileName);
String fileName = prefix + fileExtension;
fileNames.add(fileName);
return fileName;
}

private TableMetadata createTableMetadata() {
Map<String, String> properties = Maps.newHashMap();
properties.put(TableProperties.METADATA_COMPRESSION, codecName);
String location = "file://tmp/db/table";
TableMetadata metadata = newTableMetadata(SCHEMA, unpartitioned(), location, properties);
return newTableMetadata(SCHEMA, unpartitioned(), location, properties);
}

private void writeTableMetadata(String fileName, TableMetadata metadata) {
OutputFile outputFile = io.newOutputFile(fileName);
Map<String, String> properties = Maps.newHashMap();
properties.put(TableProperties.METADATA_COMPRESSION, codecName);
TableMetadataParser.write(metadata, outputFile);
assertThat(isCompressed(fileName)).isEqualTo(codec == Codec.GZIP);
TableMetadata actualMetadata = TableMetadataParser.read(Files.localInput(new File(fileName)));
verifyMetadata(metadata, actualMetadata);
}

@AfterEach
public void cleanup() throws IOException {
Codec codec = Codec.fromName(codecName);
Path metadataFilePath = Paths.get("v3" + getFileExtension(codec));
java.nio.file.Files.deleteIfExists(metadataFilePath);
private void readAndTestTableMetadata(String fileName, TableMetadata metadata) throws IOException {
assertThat(isCompressed(fileName)).isEqualTo(Codec.fromName(codecName) == Codec.GZIP);
TableMetadata actualMetadata =
TableMetadataParser.read(io, io.newInputFile(fileName));
verifyMetadata(metadata, actualMetadata);
}

private void verifyMetadata(TableMetadata expected, TableMetadata actual) {
Expand Down
Loading