Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
import com.google.common.annotations.VisibleForTesting;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.LegacyConfig;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.plugin.hive.HiveCompressionCodec;
import org.joda.time.DateTimeZone;

import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

import java.util.Optional;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;

import static io.airlift.units.DataSize.Unit.MEGABYTE;
Expand Down Expand Up @@ -58,6 +61,9 @@ public class DeltaLakeConfig
private boolean tableStatisticsEnabled = true;
private boolean extendedStatisticsEnabled = true;
private HiveCompressionCodec compressionCodec = HiveCompressionCodec.SNAPPY;
private long perTransactionMetastoreCacheMaximumSize = 1000;
private boolean deleteSchemaLocationsFallback;
private String parquetTimeZone = TimeZone.getDefault().getID();

public Duration getMetadataCacheTtl()
{
Expand Down Expand Up @@ -311,4 +317,52 @@ public DeltaLakeConfig setCompressionCodec(HiveCompressionCodec compressionCodec
this.compressionCodec = compressionCodec;
return this;
}

@Min(1)
public long getPerTransactionMetastoreCacheMaximumSize()
{
return perTransactionMetastoreCacheMaximumSize;
}

@LegacyConfig("hive.per-transaction-metastore-cache-maximum-size")
@Config("delta.per-transaction-metastore-cache-maximum-size")
public DeltaLakeConfig setPerTransactionMetastoreCacheMaximumSize(long perTransactionMetastoreCacheMaximumSize)
{
this.perTransactionMetastoreCacheMaximumSize = perTransactionMetastoreCacheMaximumSize;
return this;
}

public boolean isDeleteSchemaLocationsFallback()
{
return this.deleteSchemaLocationsFallback;
}

@LegacyConfig("hive.per-transaction-metastore-cache-maximum-size")
@Config("delta.delete-schema-locations-fallback")
@ConfigDescription("Whether schema locations should be deleted when Trino can't determine whether they contain external files.")
public DeltaLakeConfig setDeleteSchemaLocationsFallback(boolean deleteSchemaLocationsFallback)
{
this.deleteSchemaLocationsFallback = deleteSchemaLocationsFallback;
return this;
}

public DateTimeZone getParquetDateTimeZone()
{
return DateTimeZone.forID(parquetTimeZone);
}

@NotNull
public String getParquetTimeZone()
{
return parquetTimeZone;
}

@LegacyConfig("hive.parquet.time-zone")
@Config("delta.parquet.time-zone")
@ConfigDescription("Time zone for Parquet read and write")
public DeltaLakeConfig setParquetTimeZone(String parquetTimeZone)
{
this.parquetTimeZone = parquetTimeZone;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriterManager;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriterFactory;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.HiveConfig;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore;
import io.trino.spi.NodeManager;
Expand Down Expand Up @@ -69,8 +68,7 @@ public DeltaLakeMetadataFactory(
NodeManager nodeManager,
CheckpointWriterManager checkpointWriterManager,
DeltaLakeRedirectionsProvider deltaLakeRedirectionsProvider,
CachingExtendedStatisticsAccess statisticsAccess,
HiveConfig hiveConfig)
CachingExtendedStatisticsAccess statisticsAccess)
{
this.hiveMetastoreFactory = requireNonNull(hiveMetastoreFactory, "hiveMetastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
Expand All @@ -89,8 +87,8 @@ public DeltaLakeMetadataFactory(
this.unsafeWritesEnabled = deltaLakeConfig.getUnsafeWritesEnabled();
this.checkpointWritingInterval = deltaLakeConfig.getDefaultCheckpointWritingInterval();
this.ignoreCheckpointWriteFailures = deltaLakeConfig.isIgnoreCheckpointWriteFailures();
this.perTransactionMetastoreCacheMaximumSize = hiveConfig.getPerTransactionMetastoreCacheMaximumSize();
this.deleteSchemaLocationsFallback = hiveConfig.isDeleteSchemaLocationsFallback();
this.perTransactionMetastoreCacheMaximumSize = deltaLakeConfig.getPerTransactionMetastoreCacheMaximumSize();
this.deleteSchemaLocationsFallback = deltaLakeConfig.isDeleteSchemaLocationsFallback();
}

public DeltaLakeMetadata create(ConnectorIdentity identity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogSynchronizerManager;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriterFactory;
import io.trino.plugin.hive.FileFormatDataSourceStats;
import io.trino.plugin.hive.HiveConfig;
import io.trino.plugin.hive.HiveLocationService;
import io.trino.plugin.hive.HiveTransactionHandle;
import io.trino.plugin.hive.HiveTransactionManager;
Expand Down Expand Up @@ -91,7 +90,6 @@ public void setup(Binder binder)
Provider<CatalogName> catalogName = binder.getProvider(CatalogName.class);

configBinder(binder).bindConfig(DeltaLakeConfig.class);
configBinder(binder).bindConfig(HiveConfig.class);
binder.bind(Key.get(boolean.class, TranslateHiveViews.class)).toInstance(false);
configBinder(binder).bindConfig(ParquetReaderConfig.class);
configBinder(binder).bindConfig(ParquetWriterConfig.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.HdfsEnvironment.HdfsContext;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hive.HiveConfig;
import io.trino.plugin.hive.ReaderPageSource;
import io.trino.plugin.hive.parquet.ParquetPageSourceFactory;
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
Expand Down Expand Up @@ -73,7 +72,6 @@ public DeltaLakePageSourceProvider(
FileFormatDataSourceStats fileFormatDataSourceStats,
ParquetReaderConfig parquetReaderConfig,
DeltaLakeConfig deltaLakeConfig,
HiveConfig hiveConfig,
ExecutorService executorService,
TypeManager typeManager,
JsonCodec<DeltaLakeUpdateResult> updateResultJsonCodec)
Expand All @@ -82,7 +80,7 @@ public DeltaLakePageSourceProvider(
this.fileFormatDataSourceStats = requireNonNull(fileFormatDataSourceStats, "fileFormatDataSourceStats is null");
this.parquetReaderOptions = requireNonNull(parquetReaderConfig, "parquetReaderConfig is null").toParquetReaderOptions();
this.domainCompactionThreshold = requireNonNull(deltaLakeConfig, "deltaLakeConfig is null").getDomainCompactionThreshold();
this.parquetDateTimeZone = requireNonNull(hiveConfig, "hiveConfig is null").getParquetDateTimeZone();
this.parquetDateTimeZone = deltaLakeConfig.getParquetDateTimeZone();
this.executorService = requireNonNull(executorService, "executorService is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.updateResultJsonCodec = requireNonNull(updateResultJsonCodec, "deleteResultJsonCodec is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import org.testng.annotations.Test;

import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;

import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.trino.plugin.hive.util.TestHiveUtil.nonDefaultTimeZone;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MINUTES;
Expand Down Expand Up @@ -54,7 +56,10 @@ public void testDefaults()
.setDynamicFilteringWaitTimeout(new Duration(0, SECONDS))
.setTableStatisticsEnabled(true)
.setExtendedStatisticsEnabled(true)
.setCompressionCodec(HiveCompressionCodec.SNAPPY));
.setCompressionCodec(HiveCompressionCodec.SNAPPY)
.setDeleteSchemaLocationsFallback(false)
.setParquetTimeZone(TimeZone.getDefault().getID())
.setPerTransactionMetastoreCacheMaximumSize(1000));
}

@Test
Expand All @@ -80,6 +85,9 @@ public void testExplicitPropertyMappings()
.put("delta.table-statistics-enabled", "false")
.put("delta.extended-statistics.enabled", "false")
.put("delta.compression-codec", "GZIP")
.put("delta.per-transaction-metastore-cache-maximum-size", "500")
.put("delta.delete-schema-locations-fallback", "true")
.put("delta.parquet.time-zone", nonDefaultTimeZone().getID())
.buildOrThrow();

DeltaLakeConfig expected = new DeltaLakeConfig()
Expand All @@ -101,7 +109,10 @@ public void testExplicitPropertyMappings()
.setDynamicFilteringWaitTimeout(new Duration(30, MINUTES))
.setTableStatisticsEnabled(false)
.setExtendedStatisticsEnabled(false)
.setCompressionCodec(HiveCompressionCodec.GZIP);
.setCompressionCodec(HiveCompressionCodec.GZIP)
.setDeleteSchemaLocationsFallback(true)
.setParquetTimeZone(nonDefaultTimeZone().getID())
.setPerTransactionMetastoreCacheMaximumSize(500);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public HiveMetastore createMetastore(Optional<ConnectorIdentity> identity)
deltaLakeProperties.put("hive.metastore", "test"); // use test value so we do not get clash with default bindings)
if (!enablePerTransactionHiveMetastoreCaching) {
// almost disable the cache; 0 is not allowed as config property value
deltaLakeProperties.put("hive.per-transaction-metastore-cache-maximum-size", "1");
deltaLakeProperties.put("delta.per-transaction-metastore-cache-maximum-size", "1");
}

queryRunner.createCatalog(DELTA_CATALOG, "delta-lake", deltaLakeProperties.buildOrThrow());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package io.trino.plugin.iceberg;

import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Scopes;
import com.google.inject.multibindings.Multibinder;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.plugin.hive.FileFormatDataSourceStats;
import io.trino.plugin.hive.metastore.thrift.TranslateHiveViews;
import io.trino.plugin.hive.orc.OrcReaderConfig;
import io.trino.plugin.hive.orc.OrcWriterConfig;
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
Expand Down Expand Up @@ -46,6 +48,7 @@ public class IcebergModule
public void configure(Binder binder)
{
binder.bind(IcebergTransactionManager.class).in(Scopes.SINGLETON);
binder.bind(Key.get(boolean.class, TranslateHiveViews.class)).toInstance(false);
configBinder(binder).bindConfig(IcebergConfig.class);

newSetBinder(binder, SessionPropertiesProvider.class).addBinding().to(IcebergSessionProperties.class).in(Scopes.SINGLETON);
Expand Down