Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import com.google.inject.BindingAnnotation;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@BindingAnnotation
public @interface ForFileIO
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ public class IcebergConfig
private boolean sortedWritingEnabled = true;
private boolean queryPartitionFilterRequired;
private int splitManagerThreads = Runtime.getRuntime().availableProcessors() * 2;
private boolean manifestCachingEnabled;
private Duration manifestCachingTTL = new Duration(60, SECONDS);
private DataSize manifestCachingMaxFileSize = DataSize.of(8, MEGABYTE);
private DataSize manifestCachingMaxTotalSize = DataSize.of(100, MEGABYTE);

public CatalogType getCatalogType()
{
Expand Down Expand Up @@ -433,4 +437,52 @@ public boolean isStorageSchemaSetWhenHidingIsEnabled()
{
return hideMaterializedViewStorageTable && materializedViewsStorageSchema.isPresent();
}

public boolean isManifestCachingEnabled()
{
return manifestCachingEnabled;
}

@Config("iceberg.manifest-caching.enabled")
@ConfigDescription("Enable in-memory caching for manifest files")
public void setManifestCachingEnabled(boolean manifestCachingEnabled)
{
this.manifestCachingEnabled = manifestCachingEnabled;
}

public Duration getManifestCachingTTL()
{
return manifestCachingTTL;
}

@Config("iceberg.manifest-caching.ttl")
@ConfigDescription("Maximum duration for which an entry stays in the manifest cache")
public void setManifestCachingTTL(Duration manifestCachingTTL)
{
this.manifestCachingTTL = manifestCachingTTL;
}

public DataSize getManifestCachingMaxFileSize()
{
return manifestCachingMaxFileSize;
}

@Config("iceberg.manifest-caching.max-file-size")
@ConfigDescription("Maximum size of a manifest file to be considered for caching")
public void setManifestCachingMaxFileSize(DataSize manifestCachingMaxFileSize)
{
this.manifestCachingMaxFileSize = manifestCachingMaxFileSize;
}

public DataSize getManifestCachingMaxTotalSize()
{
return manifestCachingMaxTotalSize;
}

@Config("iceberg.manifest-caching.max-total-size")
@ConfigDescription("Maximum total amount of bytes to cache in manifest cache")
public void setManifestCachingMaxTotalSize(DataSize manifestCachingMaxTotalSize)
{
this.manifestCachingMaxTotalSize = manifestCachingMaxTotalSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public static Connector createConnector(
new MBeanModule(),
new ConnectorObjectNameGeneratorModule("io.trino.plugin.iceberg", "trino.plugin.iceberg"),
new JsonModule(),
new IcebergModule(),
new IcebergModule(context.getNodeManager()),
new IcebergSecurityModule(),
icebergCatalogModule.orElse(new IcebergCatalogModule()),
new MBeanServerModule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provider;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.Multibinder;
import io.trino.filesystem.cache.AllowFilesystemCacheOnCoordinator;
import io.trino.filesystem.cache.CacheKeyProvider;
Expand All @@ -42,6 +46,7 @@
import io.trino.plugin.iceberg.procedure.RegisterTableProcedure;
import io.trino.plugin.iceberg.procedure.RemoveOrphanFilesTableProcedure;
import io.trino.plugin.iceberg.procedure.UnregisterTableProcedure;
import io.trino.spi.NodeManager;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorPageSourceProvider;
Expand All @@ -50,7 +55,9 @@
import io.trino.spi.function.FunctionProvider;
import io.trino.spi.function.table.ConnectorTableFunction;
import io.trino.spi.procedure.Procedure;
import org.apache.iceberg.CatalogProperties;

import java.util.Map;
import java.util.concurrent.ExecutorService;

import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
Expand All @@ -59,12 +66,20 @@
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

public class IcebergModule
implements Module
{
private final NodeManager nodeManager;

public IcebergModule(NodeManager nodeManager)
{
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
}

@Override
public void configure(Binder binder)
{
Expand Down Expand Up @@ -119,6 +134,13 @@ public void configure(Binder binder)
newOptionalBinder(binder, IcebergFileSystemFactory.class).setDefault().to(DefaultIcebergFileSystemFactory.class).in(Scopes.SINGLETON);
newOptionalBinder(binder, CacheKeyProvider.class).setBinding().to(IcebergCacheKeyProvider.class).in(Scopes.SINGLETON);
newOptionalBinder(binder, Key.get(boolean.class, AllowFilesystemCacheOnCoordinator.class)).setBinding().toInstance(true);

if (nodeManager.getCurrentNode().isCoordinator()) {
newOptionalBinder(binder, Key.get(new TypeLiteral<Map<String, String>>() {}, ForFileIO.class)).setBinding().toProvider(CoordinatorFileIOPropertiesProvider.class).in(Scopes.SINGLETON);
}
else {
newOptionalBinder(binder, Key.get(new TypeLiteral<Map<String, String>>() {}, ForFileIO.class)).setBinding().toInstance(Map.of());
}
}

@Provides
Expand All @@ -133,4 +155,30 @@ public ExecutorService createSplitManagerExecutor(CatalogName catalogName, Icebe
config.getSplitManagerThreads(),
daemonThreadsNamed("iceberg-split-manager-" + catalogName + "-%s"));
}

public static class CoordinatorFileIOPropertiesProvider
implements Provider<Map<String, String>>
{
private final IcebergConfig icebergConfig;

@Inject
public CoordinatorFileIOPropertiesProvider(IcebergConfig icebergConfig)
{
this.icebergConfig = requireNonNull(icebergConfig, "icebergConfig is null");
}

@Override
public Map<String, String> get()
{
if (icebergConfig.isManifestCachingEnabled()) {
return ImmutableMap.<String, String>builder()
.put(CatalogProperties.IO_MANIFEST_CACHE_ENABLED, "true")
.put(CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS, Long.toString(icebergConfig.getManifestCachingTTL().toMillis()))
.put(CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH, Long.toString(icebergConfig.getManifestCachingMaxFileSize().toBytes()))
.put(CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES, Long.toString(icebergConfig.getManifestCachingMaxTotalSize().toBytes()))
.buildOrThrow();
}
return Map.of();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@

import com.google.inject.Inject;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.iceberg.ForFileIO;
import io.trino.plugin.iceberg.catalog.IcebergTableOperations;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.TrinoCatalog;
import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog;
import io.trino.plugin.iceberg.fileio.ForwardingFileIo;
import io.trino.spi.connector.ConnectorSession;

import java.util.Map;
import java.util.Optional;

import static java.util.Objects.requireNonNull;
Expand All @@ -30,11 +32,13 @@ public class FileMetastoreTableOperationsProvider
implements IcebergTableOperationsProvider
{
private final TrinoFileSystemFactory fileSystemFactory;
private final Map<String, String> fileIoProperties;

@Inject
public FileMetastoreTableOperationsProvider(TrinoFileSystemFactory fileSystemFactory)
public FileMetastoreTableOperationsProvider(TrinoFileSystemFactory fileSystemFactory, @ForFileIO Map<String, String> fileIoProperties)
{
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.fileIoProperties = requireNonNull(fileIoProperties, "fileIoProperties is null");
}

@Override
Expand All @@ -47,7 +51,7 @@ public IcebergTableOperations createTableOperations(
Optional<String> location)
{
return new FileMetastoreTableOperations(
new ForwardingFileIo(fileSystemFactory.create(session)),
new ForwardingFileIo(fileSystemFactory.create(session), fileIoProperties),
((TrinoHiveCatalog) catalog).getMetastore(),
session,
database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import com.google.inject.Inject;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats;
import io.trino.plugin.iceberg.ForFileIO;
import io.trino.plugin.iceberg.catalog.IcebergTableOperations;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.TrinoCatalog;
import io.trino.plugin.iceberg.fileio.ForwardingFileIo;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.type.TypeManager;

import java.util.Map;
import java.util.Optional;

import static java.util.Objects.requireNonNull;
Expand All @@ -36,20 +38,23 @@ public class GlueIcebergTableOperationsProvider
private final TrinoFileSystemFactory fileSystemFactory;
private final AWSGlueAsync glueClient;
private final GlueMetastoreStats stats;
private final Map<String, String> fileIoProperties;

@Inject
public GlueIcebergTableOperationsProvider(
TypeManager typeManager,
IcebergGlueCatalogConfig catalogConfig,
TrinoFileSystemFactory fileSystemFactory,
GlueMetastoreStats stats,
AWSGlueAsync glueClient)
AWSGlueAsync glueClient,
@ForFileIO Map<String, String> fileIoProperties)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.cacheTableMetadata = catalogConfig.isCacheTableMetadata();
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.stats = requireNonNull(stats, "stats is null");
this.glueClient = requireNonNull(glueClient, "glueClient is null");
this.fileIoProperties = requireNonNull(fileIoProperties, "fileIoProperties is null");
}

@Override
Expand All @@ -69,7 +74,7 @@ public IcebergTableOperations createTableOperations(
// Share Glue Table cache between Catalog and TableOperations so that, when doing metadata queries (e.g. information_schema.columns)
// the GetTableRequest is issued once per table.
((TrinoGlueCatalog) catalog)::getTable,
new ForwardingFileIo(fileSystemFactory.create(session)),
new ForwardingFileIo(fileSystemFactory.create(session), fileIoProperties),
session,
database,
table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.trino.plugin.hive.ViewAlreadyExistsException;
import io.trino.plugin.hive.ViewReaderUtil;
import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats;
import io.trino.plugin.iceberg.ForFileIO;
import io.trino.plugin.iceberg.IcebergMaterializedViewDefinition;
import io.trino.plugin.iceberg.IcebergMetadata;
import io.trino.plugin.iceberg.UnknownTableTypeException;
Expand Down Expand Up @@ -182,6 +183,7 @@ public class TrinoGlueCatalog
private final GlueMetastoreStats stats;
private final boolean hideMaterializedViewStorageTable;
private final boolean isUsingSystemSecurity;
private final Map<String, String> fileIoProperties;

private final Cache<SchemaTableName, com.amazonaws.services.glue.model.Table> glueTableCache = EvictableCacheBuilder.newBuilder()
// Even though this is query-scoped, this still needs to be bounded. information_schema queries can access large number of tables.
Expand Down Expand Up @@ -210,7 +212,8 @@ public TrinoGlueCatalog(
boolean isUsingSystemSecurity,
Optional<String> defaultSchemaLocation,
boolean useUniqueTableLocation,
boolean hideMaterializedViewStorageTable)
boolean hideMaterializedViewStorageTable,
@ForFileIO Map<String, String> fileIoProperties)
{
super(catalogName, typeManager, tableOperationsProvider, fileSystemFactory, useUniqueTableLocation);
this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null");
Expand All @@ -222,6 +225,7 @@ public TrinoGlueCatalog(
this.isUsingSystemSecurity = isUsingSystemSecurity;
this.defaultSchemaLocation = requireNonNull(defaultSchemaLocation, "defaultSchemaLocation is null");
this.hideMaterializedViewStorageTable = hideMaterializedViewStorageTable;
this.fileIoProperties = requireNonNull(fileIoProperties, "fileIoProperties is null");
}

@Override
Expand Down Expand Up @@ -852,7 +856,7 @@ private Optional<com.amazonaws.services.glue.model.Table> getTableAndCacheMetada
try {
// Cache the TableMetadata while we have the Table retrieved anyway
// Note: this is racy from cache invalidation perspective, but it should not matter here
uncheckedCacheGet(tableMetadataCache, schemaTableName, () -> TableMetadataParser.read(new ForwardingFileIo(fileSystemFactory.create(session)), metadataLocation));
uncheckedCacheGet(tableMetadataCache, schemaTableName, () -> TableMetadataParser.read(new ForwardingFileIo(fileSystemFactory.create(session), fileIoProperties), metadataLocation));
}
catch (RuntimeException e) {
LOG.warn(e, "Failed to cache table metadata from table at %s", metadataLocation);
Expand Down Expand Up @@ -1381,7 +1385,7 @@ private TableMetadata getMaterializedViewTableMetadata(ConnectorSession session,
requireNonNull(storageMetadataLocation, "storageMetadataLocation is null");
return uncheckedCacheGet(tableMetadataCache, storageTableName, () -> {
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
return TableMetadataParser.read(new ForwardingFileIo(fileSystem), storageMetadataLocation);
return TableMetadataParser.read(new ForwardingFileIo(fileSystem, fileIoProperties), storageMetadataLocation);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.plugin.hive.NodeVersion;
import io.trino.plugin.hive.metastore.glue.GlueHiveMetastoreConfig;
import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats;
import io.trino.plugin.iceberg.ForFileIO;
import io.trino.plugin.iceberg.IcebergConfig;
import io.trino.plugin.iceberg.IcebergSecurityConfig;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
Expand All @@ -30,6 +31,7 @@
import org.weakref.jmx.Flatten;
import org.weakref.jmx.Managed;

import java.util.Map;
import java.util.Optional;

import static io.trino.plugin.iceberg.IcebergSecurityConfig.IcebergSecurity.SYSTEM;
Expand All @@ -50,6 +52,7 @@ public class TrinoGlueCatalogFactory
private final boolean hideMaterializedViewStorageTable;
private final GlueMetastoreStats stats;
private final boolean isUsingSystemSecurity;
private final Map<String, String> fileIoProperties;

@Inject
public TrinoGlueCatalogFactory(
Expand All @@ -63,7 +66,8 @@ public TrinoGlueCatalogFactory(
IcebergGlueCatalogConfig catalogConfig,
IcebergSecurityConfig securityConfig,
GlueMetastoreStats stats,
AWSGlueAsync glueClient)
AWSGlueAsync glueClient,
@ForFileIO Map<String, String> fileIoProperties)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
Expand All @@ -77,6 +81,7 @@ public TrinoGlueCatalogFactory(
this.hideMaterializedViewStorageTable = icebergConfig.isHideMaterializedViewStorageTable();
this.stats = requireNonNull(stats, "stats is null");
this.isUsingSystemSecurity = securityConfig.getSecuritySystem() == SYSTEM;
this.fileIoProperties = requireNonNull(fileIoProperties, "fileIoProperties is null");
}

@Managed
Expand All @@ -101,6 +106,7 @@ public TrinoCatalog create(ConnectorIdentity identity)
isUsingSystemSecurity,
defaultSchemaLocation,
isUniqueTableLocation,
hideMaterializedViewStorageTable);
hideMaterializedViewStorageTable,
fileIoProperties);
}
}
Loading