Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -411,10 +411,6 @@ Manifest File Caching
As of Iceberg version 1.1.0, Apache Iceberg provides a mechanism to cache the contents of Iceberg manifest files in memory. This feature helps
to reduce repeated reads of small Iceberg manifest files from remote storage.

.. note::

Currently, manifest file caching is supported for Hadoop and Nessie catalogs in the Presto Iceberg connector.

The following configuration properties are available:

==================================================== ============================================================= ============
Expand Down
27 changes: 26 additions & 1 deletion presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<dep.iceberg.version>1.5.0</dep.iceberg.version>
<dep.iceberg.version>1.6.0-SNAPSHOT</dep.iceberg.version>
<dep.nessie.version>0.77.1</dep.nessie.version>
</properties>

Expand Down Expand Up @@ -243,6 +243,31 @@
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-bundled-guava</artifactId>
<version>${dep.iceberg.version}</version>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-common</artifactId>
<version>${dep.iceberg.version}</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-presto-bundle</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
<version>2.10.0</version>
<scope>runtime</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,45 @@
import com.facebook.presto.hive.HdfsContext;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.spi.PrestoException;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.util.SerializableMap;

import java.io.IOException;
import java.util.Map;

import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

public class HdfsFileIO
implements FileIO
{
private final HdfsEnvironment environment;
private final HdfsContext context;
private SerializableMap<String, String> properties = SerializableMap.copyOf(ImmutableMap.of());

public HdfsFileIO(HdfsEnvironment environment, HdfsContext context)
{
this.environment = requireNonNull(environment, "environment is null");
this.context = requireNonNull(context, "context is null");
}

@Override
public void initialize(Map<String, String> props)
{
this.properties = SerializableMap.copyOf(props);
}

@Override
public Map<String, String> properties()
{
return properties.immutableMap();
}

@Override
public InputFile newInputFile(String path)
{
Expand All @@ -61,4 +78,11 @@ public void deleteFile(String pathString)
throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete file: " + path, e);
}
}

@Override
public String toString()
{
return toStringHelper(this)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import static com.facebook.presto.hive.metastore.MetastoreUtil.isPrestoView;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static com.facebook.presto.iceberg.IcebergUtil.isIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.loadCachingProperties;
import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns;
import static com.facebook.presto.spi.security.PrincipalType.USER;
import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -100,6 +101,7 @@ public class HiveTableOperations

private final ExtendedHiveMetastore metastore;
private final MetastoreContext metastoreContext;
private final IcebergConfig icebergConfig;
private final String database;
private final String tableName;
private final Optional<String> owner;
Expand All @@ -120,13 +122,15 @@ public HiveTableOperations(
HdfsEnvironment hdfsEnvironment,
HdfsContext hdfsContext,
IcebergHiveTableOperationsConfig config,
IcebergConfig icebergConfig,
String database,
String table)
{
this(new HdfsFileIO(hdfsEnvironment, hdfsContext),
metastore,
metastoreContext,
config,
icebergConfig,
database,
table,
Optional.empty(),
Expand All @@ -139,6 +143,7 @@ public HiveTableOperations(
HdfsEnvironment hdfsEnvironment,
HdfsContext hdfsContext,
IcebergHiveTableOperationsConfig config,
IcebergConfig icebergConfig,
String database,
String table,
String owner,
Expand All @@ -148,6 +153,7 @@ public HiveTableOperations(
metastore,
metastoreContext,
config,
icebergConfig,
database,
table,
Optional.of(requireNonNull(owner, "owner is null")),
Expand All @@ -159,6 +165,7 @@ private HiveTableOperations(
ExtendedHiveMetastore metastore,
MetastoreContext metastoreContext,
IcebergHiveTableOperationsConfig config,
IcebergConfig icebergConfig,
String database,
String table,
Optional<String> owner,
Expand All @@ -167,6 +174,7 @@ private HiveTableOperations(
this.fileIO = requireNonNull(fileIO, "fileIO is null");
this.metastore = requireNonNull(metastore, "metastore is null");
this.metastoreContext = requireNonNull(metastoreContext, "metastore context is null");
this.icebergConfig = requireNonNull(icebergConfig, "icebergConfig context is null");
this.database = requireNonNull(database, "database is null");
this.tableName = requireNonNull(table, "table is null");
this.owner = requireNonNull(owner, "owner is null");
Expand Down Expand Up @@ -338,6 +346,11 @@ public void commit(@Nullable TableMetadata base, TableMetadata metadata)
@Override
public FileIO io()
{
if (icebergConfig.getManifestCachingEnabled()) {
Map<String, String> properties = new HashMap<>();
loadCachingProperties(properties, icebergConfig);
fileIO.initialize(properties);
}
return fileIO;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,17 @@ public void setup(Binder binder)
configBinder(binder).bindConfig(ParquetCacheConfig.class, connectorId);

binder.bind(ConnectorPlanOptimizerProvider.class).to(IcebergPlanOptimizerProvider.class).in(Scopes.SINGLETON);

boolean isManifestFileCachingEnabled = buildConfigObject(IcebergConfig.class).getManifestCachingEnabled();
if (isManifestFileCachingEnabled) {
binder.bind(PrestoInMemoryContentCacheManager.class).in(Scopes.SINGLETON);
binder.requestStaticInjection(PrestoInMemoryContentCacheManager.class);
// newExporter(binder).export(PrestoInMemoryContentCacheManager.class).withGeneratedName();

binder.bind(IcebergFileIOStats.class).in(Scopes.SINGLETON);
binder.requestStaticInjection(IcebergFileIOStats.class);
newExporter(binder).export(IcebergFileIOStats.class).withGeneratedName();
}
}

@ForCachingHiveMetastore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.facebook.presto.spi.statistics.ColumnStatisticType;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import org.apache.iceberg.hadoop.HadoopFileIO;

import javax.validation.constraints.DecimalMax;
import javax.validation.constraints.DecimalMin;
Expand All @@ -33,9 +32,7 @@
import static com.facebook.presto.iceberg.CatalogType.HIVE;
import static com.facebook.presto.iceberg.IcebergFileFormat.PARQUET;
import static com.facebook.presto.iceberg.util.StatisticsUtil.decodeMergeFlags;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT;

Expand All @@ -59,11 +56,12 @@ public class IcebergConfig
private boolean metadataDeleteAfterCommit = METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT;

private EnumSet<ColumnStatisticType> hiveStatisticsMergeFlags = EnumSet.noneOf(ColumnStatisticType.class);
private String fileIOImpl = HadoopFileIO.class.getName();
private boolean manifestCachingEnabled;
private long maxManifestCacheSize = IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT;
private long manifestCacheExpireDuration = IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;
private String fileIOImpl = HdfsFileIO.class.getName();
private boolean manifestCachingEnabled = true;
private long maxManifestCacheSize = 1073741824; // 1 GB, default 100 MB
private long manifestCacheExpireDuration = 86400000; // 24 hrs
private long manifestCacheMaxContentLength = IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT;
private String fileIOContentCacheManager = PrestoInMemoryContentCacheManager.class.getName();
private int splitManagerThreads = Runtime.getRuntime().availableProcessors();

@NotNull
Expand Down Expand Up @@ -340,6 +338,20 @@ public IcebergConfig setManifestCacheMaxContentLength(long manifestCacheMaxConte
return this;
}

public String getFileIOContentCacheManager()
{
return fileIOContentCacheManager;
}

@NotNull
@Config("iceberg.io.manifest.cache.content-caches-impl")
@ConfigDescription("Custom FileIO Content cache manager for file io to use")
public IcebergConfig setFileIOContentCacheManager(String fileIOContentCacheManager)
{
this.fileIOContentCacheManager = fileIOContentCacheManager;
return this;
}

@Min(0)
public int getSplitManagerThreads()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;

import org.weakref.jmx.Managed;

import javax.inject.Inject;

public class IcebergFileIOStats
{
@Inject
private static PrestoInMemoryContentCacheManager inMemoryContentCacheManager;

private static final String HDFS_FILE_IO = "com.facebook.presto.iceberg.HdfsFileIO";
private static final String HADOOP_FILE_IO = "org.apache.iceberg.hadoop.HadoopFileIO";

@Managed
public String getHiveCatalogIOStats()
{
return inMemoryContentCacheManager.getCache().getIfPresent(HDFS_FILE_IO).stats().toString();
}

@Managed
public String getHadoopCatalogIOStats()
{
return inMemoryContentCacheManager.getCache().getIfPresent(HADOOP_FILE_IO).stats().toString();
}

@Managed
public String getNessieCatalogIOStats()
{
return inMemoryContentCacheManager.getCache().getIfPresent(HADOOP_FILE_IO).stats().toString();
}

@Managed
public String getRestCatalogIOStats()
{
return inMemoryContentCacheManager.getCache().getIfPresent(HADOOP_FILE_IO).stats().toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public class IcebergHiveMetadata
private final FilterStatsCalculatorService filterStatsCalculatorService;
private final IcebergHiveTableOperationsConfig hiveTableOeprationsConfig;

private final IcebergConfig icebergConfig;
public IcebergHiveMetadata(
ExtendedHiveMetastore metastore,
HdfsEnvironment hdfsEnvironment,
Expand All @@ -164,13 +165,15 @@ public IcebergHiveMetadata(
JsonCodec<CommitTaskData> commitTaskCodec,
NodeVersion nodeVersion,
FilterStatsCalculatorService filterStatsCalculatorService,
IcebergHiveTableOperationsConfig hiveTableOeprationsConfig)
IcebergHiveTableOperationsConfig hiveTableOeprationsConfig,
IcebergConfig icebergConfig)
{
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion);
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
this.hiveTableOeprationsConfig = requireNonNull(hiveTableOeprationsConfig, "hiveTableOperationsConfig is null");
this.icebergConfig = requireNonNull(icebergConfig, "icebergConfig is null");
}

public ExtendedHiveMetastore getMetastore()
Expand All @@ -181,7 +184,7 @@ public ExtendedHiveMetastore getMetastore()
@Override
protected org.apache.iceberg.Table getRawIcebergTable(ConnectorSession session, SchemaTableName schemaTableName)
{
return getHiveIcebergTable(metastore, hdfsEnvironment, hiveTableOeprationsConfig, session, schemaTableName);
return getHiveIcebergTable(metastore, hdfsEnvironment, hiveTableOeprationsConfig, icebergConfig, session, schemaTableName);
}

@Override
Expand Down Expand Up @@ -300,6 +303,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
hdfsEnvironment,
hdfsContext,
hiveTableOeprationsConfig,
icebergConfig,
schemaName,
tableName,
session.getUser(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class IcebergHiveMetadataFactory
final NodeVersion nodeVersion;
final FilterStatsCalculatorService filterStatsCalculatorService;
final IcebergHiveTableOperationsConfig operationsConfig;
final IcebergConfig icebergConfig;

@Inject
public IcebergHiveMetadataFactory(
Expand All @@ -50,7 +51,8 @@ public IcebergHiveMetadataFactory(
JsonCodec<CommitTaskData> commitTaskCodec,
NodeVersion nodeVersion,
FilterStatsCalculatorService filterStatsCalculatorService,
IcebergHiveTableOperationsConfig operationsConfig)
IcebergHiveTableOperationsConfig operationsConfig,
IcebergConfig icebergConfig)
{
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
Expand All @@ -61,6 +63,7 @@ public IcebergHiveMetadataFactory(
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
this.operationsConfig = requireNonNull(operationsConfig, "operationsConfig is null");
this.icebergConfig = requireNonNull(icebergConfig, "icebergConfig is null");
}

public ConnectorMetadata create()
Expand All @@ -74,6 +77,7 @@ public ConnectorMetadata create()
commitTaskCodec,
nodeVersion,
filterStatsCalculatorService,
operationsConfig);
operationsConfig,
icebergConfig);
}
}
Loading