From 59cda7c0cefc7b76337abd63d8cca0c0b0b6eafb Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Thu, 9 Jun 2022 14:36:32 +0900 Subject: [PATCH] Attach weights to DeltaLakeSplits The weight is equal to the split size divided by the target split size. --- docs/src/main/sphinx/connector/delta-lake.rst | 4 ++ .../plugin/deltalake/DeltaLakeConfig.java | 18 ++++++++ .../plugin/deltalake/DeltaLakeSplit.java | 12 ++++++ .../deltalake/DeltaLakeSplitManager.java | 14 ++++--- .../plugin/deltalake/TestDeltaLakeConfig.java | 3 ++ .../deltalake/TestDeltaLakeSplitManager.java | 41 +++++++++++-------- 6 files changed, 69 insertions(+), 23 deletions(-) diff --git a/docs/src/main/sphinx/connector/delta-lake.rst b/docs/src/main/sphinx/connector/delta-lake.rst index 9f3a12f7bf8f..cac40c25d96d 100644 --- a/docs/src/main/sphinx/connector/delta-lake.rst +++ b/docs/src/main/sphinx/connector/delta-lake.rst @@ -180,6 +180,10 @@ connector. can also use the corresponding catalog session property ``.max_split_size``. - ``64MB`` + * - ``delta.minimum-assigned-split-weight`` + - A decimal value in the range (0, 1] used as a minimum for weights assigned to each split. A low value may improve performance + on tables with small files. A higher value may improve performance for queries with highly skewed aggregations or joins. + - 0.05 The following table describes :ref:`catalog session properties ` supported by the Delta Lake connector to diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java index 1010fb065163..a41153cbbbb4 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java @@ -22,6 +22,8 @@ import io.trino.plugin.hive.HiveCompressionCodec; import org.joda.time.DateTimeZone; +import javax.validation.constraints.DecimalMax; +import javax.validation.constraints.DecimalMin; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; @@ -50,6 +52,7 @@ public class DeltaLakeConfig private int maxInitialSplits = 200; private DataSize maxInitialSplitSize; private DataSize maxSplitSize = DataSize.of(64, MEGABYTE); + private double minimumAssignedSplitWeight = 0.05; private int maxPartitionsPerWriter = 100; private boolean unsafeWritesEnabled; private boolean checkpointRowStatisticsWritingEnabled = true; @@ -173,6 +176,21 @@ public DeltaLakeConfig setMaxSplitSize(DataSize maxSplitSize) return this; } + @Config("delta.minimum-assigned-split-weight") + @ConfigDescription("Minimum weight that a split can be assigned") + public DeltaLakeConfig setMinimumAssignedSplitWeight(double minimumAssignedSplitWeight) + { + this.minimumAssignedSplitWeight = minimumAssignedSplitWeight; + return this; + } + + @DecimalMax("1") + @DecimalMin(value = "0", inclusive = false) + public double getMinimumAssignedSplitWeight() + { + return minimumAssignedSplitWeight; + } + @Min(1) public int getMaxPartitionsPerWriter() { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java index b179d54e45b0..b1ce421df01a 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableMap; import io.airlift.slice.SizeOf; import io.trino.spi.HostAddress; +import io.trino.spi.SplitWeight; import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.predicate.TupleDomain; import org.openjdk.jol.info.ClassLayout; @@ -44,6 +45,7 @@ public class DeltaLakeSplit private final long fileSize; private final long fileModifiedTime; private final List addresses; + private final SplitWeight splitWeight; private final TupleDomain statisticsPredicate; private final Map> partitionKeys; @@ -55,6 +57,7 @@ public DeltaLakeSplit( @JsonProperty("fileSize") long fileSize, @JsonProperty("fileModifiedTime") long fileModifiedTime, @JsonProperty("addresses") List addresses, + @JsonProperty("splitWeight") SplitWeight splitWeight, @JsonProperty("statisticsPredicate") TupleDomain statisticsPredicate, @JsonProperty("partitionKeys") Map> partitionKeys) { @@ -64,6 +67,7 @@ public DeltaLakeSplit( this.fileSize = fileSize; this.fileModifiedTime = fileModifiedTime; this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null")); + this.splitWeight = requireNonNull(splitWeight, "splitWeight is null"); this.statisticsPredicate = requireNonNull(statisticsPredicate, "statisticsPredicate is null"); this.partitionKeys = requireNonNull(partitionKeys, "partitionKeys is null"); } @@ -81,6 +85,13 @@ public List getAddresses() return addresses; } + @JsonProperty + @Override + public SplitWeight getSplitWeight() + { + return splitWeight; + } + @JsonProperty public String getPath() { @@ -132,6 +143,7 @@ public long getRetainedSizeInBytes() return INSTANCE_SIZE + estimatedSizeOf(path) + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes) + + splitWeight.getRetainedSizeInBytes() + statisticsPredicate.getRetainedSizeInBytes(DeltaLakeColumnHandle::getRetainedSizeInBytes) + estimatedSizeOf(partitionKeys, SizeOf::estimatedSizeOf, value -> sizeOf(value, SizeOf::estimatedSizeOf)); } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java index 06d5020ae3f2..e6327a9583e5 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java @@ -19,6 +19,7 @@ import io.trino.plugin.deltalake.metastore.DeltaLakeMetastore; import io.trino.plugin.deltalake.transactionlog.AddFileEntry; import io.trino.plugin.hive.HiveTransactionHandle; +import io.trino.spi.SplitWeight; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ConnectorSession; @@ -70,6 +71,7 @@ public class DeltaLakeSplitManager private final int maxInitialSplits; private final int maxSplitsPerSecond; private final int maxOutstandingSplits; + private final double minimumAssignedSplitWeight; @Inject public DeltaLakeSplitManager( @@ -85,6 +87,7 @@ public DeltaLakeSplitManager( this.maxInitialSplits = config.getMaxInitialSplits(); this.maxSplitsPerSecond = config.getMaxSplitsPerSecond(); this.maxOutstandingSplits = config.getMaxOutstandingSplits(); + this.minimumAssignedSplitWeight = config.getMinimumAssignedSplitWeight(); } @Override @@ -235,6 +238,7 @@ private List splitsForFile( fileSize, addFileEntry.getModificationTime(), ImmutableList.of(), + SplitWeight.standard(), statisticsPredicate, partitionKeys)); } @@ -242,15 +246,14 @@ private List splitsForFile( ImmutableList.Builder splits = ImmutableList.builder(); long currentOffset = 0; while (currentOffset < fileSize) { - long splitSize; + long maxSplitSize; if (remainingInitialSplits.get() > 0 && remainingInitialSplits.getAndDecrement() > 0) { - splitSize = getMaxInitialSplitSize(session).toBytes(); + maxSplitSize = getMaxInitialSplitSize(session).toBytes(); } else { - splitSize = getMaxSplitSize(session).toBytes(); + maxSplitSize = getMaxSplitSize(session).toBytes(); } - - splitSize = Math.min(splitSize, fileSize - currentOffset); + long splitSize = Math.min(maxSplitSize, fileSize - currentOffset); splits.add(new DeltaLakeSplit( splitPath, @@ -259,6 +262,7 @@ private List splitsForFile( fileSize, addFileEntry.getModificationTime(), ImmutableList.of(), + SplitWeight.fromProportion(Math.min(Math.max((double) splitSize / maxSplitSize, minimumAssignedSplitWeight), 1.0)), statisticsPredicate, partitionKeys)); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java index cb9766d79599..71092ce64960 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java @@ -46,6 +46,7 @@ public void testDefaults() .setMaxInitialSplits(200) .setMaxInitialSplitSize(DataSize.of(32, DataSize.Unit.MEGABYTE)) .setMaxSplitSize(DataSize.of(64, DataSize.Unit.MEGABYTE)) + .setMinimumAssignedSplitWeight(0.05) .setMaxPartitionsPerWriter(100) .setUnsafeWritesEnabled(false) .setDefaultCheckpointWritingInterval(10) @@ -74,6 +75,7 @@ public void testExplicitPropertyMappings() .put("delta.max-initial-splits", "5") .put("delta.max-initial-split-size", "1 GB") .put("delta.max-split-size", "10 MB") + .put("delta.minimum-assigned-split-weight", "0.01") .put("delta.max-partitions-per-writer", "200") .put("delta.enable-non-concurrent-writes", "true") .put("delta.default-checkpoint-writing-interval", "15") @@ -99,6 +101,7 @@ public void testExplicitPropertyMappings() .setMaxInitialSplits(5) .setMaxInitialSplitSize(DataSize.of(1, DataSize.Unit.GIGABYTE)) .setMaxSplitSize(DataSize.of(10, DataSize.Unit.MEGABYTE)) + .setMinimumAssignedSplitWeight(0.01) .setMaxPartitionsPerWriter(200) .setUnsafeWritesEnabled(true) .setDefaultCheckpointWritingInterval(15) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java index 2c762fe86b46..49421f987b22 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java @@ -29,6 +29,7 @@ import io.trino.plugin.hive.metastore.Table; import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.hive.parquet.ParquetWriterConfig; +import io.trino.spi.SplitWeight; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.connector.ConnectorSplitManager; @@ -89,15 +90,16 @@ public void testInitialSplits() DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig() .setMaxInitialSplits(1000) .setMaxInitialSplitSize(DataSize.ofBytes(5_000)); + double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight(); DeltaLakeSplitManager splitManager = setupSplitManager(addFileEntries, deltaLakeConfig); List splits = getSplits(splitManager, deltaLakeConfig); List expected = ImmutableList.of( - makeSplit(0, 5_000, fileSize), - makeSplit(5_000, 5_000, fileSize), - makeSplit(10_000, 5_000, fileSize), - makeSplit(15_000, 5_000, fileSize)); + makeSplit(0, 5_000, fileSize, minimumAssignedSplitWeight), + makeSplit(5_000, 5_000, fileSize, minimumAssignedSplitWeight), + makeSplit(10_000, 5_000, fileSize, minimumAssignedSplitWeight), + makeSplit(15_000, 5_000, fileSize, minimumAssignedSplitWeight)); assertEquals(splits, expected); } @@ -112,18 +114,19 @@ public void testNonInitialSplits() .setMaxInitialSplits(5) .setMaxInitialSplitSize(DataSize.ofBytes(5_000)) .setMaxSplitSize(DataSize.ofBytes(20_000)); + double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight(); DeltaLakeSplitManager splitManager = setupSplitManager(addFileEntries, deltaLakeConfig); List splits = getSplits(splitManager, deltaLakeConfig); List expected = ImmutableList.of( - makeSplit(0, 5_000, fileSize), - makeSplit(5_000, 5_000, fileSize), - makeSplit(10_000, 5_000, fileSize), - makeSplit(15_000, 5_000, fileSize), - makeSplit(20_000, 5_000, fileSize), - makeSplit(25_000, 20_000, fileSize), - makeSplit(45_000, 5_000, fileSize)); + makeSplit(0, 5_000, fileSize, minimumAssignedSplitWeight), + makeSplit(5_000, 5_000, fileSize, minimumAssignedSplitWeight), + makeSplit(10_000, 5_000, fileSize, minimumAssignedSplitWeight), + makeSplit(15_000, 5_000, fileSize, minimumAssignedSplitWeight), + makeSplit(20_000, 5_000, fileSize, minimumAssignedSplitWeight), + makeSplit(25_000, 20_000, fileSize, minimumAssignedSplitWeight), + makeSplit(45_000, 5_000, fileSize, minimumAssignedSplitWeight)); assertEquals(splits, expected); } @@ -139,16 +142,17 @@ public void testSplitsFromMultipleFiles() .setMaxInitialSplits(3) .setMaxInitialSplitSize(DataSize.ofBytes(2_000)) .setMaxSplitSize(DataSize.ofBytes(10_000)); + double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight(); DeltaLakeSplitManager splitManager = setupSplitManager(addFileEntries, deltaLakeConfig); List splits = getSplits(splitManager, deltaLakeConfig); List expected = ImmutableList.of( - makeSplit(0, 1_000, firstFileSize), - makeSplit(0, 2_000, secondFileSize), - makeSplit(2_000, 2_000, secondFileSize), - makeSplit(4_000, 10_000, secondFileSize), - makeSplit(14_000, 6_000, secondFileSize)); + makeSplit(0, 1_000, firstFileSize, minimumAssignedSplitWeight), + makeSplit(0, 2_000, secondFileSize, minimumAssignedSplitWeight), + makeSplit(2_000, 2_000, secondFileSize, minimumAssignedSplitWeight), + makeSplit(4_000, 10_000, secondFileSize, minimumAssignedSplitWeight), + makeSplit(14_000, 6_000, secondFileSize, minimumAssignedSplitWeight)); assertEquals(splits, expected); } @@ -171,9 +175,10 @@ private AddFileEntry addFileEntryOfSize(long fileSize) return new AddFileEntry(FILE_PATH, ImmutableMap.of(), fileSize, 0, false, Optional.empty(), Optional.empty(), ImmutableMap.of()); } - private DeltaLakeSplit makeSplit(long start, long splitSize, long fileSize) + private DeltaLakeSplit makeSplit(long start, long splitSize, long fileSize, double minimumAssignedSplitWeight) { - return new DeltaLakeSplit(FULL_PATH, start, splitSize, fileSize, 0, ImmutableList.of(), TupleDomain.all(), ImmutableMap.of()); + SplitWeight splitWeight = SplitWeight.fromProportion(Math.min(Math.max((double) fileSize / splitSize, minimumAssignedSplitWeight), 1.0)); + return new DeltaLakeSplit(FULL_PATH, start, splitSize, fileSize, 0, ImmutableList.of(), splitWeight, TupleDomain.all(), ImmutableMap.of()); } private List getSplits(DeltaLakeSplitManager splitManager, DeltaLakeConfig deltaLakeConfig)