Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -1685,3 +1685,35 @@ The connector supports redirection from Iceberg tables to Hive tables with the

The connector supports configuring and using [file system
caching](/object-storage/file-system-cache).

### Manifest File Caching

As of Iceberg version 1.1.0, Apache Iceberg provides a mechanism to cache the
contents of Iceberg manifest files in memory. This feature helps to reduce
repeated reads of small Iceberg manifest files from remote storage.

Note that currently, manifest file caching is supported for Hive Metastore catalog.

The following configuration properties are available:

:::{list-table} Manifest File Caching configuration properties
:widths: 30, 58, 12
:header-rows: 1

* - Property name
- Description
- Default
* - `iceberg.hive.manifest.cache-enabled`
- Enable or disable the manifest caching feature.
- `false`
* - `iceberg.hive.manifest.cache.max-total-size`
- Maximum size of cache size.
- `100MB`
* - `iceberg.hive.manifest.cache.expiration-interval-duration`
- Maximum time duration for which an entry stays in the manifest cache.
- `60s`
* - `iceberg.hive.manifest.cache.max-content-length`
- Maximum length of a manifest file to be considered for caching. Manifest files
with a length exceeding this size will not be cached.
- `8MB`
:::
5 changes: 5 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>

<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package io.trino.plugin.iceberg;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
Expand All @@ -22,6 +24,8 @@
import io.airlift.slice.Slice;
import io.airlift.slice.SliceUtf8;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.filesystem.FileEntry;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
Expand Down Expand Up @@ -64,10 +68,12 @@
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.SystemConfigs;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.types.Type.PrimitiveType;
import org.apache.iceberg.types.Types.NestedField;
Expand All @@ -89,6 +95,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -158,6 +165,10 @@
import static java.math.RoundingMode.UNNECESSARY;
import static java.util.Comparator.comparing;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_ENABLED;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
Expand Down Expand Up @@ -954,4 +965,37 @@ public static long getModificationTime(String path, TrinoFileSystem fileSystem)
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to get file modification time: " + path, e);
}
}

public static Map<String, String> loadManifestCachingProperties(
Map<String, String> properties,
DataSize maxManifestCacheSize,
Duration manifestCacheExpireDuration,
DataSize manifestCacheMaxContentLength)
{
properties.put(IO_MANIFEST_CACHE_ENABLED, "true");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is configuring org.apache.iceberg.ManifestFiles#CONTENT_CACHES, right?
the cache is keyed by FileIO and our ForwardingFileIo is typically short-lived. how effective is this caching?

cc @alexjo2144 @electrum

Copy link
Member Author

@hackeryang hackeryang Jun 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in one of our production environments, a big sql which was planned for 12s has been reduced to 8s~

In our internal implementation, we also used a Caffeine cache with the key CatalogType and the value FileIo to make a ForwardingFileIo live longer, i can also add relevant codes to our PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the 12s -> 8s improvement -- was it with or without fileio caching?

what would the numbers be if we don't do this PR and instead enable filesystem caching that we already have in trino?

Copy link
Member Author

@hackeryang hackeryang Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the 12s -> 8s improvement -- was it with or without fileio caching?

what would the numbers be if we don't do this PR and instead enable filesystem caching that we already have in trino?

@findepi the 12s -> 8s improvement was with fileio/manifest caching.

Sorry our newest production environment is 423, so i couldn't test the performance about filesystem caching, but i saw that PrestoDB have manifest caching and filesystem caching both, so i thought that they may not conflict with each other~ The metadata caching in memory is also faster than metadata caching in disks in most cases.

Thank you for your advice, I have modified some codes accordingly, please review again when you have time~ The CICD error seems not related to our PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi the 12s -> 8s improvement was with fileio/manifest caching.

thanks!

before merging this PR, we should understand what's the level of improvement that it brings (in isolation).

cc @sopel39 @raunaqmorarka

properties.put(IO_MANIFEST_CACHE_MAX_TOTAL_BYTES, String.valueOf(maxManifestCacheSize.toBytes()));
properties.put(IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS, String.valueOf(manifestCacheExpireDuration.toMillis()));
properties.put(IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH, String.valueOf(manifestCacheMaxContentLength.toBytes()));
return properties;
}

public static Optional<Cache<CatalogType, FileIO>> createFileIOCache(boolean isManifestCachingEnabled, Duration manifestCacheExpireDuration)
{
Optional<Cache<CatalogType, FileIO>> fileIOCache;
if (isManifestCachingEnabled) {
fileIOCache = Optional.of(Caffeine.newBuilder()
.maximumSize(getMaxFileIO())
.expireAfterAccess(manifestCacheExpireDuration.toMillis(), TimeUnit.MILLISECONDS)
.build());
}
else {
fileIOCache = Optional.empty();
}
return fileIOCache;
}

public static int getMaxFileIO()
{
return SystemConfigs.IO_MANIFEST_CACHE_MAX_FILEIO.defaultValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,54 @@
*/
package io.trino.plugin.iceberg.catalog.hms;

import com.github.benmanes.caffeine.cache.Cache;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.hive.metastore.thrift.ThriftMetastoreFactory;
import io.trino.plugin.iceberg.CatalogType;
import io.trino.plugin.iceberg.catalog.IcebergTableOperations;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.TrinoCatalog;
import io.trino.plugin.iceberg.fileio.ForwardingFileIo;
import io.trino.spi.connector.ConnectorSession;
import org.apache.iceberg.io.FileIO;

import java.util.HashMap;
import java.util.Optional;

import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE;
import static io.trino.plugin.iceberg.IcebergUtil.createFileIOCache;
import static io.trino.plugin.iceberg.IcebergUtil.loadManifestCachingProperties;
import static java.util.Objects.requireNonNull;

public class HiveMetastoreTableOperationsProvider
implements IcebergTableOperationsProvider
{
private final TrinoFileSystemFactory fileSystemFactory;
private final ThriftMetastoreFactory thriftMetastoreFactory;
private final boolean isManifestCachingEnabled;
private final DataSize maxManifestCacheSize;
private final Duration manifestCacheExpireDuration;
private final DataSize manifestCacheMaxContentLength;
private final Optional<Cache<CatalogType, FileIO>> fileIOCache;

@Inject
public HiveMetastoreTableOperationsProvider(TrinoFileSystemFactory fileSystemFactory, ThriftMetastoreFactory thriftMetastoreFactory)
public HiveMetastoreTableOperationsProvider(
TrinoFileSystemFactory fileSystemFactory,
ThriftMetastoreFactory thriftMetastoreFactory,
IcebergHiveMetastoreCatalogConfig icebergHiveMetastoreCatalogConfig)
{
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.thriftMetastoreFactory = requireNonNull(thriftMetastoreFactory, "thriftMetastoreFactory is null");
requireNonNull(icebergHiveMetastoreCatalogConfig, "icebergHiveMetastoreCatalogConfig is null");
this.isManifestCachingEnabled = icebergHiveMetastoreCatalogConfig.isManifestCachingEnabled();
this.maxManifestCacheSize = icebergHiveMetastoreCatalogConfig.getMaxManifestCacheSize();
this.manifestCacheExpireDuration = icebergHiveMetastoreCatalogConfig.getManifestCacheExpireDuration();
this.manifestCacheMaxContentLength = icebergHiveMetastoreCatalogConfig.getManifestCacheMaxContentLength();
this.fileIOCache = createFileIOCache(isManifestCachingEnabled, manifestCacheExpireDuration);
}

@Override
Expand All @@ -48,8 +72,24 @@ public IcebergTableOperations createTableOperations(
Optional<String> owner,
Optional<String> location)
{
FileIO fileIO;
if (fileIOCache.isPresent()) {
fileIO = fileIOCache.get().get(
HIVE_METASTORE,
k -> new ForwardingFileIo(
fileSystemFactory.create(session),
loadManifestCachingProperties(
new HashMap<>(),
maxManifestCacheSize,
manifestCacheExpireDuration,
manifestCacheMaxContentLength)));
}
else {
fileIO = new ForwardingFileIo(fileSystemFactory.create(session), ImmutableMap.of());
}

return new HiveMetastoreTableOperations(
new ForwardingFileIo(fileSystemFactory.create(session)),
fileIO,
((TrinoHiveCatalog) catalog).getMetastore(),
thriftMetastoreFactory.createMetastore(Optional.of(session.getIdentity())),
session,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.catalog.hms;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;

import static io.airlift.units.DataSize.Unit.BYTE;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT;

public class IcebergHiveMetastoreCatalogConfig
{
private boolean manifestCachingEnabled;
private DataSize maxManifestCacheSize = DataSize.of(IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT, BYTE);
private Duration manifestCacheExpireDuration = new Duration(IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT, MILLISECONDS);
private DataSize manifestCacheMaxContentLength = DataSize.of(IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT, BYTE);

public boolean isManifestCachingEnabled()
{
return manifestCachingEnabled;
}

@Config("iceberg.hive.manifest.cache-enabled")
@ConfigDescription("Enable/disable the manifest caching feature in Hive Metastore catalog")
public IcebergHiveMetastoreCatalogConfig setManifestCachingEnabled(boolean manifestCachingEnabled)
{
this.manifestCachingEnabled = manifestCachingEnabled;
return this;
}

public DataSize getMaxManifestCacheSize()
{
return maxManifestCacheSize;
}

@Config("iceberg.hive.manifest.cache.max-total-size")
@ConfigDescription("Maximum total amount to cache in the manifest cache of Hive Metastore catalog")
public IcebergHiveMetastoreCatalogConfig setMaxManifestCacheSize(DataSize maxManifestCacheSize)
{
this.maxManifestCacheSize = maxManifestCacheSize;
return this;
}

public Duration getManifestCacheExpireDuration()
{
return manifestCacheExpireDuration;
}

@Config("iceberg.hive.manifest.cache.expiration-interval-duration")
@ConfigDescription("Maximum duration for which an entry stays in the manifest cache of Hive Metastore catalog")
public IcebergHiveMetastoreCatalogConfig setManifestCacheExpireDuration(Duration manifestCacheExpireDuration)
{
this.manifestCacheExpireDuration = manifestCacheExpireDuration;
return this;
}

public DataSize getManifestCacheMaxContentLength()
{
return manifestCacheMaxContentLength;
}

@Config("iceberg.hive.manifest.cache.max-content-length")
@ConfigDescription("Maximum length of a manifest file to be considered for caching in Hive Metastore catalog")
public IcebergHiveMetastoreCatalogConfig setManifestCacheMaxContentLength(DataSize manifestCacheMaxContentLength)
{
this.manifestCacheMaxContentLength = manifestCacheMaxContentLength;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class IcebergHiveMetastoreCatalogModule
protected void setup(Binder binder)
{
install(new ThriftMetastoreModule());
configBinder(binder).bindConfig(IcebergHiveMetastoreCatalogConfig.class);
binder.bind(IcebergTableOperationsProvider.class).to(HiveMetastoreTableOperationsProvider.class).in(Scopes.SINGLETON);
binder.bind(TrinoCatalogFactory.class).to(TrinoHiveCatalogFactory.class).in(Scopes.SINGLETON);
binder.bind(MetastoreValidator.class).asEagerSingleton();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.catalog.hms;

import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.junit.jupiter.api.Test;

import java.util.Map;

import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT;

public class TestIcebergHiveMetastoreCatalogConfig
{
@Test
public void testDefaults()
{
assertRecordedDefaults(recordDefaults(IcebergHiveMetastoreCatalogConfig.class)
.setManifestCachingEnabled(false)
.setMaxManifestCacheSize(DataSize.of(IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT, BYTE))
.setManifestCacheExpireDuration(new Duration(IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT, MILLISECONDS))
.setManifestCacheMaxContentLength(DataSize.of(IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT, BYTE)));
}

@Test
public void testExplicitPropertyMappings()
{
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("iceberg.hive.manifest.cache-enabled", "true")
.put("iceberg.hive.manifest.cache.max-total-size", "1GB")
.put("iceberg.hive.manifest.cache.expiration-interval-duration", "10m")
.put("iceberg.hive.manifest.cache.max-content-length", "10MB")
.buildOrThrow();

IcebergHiveMetastoreCatalogConfig expected = new IcebergHiveMetastoreCatalogConfig()
.setManifestCachingEnabled(true)
.setMaxManifestCacheSize(DataSize.of(1, GIGABYTE))
.setManifestCacheExpireDuration(new Duration(10, MINUTES))
.setManifestCacheMaxContentLength(DataSize.of(10, MEGABYTE));

assertFullMapping(properties, expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public ThriftMetastore createMetastore(Optional<ConnectorIdentity> identity)
{
return thriftMetastore;
}
}),
}, new IcebergHiveMetastoreCatalogConfig()),
useUniqueTableLocations,
false,
false,
Expand Down