Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ connector.
can also use the corresponding catalog session property
``<catalog-name>.max_split_size``.
- ``64MB``
* - ``delta.minimum-assigned-split-weight``
- A decimal value in the range (0, 1] used as a minimum for weights assigned to each split. A low value may improve performance
on tables with small files. A higher value may improve performance for queries with highly skewed aggregations or joins.
- 0.05

The following table describes :ref:`catalog session properties
<session-properties-definition>` supported by the Delta Lake connector to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.trino.plugin.hive.HiveCompressionCodec;
import org.joda.time.DateTimeZone;

import javax.validation.constraints.DecimalMax;
import javax.validation.constraints.DecimalMin;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

Expand Down Expand Up @@ -50,6 +52,7 @@ public class DeltaLakeConfig
private int maxInitialSplits = 200;
private DataSize maxInitialSplitSize;
private DataSize maxSplitSize = DataSize.of(64, MEGABYTE);
private double minimumAssignedSplitWeight = 0.05;
private int maxPartitionsPerWriter = 100;
private boolean unsafeWritesEnabled;
private boolean checkpointRowStatisticsWritingEnabled = true;
Expand Down Expand Up @@ -173,6 +176,21 @@ public DeltaLakeConfig setMaxSplitSize(DataSize maxSplitSize)
return this;
}

@Config("delta.minimum-assigned-split-weight")
@ConfigDescription("Minimum weight that a split can be assigned")
public DeltaLakeConfig setMinimumAssignedSplitWeight(double minimumAssignedSplitWeight)
{
this.minimumAssignedSplitWeight = minimumAssignedSplitWeight;
return this;
}

@DecimalMax("1")
@DecimalMin(value = "0", inclusive = false)
public double getMinimumAssignedSplitWeight()
{
return minimumAssignedSplitWeight;
}

@Min(1)
public int getMaxPartitionsPerWriter()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.SizeOf;
import io.trino.spi.HostAddress;
import io.trino.spi.SplitWeight;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.predicate.TupleDomain;
import org.openjdk.jol.info.ClassLayout;
Expand All @@ -44,6 +45,7 @@ public class DeltaLakeSplit
private final long fileSize;
private final long fileModifiedTime;
private final List<HostAddress> addresses;
private final SplitWeight splitWeight;
private final TupleDomain<DeltaLakeColumnHandle> statisticsPredicate;
private final Map<String, Optional<String>> partitionKeys;

Expand All @@ -55,6 +57,7 @@ public DeltaLakeSplit(
@JsonProperty("fileSize") long fileSize,
@JsonProperty("fileModifiedTime") long fileModifiedTime,
@JsonProperty("addresses") List<HostAddress> addresses,
@JsonProperty("splitWeight") SplitWeight splitWeight,
@JsonProperty("statisticsPredicate") TupleDomain<DeltaLakeColumnHandle> statisticsPredicate,
@JsonProperty("partitionKeys") Map<String, Optional<String>> partitionKeys)
{
Expand All @@ -64,6 +67,7 @@ public DeltaLakeSplit(
this.fileSize = fileSize;
this.fileModifiedTime = fileModifiedTime;
this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null"));
this.splitWeight = requireNonNull(splitWeight, "splitWeight is null");
this.statisticsPredicate = requireNonNull(statisticsPredicate, "statisticsPredicate is null");
this.partitionKeys = requireNonNull(partitionKeys, "partitionKeys is null");
}
Expand All @@ -81,6 +85,13 @@ public List<HostAddress> getAddresses()
return addresses;
}

@JsonProperty
@Override
public SplitWeight getSplitWeight()
{
return splitWeight;
}

@JsonProperty
public String getPath()
{
Expand Down Expand Up @@ -132,6 +143,7 @@ public long getRetainedSizeInBytes()
return INSTANCE_SIZE
+ estimatedSizeOf(path)
+ estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes)
+ splitWeight.getRetainedSizeInBytes()
+ statisticsPredicate.getRetainedSizeInBytes(DeltaLakeColumnHandle::getRetainedSizeInBytes)
+ estimatedSizeOf(partitionKeys, SizeOf::estimatedSizeOf, value -> sizeOf(value, SizeOf::estimatedSizeOf));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.plugin.deltalake.metastore.DeltaLakeMetastore;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.hive.HiveTransactionHandle;
import io.trino.spi.SplitWeight;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorSession;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class DeltaLakeSplitManager
private final int maxInitialSplits;
private final int maxSplitsPerSecond;
private final int maxOutstandingSplits;
private final double minimumAssignedSplitWeight;

@Inject
public DeltaLakeSplitManager(
Expand All @@ -85,6 +87,7 @@ public DeltaLakeSplitManager(
this.maxInitialSplits = config.getMaxInitialSplits();
this.maxSplitsPerSecond = config.getMaxSplitsPerSecond();
this.maxOutstandingSplits = config.getMaxOutstandingSplits();
this.minimumAssignedSplitWeight = config.getMinimumAssignedSplitWeight();
}

@Override
Expand Down Expand Up @@ -235,22 +238,22 @@ private List<DeltaLakeSplit> splitsForFile(
fileSize,
addFileEntry.getModificationTime(),
ImmutableList.of(),
SplitWeight.standard(),
statisticsPredicate,
partitionKeys));
}

ImmutableList.Builder<DeltaLakeSplit> splits = ImmutableList.builder();
long currentOffset = 0;
while (currentOffset < fileSize) {
long splitSize;
long maxSplitSize;
if (remainingInitialSplits.get() > 0 && remainingInitialSplits.getAndDecrement() > 0) {
splitSize = getMaxInitialSplitSize(session).toBytes();
maxSplitSize = getMaxInitialSplitSize(session).toBytes();
}
else {
splitSize = getMaxSplitSize(session).toBytes();
maxSplitSize = getMaxSplitSize(session).toBytes();
}

splitSize = Math.min(splitSize, fileSize - currentOffset);
long splitSize = Math.min(maxSplitSize, fileSize - currentOffset);

splits.add(new DeltaLakeSplit(
splitPath,
Expand All @@ -259,6 +262,7 @@ private List<DeltaLakeSplit> splitsForFile(
fileSize,
addFileEntry.getModificationTime(),
ImmutableList.of(),
SplitWeight.fromProportion(Math.min(Math.max((double) splitSize / maxSplitSize, minimumAssignedSplitWeight), 1.0)),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Math.min( ..., 1) is redundant per my understanding.

nit: static import min & max

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Math.min( ..., 1) is redundant per my understanding.

ie if we somehow end up creating an oversized split, it's fine to let the engine know about that.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

statisticsPredicate,
partitionKeys));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public void testDefaults()
.setMaxInitialSplits(200)
.setMaxInitialSplitSize(DataSize.of(32, DataSize.Unit.MEGABYTE))
.setMaxSplitSize(DataSize.of(64, DataSize.Unit.MEGABYTE))
.setMinimumAssignedSplitWeight(0.05)
.setMaxPartitionsPerWriter(100)
.setUnsafeWritesEnabled(false)
.setDefaultCheckpointWritingInterval(10)
Expand Down Expand Up @@ -74,6 +75,7 @@ public void testExplicitPropertyMappings()
.put("delta.max-initial-splits", "5")
.put("delta.max-initial-split-size", "1 GB")
.put("delta.max-split-size", "10 MB")
.put("delta.minimum-assigned-split-weight", "0.01")
.put("delta.max-partitions-per-writer", "200")
.put("delta.enable-non-concurrent-writes", "true")
.put("delta.default-checkpoint-writing-interval", "15")
Expand All @@ -99,6 +101,7 @@ public void testExplicitPropertyMappings()
.setMaxInitialSplits(5)
.setMaxInitialSplitSize(DataSize.of(1, DataSize.Unit.GIGABYTE))
.setMaxSplitSize(DataSize.of(10, DataSize.Unit.MEGABYTE))
.setMinimumAssignedSplitWeight(0.01)
.setMaxPartitionsPerWriter(200)
.setUnsafeWritesEnabled(true)
.setDefaultCheckpointWritingInterval(15)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.trino.plugin.hive.metastore.Table;
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
import io.trino.plugin.hive.parquet.ParquetWriterConfig;
import io.trino.spi.SplitWeight;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorSplitManager;
Expand Down Expand Up @@ -89,15 +90,16 @@ public void testInitialSplits()
DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig()
.setMaxInitialSplits(1000)
.setMaxInitialSplitSize(DataSize.ofBytes(5_000));
double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight();

DeltaLakeSplitManager splitManager = setupSplitManager(addFileEntries, deltaLakeConfig);
List<DeltaLakeSplit> splits = getSplits(splitManager, deltaLakeConfig);

List<DeltaLakeSplit> expected = ImmutableList.of(
makeSplit(0, 5_000, fileSize),
makeSplit(5_000, 5_000, fileSize),
makeSplit(10_000, 5_000, fileSize),
makeSplit(15_000, 5_000, fileSize));
makeSplit(0, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(5_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(10_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(15_000, 5_000, fileSize, minimumAssignedSplitWeight));

assertEquals(splits, expected);
}
Expand All @@ -112,18 +114,19 @@ public void testNonInitialSplits()
.setMaxInitialSplits(5)
.setMaxInitialSplitSize(DataSize.ofBytes(5_000))
.setMaxSplitSize(DataSize.ofBytes(20_000));
double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight();

DeltaLakeSplitManager splitManager = setupSplitManager(addFileEntries, deltaLakeConfig);
List<DeltaLakeSplit> splits = getSplits(splitManager, deltaLakeConfig);

List<DeltaLakeSplit> expected = ImmutableList.of(
makeSplit(0, 5_000, fileSize),
makeSplit(5_000, 5_000, fileSize),
makeSplit(10_000, 5_000, fileSize),
makeSplit(15_000, 5_000, fileSize),
makeSplit(20_000, 5_000, fileSize),
makeSplit(25_000, 20_000, fileSize),
makeSplit(45_000, 5_000, fileSize));
makeSplit(0, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(5_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(10_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(15_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(20_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(25_000, 20_000, fileSize, minimumAssignedSplitWeight),
makeSplit(45_000, 5_000, fileSize, minimumAssignedSplitWeight));

assertEquals(splits, expected);
}
Expand All @@ -139,16 +142,17 @@ public void testSplitsFromMultipleFiles()
.setMaxInitialSplits(3)
.setMaxInitialSplitSize(DataSize.ofBytes(2_000))
.setMaxSplitSize(DataSize.ofBytes(10_000));
double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight();

DeltaLakeSplitManager splitManager = setupSplitManager(addFileEntries, deltaLakeConfig);

List<DeltaLakeSplit> splits = getSplits(splitManager, deltaLakeConfig);
List<DeltaLakeSplit> expected = ImmutableList.of(
makeSplit(0, 1_000, firstFileSize),
makeSplit(0, 2_000, secondFileSize),
makeSplit(2_000, 2_000, secondFileSize),
makeSplit(4_000, 10_000, secondFileSize),
makeSplit(14_000, 6_000, secondFileSize));
makeSplit(0, 1_000, firstFileSize, minimumAssignedSplitWeight),
makeSplit(0, 2_000, secondFileSize, minimumAssignedSplitWeight),
makeSplit(2_000, 2_000, secondFileSize, minimumAssignedSplitWeight),
makeSplit(4_000, 10_000, secondFileSize, minimumAssignedSplitWeight),
makeSplit(14_000, 6_000, secondFileSize, minimumAssignedSplitWeight));
assertEquals(splits, expected);
}

Expand All @@ -171,9 +175,10 @@ private AddFileEntry addFileEntryOfSize(long fileSize)
return new AddFileEntry(FILE_PATH, ImmutableMap.of(), fileSize, 0, false, Optional.empty(), Optional.empty(), ImmutableMap.of());
}

private DeltaLakeSplit makeSplit(long start, long splitSize, long fileSize)
private DeltaLakeSplit makeSplit(long start, long splitSize, long fileSize, double minimumAssignedSplitWeight)
{
return new DeltaLakeSplit(FULL_PATH, start, splitSize, fileSize, 0, ImmutableList.of(), TupleDomain.all(), ImmutableMap.of());
SplitWeight splitWeight = SplitWeight.fromProportion(Math.min(Math.max((double) fileSize / splitSize, minimumAssignedSplitWeight), 1.0));
return new DeltaLakeSplit(FULL_PATH, start, splitSize, fileSize, 0, ImmutableList.of(), splitWeight, TupleDomain.all(), ImmutableMap.of());
}

private List<DeltaLakeSplit> getSplits(DeltaLakeSplitManager splitManager, DeltaLakeConfig deltaLakeConfig)
Expand Down