diff --git a/docs/src/main/sphinx/connector/delta-lake.rst b/docs/src/main/sphinx/connector/delta-lake.rst index 3b0bef045406..74f5077395bc 100644 --- a/docs/src/main/sphinx/connector/delta-lake.rst +++ b/docs/src/main/sphinx/connector/delta-lake.rst @@ -137,6 +137,9 @@ values. Typical usage does not require you to configure them. * - ``delta.parquet.time-zone`` - Time zone for Parquet read and write. - JVM default + * - ``delta.target-max-file-size`` + - Target maximum size of written files; the actual size may be larger. + - ``1GB`` The following table describes performance tuning catalog properties for the connector. diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java index a59dd3d1f57a..ca9a13012c11 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java @@ -29,6 +29,7 @@ import java.util.TimeZone; import java.util.concurrent.TimeUnit; +import static io.airlift.units.DataSize.Unit.GIGABYTE; import static io.airlift.units.DataSize.Unit.MEGABYTE; import static java.util.concurrent.TimeUnit.DAYS; import static java.util.concurrent.TimeUnit.SECONDS; @@ -64,6 +65,7 @@ public class DeltaLakeConfig private long perTransactionMetastoreCacheMaximumSize = 1000; private boolean deleteSchemaLocationsFallback; private String parquetTimeZone = TimeZone.getDefault().getID(); + private DataSize targetMaxFileSize = DataSize.of(1, GIGABYTE); public Duration getMetadataCacheTtl() { @@ -364,4 +366,18 @@ public DeltaLakeConfig setParquetTimeZone(String parquetTimeZone) this.parquetTimeZone = parquetTimeZone; return this; } + + @NotNull + public DataSize getTargetMaxFileSize() + { + return targetMaxFileSize; + } + + @Config("delta.target-max-file-size") + @ConfigDescription("Target maximum size of written files; the actual size may be larger") + public DeltaLakeConfig setTargetMaxFileSize(DataSize targetMaxFileSize) + { + this.targetMaxFileSize = targetMaxFileSize; + return this; + } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java index 28783bb09046..9cdff8a05150 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java @@ -14,6 +14,7 @@ package io.trino.plugin.deltalake; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -114,10 +115,14 @@ public class DeltaLakePageSink private final JobConf conf; private final TypeManager typeManager; private final String trinoVersion; + private final long targetMaxFileSize; private long writtenBytes; private long memoryUsage; + private final List closedWriters = new ArrayList<>(); + private final ImmutableList.Builder dataFileInfos = ImmutableList.builder(); + public DeltaLakePageSink( List inputColumns, List originalPartitionColumns, @@ -191,6 +196,7 @@ public DeltaLakePageSink( this.conf = toJobConf(conf); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null"); + this.targetMaxFileSize = DeltaLakeSessionProperties.getTargetMaxFileSize(session); } @Override @@ -220,26 +226,13 @@ public CompletableFuture> finish() private ListenableFuture> doFinish() { - ImmutableList.Builder dataFileInfos = ImmutableList.builder(); - Optional commitException = Optional.empty(); for (DeltaLakeWriter writer : writers) { - writer.commit(); - try { - DataFileInfo dataFileInfo = writer.getDataFileInfo(); - dataFileInfos.add(wrappedBuffer(dataFileInfoCodec.toJsonBytes(dataFileInfo))); - } - catch (IOException e) { - LOG.warn("exception '%s' while finishing write on %s", e, writer); - commitException = Optional.of(e); - } - } - if (commitException.isPresent()) { - throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Error committing Parquet file to Delta Lake", commitException.get()); + closeWriter(writer); } List result = dataFileInfos.build(); - writtenBytes = writers.stream() + writtenBytes = closedWriters.stream() .mapToLong(DeltaLakeWriter::getWrittenBytes) .sum(); @@ -255,7 +248,7 @@ public void abort() private void doAbort() { Optional rollbackException = Optional.empty(); - for (DeltaLakeWriter writer : writers) { + for (DeltaLakeWriter writer : Iterables.concat(writers, closedWriters)) { // writers can contain nulls if an exception is thrown when doAppend expends the writer list if (writer != null) { try { @@ -357,12 +350,21 @@ private int[] getWriterIndexes(Page page) while (writers.size() <= pageIndexer.getMaxIndex()) { writers.add(null); } - + boolean isOptimizedParquetWriter = isParquetOptimizedWriterEnabled(session); // create missing writers for (int position = 0; position < page.getPositionCount(); position++) { int writerIndex = writerIndexes[position]; - if (writers.get(writerIndex) != null) { - continue; + DeltaLakeWriter deltaLakeWriter = writers.get(writerIndex); + if (deltaLakeWriter != null) { + if (isOptimizedParquetWriter) { + if (deltaLakeWriter.getWrittenBytes() <= targetMaxFileSize) { + continue; + } + closeWriter(deltaLakeWriter); + } + else { + continue; + } } Path filePath = new Path(outputPath); @@ -379,7 +381,7 @@ private int[] getWriterIndexes(Page page) filePath = new Path(filePath, fileName); FileWriter fileWriter; - if (isParquetOptimizedWriterEnabled(session)) { + if (isOptimizedParquetWriter) { fileWriter = createParquetFileWriter(filePath); } else { @@ -409,6 +411,24 @@ private int[] getWriterIndexes(Page page) return writerIndexes; } + private void closeWriter(DeltaLakeWriter writer) + { + long currentWritten = writer.getWrittenBytes(); + long currentMemory = writer.getMemoryUsage(); + writer.commit(); + writtenBytes += writer.getWrittenBytes() - currentWritten; + memoryUsage += writer.getMemoryUsage() - currentMemory; + try { + DataFileInfo dataFileInfo = writer.getDataFileInfo(); + dataFileInfos.add(wrappedBuffer(dataFileInfoCodec.toJsonBytes(dataFileInfo))); + } + catch (IOException e) { + LOG.warn("exception '%s' while finishing write on %s", e, writer); + throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Error committing Parquet file to Delta Lake", e); + } + closedWriters.add(writer); + } + /** * Copy of {@link FileUtils#makePartName(List, List)} modified to preserve case of partition columns. */ diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java index 700804d89c9b..3b211cfd0d0c 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java @@ -47,6 +47,7 @@ public final class DeltaLakeSessionProperties private static final String PARQUET_USE_COLUMN_INDEX = "parquet_use_column_index"; private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size"; private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size"; + private static final String TARGET_MAX_FILE_SIZE = "target_max_file_size"; private static final String PARQUET_OPTIMIZED_WRITER_ENABLED = "experimental_parquet_optimized_writer_enabled"; // = HiveSessionProperties#PARQUET_OPTIMIZED_WRITER_ENABLED private static final String COMPRESSION_CODEC = "compression_codec"; // This property is not supported by Delta Lake and exists solely for technical reasons. @@ -107,6 +108,11 @@ public DeltaLakeSessionProperties( "Parquet: Writer page size", parquetWriterConfig.getPageSize(), false), + dataSizeProperty( + TARGET_MAX_FILE_SIZE, + "Target maximum size of written files; the actual size may be larger", + deltaLakeConfig.getTargetMaxFileSize(), + false), booleanProperty( PARQUET_OPTIMIZED_WRITER_ENABLED, "Experimental: Enable optimized writer", @@ -193,6 +199,11 @@ public static DataSize getParquetWriterPageSize(ConnectorSession session) return session.getProperty(PARQUET_WRITER_PAGE_SIZE, DataSize.class); } + public static long getTargetMaxFileSize(ConnectorSession session) + { + return session.getProperty(TARGET_MAX_FILE_SIZE, DataSize.class).toBytes(); + } + public static Duration getDynamicFilteringWaitTimeout(ConnectorSession session) { return session.getProperty(DYNAMIC_FILTERING_WAIT_TIMEOUT, Duration.class); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java index e5066ea5365a..3bcff620d47a 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java @@ -16,17 +16,20 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.airlift.units.DataSize; import io.trino.Session; import io.trino.execution.QueryInfo; import io.trino.plugin.deltalake.util.DockerizedMinioDataLake; import io.trino.testing.BaseConnectorTest; import io.trino.testing.DistributedQueryRunner; import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedRow; import io.trino.testing.QueryRunner; import io.trino.testing.ResultWithQueryId; import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.sql.TestTable; import io.trino.tpch.TpchTable; +import org.intellij.lang.annotations.Language; import org.testng.SkipException; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -429,6 +432,40 @@ public void testAddColumnAndVacuum() } } + @Test + public void testTargetMaxFileSize() + { + String tableName = "test_default_max_file_size" + randomTableSuffix(); + @Language("SQL") String createTableSql = format("CREATE TABLE %s AS SELECT * FROM tpch.sf1.lineitem LIMIT 100000", tableName); + + Session session = Session.builder(getSession()) + .setSystemProperty("task_writer_count", "1") + .setCatalogSessionProperty("delta_lake", "experimental_parquet_optimized_writer_enabled", "true") + .build(); + assertUpdate(session, createTableSql, 100000); + Set initialFiles = getActiveFiles(tableName); + assertThat(initialFiles.size()).isLessThanOrEqualTo(3); + assertUpdate(format("DROP TABLE %s", tableName)); + + DataSize maxSize = DataSize.of(40, DataSize.Unit.KILOBYTE); + session = Session.builder(getSession()) + .setSystemProperty("task_writer_count", "1") + .setCatalogSessionProperty("delta_lake", "experimental_parquet_optimized_writer_enabled", "true") + .setCatalogSessionProperty("delta_lake", "target_max_file_size", maxSize.toString()) + .build(); + + assertUpdate(session, createTableSql, 100000); + assertThat(query(format("SELECT count(*) FROM %s", tableName))).matches("VALUES BIGINT '100000'"); + Set updatedFiles = getActiveFiles(tableName); + assertThat(updatedFiles.size()).isGreaterThan(10); + + MaterializedResult result = computeActual("SELECT DISTINCT \"$path\", \"$file_size\" FROM " + tableName); + for (MaterializedRow row : result) { + // allow up to a larger delta due to the very small max size and the relatively large writer chunk size + assertThat((Long) row.getField(1)).isLessThan(maxSize.toBytes() * 5); + } + } + @Override protected String createSchemaSql(String schemaName) { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java index cb9766d79599..248d3b437fc5 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java @@ -26,6 +26,7 @@ import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; +import static io.airlift.units.DataSize.Unit.GIGABYTE; import static io.trino.plugin.hive.util.TestHiveUtil.nonDefaultTimeZone; import static java.util.concurrent.TimeUnit.DAYS; import static java.util.concurrent.TimeUnit.HOURS; @@ -59,7 +60,8 @@ public void testDefaults() .setCompressionCodec(HiveCompressionCodec.SNAPPY) .setDeleteSchemaLocationsFallback(false) .setParquetTimeZone(TimeZone.getDefault().getID()) - .setPerTransactionMetastoreCacheMaximumSize(1000)); + .setPerTransactionMetastoreCacheMaximumSize(1000) + .setTargetMaxFileSize(DataSize.of(1, GIGABYTE))); } @Test @@ -88,6 +90,7 @@ public void testExplicitPropertyMappings() .put("delta.per-transaction-metastore-cache-maximum-size", "500") .put("delta.delete-schema-locations-fallback", "true") .put("delta.parquet.time-zone", nonDefaultTimeZone().getID()) + .put("delta.target-max-file-size", "2 GB") .buildOrThrow(); DeltaLakeConfig expected = new DeltaLakeConfig() @@ -97,7 +100,7 @@ public void testExplicitPropertyMappings() .setMaxOutstandingSplits(200) .setMaxSplitsPerSecond(10) .setMaxInitialSplits(5) - .setMaxInitialSplitSize(DataSize.of(1, DataSize.Unit.GIGABYTE)) + .setMaxInitialSplitSize(DataSize.of(1, GIGABYTE)) .setMaxSplitSize(DataSize.of(10, DataSize.Unit.MEGABYTE)) .setMaxPartitionsPerWriter(200) .setUnsafeWritesEnabled(true) @@ -112,7 +115,8 @@ public void testExplicitPropertyMappings() .setCompressionCodec(HiveCompressionCodec.GZIP) .setDeleteSchemaLocationsFallback(true) .setParquetTimeZone(nonDefaultTimeZone().getID()) - .setPerTransactionMetastoreCacheMaximumSize(500); + .setPerTransactionMetastoreCacheMaximumSize(500) + .setTargetMaxFileSize(DataSize.of(2, GIGABYTE)); assertFullMapping(properties, expected); }