diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java index 8ad731e93025..45f613554fa0 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java @@ -42,7 +42,6 @@ import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty; import static io.trino.spi.StandardErrorCode.INVALID_SESSION_PROPERTY; import static io.trino.spi.session.PropertyMetadata.booleanProperty; -import static io.trino.spi.session.PropertyMetadata.doubleProperty; import static io.trino.spi.session.PropertyMetadata.enumProperty; import static io.trino.spi.session.PropertyMetadata.integerProperty; import static io.trino.spi.session.PropertyMetadata.stringProperty; @@ -115,7 +114,6 @@ public final class HiveSessionProperties private static final String ICEBERG_CATALOG_NAME = "iceberg_catalog_name"; public static final String DELTA_LAKE_CATALOG_NAME = "delta_lake_catalog_name"; public static final String SIZE_BASED_SPLIT_WEIGHTS_ENABLED = "size_based_split_weights_enabled"; - public static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight"; public static final String NON_TRANSACTIONAL_OPTIMIZE_ENABLED = "non_transactional_optimize_enabled"; private final List> sessionProperties; @@ -478,16 +476,6 @@ public HiveSessionProperties( "Enable estimating split weights based on size in bytes", hiveConfig.isSizeBasedSplitWeightsEnabled(), false), - doubleProperty( - MINIMUM_ASSIGNED_SPLIT_WEIGHT, - "Minimum assigned split weight when size based split weighting is enabled", - hiveConfig.getMinimumAssignedSplitWeight(), - value -> { - if (!Double.isFinite(value) || value <= 0 || value > 1) { - throw new TrinoException(INVALID_SESSION_PROPERTY, format("%s must be > 0 and <= 1.0: %s", MINIMUM_ASSIGNED_SPLIT_WEIGHT, value)); - } - }, - false), booleanProperty( NON_TRANSACTIONAL_OPTIMIZE_ENABLED, "Enable OPTIMIZE table procedure", @@ -817,11 +805,6 @@ public static boolean isSizeBasedSplitWeightsEnabled(ConnectorSession session) return session.getProperty(SIZE_BASED_SPLIT_WEIGHTS_ENABLED, Boolean.class); } - public static double getMinimumAssignedSplitWeight(ConnectorSession session) - { - return session.getProperty(MINIMUM_ASSIGNED_SPLIT_WEIGHT, Double.class); - } - public static boolean isNonTransactionalOptimizeEnabled(ConnectorSession session) { return session.getProperty(NON_TRANSACTIONAL_OPTIMIZE_ENABLED, Boolean.class); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java index a1d9d8909c84..ff68c886e528 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java @@ -100,6 +100,7 @@ public class HiveSplitManager private final Executor executor; private final int maxOutstandingSplits; private final DataSize maxOutstandingSplitsSize; + private final double minimumAssignedSplitWeight; private final int minPartitionBatchSize; private final int maxPartitionBatchSize; private final int maxInitialSplits; @@ -129,6 +130,7 @@ public HiveSplitManager( new CounterStat(), hiveConfig.getMaxOutstandingSplits(), hiveConfig.getMaxOutstandingSplitsSize(), + hiveConfig.getMinimumAssignedSplitWeight(), hiveConfig.getMinPartitionBatchSize(), hiveConfig.getMaxPartitionBatchSize(), hiveConfig.getMaxInitialSplits(), @@ -147,6 +149,7 @@ public HiveSplitManager( CounterStat highMemorySplitSourceCounter, int maxOutstandingSplits, DataSize maxOutstandingSplitsSize, + double minimumAssignedSplitWeight, int minPartitionBatchSize, int maxPartitionBatchSize, int maxInitialSplits, @@ -164,6 +167,7 @@ public HiveSplitManager( checkArgument(maxOutstandingSplits >= 1, "maxOutstandingSplits must be at least 1"); this.maxOutstandingSplits = maxOutstandingSplits; this.maxOutstandingSplitsSize = maxOutstandingSplitsSize; + this.minimumAssignedSplitWeight = minimumAssignedSplitWeight; this.minPartitionBatchSize = minPartitionBatchSize; this.maxPartitionBatchSize = maxPartitionBatchSize; this.maxInitialSplits = maxInitialSplits; @@ -263,6 +267,7 @@ public ConnectorSplitSource getSplits( maxInitialSplits, maxOutstandingSplits, maxOutstandingSplitsSize, + minimumAssignedSplitWeight, maxSplitsPerSecond, hiveSplitLoader, executor, @@ -277,6 +282,7 @@ public ConnectorSplitSource getSplits( maxInitialSplits, maxOutstandingSplits, maxOutstandingSplitsSize, + minimumAssignedSplitWeight, maxSplitsPerSecond, hiveSplitLoader, executor, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java index 5ab32345f6f7..6fa169bbd50d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java @@ -58,7 +58,6 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR; import static io.trino.plugin.hive.HiveSessionProperties.getMaxInitialSplitSize; import static io.trino.plugin.hive.HiveSessionProperties.getMaxSplitSize; -import static io.trino.plugin.hive.HiveSessionProperties.getMinimumAssignedSplitWeight; import static io.trino.plugin.hive.HiveSessionProperties.isSizeBasedSplitWeightsEnabled; import static io.trino.plugin.hive.HiveSplitSource.StateKind.CLOSED; import static io.trino.plugin.hive.HiveSplitSource.StateKind.FAILED; @@ -104,6 +103,7 @@ private HiveSplitSource( PerBucket queues, int maxInitialSplits, DataSize maxOutstandingSplitsSize, + double minimumAssignedSplitWeight, HiveSplitLoader splitLoader, AtomicReference stateReference, CounterStat highMemorySplitSourceCounter, @@ -123,7 +123,7 @@ private HiveSplitSource( this.maxInitialSplitSize = getMaxInitialSplitSize(session); this.remainingInitialSplits = new AtomicInteger(maxInitialSplits); this.numberOfProcessedSplits = new AtomicLong(0); - this.splitWeightProvider = isSizeBasedSplitWeightsEnabled(session) ? new SizeBasedSplitWeightProvider(getMinimumAssignedSplitWeight(session), maxSplitSize) : HiveSplitWeightProvider.uniformStandardWeightProvider(); + this.splitWeightProvider = isSizeBasedSplitWeightsEnabled(session) ? new SizeBasedSplitWeightProvider(minimumAssignedSplitWeight, maxSplitSize) : HiveSplitWeightProvider.uniformStandardWeightProvider(); this.recordScannedFiles = recordScannedFiles; } @@ -134,6 +134,7 @@ public static HiveSplitSource allAtOnce( int maxInitialSplits, int maxOutstandingSplits, DataSize maxOutstandingSplitsSize, + double minimumAssignedSplitWeight, int maxSplitsPerSecond, HiveSplitLoader splitLoader, Executor executor, @@ -178,6 +179,7 @@ public boolean isFinished(OptionalInt bucketNumber) }, maxInitialSplits, maxOutstandingSplitsSize, + minimumAssignedSplitWeight, splitLoader, stateReference, highMemorySplitSourceCounter, @@ -191,6 +193,7 @@ public static HiveSplitSource bucketed( int estimatedOutstandingSplitsPerBucket, int maxInitialSplits, DataSize maxOutstandingSplitsSize, + double minimumAssignedSplitWeight, int maxSplitsPerSecond, HiveSplitLoader splitLoader, Executor executor, @@ -255,6 +258,7 @@ public AsyncQueue queueFor(OptionalInt bucketNumber) }, maxInitialSplits, maxOutstandingSplitsSize, + minimumAssignedSplitWeight, splitLoader, stateReference, highMemorySplitSourceCounter, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java index 31484474f1a4..2fa805bfe863 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java @@ -889,6 +889,7 @@ public Optional getMaterializedView(Connect new CounterStat(), 100, hiveConfig.getMaxOutstandingSplitsSize(), + hiveConfig.getMinimumAssignedSplitWeight(), hiveConfig.getMinPartitionBatchSize(), hiveConfig.getMaxPartitionBatchSize(), hiveConfig.getMaxInitialSplits(), diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java index e9e9356fde86..fb8dc766144c 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java @@ -231,6 +231,7 @@ protected void setup(String host, int port, String databaseName, boolean s3Selec new CounterStat(), config.getMaxOutstandingSplits(), config.getMaxOutstandingSplitsSize(), + config.getMinimumAssignedSplitWeight(), config.getMinPartitionBatchSize(), config.getMaxPartitionBatchSize(), config.getMaxInitialSplits(), diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java index 18e5c163e45a..281a0284dadc 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java @@ -1188,6 +1188,7 @@ private HiveSplitSource hiveSplitSource(HiveSplitLoader hiveSplitLoader) 1, 1, DataSize.of(32, MEGABYTE), + new HiveConfig().getMinimumAssignedSplitWeight(), Integer.MAX_VALUE, hiveSplitLoader, executor, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplitSource.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplitSource.java index 1f1a393fbbd3..840b9a987e0a 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplitSource.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplitSource.java @@ -55,6 +55,7 @@ public void testOutstandingSplitCount() 10, 10, DataSize.of(1, MEGABYTE), + new HiveConfig().getMinimumAssignedSplitWeight(), Integer.MAX_VALUE, new TestingHiveSplitLoader(), Executors.newFixedThreadPool(5), @@ -90,6 +91,7 @@ public void testDynamicPartitionPruning() 10, 10, DataSize.of(1, MEGABYTE), + new HiveConfig().getMinimumAssignedSplitWeight(), Integer.MAX_VALUE, new TestingHiveSplitLoader(), Executors.newFixedThreadPool(5), @@ -116,6 +118,7 @@ public void testCorrectlyGeneratingInitialRowId() 10, 10, DataSize.of(1, MEGABYTE), + new HiveConfig().getMinimumAssignedSplitWeight(), Integer.MAX_VALUE, new TestingHiveSplitLoader(), Executors.newFixedThreadPool(5), @@ -145,6 +148,7 @@ public void testEvenlySizedSplitRemainder() 10, 10, DataSize.of(1, MEGABYTE), + new HiveConfig().getMinimumAssignedSplitWeight(), Integer.MAX_VALUE, new TestingHiveSplitLoader(), Executors.newSingleThreadExecutor(), @@ -173,6 +177,7 @@ public void testFail() 10, 10, DataSize.of(1, MEGABYTE), + new HiveConfig().getMinimumAssignedSplitWeight(), Integer.MAX_VALUE, new TestingHiveSplitLoader(), Executors.newFixedThreadPool(5), @@ -224,6 +229,7 @@ public void testReaderWaitsForSplits() 10, 10, DataSize.of(1, MEGABYTE), + new HiveConfig().getMinimumAssignedSplitWeight(), Integer.MAX_VALUE, new TestingHiveSplitLoader(), Executors.newFixedThreadPool(5), @@ -279,6 +285,7 @@ public void testOutstandingSplitSize() 10, 10000, maxOutstandingSplitsSize, + new HiveConfig().getMinimumAssignedSplitWeight(), Integer.MAX_VALUE, new TestingHiveSplitLoader(), Executors.newFixedThreadPool(5), @@ -313,6 +320,7 @@ public void testEmptyBucket() 10, 10, DataSize.of(1, MEGABYTE), + new HiveConfig().getMinimumAssignedSplitWeight(), Integer.MAX_VALUE, new TestingHiveSplitLoader(), Executors.newFixedThreadPool(5), diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java index 1adac3626ddb..39923d934a1f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java @@ -73,7 +73,6 @@ public final class IcebergSessionProperties private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled"; private static final String TARGET_MAX_FILE_SIZE = "target_max_file_size"; private static final String HIVE_CATALOG_NAME = "hive_catalog_name"; - private static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight"; public static final String EXPIRE_SNAPSHOTS_MIN_RETENTION = "expire_snapshots_min_retention"; public static final String REMOVE_ORPHAN_FILES_MIN_RETENTION = "remove_orphan_files_min_retention"; @@ -230,11 +229,6 @@ public IcebergSessionProperties( // Session-level redirections configuration does not work well with views, as view body is analyzed in context // of a session with properties stripped off. Thus, this property is more of a test-only, or at most POC usefulness. true)) - .add(doubleProperty( - MINIMUM_ASSIGNED_SPLIT_WEIGHT, - "Minimum assigned split weight", - icebergConfig.getMinimumAssignedSplitWeight(), - false)) .add(durationProperty( EXPIRE_SNAPSHOTS_MIN_RETENTION, "Minimal retention period for expire_snapshot procedure", @@ -400,9 +394,4 @@ public static Duration getRemoveOrphanFilesMinRetention(ConnectorSession session { return session.getProperty(REMOVE_ORPHAN_FILES_MIN_RETENTION, Duration.class); } - - public static double getMinimumAssignedSplitWeight(ConnectorSession session) - { - return session.getProperty(MINIMUM_ASSIGNED_SPLIT_WEIGHT, Double.class); - } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java index b0af1965c900..064a36290a4e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java @@ -31,7 +31,6 @@ import javax.inject.Inject; import static io.trino.plugin.iceberg.IcebergSessionProperties.getDynamicFilteringWaitTimeout; -import static io.trino.plugin.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight; import static java.util.Objects.requireNonNull; public class IcebergSplitManager @@ -39,12 +38,15 @@ public class IcebergSplitManager { public static final int ICEBERG_DOMAIN_COMPACTION_THRESHOLD = 1000; + private final double minimumAssignedSplitWeight; private final IcebergTransactionManager transactionManager; private final TypeManager typeManager; @Inject - public IcebergSplitManager(IcebergTransactionManager transactionManager, TypeManager typeManager) + public IcebergSplitManager(IcebergConfig icebergConfig, IcebergTransactionManager transactionManager, TypeManager typeManager) { + requireNonNull(icebergConfig, "icebergConfig is null"); + this.minimumAssignedSplitWeight = icebergConfig.getMinimumAssignedSplitWeight(); this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); } @@ -81,7 +83,7 @@ public ConnectorSplitSource getSplits( constraint, typeManager, table.isRecordScannedFiles(), - getMinimumAssignedSplitWeight(session)); + minimumAssignedSplitWeight); return new ClassLoaderSafeConnectorSplitSource(splitSource, Thread.currentThread().getContextClassLoader()); }