diff --git a/docs/src/main/sphinx/connector/delta-lake.md b/docs/src/main/sphinx/connector/delta-lake.md index ca80b767a332..8a1ce602ae67 100644 --- a/docs/src/main/sphinx/connector/delta-lake.md +++ b/docs/src/main/sphinx/connector/delta-lake.md @@ -1086,17 +1086,6 @@ keep a backup of the original values if you change them. results in Trino maximizing the parallelization of data access by default. Attempting to set it higher results in Trino not being able to start. - `Integer.MAX_VALUE` -* - `delta.max-initial-splits` - - For each query, the coordinator assigns file sections to read first at the - `initial-split-size` until the `max-initial-splits` is reached. Then it - starts issuing reads of the `max-split-size` size. - - `200` -* - `delta.max-initial-split-size` - - Sets the initial [](prop-type-data-size) for a single read section - assigned to a worker until `max-initial-splits` have been processed. You can - also use the corresponding catalog session property - `.max_initial_split_size`. - - `32MB` * - `delta.max-split-size` - Sets the largest [](prop-type-data-size) for a single read section assigned to a worker after `max-initial-splits` have been processed. You can diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java index c5c983ab83c4..aded91e665bd 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java @@ -38,7 +38,10 @@ @DefunctConfig({ "delta.experimental.ignore-checkpoint-write-failures", - "delta.legacy-create-table-with-existing-location.enabled"}) + "delta.legacy-create-table-with-existing-location.enabled", + "delta.max-initial-splits", + "delta.max-initial-split-size" +}) public class DeltaLakeConfig { public static final String EXTENDED_STATISTICS_ENABLED = "delta.extended-statistics.enabled"; @@ -56,8 +59,6 @@ public class DeltaLakeConfig private int domainCompactionThreshold = 1000; private int maxOutstandingSplits = 1_000; private int maxSplitsPerSecond = Integer.MAX_VALUE; - private int maxInitialSplits = 200; - private DataSize maxInitialSplitSize; private DataSize maxSplitSize = DataSize.of(64, MEGABYTE); private double minimumAssignedSplitWeight = 0.05; private int maxPartitionsPerWriter = 100; @@ -176,34 +177,6 @@ public DeltaLakeConfig setMaxSplitsPerSecond(int maxSplitsPerSecond) return this; } - public int getMaxInitialSplits() - { - return maxInitialSplits; - } - - @Config("delta.max-initial-splits") - public DeltaLakeConfig setMaxInitialSplits(int maxInitialSplits) - { - this.maxInitialSplits = maxInitialSplits; - return this; - } - - @NotNull - public DataSize getMaxInitialSplitSize() - { - if (maxInitialSplitSize == null) { - return DataSize.ofBytes(maxSplitSize.toBytes() / 2).to(maxSplitSize.getUnit()); - } - return maxInitialSplitSize; - } - - @Config("delta.max-initial-split-size") - public DeltaLakeConfig setMaxInitialSplitSize(DataSize maxInitialSplitSize) - { - this.maxInitialSplitSize = maxInitialSplitSize; - return this; - } - @NotNull public DataSize getMaxSplitSize() { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java index 61a0f8b59b92..bfc247eceec7 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java @@ -51,7 +51,6 @@ public final class DeltaLakeSessionProperties implements SessionPropertiesProvider { public static final String MAX_SPLIT_SIZE = "max_split_size"; - public static final String MAX_INITIAL_SPLIT_SIZE = "max_initial_split_size"; public static final String VACUUM_MIN_RETENTION = "vacuum_min_retention"; private static final String HIVE_CATALOG_NAME = "hive_catalog_name"; private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size"; @@ -90,11 +89,6 @@ public DeltaLakeSessionProperties( "Max split size", deltaLakeConfig.getMaxSplitSize(), true), - dataSizeProperty( - MAX_INITIAL_SPLIT_SIZE, - "Max initial split size", - deltaLakeConfig.getMaxInitialSplitSize(), - true), durationProperty( VACUUM_MIN_RETENTION, "Minimal retention period for vacuum procedure", @@ -244,11 +238,6 @@ public static DataSize getMaxSplitSize(ConnectorSession session) return session.getProperty(MAX_SPLIT_SIZE, DataSize.class); } - public static DataSize getMaxInitialSplitSize(ConnectorSession session) - { - return session.getProperty(MAX_INITIAL_SPLIT_SIZE, DataSize.class); - } - public static Duration getVacuumMinRetention(ConnectorSession session) { return session.getProperty(VACUUM_MIN_RETENTION, Duration.class); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java index 2c24cdc7f045..369a5e34bd02 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java @@ -52,7 +52,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import static com.google.common.base.Preconditions.checkArgument; @@ -64,7 +63,6 @@ import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.pathColumnHandle; import static io.trino.plugin.deltalake.DeltaLakeMetadata.createStatisticsPredicate; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getDynamicFilteringWaitTimeout; -import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getMaxInitialSplitSize; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getMaxSplitSize; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.deserializePartitionValue; @@ -79,7 +77,6 @@ public class DeltaLakeSplitManager private final TypeManager typeManager; private final TransactionLogAccess transactionLogAccess; private final ExecutorService executor; - private final int maxInitialSplits; private final int maxSplitsPerSecond; private final int maxOutstandingSplits; private final double minimumAssignedSplitWeight; @@ -100,7 +97,6 @@ public DeltaLakeSplitManager( this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null"); this.executor = requireNonNull(executor, "executor is null"); - this.maxInitialSplits = config.getMaxInitialSplits(); this.maxSplitsPerSecond = config.getMaxSplitsPerSecond(); this.maxOutstandingSplits = config.getMaxOutstandingSplits(); this.minimumAssignedSplitWeight = config.getMinimumAssignedSplitWeight(); @@ -174,7 +170,6 @@ private Stream getSplits( tableHandle.getWriteType().isEmpty() && // When only partitioning columns projected, there is no point splitting the files mayAnyDataColumnProjected(tableHandle); - AtomicInteger remainingInitialSplits = new AtomicInteger(maxInitialSplits); Optional filesModifiedAfter = tableHandle.getAnalyzeHandle().flatMap(AnalyzeHandle::getFilesModifiedAfter); Optional maxScannedFileSizeInBytes = maxScannedFileSize.map(DataSize::toBytes); @@ -248,8 +243,7 @@ private Stream getSplits( splitPath, addAction.getCanonicalPartitionValues(), statisticsPredicate, - splittable, - remainingInitialSplits) + splittable) .stream(); }); } @@ -317,8 +311,7 @@ private List splitsForFile( String splitPath, Map> partitionKeys, TupleDomain statisticsPredicate, - boolean splittable, - AtomicInteger remainingInitialSplits) + boolean splittable) { long fileSize = addFileEntry.getSize(); @@ -341,13 +334,7 @@ private List splitsForFile( ImmutableList.Builder splits = ImmutableList.builder(); long currentOffset = 0; while (currentOffset < fileSize) { - long maxSplitSize; - if (remainingInitialSplits.get() > 0 && remainingInitialSplits.getAndDecrement() > 0) { - maxSplitSize = getMaxInitialSplitSize(session).toBytes(); - } - else { - maxSplitSize = getMaxSplitSize(session).toBytes(); - } + long maxSplitSize = getMaxSplitSize(session).toBytes(); long splitSize = Math.min(maxSplitSize, fileSize - currentOffset); splits.add(new DeltaLakeSplit( diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java index 6cc5eccd1312..ba9cef997bc6 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java @@ -47,8 +47,6 @@ public void testDefaults() .setDomainCompactionThreshold(1000) .setMaxSplitsPerSecond(Integer.MAX_VALUE) .setMaxOutstandingSplits(1_000) - .setMaxInitialSplits(200) - .setMaxInitialSplitSize(DataSize.of(32, DataSize.Unit.MEGABYTE)) .setMaxSplitSize(DataSize.of(64, DataSize.Unit.MEGABYTE)) .setMinimumAssignedSplitWeight(0.05) .setMaxPartitionsPerWriter(100) @@ -85,8 +83,6 @@ public void testExplicitPropertyMappings() .put("delta.domain-compaction-threshold", "500") .put("delta.max-outstanding-splits", "200") .put("delta.max-splits-per-second", "10") - .put("delta.max-initial-splits", "5") - .put("delta.max-initial-split-size", "1 GB") .put("delta.max-split-size", "10 MB") .put("delta.minimum-assigned-split-weight", "0.01") .put("delta.max-partitions-per-writer", "200") @@ -120,8 +116,6 @@ public void testExplicitPropertyMappings() .setDomainCompactionThreshold(500) .setMaxOutstandingSplits(200) .setMaxSplitsPerSecond(10) - .setMaxInitialSplits(5) - .setMaxInitialSplitSize(DataSize.of(1, GIGABYTE)) .setMaxSplitSize(DataSize.of(10, DataSize.Unit.MEGABYTE)) .setMinimumAssignedSplitWeight(0.01) .setMaxPartitionsPerWriter(200) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java index 55d3c7239c84..a1b68dded2ef 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java @@ -324,7 +324,6 @@ public void testReadWholePartitionSplittableFile() Session session = Session.builder(getSession()) .setCatalogSessionProperty(catalog, DeltaLakeSessionProperties.MAX_SPLIT_SIZE, "1kB") - .setCatalogSessionProperty(catalog, DeltaLakeSessionProperties.MAX_INITIAL_SPLIT_SIZE, "1kB") .build(); // Read partition column only diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java index dfbd879526d8..d0e04904bde0 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java @@ -102,37 +102,12 @@ public class TestDeltaLakeSplitManager private final HiveTransactionHandle transactionHandle = new HiveTransactionHandle(true); @Test - public void testInitialSplits() - throws ExecutionException, InterruptedException - { - long fileSize = 20_000; - List addFileEntries = ImmutableList.of(addFileEntryOfSize(fileSize)); - DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig() - .setMaxInitialSplits(1000) - .setMaxInitialSplitSize(DataSize.ofBytes(5_000)); - double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight(); - - DeltaLakeSplitManager splitManager = setupSplitManager(addFileEntries, deltaLakeConfig); - List splits = getSplits(splitManager, deltaLakeConfig); - - List expected = ImmutableList.of( - makeSplit(0, 5_000, fileSize, minimumAssignedSplitWeight), - makeSplit(5_000, 5_000, fileSize, minimumAssignedSplitWeight), - makeSplit(10_000, 5_000, fileSize, minimumAssignedSplitWeight), - makeSplit(15_000, 5_000, fileSize, minimumAssignedSplitWeight)); - - assertThat(splits).isEqualTo(expected); - } - - @Test - public void testNonInitialSplits() + public void testSplitSizes() throws ExecutionException, InterruptedException { long fileSize = 50_000; List addFileEntries = ImmutableList.of(addFileEntryOfSize(fileSize)); DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig() - .setMaxInitialSplits(5) - .setMaxInitialSplitSize(DataSize.ofBytes(5_000)) .setMaxSplitSize(DataSize.ofBytes(20_000)); double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight(); @@ -140,13 +115,9 @@ public void testNonInitialSplits() List splits = getSplits(splitManager, deltaLakeConfig); List expected = ImmutableList.of( - makeSplit(0, 5_000, fileSize, minimumAssignedSplitWeight), - makeSplit(5_000, 5_000, fileSize, minimumAssignedSplitWeight), - makeSplit(10_000, 5_000, fileSize, minimumAssignedSplitWeight), - makeSplit(15_000, 5_000, fileSize, minimumAssignedSplitWeight), - makeSplit(20_000, 5_000, fileSize, minimumAssignedSplitWeight), - makeSplit(25_000, 20_000, fileSize, minimumAssignedSplitWeight), - makeSplit(45_000, 5_000, fileSize, minimumAssignedSplitWeight)); + makeSplit(0, 20_000, fileSize, minimumAssignedSplitWeight), + makeSplit(20_000, 20_000, fileSize, minimumAssignedSplitWeight), + makeSplit(40_000, 10_000, fileSize, minimumAssignedSplitWeight)); assertThat(splits).isEqualTo(expected); } @@ -159,8 +130,6 @@ public void testSplitsFromMultipleFiles() long secondFileSize = 20_000; List addFileEntries = ImmutableList.of(addFileEntryOfSize(firstFileSize), addFileEntryOfSize(secondFileSize)); DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig() - .setMaxInitialSplits(3) - .setMaxInitialSplitSize(DataSize.ofBytes(2_000)) .setMaxSplitSize(DataSize.ofBytes(10_000)); double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight(); @@ -169,10 +138,8 @@ public void testSplitsFromMultipleFiles() List splits = getSplits(splitManager, deltaLakeConfig); List expected = ImmutableList.of( makeSplit(0, 1_000, firstFileSize, minimumAssignedSplitWeight), - makeSplit(0, 2_000, secondFileSize, minimumAssignedSplitWeight), - makeSplit(2_000, 2_000, secondFileSize, minimumAssignedSplitWeight), - makeSplit(4_000, 10_000, secondFileSize, minimumAssignedSplitWeight), - makeSplit(14_000, 6_000, secondFileSize, minimumAssignedSplitWeight)); + makeSplit(0, 10_000, secondFileSize, minimumAssignedSplitWeight), + makeSplit(10_000, 10_000, secondFileSize, minimumAssignedSplitWeight)); assertThat(splits).isEqualTo(expected); }