Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions docs/src/main/sphinx/connector/delta-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -1162,8 +1162,7 @@ keep a backup of the original values if you change them.
- `Integer.MAX_VALUE`
* - `delta.max-split-size`
- Sets the largest [](prop-type-data-size) for a single read section
assigned to a worker after `max-initial-splits` have been processed. You can
also use the corresponding catalog session property
assigned to a worker. You can also use the corresponding catalog session property
`<catalog-name>.max_split_size`.
- `64MB`
* - `delta.minimum-assigned-split-weight`
Expand Down
10 changes: 0 additions & 10 deletions docs/src/main/sphinx/connector/hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -1293,16 +1293,6 @@ cause instability and performance degradation.
be used to reduce the load on the storage system. By default, there is no
limit, which results in Trino maximizing the parallelization of data access.
-
* - `hive.max-initial-splits`
- For each table scan, the coordinator first assigns file sections of up to
`max-initial-split-size`. After `max-initial-splits` have been assigned,
`max-split-size` is used for the remaining splits.
- `200`
* - `hive.max-initial-split-size`
- The size of a single file section assigned to a worker until
`max-initial-splits` have been assigned. Smaller splits results in more
parallelism, which gives a boost to smaller queries.
- `32 MB`
* - `hive.max-split-size`
- The largest size of a single file section assigned to a worker. Smaller
splits result in more parallelism and thus can decrease latency, but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_PARTITION_VALUE;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
import static io.trino.plugin.hive.HiveSessionProperties.getMaxInitialSplitSize;
import static io.trino.plugin.hive.HiveSessionProperties.getMaxSplitSize;
import static io.trino.plugin.hive.HiveSessionProperties.isForceLocalScheduling;
import static io.trino.plugin.hive.HiveSessionProperties.isValidateBucketing;
import static io.trino.plugin.hive.HiveStorageFormat.TEXTFILE;
Expand Down Expand Up @@ -427,7 +427,7 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)
partition.getHiveColumnCoercions(),
Optional.empty(),
Optional.empty(),
getMaxInitialSplitSize(session),
getMaxSplitSize(session),
isForceLocalScheduling(session),
maxSplitFileSize);

Expand Down Expand Up @@ -479,7 +479,7 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)
partition.getHiveColumnCoercions(),
bucketConversionRequiresWorkerParticipation ? bucketConversion : Optional.empty(),
bucketValidation,
getMaxInitialSplitSize(session),
getMaxSplitSize(session),
isForceLocalScheduling(session),
maxSplitFileSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
"hive.s3select-pushdown.enabled",
"hive.s3select-pushdown.experimental-textfile-pushdown-enabled",
"hive.s3select-pushdown.max-connections",
"hive.max-initial-splits",
"hive.max-initial-split-size"
})
public class HiveConfig
{
Expand All @@ -78,10 +80,8 @@ public class HiveConfig
private int maxSplitIteratorThreads = 1_000;
private int minPartitionBatchSize = 10;
private int maxPartitionBatchSize = 100;
private int maxInitialSplits = 200;
private int splitLoaderConcurrency = 64;
private Integer maxSplitsPerSecond;
private DataSize maxInitialSplitSize;
private int domainCompactionThreshold = 1000;
private boolean forceLocalScheduling;
private boolean recursiveDirWalkerEnabled;
Expand Down Expand Up @@ -189,33 +189,6 @@ public HiveConfig setSingleStatementWritesOnly(boolean singleStatementWritesOnly
return this;
}

public int getMaxInitialSplits()
{
return maxInitialSplits;
}

@Config("hive.max-initial-splits")
public HiveConfig setMaxInitialSplits(int maxInitialSplits)
{
this.maxInitialSplits = maxInitialSplits;
return this;
}

public DataSize getMaxInitialSplitSize()
{
if (maxInitialSplitSize == null) {
return DataSize.ofBytes(maxSplitSize.toBytes() / 2).to(maxSplitSize.getUnit());
}
return maxInitialSplitSize;
}

@Config("hive.max-initial-split-size")
public HiveConfig setMaxInitialSplitSize(DataSize maxInitialSplitSize)
{
this.maxInitialSplitSize = maxInitialSplitSize;
return this;
}

@Min(1)
public int getSplitLoaderConcurrency()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ public final class HiveSessionProperties
private static final String PARQUET_WRITER_BATCH_SIZE = "parquet_writer_batch_size";
private static final String PARQUET_OPTIMIZED_WRITER_VALIDATION_PERCENTAGE = "parquet_optimized_writer_validation_percentage";
private static final String MAX_SPLIT_SIZE = "max_split_size";
private static final String MAX_INITIAL_SPLIT_SIZE = "max_initial_split_size";
private static final String RCFILE_OPTIMIZED_WRITER_VALIDATE = "rcfile_optimized_writer_validate";
private static final String SORTED_WRITING_ENABLED = "sorted_writing_enabled";
private static final String PROPAGATE_TABLE_SCAN_SORTING_PROPERTIES = "propagate_table_scan_sorting_properties";
Expand Down Expand Up @@ -420,11 +419,6 @@ public HiveSessionProperties(
"Max split size",
hiveConfig.getMaxSplitSize(),
true),
dataSizeProperty(
MAX_INITIAL_SPLIT_SIZE,
"Max initial split size",
hiveConfig.getMaxInitialSplitSize(),
true),
booleanProperty(
RCFILE_OPTIMIZED_WRITER_VALIDATE,
"RCFile: Validate writer files",
Expand Down Expand Up @@ -786,11 +780,6 @@ public static DataSize getMaxSplitSize(ConnectorSession session)
return session.getProperty(MAX_SPLIT_SIZE, DataSize.class);
}

public static DataSize getMaxInitialSplitSize(ConnectorSession session)
{
return session.getProperty(MAX_INITIAL_SPLIT_SIZE, DataSize.class);
}

public static boolean isRcfileOptimizedWriterValidate(ConnectorSession session)
{
return session.getProperty(RCFILE_OPTIMIZED_WRITER_VALIDATE, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ public class HiveSplitManager
private final DataSize maxOutstandingSplitsSize;
private final int minPartitionBatchSize;
private final int maxPartitionBatchSize;
private final int maxInitialSplits;
private final int splitLoaderConcurrency;
private final int maxSplitsPerSecond;
private final boolean recursiveDfsWalkerEnabled;
Expand Down Expand Up @@ -143,7 +142,6 @@ public HiveSplitManager(
hiveConfig.getMaxOutstandingSplitsSize(),
hiveConfig.getMinPartitionBatchSize(),
hiveConfig.getMaxPartitionBatchSize(),
hiveConfig.getMaxInitialSplits(),
hiveConfig.getSplitLoaderConcurrency(),
hiveConfig.getMaxSplitsPerSecond(),
hiveConfig.getRecursiveDirWalkerEnabled(),
Expand All @@ -162,7 +160,6 @@ public HiveSplitManager(
DataSize maxOutstandingSplitsSize,
int minPartitionBatchSize,
int maxPartitionBatchSize,
int maxInitialSplits,
int splitLoaderConcurrency,
@Nullable Integer maxSplitsPerSecond,
boolean recursiveDfsWalkerEnabled,
Expand All @@ -180,7 +177,6 @@ public HiveSplitManager(
this.maxOutstandingSplitsSize = maxOutstandingSplitsSize;
this.minPartitionBatchSize = minPartitionBatchSize;
this.maxPartitionBatchSize = maxPartitionBatchSize;
this.maxInitialSplits = maxInitialSplits;
this.splitLoaderConcurrency = splitLoaderConcurrency;
this.maxSplitsPerSecond = firstNonNull(maxSplitsPerSecond, Integer.MAX_VALUE);
this.recursiveDfsWalkerEnabled = recursiveDfsWalkerEnabled;
Expand Down Expand Up @@ -278,7 +274,6 @@ public ConnectorSplitSource getSplits(
session,
table.getDatabaseName(),
table.getTableName(),
maxInitialSplits,
maxOutstandingSplits,
maxOutstandingSplitsSize,
maxSplitsPerSecond,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_EXCEEDED_SPLIT_BUFFERING_LIMIT;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILE_NOT_FOUND;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR;
import static io.trino.plugin.hive.HiveSessionProperties.getMaxInitialSplitSize;
import static io.trino.plugin.hive.HiveSessionProperties.getMaxSplitSize;
import static io.trino.plugin.hive.HiveSessionProperties.getMinimumAssignedSplitWeight;
import static io.trino.plugin.hive.HiveSessionProperties.isSizeBasedSplitWeightsEnabled;
Expand All @@ -73,9 +72,7 @@ class HiveSplitSource
private final AtomicInteger bufferedInternalSplitCount = new AtomicInteger();
private final long maxOutstandingSplitsBytes;

private final DataSize maxSplitSize;
private final DataSize maxInitialSplitSize;
private final AtomicInteger remainingInitialSplits;
private final long maxSplitBytes;

private final HiveSplitLoader splitLoader;
private final AtomicReference<State> stateReference;
Expand All @@ -95,7 +92,6 @@ private HiveSplitSource(
String databaseName,
String tableName,
PerBucket queues,
int maxInitialSplits,
DataSize maxOutstandingSplitsSize,
HiveSplitLoader splitLoader,
AtomicReference<State> stateReference,
Expand All @@ -113,10 +109,8 @@ private HiveSplitSource(
this.stateReference = requireNonNull(stateReference, "stateReference is null");
this.highMemorySplitSourceCounter = requireNonNull(highMemorySplitSourceCounter, "highMemorySplitSourceCounter is null");

this.maxSplitSize = getMaxSplitSize(session);
this.maxInitialSplitSize = getMaxInitialSplitSize(session);
this.remainingInitialSplits = new AtomicInteger(maxInitialSplits);
this.splitWeightProvider = isSizeBasedSplitWeightsEnabled(session) ? new SizeBasedSplitWeightProvider(getMinimumAssignedSplitWeight(session), maxSplitSize) : HiveSplitWeightProvider.uniformStandardWeightProvider();
this.maxSplitBytes = getMaxSplitSize(session).toBytes();
this.splitWeightProvider = isSizeBasedSplitWeightsEnabled(session) ? new SizeBasedSplitWeightProvider(getMinimumAssignedSplitWeight(session), getMaxSplitSize(session)) : HiveSplitWeightProvider.uniformStandardWeightProvider();
this.cachingHostAddressProvider = requireNonNull(cachingHostAddressProvider, "cachingHostAddressProvider is null");
this.recordScannedFiles = recordScannedFiles;
}
Expand All @@ -125,7 +119,6 @@ public static HiveSplitSource allAtOnce(
ConnectorSession session,
String databaseName,
String tableName,
int maxInitialSplits,
int maxOutstandingSplits,
DataSize maxOutstandingSplitsSize,
int maxSplitsPerSecond,
Expand Down Expand Up @@ -168,7 +161,6 @@ public boolean isFinished()
return queue.isFinished();
}
},
maxInitialSplits,
maxOutstandingSplitsSize,
splitLoader,
stateReference,
Expand Down Expand Up @@ -277,12 +269,6 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize)
continue;
}

long maxSplitBytes = maxSplitSize.toBytes();
if (remainingInitialSplits.get() > 0) {
if (remainingInitialSplits.getAndDecrement() > 0) {
maxSplitBytes = maxInitialSplitSize.toBytes();
}
}
InternalHiveBlock block = internalSplit.currentBlock();
long splitBytes;
if (internalSplit.isSplittable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class InternalHiveSplitFactory
private final BooleanSupplier partitionMatchSupplier;
private final Optional<BucketConversion> bucketConversion;
private final Optional<HiveSplit.BucketValidation> bucketValidation;
private final long minimumTargetSplitSizeInBytes;
private final long maxSplitSizeInBytes;
private final Optional<Long> maxSplitFileSize;
private final boolean forceLocalScheduling;

Expand All @@ -72,7 +72,7 @@ public InternalHiveSplitFactory(
Map<Integer, HiveTypeName> hiveColumnCoercions,
Optional<BucketConversion> bucketConversion,
Optional<HiveSplit.BucketValidation> bucketValidation,
DataSize minimumTargetSplitSize,
DataSize maxSplitSize,
boolean forceLocalScheduling,
Optional<Long> maxSplitFileSize)
{
Expand All @@ -86,9 +86,9 @@ public InternalHiveSplitFactory(
this.bucketConversion = requireNonNull(bucketConversion, "bucketConversion is null");
this.bucketValidation = requireNonNull(bucketValidation, "bucketValidation is null");
this.forceLocalScheduling = forceLocalScheduling;
this.minimumTargetSplitSizeInBytes = minimumTargetSplitSize.toBytes();
this.maxSplitSizeInBytes = maxSplitSize.toBytes();
this.maxSplitFileSize = requireNonNull(maxSplitFileSize, "maxSplitFileSize is null");
checkArgument(minimumTargetSplitSizeInBytes > 0, "minimumTargetSplitSize must be > 0, found: %s", minimumTargetSplitSize);
checkArgument(this.maxSplitSizeInBytes > 0, "maxTargetSplitSize must be > 0, found: %s", maxSplitSize);
}

private static Map<String, String> stripUnnecessaryProperties(Map<String, String> schema)
Expand All @@ -108,7 +108,11 @@ public String getPartitionName()
public Optional<InternalHiveSplit> createInternalHiveSplit(TrinoFileStatus status, OptionalInt readBucketNumber, OptionalInt tableBucketNumber, boolean splittable, Optional<AcidInfo> acidInfo)
{
splittable = splittable &&
status.getLength() > minimumTargetSplitSizeInBytes &&
// For some input formats, the isSplittable check is non-trivial and can
// add a significant amount of time to split generation when handling a
// large number of very small files. If file is smaller or equal to the
// maxSplitSizeInBytes, then this check can be skipped.
status.getLength() > maxSplitSizeInBytes &&
storageFormat.isSplittable(status.getPath());
return createInternalHiveSplit(
status.getPath(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ public DistributedQueryRunner build()
if (tpchBucketedCatalogEnabled) {
Map<String, String> hiveBucketedProperties = ImmutableMap.<String, String>builder()
.putAll(hiveProperties)
.put("hive.max-initial-split-size", "10kB") // so that each bucket has multiple splits
.put("hive.max-split-size", "10kB") // so that each bucket has multiple splits
.put("hive.storage-format", "TEXTFILE") // so that there's no minimum split size for the file
.buildOrThrow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ public void testCsv()
throws Exception
{
FileEntry file = new FileEntry(LOCATION, DataSize.of(2, GIGABYTE).toBytes(), Instant.now(), Optional.empty());
assertCsvSplitCount(file, Map.of(), 33);
assertCsvSplitCount(file, Map.of(HEADER_COUNT, "1"), 33);
assertCsvSplitCount(file, Map.of(), 32);
assertCsvSplitCount(file, Map.of(HEADER_COUNT, "1"), 32);
assertCsvSplitCount(file, Map.of(HEADER_COUNT, "2"), 1);
assertCsvSplitCount(file, Map.of(FOOTER_COUNT, "1"), 1);
assertCsvSplitCount(file, Map.of(HEADER_COUNT, "1", FOOTER_COUNT, "1"), 1);
Expand Down Expand Up @@ -517,7 +517,7 @@ public void testMultipleSplitsPerBucket()
HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
backgroundHiveSplitLoader.start(hiveSplitSource);

assertThat(drainSplits(hiveSplitSource).size()).isEqualTo(17);
assertThat(drainSplits(hiveSplitSource).size()).isEqualTo(16);
}

@Test
Expand Down Expand Up @@ -1280,7 +1280,6 @@ private HiveSplitSource hiveSplitSource(HiveSplitLoader hiveSplitLoader)
SIMPLE_TABLE.getDatabaseName(),
SIMPLE_TABLE.getTableName(),
1,
1,
DataSize.of(32, MEGABYTE),
Integer.MAX_VALUE,
hiveSplitLoader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ public void testDefaults()
.setPerTransactionMetastoreCacheMaximumSize(1000)
.setMinPartitionBatchSize(10)
.setMaxPartitionBatchSize(100)
.setMaxInitialSplits(200)
.setMaxInitialSplitSize(DataSize.of(32, Unit.MEGABYTE))
.setSplitLoaderConcurrency(64)
.setMaxSplitsPerSecond(null)
.setDomainCompactionThreshold(1000)
Expand Down Expand Up @@ -135,8 +133,6 @@ public void testExplicitPropertyMappings()
.put("hive.per-transaction-metastore-cache-maximum-size", "500")
.put("hive.metastore.partition-batch-size.min", "1")
.put("hive.metastore.partition-batch-size.max", "1000")
.put("hive.max-initial-splits", "10")
.put("hive.max-initial-split-size", "16MB")
.put("hive.split-loader-concurrency", "1")
.put("hive.max-splits-per-second", "1")
.put("hive.domain-compaction-threshold", "42")
Expand Down Expand Up @@ -217,8 +213,6 @@ public void testExplicitPropertyMappings()
.setPerTransactionMetastoreCacheMaximumSize(500)
.setMinPartitionBatchSize(1)
.setMaxPartitionBatchSize(1000)
.setMaxInitialSplits(10)
.setMaxInitialSplitSize(DataSize.of(16, Unit.MEGABYTE))
.setSplitLoaderConcurrency(1)
.setMaxSplitsPerSecond(1)
.setDomainCompactionThreshold(42)
Expand Down
Loading