Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
</dependency>

<dependency>
<groupId>org.weakref</groupId>
<artifactId>jmxutils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;

import com.facebook.airlift.log.Logger;
import org.apache.iceberg.io.ByteBufferInputStream;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.SeekableInputStream;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.io.IOUtil.readRemaining;

public class HdfsCachedInputFile
implements InputFile
{
private static final Logger LOG = Logger.get(HdfsCachedInputFile.class);

private final InputFile delegate;
private final ManifestFileCacheKey cacheKey;
private final ManifestFileCache cache;

public HdfsCachedInputFile(InputFile delegate, ManifestFileCacheKey cacheKey, ManifestFileCache cache)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.cacheKey = requireNonNull(cacheKey, "cacheKey is null");
this.cache = requireNonNull(cache, "cache is null");
}

@Override
public long getLength()
{
ManifestFileCachedContent cachedContent = cache.getIfPresent(cacheKey);
if (cachedContent != null) {
return cachedContent.getLength();
}
return delegate.getLength();
}

@Override
public SeekableInputStream newStream()
{
ManifestFileCachedContent cachedContent = cache.getIfPresent(cacheKey);
if (cachedContent != null) {
return ByteBufferInputStream.wrap(cachedContent.getData());
}

long fileLength = delegate.getLength();
if (fileLength <= cache.getMaxFileLength() && cache.isEnabled()) {
try {
ManifestFileCachedContent content = readFully(delegate, fileLength, cache.getBufferChunkSize());
cache.put(cacheKey, content);
cache.recordFileSize(content.getLength());
return ByteBufferInputStream.wrap(content.getData());
}
catch (IOException e) {
LOG.warn("Failed to cache input file. Falling back to direct read.", e);
}
}

return delegate.newStream();
}

@Override
public String location()
{
return delegate.location();
}

@Override
public boolean exists()
{
return cache.getIfPresent(cacheKey) != null || delegate.exists();
}

private static ManifestFileCachedContent readFully(InputFile input, long fileLength, long chunkSize)
throws IOException
{
try (SeekableInputStream stream = input.newStream()) {
long totalBytesToRead = fileLength;
List<ByteBuffer> buffers = new ArrayList<>(
((int) (fileLength / chunkSize)) +
(fileLength % chunkSize == 0 ? 0 : 1));

while (totalBytesToRead > 0) {
int bytesToRead = (int) Math.min(chunkSize, totalBytesToRead);
byte[] buf = new byte[bytesToRead];
int bytesRead = readRemaining(stream, buf, 0, bytesToRead);
totalBytesToRead -= bytesRead;

if (bytesRead < bytesToRead) {
throw new IOException(
String.format(
"Failed to read %d bytes: %d bytes in stream",
fileLength, fileLength - totalBytesToRead));
}
else {
buffers.add(ByteBuffer.wrap(buf));
}
}
return new ManifestFileCachedContent(buffers, fileLength);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,30 @@
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.spi.PrestoException;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;

import java.io.IOException;
import java.util.Optional;

import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class HdfsFileIO
implements FileIO
{
private final HdfsEnvironment environment;
private final HdfsContext context;
private final ManifestFileCache manifestFileCache;

public HdfsFileIO(HdfsEnvironment environment, HdfsContext context)
public HdfsFileIO(ManifestFileCache manifestFileCache, HdfsEnvironment environment, HdfsContext context)
{
this.environment = requireNonNull(environment, "environment is null");
this.context = requireNonNull(context, "context is null");
this.manifestFileCache = requireNonNull(manifestFileCache, "manifestFileCache is null");
}

@Override
Expand All @@ -44,6 +49,33 @@ public InputFile newInputFile(String path)
return new HdfsInputFile(new Path(path), environment, context);
}

@Override
public InputFile newInputFile(String path, long length)
{
return new HdfsInputFile(new Path(path), environment, context, Optional.of(length));
}

protected InputFile newCachedInputFile(String path)
{
InputFile inputFile = new HdfsInputFile(new Path(path), environment, context);
return manifestFileCache.isEnabled() ?
new HdfsCachedInputFile(inputFile, new ManifestFileCacheKey(path), manifestFileCache) :
inputFile;
}

@Override
public InputFile newInputFile(ManifestFile manifest)
{
checkArgument(
manifest.keyMetadata() == null,
"Cannot decrypt manifest: {} (use EncryptingFileIO)",
manifest.path());
InputFile inputFile = new HdfsInputFile(new Path(manifest.path()), environment, context, Optional.of(manifest.length()));
return manifestFileCache.isEnabled() ?
new HdfsCachedInputFile(inputFile, new ManifestFileCacheKey(manifest.path()), manifestFileCache) :
inputFile;
}

@Override
public OutputFile newOutputFile(String path)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.iceberg.io.SeekableInputStream;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static java.util.Objects.requireNonNull;
Expand All @@ -32,25 +34,42 @@ public class HdfsInputFile
private final InputFile delegate;
private final HdfsEnvironment environment;
private final String user;
private final AtomicLong length;

public HdfsInputFile(Path path, HdfsEnvironment environment, HdfsContext context)
public HdfsInputFile(Path path, HdfsEnvironment environment, HdfsContext context, Optional<Long> length)
{
requireNonNull(path, "path is null");
this.environment = requireNonNull(environment, "environment is null");
this.length = new AtomicLong(length.orElse(-1L));
requireNonNull(context, "context is null");
try {
this.delegate = HadoopInputFile.fromPath(path, environment.getFileSystem(context, path), environment.getConfiguration(context, path));
if (this.length.get() < 0) {
this.delegate = HadoopInputFile.fromPath(path, environment.getFileSystem(context, path), environment.getConfiguration(context, path));
}
else {
this.delegate = HadoopInputFile.fromPath(path, this.length.get(), environment.getFileSystem(context, path), environment.getConfiguration(context, path));
}
}
catch (IOException e) {
throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "Failed to create input file: " + path, e);
}
this.user = context.getIdentity().getUser();
}

public HdfsInputFile(Path path, HdfsEnvironment environment, HdfsContext context)
{
this(path, environment, context, Optional.empty());
}

@Override
public long getLength()
{
return environment.doAs(user, delegate::getLength);
return length.updateAndGet(value -> {
if (value < 0) {
return environment.doAs(user, delegate::getLength);
}
return value;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class HiveTableOperations
private final String tableName;
private final Optional<String> owner;
private final Optional<String> location;
private final FileIO fileIO;
private final HdfsFileIO fileIO;
private final IcebergHiveTableOperationsConfig config;

private TableMetadata currentMetadata;
Expand All @@ -121,10 +121,11 @@ public HiveTableOperations(
HdfsEnvironment hdfsEnvironment,
HdfsContext hdfsContext,
IcebergHiveTableOperationsConfig config,
ManifestFileCache manifestFileCache,
String database,
String table)
{
this(new HdfsFileIO(hdfsEnvironment, hdfsContext),
this(new HdfsFileIO(manifestFileCache, hdfsEnvironment, hdfsContext),
metastore,
metastoreContext,
config,
Expand All @@ -140,12 +141,13 @@ public HiveTableOperations(
HdfsEnvironment hdfsEnvironment,
HdfsContext hdfsContext,
IcebergHiveTableOperationsConfig config,
ManifestFileCache manifestFileCache,
String database,
String table,
String owner,
String location)
{
this(new HdfsFileIO(hdfsEnvironment, hdfsContext),
this(new HdfsFileIO(manifestFileCache, hdfsEnvironment, hdfsContext),
metastore,
metastoreContext,
config,
Expand All @@ -156,7 +158,7 @@ public HiveTableOperations(
}

private HiveTableOperations(
FileIO fileIO,
HdfsFileIO fileIO,
ExtendedHiveMetastore metastore,
MetastoreContext metastoreContext,
IcebergHiveTableOperationsConfig config,
Expand Down Expand Up @@ -409,7 +411,7 @@ private void refreshFromMetadataLocation(String newLocation)
config.getTableRefreshMaxRetryTime().toMillis(),
config.getTableRefreshBackoffScaleFactor())
.run(metadataLocation -> newMetadata.set(
TableMetadataParser.read(fileIO, io().newInputFile(metadataLocation))));
TableMetadataParser.read(fileIO, fileIO.newCachedInputFile(metadataLocation))));
}
catch (RuntimeException e) {
throw new TableNotFoundException(getSchemaTableName(), "Table metadata is missing", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.procedure.Procedure;
import com.facebook.presto.spi.statistics.ColumnStatistics;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.inject.Binder;
Expand All @@ -93,8 +94,11 @@

import javax.inject.Singleton;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
Expand Down Expand Up @@ -198,7 +202,7 @@ protected void setup(Binder binder)
@Provides
public StatisticsFileCache createStatisticsFileCache(IcebergConfig config, MBeanExporter exporter)
{
Cache<StatisticsFileCacheKey, ColumnStatistics> delegate = CacheBuilder.newBuilder()
com.github.benmanes.caffeine.cache.Cache<StatisticsFileCacheKey, ColumnStatistics> delegate = Caffeine.newBuilder()
.maximumWeight(config.getMaxStatisticsFileCacheSize().toBytes())
.<StatisticsFileCacheKey, ColumnStatistics>weigher((key, entry) -> (int) entry.getEstimatedSize())
.recordStats()
Expand All @@ -208,6 +212,26 @@ public StatisticsFileCache createStatisticsFileCache(IcebergConfig config, MBean
return statisticsFileCache;
}

@Singleton
@Provides
public ManifestFileCache createManifestFileCache(IcebergConfig config, MBeanExporter exporter)
{
com.github.benmanes.caffeine.cache.Caffeine<ManifestFileCacheKey, ManifestFileCachedContent> delegate = Caffeine.newBuilder()
.maximumWeight(config.getMaxManifestCacheSize())
.<ManifestFileCacheKey, ManifestFileCachedContent>weigher((key, entry) -> (int) entry.getData().stream().mapToLong(ByteBuffer::capacity).sum())
.recordStats();
if (config.getManifestCacheExpireDuration() > 0) {
delegate.expireAfterWrite(config.getManifestCacheExpireDuration(), MILLISECONDS);
}
ManifestFileCache manifestFileCache = new ManifestFileCache(
delegate.build(),
config.getManifestCachingEnabled(),
config.getManifestCacheMaxContentLength(),
config.getManifestCacheMaxChunkSize().toBytes());
exporter.export(generatedNameOf(ManifestFileCache.class, connectorId), manifestFileCache);
return manifestFileCache;
}

@ForCachingHiveMetastore
@Singleton
@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ public class IcebergConfig

private EnumSet<ColumnStatisticType> hiveStatisticsMergeFlags = EnumSet.noneOf(ColumnStatisticType.class);
private String fileIOImpl = HadoopFileIO.class.getName();
private boolean manifestCachingEnabled;
private boolean manifestCachingEnabled = true;
private long maxManifestCacheSize = IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT;
private long manifestCacheExpireDuration = IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;
private long manifestCacheMaxContentLength = IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT;
private DataSize manifestCacheMaxChunkSize = succinctDataSize(2, MEGABYTE);
private int splitManagerThreads = Runtime.getRuntime().availableProcessors();
private DataSize maxStatisticsFileCacheSize = succinctDataSize(256, MEGABYTE);

Expand Down Expand Up @@ -348,6 +349,20 @@ public IcebergConfig setManifestCacheMaxContentLength(long manifestCacheMaxConte
return this;
}

public DataSize getManifestCacheMaxChunkSize()
{
return manifestCacheMaxChunkSize;
}

@Min(1024)
@Config("iceberg.io.manifest.cache.max-chunk-size")
@ConfigDescription("Maximum length of a buffer used to cache manifest file content. Only applicable to HIVE catalog.")
public IcebergConfig setManifestCacheMaxChunkSize(DataSize manifestCacheMaxChunkSize)
{
this.manifestCacheMaxChunkSize = manifestCacheMaxChunkSize;
return this;
}

@Min(0)
public int getSplitManagerThreads()
{
Expand Down
Loading
Loading