diff --git a/presto-docs/src/main/sphinx/cache/local.rst b/presto-docs/src/main/sphinx/cache/local.rst index c10332a311658..7d6cc21515e9e 100644 --- a/presto-docs/src/main/sphinx/cache/local.rst +++ b/presto-docs/src/main/sphinx/cache/local.rst @@ -39,8 +39,8 @@ and restart the Presto coordinator and workers: In the above example configuration, * ``hive.node-selection-strategy=SOFT_AFFINITY`` instructs Presto scheduler to take data affinity - into consideration when secheduling tasks to workers that enables meaningful data caching effectiveness. - This configuration property is defaul to ``NO_PREFERENCE`` and SDK cache is only enabled when set to ``SOFT_AFFINITY``. + into consideration when scheduling tasks to workers that enables meaningful data caching effectiveness. + This configuration property defaults to ``NO_PREFERENCE`` and SDK cache is only enabled when set to ``SOFT_AFFINITY``. Other configuration on coordinator that can impact data affinity includes ``node-scheduler.max-pending-splits-per-task`` (the max pending splits per task) and ``node-scheduler.max-splits-per-node`` (the max splits per node). @@ -48,6 +48,17 @@ In the above example configuration, * ``cache.alluxio.max-cache-size=500GB`` sets storage space to be 500GB. * ``cache.base-directory=/tmp/alluxio-cache`` specifies a local directory ``/tmp/alluxio-cache``. Note that this Presto server must have both read and write permission to access this local directory. +When affinity scheduling is enabled, a set of preferred nodes is assigned to a certain file section. The default file section size is ``256MB``. +For example, if the file size is 512MB, two different affinity preferences will be assigned: + - ``[0MB..256MB] -> NodeA, NodeB`` + - ``[256MB+1B..512MB] -> NodeC, NodeD`` + +The section is selected based on the split start offset. +A split that has its first byte in the first section is preferred to be scheduled on ``NodeA`` or ``NodeB``. + +Change the size of the section by setting the ``hive.affinity-scheduling-file-section-size`` configuration property +or the ``affinity_scheduling_file_section_size`` session property. + Monitoring ---------- diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java index 1844c3b3a1a6a..53f9d72cfea28 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java @@ -216,6 +216,7 @@ public class HiveClientConfig private Duration parquetQuickStatsFileMetadataFetchTimeout = new Duration(60, TimeUnit.SECONDS); private int parquetQuickStatsMaxConcurrentCalls = 500; private int quickStatsMaxConcurrentCalls = 100; + private DataSize affinitySchedulingFileSectionSize = new DataSize(256, MEGABYTE); @Min(0) public int getMaxInitialSplits() @@ -1802,4 +1803,17 @@ public int getMaxParallelParsingConcurrency() { return this.maxParallelParsingConcurrency; } + + @NotNull + public DataSize getAffinitySchedulingFileSectionSize() + { + return affinitySchedulingFileSectionSize; + } + + @Config("hive.affinity-scheduling-file-section-size") + public HiveClientConfig setAffinitySchedulingFileSectionSize(DataSize affinitySchedulingFileSectionSize) + { + this.affinitySchedulingFileSectionSize = affinitySchedulingFileSectionSize; + return this; + } } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveFileSplit.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveFileSplit.java index ed13d0819f84d..aef38956316a4 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveFileSplit.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveFileSplit.java @@ -38,6 +38,7 @@ public class HiveFileSplit private final long fileModifiedTime; private final Optional extraFileInfo; private final Map customSplitInfo; + private final long affinitySchedulingFileSectionIndex; /** * @param path the absolute path to the file that contains the split @@ -56,7 +57,8 @@ public HiveFileSplit( @JsonProperty("fileSize") long fileSize, @JsonProperty("fileModifiedTime") long fileModifiedTime, @JsonProperty("extraFileInfo") Optional extraFileInfo, - @JsonProperty("customSplitInfo") Map customSplitInfo) + @JsonProperty("customSplitInfo") Map customSplitInfo, + @JsonProperty("affinitySchedulingFileSectionIndex") long affinitySchedulingFileSectionIndex) { checkArgument(start >= 0, "start must be non-negative"); checkArgument(length >= 0, "length must be non-negative"); @@ -73,6 +75,7 @@ public HiveFileSplit( this.fileModifiedTime = fileModifiedTime; this.extraFileInfo = extraFileInfo; this.customSplitInfo = ImmutableMap.copyOf(customSplitInfo); + this.affinitySchedulingFileSectionIndex = affinitySchedulingFileSectionIndex; } @JsonProperty @@ -117,6 +120,12 @@ public Map getCustomSplitInfo() return customSplitInfo; } + @JsonProperty + public long getAffinitySchedulingFileSectionIndex() + { + return affinitySchedulingFileSectionIndex; + } + @Override public boolean equals(Object o) { diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java index 2c3ef7ebdaa4b..a170040b61321 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java @@ -130,6 +130,7 @@ public final class HiveSessionProperties public static final String QUICK_STATS_INLINE_BUILD_TIMEOUT = "quick_stats_inline_build_timeout"; public static final String QUICK_STATS_BACKGROUND_BUILD_TIMEOUT = "quick_stats_background_build_timeout"; public static final String DYNAMIC_SPLIT_SIZES_ENABLED = "dynamic_split_sizes_enabled"; + public static final String AFFINITY_SCHEDULING_FILE_SECTION_SIZE = "affinity_scheduling_file_section_size"; private final List> sessionProperties; @@ -629,7 +630,12 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon hiveClientConfig.getQuickStatsBackgroundBuildTimeout(), false, value -> Duration.valueOf((String) value), - Duration::toString)); + Duration::toString), + dataSizeSessionProperty( + AFFINITY_SCHEDULING_FILE_SECTION_SIZE, + "Size of file section for affinity scheduling", + hiveClientConfig.getAffinitySchedulingFileSectionSize(), + false)); } public List> getSessionProperties() @@ -1092,4 +1098,9 @@ public static Duration getQuickStatsBackgroundBuildTimeout(ConnectorSession sess { return session.getProperty(QUICK_STATS_BACKGROUND_BUILD_TIMEOUT, Duration.class); } + + public static DataSize getAffinitySchedulingFileSectionSize(ConnectorSession session) + { + return session.getProperty(AFFINITY_SCHEDULING_FILE_SECTION_SIZE, DataSize.class); + } } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplit.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplit.java index d1ecbc4726263..5ec45f2280089 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplit.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplit.java @@ -167,7 +167,7 @@ public List getAddresses() public List getPreferredNodes(NodeProvider nodeProvider) { if (getNodeSelectionStrategy() == SOFT_AFFINITY) { - return nodeProvider.get(fileSplit.getPath(), 2); + return nodeProvider.get(fileSplit.getPath() + "#" + fileSplit.getAffinitySchedulingFileSectionIndex(), 2); } return addresses; } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitSource.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitSource.java index c5ee19a28b947..2eff6cb294076 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitSource.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitSource.java @@ -54,6 +54,7 @@ import static com.facebook.presto.hive.HiveErrorCode.HIVE_EXCEEDED_SPLIT_BUFFERING_LIMIT; import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILE_NOT_FOUND; import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR; +import static com.facebook.presto.hive.HiveSessionProperties.getAffinitySchedulingFileSectionSize; import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize; import static com.facebook.presto.hive.HiveSessionProperties.getMaxSplitSize; import static com.facebook.presto.hive.HiveSessionProperties.getMinimumAssignedSplitWeight; @@ -101,6 +102,7 @@ class HiveSplitSource private final AtomicBoolean loggedHighMemoryWarning = new AtomicBoolean(); private final HiveSplitWeightProvider splitWeightProvider; private final double splitScanRatio; + private final long affinitySchedulingFileSectionSizeInBytes; private HiveSplitSource( ConnectorSession session, @@ -140,6 +142,7 @@ private HiveSplitSource( splitScanRatio = 1.0; } this.splitScanRatio = max(min(splitScanRatio, 1.0), 0.1); + affinitySchedulingFileSectionSizeInBytes = getAffinitySchedulingFileSectionSize(session).toBytes(); } public static HiveSplitSource allAtOnce( @@ -517,7 +520,8 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) { internalSplit.getFileSize(), internalSplit.getFileModifiedTime(), internalSplit.getExtraFileInfo(), - internalSplit.getCustomSplitInfo()); + internalSplit.getCustomSplitInfo(), + internalSplit.getStart() / affinitySchedulingFileSectionSizeInBytes); resultBuilder.add(new HiveSplit( fileSplit, diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestDynamicPruning.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestDynamicPruning.java index 49601aae77e31..6890a7e3c316c 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestDynamicPruning.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestDynamicPruning.java @@ -133,7 +133,8 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle outputFile.length(), Instant.now().toEpochMilli(), Optional.empty(), - ImmutableMap.of()); + ImmutableMap.of(), + 0); HiveSplit split = new HiveSplit( fileSplit, diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestGenericHiveRecordCursorProvider.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestGenericHiveRecordCursorProvider.java index 70cb117aa2950..44229433a98f9 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestGenericHiveRecordCursorProvider.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestGenericHiveRecordCursorProvider.java @@ -138,7 +138,8 @@ private static HiveFileSplit getHiveFileSplit(Class fileSpl CUSTOM_FILE_SPLIT_CLASS_KEY, HoodieRealtimeFileSplit.class.getName(), HUDI_DELTA_FILEPATHS_KEY, "", HUDI_BASEPATH_KEY, getTableBasePath(TABLE_NAME), - HUDI_MAX_COMMIT_TIME_KEY, "20210524095413")); + HUDI_MAX_COMMIT_TIME_KEY, "20210524095413"), + 0); case "org.apache.hudi.hadoop.realtime.HoodieRealtimeBootstrapBaseFileSplit": ImmutableMap.Builder customSplitInfo = new ImmutableMap.Builder<>(); customSplitInfo.put(CUSTOM_FILE_SPLIT_CLASS_KEY, HoodieRealtimeBootstrapBaseFileSplit.class.getName()); @@ -149,13 +150,14 @@ HUDI_BASEPATH_KEY, getTableBasePath(TABLE_NAME), customSplitInfo.put(BOOTSTRAP_FILE_SPLIT_START, "0"); customSplitInfo.put(BOOTSTRAP_FILE_SPLIT_LEN, "435165"); return new HiveFileSplit( - getTableBasePath(TABLE_NAME) + "/testPartition/" + FILE_NAME, - 0, - 435165, - 435165, - 1621850079, - Optional.empty(), - customSplitInfo.build()); + getTableBasePath(TABLE_NAME) + "/testPartition/" + FILE_NAME, + 0, + 435165, + 435165, + 1621850079, + Optional.empty(), + customSplitInfo.build(), + 0); default: throw new IllegalArgumentException("Unknown file split class " + fileSplitClass.getName()); } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java index 64c36a8b4c2c9..4f4947f2188ef 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java @@ -38,6 +38,7 @@ import static com.facebook.presto.hive.HiveStorageFormat.ORC; import static com.facebook.presto.hive.TestHiveUtil.nonDefaultTimeZone; import static io.airlift.units.DataSize.Unit.BYTE; +import static io.airlift.units.DataSize.Unit.MEGABYTE; public class TestHiveClientConfig { @@ -162,7 +163,8 @@ public void testDefaults() .setQuickStatsReaperExpiry(new Duration(5, TimeUnit.MINUTES)) .setParquetQuickStatsFileMetadataFetchTimeout(new Duration(60, TimeUnit.SECONDS)) .setMaxConcurrentQuickStatsCalls(100) - .setMaxConcurrentParquetQuickStatsCalls(500)); + .setMaxConcurrentParquetQuickStatsCalls(500) + .setAffinitySchedulingFileSectionSize(new DataSize(256, MEGABYTE))); } @Test @@ -287,6 +289,7 @@ public void testExplicitPropertyMappings() .put("hive.quick-stats.parquet.file-metadata-fetch-timeout", "30s") .put("hive.quick-stats.parquet.max-concurrent-calls", "399") .put("hive.quick-stats.max-concurrent-calls", "101") + .put("hive.affinity-scheduling-file-section-size", "512MB") .build(); HiveClientConfig expected = new HiveClientConfig() @@ -406,7 +409,8 @@ public void testExplicitPropertyMappings() .setQuickStatsReaperExpiry(new Duration(15, TimeUnit.MINUTES)) .setParquetQuickStatsFileMetadataFetchTimeout(new Duration(30, TimeUnit.SECONDS)) .setMaxConcurrentParquetQuickStatsCalls(399) - .setMaxConcurrentQuickStatsCalls(101); + .setMaxConcurrentQuickStatsCalls(101) + .setAffinitySchedulingFileSectionSize(new DataSize(512, MEGABYTE)); ConfigAssertions.assertFullMapping(properties, expected); } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java index e5ce8f07dd952..9d7fe39b46482 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java @@ -933,7 +933,8 @@ private void testCursorProvider(HiveRecordCursorProvider cursorProvider, split.getLength(), Instant.now().toEpochMilli(), Optional.empty(), - ImmutableMap.of()); + ImmutableMap.of(), + 0); Configuration configuration = new Configuration(); configuration.set("io.compression.codecs", LzoCodec.class.getName() + "," + LzopCodec.class.getName()); @@ -1005,7 +1006,8 @@ private void testPageSourceFactory(HiveBatchPageSourceFactory sourceFactory, split.getLength(), Instant.now().toEpochMilli(), Optional.empty(), - ImmutableMap.of()); + ImmutableMap.of(), + 0); Optional pageSource = HivePageSourceProvider.createHivePageSource( ImmutableSet.of(), diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileSplit.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileSplit.java index dbffec51d76aa..6bdc6fa7afc66 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileSplit.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileSplit.java @@ -26,7 +26,7 @@ public class TestHiveFileSplit @Test public void testGetters() { - HiveFileSplit split = new HiveFileSplit("path", 0, 200, 3, 400, Optional.of(new byte[21]), Collections.emptyMap()); + HiveFileSplit split = new HiveFileSplit("path", 0, 200, 3, 400, Optional.of(new byte[21]), Collections.emptyMap(), 0); assertEquals(split.getPath(), "path"); assertEquals(split.getLength(), 200L); assertEquals(split.getStart(), 0L); @@ -39,6 +39,6 @@ public void testGetters() @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "start must be non-negative") public void testNegativeStart() { - new HiveFileSplit("path", -1, 200, 3, 400, Optional.of(new byte[21]), Collections.emptyMap()); + new HiveFileSplit("path", -1, 200, 3, 400, Optional.of(new byte[21]), Collections.emptyMap(), 0); } } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java index bcbce794cf852..a9aba039ed4a2 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java @@ -241,7 +241,8 @@ private static ConnectorPageSource createPageSource(HiveTransactionHandle transa outputFile.length(), outputFile.lastModified(), Optional.empty(), - ImmutableMap.of()); + ImmutableMap.of(), + 0); HiveSplit split = new HiveSplit( fileSplit, diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSourceProvider.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSourceProvider.java index 6ee023b569942..b6f0789b5815d 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSourceProvider.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSourceProvider.java @@ -135,7 +135,8 @@ public void testGenerateCacheQuota() 10, Instant.now().toEpochMilli(), Optional.empty(), - ImmutableMap.of()); + ImmutableMap.of(), + 0); HiveSplit split = new HiveSplit( fileSplit, SCHEMA_NAME, @@ -220,7 +221,8 @@ public void testUseRecordReaderWithInputFormatAnnotationAndCustomSplit() 200, Instant.now().toEpochMilli(), Optional.empty(), - customSplitInfo); + customSplitInfo, + 0); Optional pageSource = HivePageSourceProvider.createHivePageSource( ImmutableSet.of(recordCursorProvider), ImmutableSet.of(hiveBatchPageSourceFactory), @@ -272,7 +274,8 @@ public void testNotUseRecordReaderWithInputFormatAnnotationWithoutCustomSplit() 200, Instant.now().toEpochMilli(), Optional.empty(), - ImmutableMap.of()); + ImmutableMap.of(), + 0); Optional pageSource = HivePageSourceProvider.createHivePageSource( ImmutableSet.of(recordCursorProvider), @@ -448,7 +451,8 @@ private static HiveSplit getHiveSplit(HiveStorageFormat hiveStorageFormat) 10, Instant.now().toEpochMilli(), Optional.empty(), - ImmutableMap.of()); + ImmutableMap.of(), + 0); return new HiveSplit( fileSplit, diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplit.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplit.java index cb45379bd7218..b5d7f6b3c8f43 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplit.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplit.java @@ -87,7 +87,8 @@ public void testJsonRoundTrip() 88, Instant.now().toEpochMilli(), Optional.empty(), - customSplitInfo); + customSplitInfo, + 0); byte[] rowIdPartitionComponent = {(byte) 76, (byte) 58}; HiveSplit expected = new HiveSplit( diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitSource.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitSource.java index 63984d7fa95ad..da834a85d6141 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitSource.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitSource.java @@ -19,6 +19,7 @@ import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.ConnectorSplitSource; import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.schedule.NodeSelectionStrategy; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -28,30 +29,36 @@ import org.testng.annotations.Test; import java.time.Instant; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.OptionalInt; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; import static com.facebook.airlift.concurrent.MoreFutures.getFutureValue; import static com.facebook.airlift.testing.Assertions.assertContains; import static com.facebook.presto.hive.CacheQuotaScope.GLOBAL; import static com.facebook.presto.hive.CacheQuotaScope.PARTITION; import static com.facebook.presto.hive.CacheQuotaScope.TABLE; +import static com.facebook.presto.hive.HiveSessionProperties.getAffinitySchedulingFileSectionSize; import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize; import static com.facebook.presto.hive.HiveTestUtils.SESSION; import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED; import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.NO_PREFERENCE; +import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.SOFT_AFFINITY; import static io.airlift.units.DataSize.Unit.BYTE; import static io.airlift.units.DataSize.Unit.GIGABYTE; import static io.airlift.units.DataSize.Unit.MEGABYTE; import static java.lang.Math.toIntExact; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -124,6 +131,50 @@ public void testEvenlySizedSplitRemainder() assertEquals(second.getFileSplit().getLength(), fileSize.toBytes() - halfOfSize); } + @Test + public void testAffinitySchedulingKey() + { + DataSize sectionSize = getAffinitySchedulingFileSectionSize(SESSION); + HiveSplitSource hiveSplitSource = HiveSplitSource.allAtOnce( + SESSION, + "database", + "table", + new CacheQuotaRequirement(TABLE, DEFAULT_QUOTA_SIZE), + 10, + 10, + new DataSize(1, MEGABYTE), + new TestingHiveSplitLoader(), + EXECUTOR, + new CounterStat(), + 1); + + // larger than the section size + DataSize fileSize = new DataSize(sectionSize.toBytes() * 3, BYTE); + hiveSplitSource.addToQueue(new TestSplit("test-relative-path", 1, OptionalInt.empty(), fileSize, SOFT_AFFINITY)); + hiveSplitSource.noMoreSplits(); + + List splits = new ArrayList<>(); + while (!hiveSplitSource.isFinished()) { + for (ConnectorSplit split : getSplits(hiveSplitSource, 10)) { + splits.add((HiveSplit) split); + } + } + assertThat(splits).isNotEmpty(); + assertEquals(getAffinitySchedulingKey(splits.get(0)), "path/test-relative-path#0"); + assertEquals(getAffinitySchedulingKey(splits.get(splits.size() - 1)), "path/test-relative-path#2"); + } + + private static String getAffinitySchedulingKey(HiveSplit split) + { + AtomicReference reference = new AtomicReference<>(); + split.getPreferredNodes((key, count) -> { + reference.set(key); + return ImmutableList.of(); + }); + assertNotNull(reference.get()); + return reference.get(); + } + @Test public void testSplitCacheQuota() { @@ -532,9 +583,14 @@ private TestSplit(int id, OptionalInt bucketNumber) } private TestSplit(int id, OptionalInt bucketNumber, DataSize fileSize) + { + this("path", id, bucketNumber, fileSize, NO_PREFERENCE); + } + + private TestSplit(String path, int id, OptionalInt bucketNumber, DataSize fileSize, NodeSelectionStrategy nodeSelectionStrategy) { super( - "path", + path, 0, fileSize.toBytes(), fileSize.toBytes(), @@ -543,7 +599,7 @@ private TestSplit(int id, OptionalInt bucketNumber, DataSize fileSize) bucketNumber, bucketNumber, true, - NO_PREFERENCE, + nodeSelectionStrategy, false, new HiveSplitPartitionInfo( new Storage( diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestOrcBatchPageSourceMemoryTracking.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestOrcBatchPageSourceMemoryTracking.java index a73805ae65136..fb7d52b43bcc0 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestOrcBatchPageSourceMemoryTracking.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestOrcBatchPageSourceMemoryTracking.java @@ -451,7 +451,8 @@ public ConnectorPageSource newPageSource(FileFormatDataSourceStats stats, Connec fileSplit.getLength(), Instant.now().toEpochMilli(), Optional.empty(), - ImmutableMap.of()); + ImmutableMap.of(), + 0); OrcBatchPageSourceFactory orcPageSourceFactory = new OrcBatchPageSourceFactory( FUNCTION_AND_TYPE_MANAGER, diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/benchmark/FileFormat.java b/presto-hive/src/test/java/com/facebook/presto/hive/benchmark/FileFormat.java index 0132da2783759..19d49b471e07c 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/benchmark/FileFormat.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/benchmark/FileFormat.java @@ -434,7 +434,8 @@ public static ConnectorPageSource createPageSource( targetFile.length(), 0, Optional.empty(), - ImmutableMap.of()); + ImmutableMap.of(), + 0); RecordCursor recordCursor = cursorProvider .createRecordCursor( @@ -477,7 +478,8 @@ public static ConnectorPageSource createPageSource( targetFile.length(), 0, Optional.empty(), - ImmutableMap.of()); + ImmutableMap.of(), + 0); return pageSourceFactory .createPageSource( diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/s3select/TestS3SelectRecordCursorProvider.java b/presto-hive/src/test/java/com/facebook/presto/hive/s3select/TestS3SelectRecordCursorProvider.java index 390bfb1e66e93..28fa065d69f89 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/s3select/TestS3SelectRecordCursorProvider.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/s3select/TestS3SelectRecordCursorProvider.java @@ -127,7 +127,8 @@ private static Optional getRecordCursor(TupleDomain