Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty;
import static io.trino.spi.StandardErrorCode.INVALID_SESSION_PROPERTY;
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.doubleProperty;
import static io.trino.spi.session.PropertyMetadata.enumProperty;
import static io.trino.spi.session.PropertyMetadata.integerProperty;
import static io.trino.spi.session.PropertyMetadata.stringProperty;
Expand Down Expand Up @@ -115,7 +114,6 @@ public final class HiveSessionProperties
private static final String ICEBERG_CATALOG_NAME = "iceberg_catalog_name";
public static final String DELTA_LAKE_CATALOG_NAME = "delta_lake_catalog_name";
public static final String SIZE_BASED_SPLIT_WEIGHTS_ENABLED = "size_based_split_weights_enabled";
public static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight";
public static final String NON_TRANSACTIONAL_OPTIMIZE_ENABLED = "non_transactional_optimize_enabled";

private final List<PropertyMetadata<?>> sessionProperties;
Expand Down Expand Up @@ -478,16 +476,6 @@ public HiveSessionProperties(
"Enable estimating split weights based on size in bytes",
hiveConfig.isSizeBasedSplitWeightsEnabled(),
false),
doubleProperty(
MINIMUM_ASSIGNED_SPLIT_WEIGHT,
"Minimum assigned split weight when size based split weighting is enabled",
hiveConfig.getMinimumAssignedSplitWeight(),
value -> {
if (!Double.isFinite(value) || value <= 0 || value > 1) {
throw new TrinoException(INVALID_SESSION_PROPERTY, format("%s must be > 0 and <= 1.0: %s", MINIMUM_ASSIGNED_SPLIT_WEIGHT, value));
}
},
false),
booleanProperty(
NON_TRANSACTIONAL_OPTIMIZE_ENABLED,
"Enable OPTIMIZE table procedure",
Expand Down Expand Up @@ -817,11 +805,6 @@ public static boolean isSizeBasedSplitWeightsEnabled(ConnectorSession session)
return session.getProperty(SIZE_BASED_SPLIT_WEIGHTS_ENABLED, Boolean.class);
}

public static double getMinimumAssignedSplitWeight(ConnectorSession session)
{
return session.getProperty(MINIMUM_ASSIGNED_SPLIT_WEIGHT, Double.class);
}

public static boolean isNonTransactionalOptimizeEnabled(ConnectorSession session)
{
return session.getProperty(NON_TRANSACTIONAL_OPTIMIZE_ENABLED, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public class HiveSplitManager
private final Executor executor;
private final int maxOutstandingSplits;
private final DataSize maxOutstandingSplitsSize;
private final double minimumAssignedSplitWeight;
private final int minPartitionBatchSize;
private final int maxPartitionBatchSize;
private final int maxInitialSplits;
Expand Down Expand Up @@ -129,6 +130,7 @@ public HiveSplitManager(
new CounterStat(),
hiveConfig.getMaxOutstandingSplits(),
hiveConfig.getMaxOutstandingSplitsSize(),
hiveConfig.getMinimumAssignedSplitWeight(),
hiveConfig.getMinPartitionBatchSize(),
hiveConfig.getMaxPartitionBatchSize(),
hiveConfig.getMaxInitialSplits(),
Expand All @@ -147,6 +149,7 @@ public HiveSplitManager(
CounterStat highMemorySplitSourceCounter,
int maxOutstandingSplits,
DataSize maxOutstandingSplitsSize,
double minimumAssignedSplitWeight,
int minPartitionBatchSize,
int maxPartitionBatchSize,
int maxInitialSplits,
Expand All @@ -164,6 +167,7 @@ public HiveSplitManager(
checkArgument(maxOutstandingSplits >= 1, "maxOutstandingSplits must be at least 1");
this.maxOutstandingSplits = maxOutstandingSplits;
this.maxOutstandingSplitsSize = maxOutstandingSplitsSize;
this.minimumAssignedSplitWeight = minimumAssignedSplitWeight;
this.minPartitionBatchSize = minPartitionBatchSize;
this.maxPartitionBatchSize = maxPartitionBatchSize;
this.maxInitialSplits = maxInitialSplits;
Expand Down Expand Up @@ -263,6 +267,7 @@ public ConnectorSplitSource getSplits(
maxInitialSplits,
maxOutstandingSplits,
maxOutstandingSplitsSize,
minimumAssignedSplitWeight,
maxSplitsPerSecond,
hiveSplitLoader,
executor,
Expand All @@ -277,6 +282,7 @@ public ConnectorSplitSource getSplits(
maxInitialSplits,
maxOutstandingSplits,
maxOutstandingSplitsSize,
minimumAssignedSplitWeight,
maxSplitsPerSecond,
hiveSplitLoader,
executor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR;
import static io.trino.plugin.hive.HiveSessionProperties.getMaxInitialSplitSize;
import static io.trino.plugin.hive.HiveSessionProperties.getMaxSplitSize;
import static io.trino.plugin.hive.HiveSessionProperties.getMinimumAssignedSplitWeight;
import static io.trino.plugin.hive.HiveSessionProperties.isSizeBasedSplitWeightsEnabled;
import static io.trino.plugin.hive.HiveSplitSource.StateKind.CLOSED;
import static io.trino.plugin.hive.HiveSplitSource.StateKind.FAILED;
Expand Down Expand Up @@ -104,6 +103,7 @@ private HiveSplitSource(
PerBucket queues,
int maxInitialSplits,
DataSize maxOutstandingSplitsSize,
double minimumAssignedSplitWeight,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we decide to remove the session property, you could just pass a HiveSplitWeightProvider directly instead of the double value here instead.

HiveSplitLoader splitLoader,
AtomicReference<State> stateReference,
CounterStat highMemorySplitSourceCounter,
Expand All @@ -123,7 +123,7 @@ private HiveSplitSource(
this.maxInitialSplitSize = getMaxInitialSplitSize(session);
this.remainingInitialSplits = new AtomicInteger(maxInitialSplits);
this.numberOfProcessedSplits = new AtomicLong(0);
this.splitWeightProvider = isSizeBasedSplitWeightsEnabled(session) ? new SizeBasedSplitWeightProvider(getMinimumAssignedSplitWeight(session), maxSplitSize) : HiveSplitWeightProvider.uniformStandardWeightProvider();
this.splitWeightProvider = isSizeBasedSplitWeightsEnabled(session) ? new SizeBasedSplitWeightProvider(minimumAssignedSplitWeight, maxSplitSize) : HiveSplitWeightProvider.uniformStandardWeightProvider();
this.recordScannedFiles = recordScannedFiles;
}

Expand All @@ -134,6 +134,7 @@ public static HiveSplitSource allAtOnce(
int maxInitialSplits,
int maxOutstandingSplits,
DataSize maxOutstandingSplitsSize,
double minimumAssignedSplitWeight,
int maxSplitsPerSecond,
HiveSplitLoader splitLoader,
Executor executor,
Expand Down Expand Up @@ -178,6 +179,7 @@ public boolean isFinished(OptionalInt bucketNumber)
},
maxInitialSplits,
maxOutstandingSplitsSize,
minimumAssignedSplitWeight,
splitLoader,
stateReference,
highMemorySplitSourceCounter,
Expand All @@ -191,6 +193,7 @@ public static HiveSplitSource bucketed(
int estimatedOutstandingSplitsPerBucket,
int maxInitialSplits,
DataSize maxOutstandingSplitsSize,
double minimumAssignedSplitWeight,
int maxSplitsPerSecond,
HiveSplitLoader splitLoader,
Executor executor,
Expand Down Expand Up @@ -255,6 +258,7 @@ public AsyncQueue<InternalHiveSplit> queueFor(OptionalInt bucketNumber)
},
maxInitialSplits,
maxOutstandingSplitsSize,
minimumAssignedSplitWeight,
splitLoader,
stateReference,
highMemorySplitSourceCounter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,7 @@ public Optional<ConnectorMaterializedViewDefinition> getMaterializedView(Connect
new CounterStat(),
100,
hiveConfig.getMaxOutstandingSplitsSize(),
hiveConfig.getMinimumAssignedSplitWeight(),
hiveConfig.getMinPartitionBatchSize(),
hiveConfig.getMaxPartitionBatchSize(),
hiveConfig.getMaxInitialSplits(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ protected void setup(String host, int port, String databaseName, boolean s3Selec
new CounterStat(),
config.getMaxOutstandingSplits(),
config.getMaxOutstandingSplitsSize(),
config.getMinimumAssignedSplitWeight(),
config.getMinPartitionBatchSize(),
config.getMaxPartitionBatchSize(),
config.getMaxInitialSplits(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,7 @@ private HiveSplitSource hiveSplitSource(HiveSplitLoader hiveSplitLoader)
1,
1,
DataSize.of(32, MEGABYTE),
new HiveConfig().getMinimumAssignedSplitWeight(),
Integer.MAX_VALUE,
hiveSplitLoader,
executor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public void testOutstandingSplitCount()
10,
10,
DataSize.of(1, MEGABYTE),
new HiveConfig().getMinimumAssignedSplitWeight(),
Integer.MAX_VALUE,
new TestingHiveSplitLoader(),
Executors.newFixedThreadPool(5),
Expand Down Expand Up @@ -90,6 +91,7 @@ public void testDynamicPartitionPruning()
10,
10,
DataSize.of(1, MEGABYTE),
new HiveConfig().getMinimumAssignedSplitWeight(),
Integer.MAX_VALUE,
new TestingHiveSplitLoader(),
Executors.newFixedThreadPool(5),
Expand All @@ -116,6 +118,7 @@ public void testCorrectlyGeneratingInitialRowId()
10,
10,
DataSize.of(1, MEGABYTE),
new HiveConfig().getMinimumAssignedSplitWeight(),
Integer.MAX_VALUE,
new TestingHiveSplitLoader(),
Executors.newFixedThreadPool(5),
Expand Down Expand Up @@ -145,6 +148,7 @@ public void testEvenlySizedSplitRemainder()
10,
10,
DataSize.of(1, MEGABYTE),
new HiveConfig().getMinimumAssignedSplitWeight(),
Integer.MAX_VALUE,
new TestingHiveSplitLoader(),
Executors.newSingleThreadExecutor(),
Expand Down Expand Up @@ -173,6 +177,7 @@ public void testFail()
10,
10,
DataSize.of(1, MEGABYTE),
new HiveConfig().getMinimumAssignedSplitWeight(),
Integer.MAX_VALUE,
new TestingHiveSplitLoader(),
Executors.newFixedThreadPool(5),
Expand Down Expand Up @@ -224,6 +229,7 @@ public void testReaderWaitsForSplits()
10,
10,
DataSize.of(1, MEGABYTE),
new HiveConfig().getMinimumAssignedSplitWeight(),
Integer.MAX_VALUE,
new TestingHiveSplitLoader(),
Executors.newFixedThreadPool(5),
Expand Down Expand Up @@ -279,6 +285,7 @@ public void testOutstandingSplitSize()
10,
10000,
maxOutstandingSplitsSize,
new HiveConfig().getMinimumAssignedSplitWeight(),
Integer.MAX_VALUE,
new TestingHiveSplitLoader(),
Executors.newFixedThreadPool(5),
Expand Down Expand Up @@ -313,6 +320,7 @@ public void testEmptyBucket()
10,
10,
DataSize.of(1, MEGABYTE),
new HiveConfig().getMinimumAssignedSplitWeight(),
Integer.MAX_VALUE,
new TestingHiveSplitLoader(),
Executors.newFixedThreadPool(5),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ public final class IcebergSessionProperties
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled";
private static final String TARGET_MAX_FILE_SIZE = "target_max_file_size";
private static final String HIVE_CATALOG_NAME = "hive_catalog_name";
private static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight";
public static final String EXPIRE_SNAPSHOTS_MIN_RETENTION = "expire_snapshots_min_retention";
public static final String REMOVE_ORPHAN_FILES_MIN_RETENTION = "remove_orphan_files_min_retention";

Expand Down Expand Up @@ -230,11 +229,6 @@ public IcebergSessionProperties(
// Session-level redirections configuration does not work well with views, as view body is analyzed in context
// of a session with properties stripped off. Thus, this property is more of a test-only, or at most POC usefulness.
true))
.add(doubleProperty(
MINIMUM_ASSIGNED_SPLIT_WEIGHT,
"Minimum assigned split weight",
icebergConfig.getMinimumAssignedSplitWeight(),
false))
.add(durationProperty(
EXPIRE_SNAPSHOTS_MIN_RETENTION,
"Minimal retention period for expire_snapshot procedure",
Expand Down Expand Up @@ -400,9 +394,4 @@ public static Duration getRemoveOrphanFilesMinRetention(ConnectorSession session
{
return session.getProperty(REMOVE_ORPHAN_FILES_MIN_RETENTION, Duration.class);
}

public static double getMinimumAssignedSplitWeight(ConnectorSession session)
{
return session.getProperty(MINIMUM_ASSIGNED_SPLIT_WEIGHT, Double.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,22 @@
import javax.inject.Inject;

import static io.trino.plugin.iceberg.IcebergSessionProperties.getDynamicFilteringWaitTimeout;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
import static java.util.Objects.requireNonNull;

public class IcebergSplitManager
implements ConnectorSplitManager
{
public static final int ICEBERG_DOMAIN_COMPACTION_THRESHOLD = 1000;

private final double minimumAssignedSplitWeight;
private final IcebergTransactionManager transactionManager;
private final TypeManager typeManager;

@Inject
public IcebergSplitManager(IcebergTransactionManager transactionManager, TypeManager typeManager)
public IcebergSplitManager(IcebergConfig icebergConfig, IcebergTransactionManager transactionManager, TypeManager typeManager)
{
requireNonNull(icebergConfig, "icebergConfig is null");
this.minimumAssignedSplitWeight = icebergConfig.getMinimumAssignedSplitWeight();
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
}
Expand Down Expand Up @@ -81,7 +83,7 @@ public ConnectorSplitSource getSplits(
constraint,
typeManager,
table.isRecordScannedFiles(),
getMinimumAssignedSplitWeight(session));
minimumAssignedSplitWeight);

return new ClassLoaderSafeConnectorSplitSource(splitSource, Thread.currentThread().getContextClassLoader());
}
Expand Down