Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions core/src/main/java/org/apache/iceberg/CatalogProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,60 @@ private CatalogProperties() {}
public static final long CACHE_EXPIRATION_INTERVAL_MS_DEFAULT = TimeUnit.SECONDS.toMillis(30);
public static final long CACHE_EXPIRATION_INTERVAL_MS_OFF = -1;

/**
* Controls whether to use caching during manifest reads or not.
*
* <p>Enabling manifest file caching require the following configuration constraints to be true:
*
* <ul>
* <li>{@link #IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS} must be a non-negative value.
* <li>{@link #IO_MANIFEST_CACHE_MAX_TOTAL_BYTES} must be a positive value.
* <li>{@link #IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH} must be a positive value.
* </ul>
*/
public static final String IO_MANIFEST_CACHE_ENABLED = "io.manifest.cache-enabled";

public static final boolean IO_MANIFEST_CACHE_ENABLED_DEFAULT = false;

/**
* Controls the maximum duration for which an entry stays in the manifest cache.
*
* <p>Must be a non-negative value. Following are specific behaviors of this config:
*
* <ul>
* <li>Zero - Cache entries expires only if it gets evicted due to memory pressure from {@link
* #IO_MANIFEST_CACHE_MAX_TOTAL_BYTES} setting.
* <li>Positive Values - Cache entries expire if not accessed via the cache after this many
* milliseconds
* </ul>
*/
public static final String IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS =
"io.manifest.cache.expiration-interval-ms";

public static final long IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT =
TimeUnit.SECONDS.toMillis(60);

/**
* Controls the maximum total amount of bytes to cache in manifest cache.
*
* <p>Must be a positive value.
*/
public static final String IO_MANIFEST_CACHE_MAX_TOTAL_BYTES =
"io.manifest.cache.max-total-bytes";

public static final long IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT = 100 * 1024 * 1024;

/**
* Controls the maximum length of file to be considered for caching.
*
* <p>An {@link org.apache.iceberg.io.InputFile} will not be cached if the length is longer than
* this limit. Must be a positive value.
*/
public static final String IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH =
"io.manifest.cache.max-content-length";

public static final long IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT = 8 * 1024 * 1024;

public static final String URI = "uri";
public static final String CLIENT_POOL_SIZE = "clients";
public static final int CLIENT_POOL_SIZE_DEFAULT = 2;
Expand Down
106 changes: 104 additions & 2 deletions core/src/main/java/org/apache/iceberg/ManifestFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,32 @@
*/
package org.apache.iceberg;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.io.IOException;
import java.util.Map;
import org.apache.iceberg.ManifestReader.FileType;
import org.apache.iceberg.avro.AvroEncoderUtil;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.ContentCache;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ManifestFiles {
private ManifestFiles() {}

private static final Logger LOG = LoggerFactory.getLogger(ManifestFiles.class);

private static final org.apache.avro.Schema MANIFEST_AVRO_SCHEMA =
AvroSchemaUtil.convert(
ManifestFile.schema(),
Expand All @@ -44,6 +53,36 @@ private ManifestFiles() {}
ManifestFile.PARTITION_SUMMARY_TYPE,
GenericPartitionFieldSummary.class.getName()));

@VisibleForTesting
static Cache<FileIO, ContentCache> newManifestCache() {
return Caffeine.newBuilder()
.weakKeys()
.softValues()
.maximumSize(maxFileIO())
.removalListener(
(io, contentCache, cause) ->
LOG.debug("Evicted {} from FileIO-level cache ({})", io, cause))
.recordStats()
.build();
}

private static final Cache<FileIO, ContentCache> CONTENT_CACHES = newManifestCache();

@VisibleForTesting
static ContentCache contentCache(FileIO io) {
return CONTENT_CACHES.get(
io,
fileIO ->
new ContentCache(
cacheDurationMs(fileIO), cacheTotalBytes(fileIO), cacheMaxContentLength(fileIO)));
}

/** Drop manifest file cache object for a FileIO if exists. */
public static synchronized void dropCache(FileIO fileIO) {
CONTENT_CACHES.invalidate(fileIO);
CONTENT_CACHES.cleanUp();
}

/**
* Returns a {@link CloseableIterable} of file paths in the {@link ManifestFile}.
*
Expand Down Expand Up @@ -86,7 +125,7 @@ public static ManifestReader<DataFile> read(
manifest.content() == ManifestContent.DATA,
"Cannot read a delete manifest with a ManifestReader: %s",
manifest);
InputFile file = io.newInputFile(manifest.path(), manifest.length());
InputFile file = newInputFile(io, manifest.path(), manifest.length());
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest);
return new ManifestReader<>(file, specsById, inheritableMetadata, FileType.DATA_FILES);
}
Expand Down Expand Up @@ -140,7 +179,7 @@ public static ManifestReader<DeleteFile> readDeleteManifest(
manifest.content() == ManifestContent.DELETES,
"Cannot read a data manifest with a DeleteManifestReader: %s",
manifest);
InputFile file = io.newInputFile(manifest.path(), manifest.length());
InputFile file = newInputFile(io, manifest.path(), manifest.length());
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest);
return new ManifestReader<>(file, specsById, inheritableMetadata, FileType.DELETE_FILES);
}
Expand Down Expand Up @@ -300,4 +339,67 @@ private static ManifestFile copyManifestInternal(

return writer.toManifestFile();
}

private static InputFile newInputFile(FileIO io, String path, long length) {
boolean enabled = false;

try {
enabled = cachingEnabled(io);
} catch (UnsupportedOperationException e) {
// There is an issue reading io.properties(). Disable caching.
enabled = false;
}

if (enabled) {
ContentCache cache = contentCache(io);
Preconditions.checkNotNull(
cache,
"ContentCache creation failed. Check that all manifest caching configurations has valid value.");
LOG.debug("FileIO-level cache stats: {}", CONTENT_CACHES.stats());
return cache.tryCache(io, path, length);
}

// caching is not enable for this io or caught RuntimeException.
return io.newInputFile(path, length);
}

private static int maxFileIO() {
String value = System.getProperty(SystemProperties.IO_MANIFEST_CACHE_MAX_FILEIO);
if (value != null) {
try {
return Integer.parseUnsignedInt(value);
} catch (NumberFormatException e) {
// will return the default
}
}
return SystemProperties.IO_MANIFEST_CACHE_MAX_FILEIO_DEFAULT;
}

static boolean cachingEnabled(FileIO io) {
return PropertyUtil.propertyAsBoolean(
io.properties(),
CatalogProperties.IO_MANIFEST_CACHE_ENABLED,
CatalogProperties.IO_MANIFEST_CACHE_ENABLED_DEFAULT);
}

static long cacheDurationMs(FileIO io) {
return PropertyUtil.propertyAsLong(
io.properties(),
CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS,
CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT);
}

static long cacheTotalBytes(FileIO io) {
return PropertyUtil.propertyAsLong(
io.properties(),
CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES,
CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT);
}

static long cacheMaxContentLength(FileIO io) {
return PropertyUtil.propertyAsLong(
io.properties(),
CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH,
CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT);
}
}
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/iceberg/SystemProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ private SystemProperties() {}
/** Whether to use the shared worker pool when planning table scans. */
public static final String SCAN_THREAD_POOL_ENABLED = "iceberg.scan.plan-in-worker-pool";

/**
* Maximum number of distinct {@link org.apache.iceberg.io.FileIO} that is allowed to have
* associated {@link org.apache.iceberg.io.ContentCache} in memory at a time.
*/
public static final String IO_MANIFEST_CACHE_MAX_FILEIO = "iceberg.io.manifest.cache.fileio-max";

public static final int IO_MANIFEST_CACHE_MAX_FILEIO_DEFAULT = 8;

static boolean getBoolean(String systemProperty, boolean defaultValue) {
String value = System.getProperty(systemProperty);
if (value != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;

public class HadoopFileIO implements FileIO, HadoopConfigurable, SupportsPrefixOperations {

private SerializableSupplier<Configuration> hadoopConf;
private SerializableMap<String, String> properties = SerializableMap.copyOf(ImmutableMap.of());

/**
* Constructor used for dynamic FileIO loading.
Expand All @@ -61,6 +63,11 @@ public Configuration conf() {
return hadoopConf.get();
}

@Override
public void initialize(Map<String, String> props) {
this.properties = SerializableMap.copyOf(props);
}

@Override
public InputFile newInputFile(String path) {
return HadoopInputFile.fromLocation(path, hadoopConf.get());
Expand Down Expand Up @@ -89,7 +96,7 @@ public void deleteFile(String path) {

@Override
public Map<String, String> properties() {
return ImmutableMap.of();
return properties.immutableMap();
}

@Override
Expand Down
Loading