Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,40 @@ and a file system location of ``s3://test_bucket/test_schema/test_table``:
location = 's3://test_bucket/test_schema/test_table')
)

Caching Support
----------------

Manifest File Caching
^^^^^^^^^^^^^^^^^^^^^^

As of Iceberg version 1.1.0, Apache Iceberg provides a mechanism to cache the contents of Iceberg manifest files in memory. This feature helps
to reduce repeated reads of small Iceberg manifest files from remote storage.

.. note::

Currently, manifest file caching is supported for Hadoop and Nessie catalogs in the Presto Iceberg connector.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a quick question (probably beyond the scope of this PR): Besides native catalogs (E.g. Hadoop and Nessie), could other catalogs support manifest file caching?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChunxuTang As of now from Presto no. Since manifest file caching is supported with HadoopFileIO in Iceberg. But in Presto for Hive Catalog we have custom HdfsFileIO

Do we know why do we use this custom HdfsFileIO from Presto for Iceberg Hive Catalog?


The following configuration properties are available:

==================================================== ============================================================= ============
Property Name Description Default
==================================================== ============================================================= ============
``iceberg.io.manifest.cache-enabled`` Enable or disable the manifest caching feature. This feature ``false``
is only available if ``iceberg.catalog.type`` is ``hadoop``
or ``nessie``.

``iceberg.io-impl`` Custom FileIO implementation to use in a catalog. It must ``org.apache.iceberg.hadoop.HadoopFileIO``
Copy link
Copy Markdown
Contributor

@nastra nastra Dec 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you elaborate why this needs to be set for manifest caching?
I would have expected this to use whatever FileIO the underlying catalog is using. Otherwise there's a risk that the FileIO specified here differs from the one used by the catalog

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nastra As I checked in Iceberg core, In case when we query the Hadoop catalog from Presto. here in HadoopCatalog.initialize() Catalog configuration as not being set via HadoopFileIO.initialize() but if fileIOImpl config is set explicaity then it creates HadoopFileIO object via CatalogUtil.loadFileIO() which is also responsible of initialising Catalog configuration (including which are being passed from Presto io.manifest.cache-enabled). That is why I had set org.apache.iceberg.hadoop.HadoopFileIO explicitely here since Hadoop & Nessie both uses HadoopFileIO.
Please let me know if there is any better way of handling this?

Copy link
Copy Markdown
Contributor

@nastra nastra Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm I see and I was not aware of that limitation in the HadoopCatalog/ HiveCatalog. The initialization of FileIO should actually be like it's done in https://github.com/apache/iceberg/blob/39239a19555f55e0facc0a585086791a606a4b32/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java#L86-L88.
@agrawalreetika this might be a good point to raise a PR in Iceberg and I'll make sure to find somebody to review it

Copy link
Copy Markdown
Member Author

@agrawalreetika agrawalreetika Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, NessieCatalog takes care of initializing it. So it's not an issue there. Thanks for confirming. Sure, I will raise a PR for this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agrawalreetika With apache/iceberg#9283 merged, do we still need to set iceberg.io-impl?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yingsu00 No after apache/iceberg#9283 and once we upgrade Iceberg in Presto, we don't have to explicitely set iceberg.io-impl for Native catalogs. But for HiveCatalog we may still need it since Presto has custom FILEIO implementation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this waiting a response from @ChunxuTang on this question? #21399 (comment)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this waiting a response from @ChunxuTang on this question? #21399 (comment)

I believe not. @agrawalreetika Can you please confirm?

Copy link
Copy Markdown
Member Author

@agrawalreetika agrawalreetika Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No this is good to go for Native catalogs support.
One we upgrade Iceberg version on Presto which has apache/iceberg#9283 change then we will not have to set iceberg.io-impl for Native catalogs explicitely to enable Manifest file caching.

be set to enable manifest caching.

``iceberg.io.manifest.cache.max-total-bytes`` Maximum size of cache size in bytes. ``104857600``

``iceberg.io.manifest.cache.expiration-interval-ms`` Maximum time duration in milliseconds for which an entry ``60000``
stays in the manifest cache.

``iceberg.io.manifest.cache.max-content-length`` Maximum length of a manifest file to be considered for ``8388608``
caching in bytes. Manifest files with a length exceeding
this size will not be cached.
==================================================== ============================================================= ============

Extra Hidden Metadata Tables
----------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.hadoop.HadoopFileIO;

import javax.validation.constraints.DecimalMax;
import javax.validation.constraints.DecimalMin;
Expand All @@ -31,6 +32,9 @@
import static com.facebook.presto.hive.HiveCompressionCodec.GZIP;
import static com.facebook.presto.iceberg.CatalogType.HIVE;
import static com.facebook.presto.iceberg.IcebergFileFormat.PARQUET;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT;

public class IcebergConfig
{
Expand All @@ -48,7 +52,11 @@ public class IcebergConfig
private boolean pushdownFilterEnabled;

private HiveStatisticsMergeStrategy hiveStatisticsMergeStrategy = HiveStatisticsMergeStrategy.NONE;

private String fileIOImpl = HadoopFileIO.class.getName();
private boolean manifestCachingEnabled;
private long maxManifestCacheSize = IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT;
private long manifestCacheExpireDuration = IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;
private long manifestCacheMaxContentLength = IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT;
@NotNull
public FileFormat getFileFormat()
{
Expand Down Expand Up @@ -226,4 +234,73 @@ public boolean isPushdownFilterEnabled()
{
return pushdownFilterEnabled;
}

public boolean getManifestCachingEnabled()
{
return manifestCachingEnabled;
}

@Config("iceberg.io.manifest.cache-enabled")
@ConfigDescription("Enable/disable the manifest caching feature")
public IcebergConfig setManifestCachingEnabled(boolean manifestCachingEnabled)
{
this.manifestCachingEnabled = manifestCachingEnabled;
return this;
}

public String getFileIOImpl()
{
return fileIOImpl;
}

@NotNull
@Config("iceberg.io-impl")
@ConfigDescription("Custom FileIO implementation to use in a catalog")
public IcebergConfig setFileIOImpl(String fileIOImpl)
{
this.fileIOImpl = fileIOImpl;
return this;
}

public long getMaxManifestCacheSize()
{
return maxManifestCacheSize;
}

@Min(1)
@Config("iceberg.io.manifest.cache.max-total-bytes")
@ConfigDescription("Maximum total amount of bytes to cache in the manifest cache")
public IcebergConfig setMaxManifestCacheSize(long maxManifestCacheSize)
{
this.maxManifestCacheSize = maxManifestCacheSize;
return this;
}

public long getManifestCacheExpireDuration()
{
return manifestCacheExpireDuration;
}

@Min(0)
@Config("iceberg.io.manifest.cache.expiration-interval-ms")
@ConfigDescription("Maximum duration for which an entry stays in the manifest cache")
public IcebergConfig setManifestCacheExpireDuration(long manifestCacheExpireDuration)
{
this.manifestCacheExpireDuration = manifestCacheExpireDuration;
return this;
}

public long getManifestCacheMaxContentLength()
{
return manifestCacheMaxContentLength;
}

@Min(0)
@Config("iceberg.io.manifest.cache.max-content-length")
@ConfigDescription("Maximum length of a manifest file to be considered for caching in bytes")
public IcebergConfig setManifestCacheMaxContentLength(long manifestCacheMaxContentLength)
{
this.manifestCacheMaxContentLength = manifestCacheMaxContentLength;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.facebook.presto.iceberg.nessie.NessieConfig;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.security.ConnectorIdentity;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.UncheckedExecutionException;
Expand All @@ -37,12 +36,14 @@
import static com.facebook.presto.iceberg.CatalogType.NESSIE;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getNessieReferenceHash;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getNessieReferenceName;
import static com.facebook.presto.iceberg.IcebergUtil.loadCachingProperties;
import static com.facebook.presto.iceberg.nessie.AuthenticationType.BASIC;
import static com.facebook.presto.iceberg.nessie.AuthenticationType.BEARER;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.CatalogProperties.FILE_IO_IMPL;
import static org.apache.iceberg.CatalogProperties.WAREHOUSE_LOCATION;

/**
Expand All @@ -59,11 +60,13 @@ public class IcebergResourceFactory
private final NessieConfig nessieConfig;
private final S3ConfigurationUpdater s3ConfigurationUpdater;

private final IcebergConfig icebergConfig;

@Inject
public IcebergResourceFactory(IcebergConfig config, IcebergCatalogName catalogName, NessieConfig nessieConfig, S3ConfigurationUpdater s3ConfigurationUpdater)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null").getCatalogName();
requireNonNull(config, "config is null");
this.icebergConfig = requireNonNull(config, "config is null");
this.catalogType = config.getCatalogType();
this.catalogWarehouse = config.getCatalogWarehouse();
this.hadoopConfigResources = config.getHadoopConfigResources();
Expand Down Expand Up @@ -99,25 +102,7 @@ public SupportsNamespaces getNamespaces(ConnectorSession session)
private String getCatalogCacheKey(ConnectorSession session)
{
StringBuilder sb = new StringBuilder();
ConnectorIdentity identity = session.getIdentity();
sb.append("User:");
sb.append(identity.getUser());
if (identity.getPrincipal().isPresent()) {
sb.append(",Principle:");
sb.append(identity.getPrincipal());
}
if (identity.getRole().isPresent()) {
sb.append(",Role:");
sb.append(identity.getRole());
}
if (identity.getExtraCredentials() != null) {
identity.getExtraCredentials().forEach((key, value) -> {
sb.append(",");
sb.append(key);
sb.append(":");
sb.append(value);
});
}
sb.append(catalogName);
Copy link
Copy Markdown
Contributor

@yingsu00 yingsu00 Nov 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is for line ln 109
According to https://projectnessie.org/tools/client_config/, the ref hash is just a hash of the reference string and it's optional. Adding it to the cache key string is not necessary and would incur perf degradation, since the cache lookup will hash the key anyways. I suggest we remove it from the key string. cc @ChunxuTang

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ref hash is not exposed as Iceberg catalog property but it's not a hidden field from session config - https://github.com/prestodb/presto/blob/master/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java#L277

So the user can still make changes to it though as per Nessie doc, it's an optional field. If we take it out from the cache key then it will load the cache value with the default ref hash i.e null so we don't have ref hash then should we also remove it from session properties in the connector?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a case that the nessie reference name itself is not unique and we have to use the reference hash to differentiate them? Or say, is it feasible to expect that reference names are unique in practical use cases?
Using a hash in a key for hashing sounds a bit uncommon to me...

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really sure if we really have to expose ref.hash property from Presto side if we have reference names there. But if we think we don't really need ref.hash then I think we should take it out as option from Presto session properties as well?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChunxuTang The ref hash is the hash of the reference name. If the name is not unique, the hash won't be either. Also the hash is optional according to https://projectnessie.org/tools/client_config/.

I actually don't understand why the "nessie_reference_hash" Iceberg session property was added at the first place. It does not have a corresponding config property in NessieConfig, and it is not settable and is defaulted to null. There is no hashing applied to the reference name to set the "nessie_reference_hash" property, nor there was any verification it's a valid hash value. It seems it would always be null. @nastra @ChunxuTang was there any reason it was done this way? I think we should remove it from IcebergSessionProperties.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yingsu00 I think it's probably ok to remove the nessie_reference_hash property as I'm unaware of any usage of it.
@nastra Do you happen to have any insights? Is it ok to remove it?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we absolutely need to remove the ref hash as part of this PR? To me it seems that introducing manifest file caching and nessie changes are intertwined.
I believe the ref hash was needed in the cache, although I would have to check again why exactly (I haven't been working on Nessie lately anymore and @ajantha-bhat took that part over)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, nessie_reference_hash is optionally needed to fetch the state of table from particular time (mapping to the hash). Without hash, we always fetch the latest state. Old state can be useful for time travel queries.

But as pointed out above, If it is not exposed to the user or integrated fully. We can remove for time being and add it later when we fully want to support it.

But as @nastra suggested better to keep the current PR scope as small as possible.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajantha-bhat thanks for your reply. But according to https://projectnessie.org/tools/client_config/ this ref hash is the hash of the ref name.

nessie.ref Mandatory Name of the Nessie reference, usually main.
nessie.ref.hash Optional Hash on nessie.ref, usually not specified.

What is the ref name used for? If ref name is already part of the cache key, do we still need the ref hash?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nessie supports catalog level branching and tagging capabilities. ref name is (branch or tag name).
If the hash is not specified, we use the latest state of the table from that branch or tag.

catalog level hash configuration is mainly used for time travel queries. Once, we support that we need this config exposed.
If it is not really causing much problem, we can even keep this code to avoid adding it back again in the future.


if (catalogType == NESSIE) {
sb.append(getNessieReferenceName(session));
Expand Down Expand Up @@ -151,6 +136,12 @@ private Configuration getHadoopConfiguration()
public Map<String, String> getCatalogProperties(ConnectorSession session)
{
Map<String, String> properties = new HashMap<>();
if (icebergConfig.getManifestCachingEnabled()) {
loadCachingProperties(properties, icebergConfig);
}
if (icebergConfig.getFileIOImpl() != null) {
properties.put(FILE_IO_IMPL, icebergConfig.getFileIOImpl());
}
if (catalogWarehouse != null) {
properties.put(WAREHOUSE_LOCATION, catalogWarehouse);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@
import static java.util.Comparator.comparing;
import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_ENABLED;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES;
import static org.apache.iceberg.LocationProviders.locationsFor;
import static org.apache.iceberg.MetadataTableUtils.createMetadataTableInstance;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
Expand Down Expand Up @@ -712,4 +716,12 @@ public static Map<Integer, HivePartitionKey> getPartitionKeys(ContentScanTask<Da

return Collections.unmodifiableMap(partitionKeys);
}

public static void loadCachingProperties(Map<String, String> properties, IcebergConfig icebergConfig)
{
properties.put(IO_MANIFEST_CACHE_ENABLED, "true");
properties.put(IO_MANIFEST_CACHE_MAX_TOTAL_BYTES, String.valueOf(icebergConfig.getMaxManifestCacheSize()));
properties.put(IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH, String.valueOf(icebergConfig.getManifestCacheMaxContentLength()));
properties.put(IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS, String.valueOf(icebergConfig.getManifestCacheExpireDuration()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy;
import com.google.common.collect.ImmutableMap;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.testng.annotations.Test;

import java.util.Map;
Expand All @@ -29,6 +30,9 @@
import static com.facebook.presto.iceberg.IcebergFileFormat.ORC;
import static com.facebook.presto.iceberg.IcebergFileFormat.PARQUET;
import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.USE_NDV;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT;

public class TestIcebergConfig
{
Expand All @@ -48,7 +52,12 @@ public void testDefaults()
.setMinimumAssignedSplitWeight(0.05)
.setParquetDereferencePushdownEnabled(true)
.setMergeOnReadModeEnabled(true)
.setPushdownFilterEnabled(false));
.setPushdownFilterEnabled(false)
.setManifestCachingEnabled(false)
.setFileIOImpl(HadoopFileIO.class.getName())
.setMaxManifestCacheSize(IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT)
.setManifestCacheExpireDuration(IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT)
.setManifestCacheMaxContentLength(IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT));
}

@Test
Expand All @@ -68,6 +77,11 @@ public void testExplicitPropertyMappings()
.put("iceberg.statistic-snapshot-record-difference-weight", "1.0")
.put("iceberg.hive-statistics-merge-strategy", "USE_NDV")
.put("iceberg.pushdown-filter-enabled", "true")
.put("iceberg.io.manifest.cache-enabled", "true")
.put("iceberg.io-impl", "com.facebook.presto.iceberg.HdfsFileIO")
.put("iceberg.io.manifest.cache.max-total-bytes", "1048576000")
.put("iceberg.io.manifest.cache.expiration-interval-ms", "600000")
.put("iceberg.io.manifest.cache.max-content-length", "10485760")
.build();

IcebergConfig expected = new IcebergConfig()
Expand All @@ -83,7 +97,12 @@ public void testExplicitPropertyMappings()
.setParquetDereferencePushdownEnabled(false)
.setMergeOnReadModeEnabled(false)
.setHiveStatisticsMergeStrategy(USE_NDV)
.setPushdownFilterEnabled(true);
.setPushdownFilterEnabled(true)
.setManifestCachingEnabled(true)
.setFileIOImpl("com.facebook.presto.iceberg.HdfsFileIO")
.setMaxManifestCacheSize(1048576000)
.setManifestCacheExpireDuration(600000)
.setManifestCacheMaxContentLength(10485760);

assertFullMapping(properties, expected);
}
Expand Down