Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ values. Typical usage does not require you to configure them.
* - ``delta.parquet.time-zone``
- Time zone for Parquet read and write.
- JVM default
* - ``delta.target-max-file-size``
Comment thread
ebyhr marked this conversation as resolved.
Outdated
- Target maximum size of written files; the actual size may be larger.
- ``1GB``

The following table describes performance tuning catalog properties for the
connector.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;

import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -64,6 +65,7 @@ public class DeltaLakeConfig
private long perTransactionMetastoreCacheMaximumSize = 1000;
private boolean deleteSchemaLocationsFallback;
private String parquetTimeZone = TimeZone.getDefault().getID();
private DataSize targetMaxFileSize = DataSize.of(1, GIGABYTE);

public Duration getMetadataCacheTtl()
{
Expand Down Expand Up @@ -364,4 +366,18 @@ public DeltaLakeConfig setParquetTimeZone(String parquetTimeZone)
this.parquetTimeZone = parquetTimeZone;
return this;
}

@NotNull
public DataSize getTargetMaxFileSize()
Comment thread
ebyhr marked this conversation as resolved.
Outdated
{
return targetMaxFileSize;
}

@Config("delta.target-max-file-size")
@ConfigDescription("Target maximum size of written files; the actual size may be larger")
public DeltaLakeConfig setTargetMaxFileSize(DataSize targetMaxFileSize)
{
this.targetMaxFileSize = targetMaxFileSize;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -114,10 +115,14 @@ public class DeltaLakePageSink
private final JobConf conf;
private final TypeManager typeManager;
private final String trinoVersion;
private final long targetMaxFileSize;

private long writtenBytes;
private long memoryUsage;

private final List<DeltaLakeWriter> closedWriters = new ArrayList<>();
private final ImmutableList.Builder<Slice> dataFileInfos = ImmutableList.builder();

public DeltaLakePageSink(
List<DeltaLakeColumnHandle> inputColumns,
List<String> originalPartitionColumns,
Expand Down Expand Up @@ -191,6 +196,7 @@ public DeltaLakePageSink(
this.conf = toJobConf(conf);
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null");
this.targetMaxFileSize = DeltaLakeSessionProperties.getTargetMaxFileSize(session);
}

@Override
Expand Down Expand Up @@ -220,26 +226,13 @@ public CompletableFuture<Collection<Slice>> finish()

private ListenableFuture<Collection<Slice>> doFinish()
{
ImmutableList.Builder<Slice> dataFileInfos = ImmutableList.builder();
Optional<Exception> commitException = Optional.empty();
for (DeltaLakeWriter writer : writers) {
writer.commit();
try {
DataFileInfo dataFileInfo = writer.getDataFileInfo();
dataFileInfos.add(wrappedBuffer(dataFileInfoCodec.toJsonBytes(dataFileInfo)));
}
catch (IOException e) {
LOG.warn("exception '%s' while finishing write on %s", e, writer);
commitException = Optional.of(e);
}
}
if (commitException.isPresent()) {
throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Error committing Parquet file to Delta Lake", commitException.get());
closeWriter(writer);
}

List<Slice> result = dataFileInfos.build();

writtenBytes = writers.stream()
writtenBytes = closedWriters.stream()
.mapToLong(DeltaLakeWriter::getWrittenBytes)
.sum();

Expand All @@ -255,7 +248,7 @@ public void abort()
private void doAbort()
{
Optional<Exception> rollbackException = Optional.empty();
for (DeltaLakeWriter writer : writers) {
for (DeltaLakeWriter writer : Iterables.concat(writers, closedWriters)) {
// writers can contain nulls if an exception is thrown when doAppend expends the writer list
if (writer != null) {
try {
Expand Down Expand Up @@ -357,12 +350,21 @@ private int[] getWriterIndexes(Page page)
while (writers.size() <= pageIndexer.getMaxIndex()) {
writers.add(null);
}

boolean isOptimizedParquetWriter = isParquetOptimizedWriterEnabled(session);
// create missing writers
for (int position = 0; position < page.getPositionCount(); position++) {
int writerIndex = writerIndexes[position];
if (writers.get(writerIndex) != null) {
continue;
DeltaLakeWriter deltaLakeWriter = writers.get(writerIndex);
if (deltaLakeWriter != null) {
if (isOptimizedParquetWriter) {
if (deltaLakeWriter.getWrittenBytes() <= targetMaxFileSize) {
continue;
}
closeWriter(deltaLakeWriter);
}
else {
continue;
}
}

Path filePath = new Path(outputPath);
Expand All @@ -379,7 +381,7 @@ private int[] getWriterIndexes(Page page)
filePath = new Path(filePath, fileName);

FileWriter fileWriter;
if (isParquetOptimizedWriterEnabled(session)) {
if (isOptimizedParquetWriter) {
fileWriter = createParquetFileWriter(filePath);
}
else {
Expand Down Expand Up @@ -409,6 +411,24 @@ private int[] getWriterIndexes(Page page)
return writerIndexes;
}

private void closeWriter(DeltaLakeWriter writer)
{
long currentWritten = writer.getWrittenBytes();
long currentMemory = writer.getMemoryUsage();
writer.commit();
writtenBytes += writer.getWrittenBytes() - currentWritten;
memoryUsage += writer.getMemoryUsage() - currentMemory;
try {
DataFileInfo dataFileInfo = writer.getDataFileInfo();
dataFileInfos.add(wrappedBuffer(dataFileInfoCodec.toJsonBytes(dataFileInfo)));
}
catch (IOException e) {
LOG.warn("exception '%s' while finishing write on %s", e, writer);
throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Error committing Parquet file to Delta Lake", e);
}
closedWriters.add(writer);
}

/**
* Copy of {@link FileUtils#makePartName(List, List)} modified to preserve case of partition columns.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public final class DeltaLakeSessionProperties
private static final String PARQUET_USE_COLUMN_INDEX = "parquet_use_column_index";
private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size";
private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size";
private static final String TARGET_MAX_FILE_SIZE = "target_max_file_size";
private static final String PARQUET_OPTIMIZED_WRITER_ENABLED = "experimental_parquet_optimized_writer_enabled"; // = HiveSessionProperties#PARQUET_OPTIMIZED_WRITER_ENABLED
private static final String COMPRESSION_CODEC = "compression_codec";
// This property is not supported by Delta Lake and exists solely for technical reasons.
Expand Down Expand Up @@ -107,6 +108,11 @@ public DeltaLakeSessionProperties(
"Parquet: Writer page size",
parquetWriterConfig.getPageSize(),
false),
dataSizeProperty(
TARGET_MAX_FILE_SIZE,
"Target maximum size of written files; the actual size may be larger",
deltaLakeConfig.getTargetMaxFileSize(),
false),
booleanProperty(
PARQUET_OPTIMIZED_WRITER_ENABLED,
"Experimental: Enable optimized writer",
Expand Down Expand Up @@ -193,6 +199,11 @@ public static DataSize getParquetWriterPageSize(ConnectorSession session)
return session.getProperty(PARQUET_WRITER_PAGE_SIZE, DataSize.class);
}

public static long getTargetMaxFileSize(ConnectorSession session)
{
return session.getProperty(TARGET_MAX_FILE_SIZE, DataSize.class).toBytes();
}

public static Duration getDynamicFilteringWaitTimeout(ConnectorSession session)
{
return session.getProperty(DYNAMIC_FILTERING_WAIT_TIMEOUT, Duration.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
import io.trino.Session;
import io.trino.execution.QueryInfo;
import io.trino.plugin.deltalake.util.DockerizedMinioDataLake;
import io.trino.testing.BaseConnectorTest;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.MaterializedResult;
import io.trino.testing.MaterializedRow;
import io.trino.testing.QueryRunner;
import io.trino.testing.ResultWithQueryId;
import io.trino.testing.TestingConnectorBehavior;
import io.trino.testing.sql.TestTable;
import io.trino.tpch.TpchTable;
import org.intellij.lang.annotations.Language;
import org.testng.SkipException;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -429,6 +432,40 @@ public void testAddColumnAndVacuum()
}
}

@Test
public void testTargetMaxFileSize()
{
String tableName = "test_default_max_file_size" + randomTableSuffix();
@Language("SQL") String createTableSql = format("CREATE TABLE %s AS SELECT * FROM tpch.sf1.lineitem LIMIT 100000", tableName);

Session session = Session.builder(getSession())
.setSystemProperty("task_writer_count", "1")
.setCatalogSessionProperty("delta_lake", "experimental_parquet_optimized_writer_enabled", "true")
.build();
assertUpdate(session, createTableSql, 100000);
Set<String> initialFiles = getActiveFiles(tableName);
assertThat(initialFiles.size()).isLessThanOrEqualTo(3);
assertUpdate(format("DROP TABLE %s", tableName));

DataSize maxSize = DataSize.of(40, DataSize.Unit.KILOBYTE);
session = Session.builder(getSession())
.setSystemProperty("task_writer_count", "1")
.setCatalogSessionProperty("delta_lake", "experimental_parquet_optimized_writer_enabled", "true")
.setCatalogSessionProperty("delta_lake", "target_max_file_size", maxSize.toString())
.build();

assertUpdate(session, createTableSql, 100000);
assertThat(query(format("SELECT count(*) FROM %s", tableName))).matches("VALUES BIGINT '100000'");
Set<String> updatedFiles = getActiveFiles(tableName);
assertThat(updatedFiles.size()).isGreaterThan(10);

MaterializedResult result = computeActual("SELECT DISTINCT \"$path\", \"$file_size\" FROM " + tableName);
for (MaterializedRow row : result) {
// allow up to a larger delta due to the very small max size and the relatively large writer chunk size
assertThat((Long) row.getField(1)).isLessThan(maxSize.toBytes() * 5);
}
}

@Override
protected String createSchemaSql(String schemaName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.trino.plugin.hive.util.TestHiveUtil.nonDefaultTimeZone;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.HOURS;
Expand Down Expand Up @@ -59,7 +60,8 @@ public void testDefaults()
.setCompressionCodec(HiveCompressionCodec.SNAPPY)
.setDeleteSchemaLocationsFallback(false)
.setParquetTimeZone(TimeZone.getDefault().getID())
.setPerTransactionMetastoreCacheMaximumSize(1000));
.setPerTransactionMetastoreCacheMaximumSize(1000)
.setTargetMaxFileSize(DataSize.of(1, GIGABYTE)));
}

@Test
Expand Down Expand Up @@ -88,6 +90,7 @@ public void testExplicitPropertyMappings()
.put("delta.per-transaction-metastore-cache-maximum-size", "500")
.put("delta.delete-schema-locations-fallback", "true")
.put("delta.parquet.time-zone", nonDefaultTimeZone().getID())
.put("delta.target-max-file-size", "2 GB")
.buildOrThrow();

DeltaLakeConfig expected = new DeltaLakeConfig()
Expand All @@ -97,7 +100,7 @@ public void testExplicitPropertyMappings()
.setMaxOutstandingSplits(200)
.setMaxSplitsPerSecond(10)
.setMaxInitialSplits(5)
.setMaxInitialSplitSize(DataSize.of(1, DataSize.Unit.GIGABYTE))
.setMaxInitialSplitSize(DataSize.of(1, GIGABYTE))
.setMaxSplitSize(DataSize.of(10, DataSize.Unit.MEGABYTE))
.setMaxPartitionsPerWriter(200)
.setUnsafeWritesEnabled(true)
Expand All @@ -112,7 +115,8 @@ public void testExplicitPropertyMappings()
.setCompressionCodec(HiveCompressionCodec.GZIP)
.setDeleteSchemaLocationsFallback(true)
.setParquetTimeZone(nonDefaultTimeZone().getID())
.setPerTransactionMetastoreCacheMaximumSize(500);
.setPerTransactionMetastoreCacheMaximumSize(500)
.setTargetMaxFileSize(DataSize.of(2, GIGABYTE));

assertFullMapping(properties, expected);
}
Expand Down