diff --git a/presto-main/src/main/java/com/facebook/presto/operator/HashBuilderOperator.java b/presto-main/src/main/java/com/facebook/presto/operator/HashBuilderOperator.java index f02faf857545b..f202d3787808c 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/HashBuilderOperator.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/HashBuilderOperator.java @@ -483,15 +483,6 @@ private void finishInput() return; } - // reserve memory for the lookup source - long estimatedSizeAfterBuild = index.getEstimatedSize().toBytes() + PagesHash.getEstimatedAdditionalSize(index.getPositionCount()); - if (spillEnabled) { - localRevocableMemoryContext.setBytes(estimatedSizeAfterBuild); - } - else { - localUserMemoryContext.setBytes(estimatedSizeAfterBuild); - } - LookupSourceSupplier partition = buildLookupSource(); if (spillEnabled) { localRevocableMemoryContext.setBytes(partition.get().getInMemorySizeInBytes()); @@ -570,10 +561,6 @@ private void finishLookupSourceUnspilling() localUserMemoryContext.setBytes(memoryRetainedByRemainingPages + index.getEstimatedSize().toBytes()); } - // reserve memory for the lookup source - long estimatedSizeAfterBuild = index.getEstimatedSize().toBytes() + PagesHash.getEstimatedAdditionalSize(index.getPositionCount()); - localUserMemoryContext.setBytes(estimatedSizeAfterBuild); - LookupSourceSupplier partition = buildLookupSource(); lookupSourceChecksum.ifPresent(checksum -> checkState(partition.checksum() == checksum, "Unspilled lookupSource checksum does not match original one")); diff --git a/presto-main/src/main/java/com/facebook/presto/operator/PagesHash.java b/presto-main/src/main/java/com/facebook/presto/operator/PagesHash.java index 6d84da737e995..cc88e927b29fa 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/PagesHash.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/PagesHash.java @@ -26,8 +26,6 @@ import static com.facebook.presto.operator.SyntheticAddress.decodeSliceIndex; import static com.facebook.presto.util.HashCollisionsEstimator.estimateNumberOfHashCollisions; import static io.airlift.slice.SizeOf.sizeOf; -import static io.airlift.slice.SizeOf.sizeOfByteArray; -import static io.airlift.slice.SizeOf.sizeOfIntArray; import static io.airlift.units.DataSize.Unit.KILOBYTE; import static java.lang.Math.toIntExact; import static java.util.Objects.requireNonNull; @@ -62,7 +60,7 @@ public PagesHash( this.channelCount = pagesHashStrategy.getChannelCount(); // reserve memory for the arrays - int hashSize = getHashTableSize(addresses.size()); + int hashSize = HashCommon.arraySize(addresses.size(), 0.75f); mask = hashSize - 1; key = new int[hashSize]; @@ -119,6 +117,7 @@ public PagesHash( key[pos] = realPosition; } } + size = sizeOf(addresses.elements()) + pagesHashStrategy.getSizeInBytes() + sizeOf(key) + sizeOf(positionToHashes); hashCollisions = hashCollisionsLocal; @@ -239,24 +238,4 @@ private static int getHashPosition(long rawHash, long mask) return (int) (rawHash & mask); } - - /** - * @return Given a number of positions, how much more memory would a pagesHash use if we constructed it from a PagesIndex with this number of positions? - * Keep in mind that this is a *Delta*: The actual pagesHash would be larger than this -- add this number to the existing size of the PagesIndex - */ - public static long getEstimatedAdditionalSize(int positionCount) - { - int hashSize = getHashTableSize(positionCount); - - long predictedKeySize = sizeOfIntArray(hashSize); - long predictedPositionToHashesSize = sizeOfByteArray(positionCount); - long predictedPositionLinksSize = sizeOfIntArray(positionCount); - - return predictedKeySize + predictedPositionToHashesSize + predictedPositionLinksSize; - } - - private static int getHashTableSize(int positionCount) - { - return HashCommon.arraySize(positionCount, 0.75f); - } } diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestHashBuilderOperator.java b/presto-main/src/test/java/com/facebook/presto/operator/TestHashBuilderOperator.java deleted file mode 100644 index 799c623a255e1..0000000000000 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestHashBuilderOperator.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.operator; - -import com.facebook.presto.RowPagesBuilder; -import com.facebook.presto.operator.HashBuilderOperator.HashBuilderOperatorFactory; -import com.facebook.presto.spi.Page; -import com.facebook.presto.spi.plan.PlanNodeId; -import com.facebook.presto.spiller.SingleStreamSpillerFactory; -import com.facebook.presto.testing.TestingTaskContext; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.primitives.Ints; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import java.util.List; -import java.util.Optional; -import java.util.OptionalInt; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.stream.IntStream; - -import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; -import static com.facebook.presto.RowPagesBuilder.rowPagesBuilder; -import static com.facebook.presto.SessionTestUtils.TEST_SESSION; -import static com.facebook.presto.operator.HashBuilderOperator.State.CONSUMING_INPUT; -import static com.facebook.presto.operator.HashBuilderOperator.State.LOOKUP_SOURCE_BUILT; -import static com.facebook.presto.operator.TestHashJoinOperator.DummySpillerFactory; -import static com.facebook.presto.spi.type.BigintType.BIGINT; -import static com.google.common.collect.ImmutableList.toImmutableList; -import static java.util.Objects.requireNonNull; -import static java.util.concurrent.Executors.newScheduledThreadPool; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; - -@Test(singleThreaded = true) -public class TestHashBuilderOperator -{ - private static final SingleStreamSpillerFactory SINGLE_STREAM_SPILLER_FACTORY = new DummySpillerFactory(); - private static final int NUM_PAGES = 1000; - private static final int PAGE_SIZE = 1024; - - private ExecutorService executor; - private ScheduledExecutorService scheduledExecutor; - - @BeforeMethod - public void setUp() - { - // Before/AfterMethod is chosen here because the executor needs to be shutdown - // after every single test case to terminate outstanding threads, if any. - - // The line below is the same as newCachedThreadPool(daemonThreadsNamed(...)) except RejectionExecutionHandler. - // RejectionExecutionHandler is set to DiscardPolicy (instead of the default AbortPolicy) here. - // Otherwise, a large number of RejectedExecutionException will flood logging, resulting in Travis failure. - executor = new ThreadPoolExecutor( - 0, - Integer.MAX_VALUE, - 60L, - SECONDS, - new SynchronousQueue(), - daemonThreadsNamed("test-executor-%s"), - new ThreadPoolExecutor.DiscardPolicy()); - scheduledExecutor = newScheduledThreadPool(2, daemonThreadsNamed("test-scheduledExecutor-%s")); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() - throws Exception - { - executor.shutdownNow(); - scheduledExecutor.shutdownNow(); - assertTrue(executor.awaitTermination(10, SECONDS)); - assertTrue(scheduledExecutor.awaitTermination(10, SECONDS)); - } - - @DataProvider(name = "hashBuildTestValues") - public static Object[][] hashJoinTestValuesProvider() - { - return new Object[][] { - {true, true}, - {false, true}, - {true, false}, - {false, false}, - }; - } - - @Test(dataProvider = "hashBuildTestValues") - public void testMemoryTracking(boolean buildHashEnabled, boolean spillEnabled) - { - // Build the resources you need - RowPagesBuilder pagesBuilder = rowPagesBuilder(buildHashEnabled, Ints.asList(0), ImmutableList.of(BIGINT, BIGINT, BIGINT)); - - int totalRows = NUM_PAGES * PAGE_SIZE; - List someNumbers = rangeList(totalRows + 1); - TaskContext taskContext = TestingTaskContext.createTaskContext(executor, scheduledExecutor, TEST_SESSION); - - for (int i = 0; i < NUM_PAGES; i++) { - pagesBuilder.addSequencePage(PAGE_SIZE, 1, someNumbers.get(i), someNumbers.get((i * i) % totalRows)); - } - List inputPages = pagesBuilder.build(); - - HashBuilderOperator buildOperator = createTestOperator(pagesBuilder, spillEnabled, taskContext); - assertEquals(buildOperator.getState(), CONSUMING_INPUT); - inputPages.stream().forEach(buildOperator::addInput); - - // Check our memory usage before we build - long estimatedFootprintBefore; - if (spillEnabled) { - estimatedFootprintBefore = taskContext.getTaskMemoryContext().getRevocableMemory(); - } - else { - estimatedFootprintBefore = taskContext.getMemoryReservation().toBytes(); - } - - // Build - buildOperator.finish(); - - // Check our memory usage after we build - long estimatedFootprintAfter; - if (spillEnabled) { - estimatedFootprintAfter = taskContext.getTaskMemoryContext().getRevocableMemory(); - } - else { - estimatedFootprintAfter = taskContext.getMemoryReservation().toBytes(); - } - - // Are we using the memory we should be using? - assertEquals(buildOperator.getState(), LOOKUP_SOURCE_BUILT); - if (buildHashEnabled) { - assertEquals(estimatedFootprintBefore, 46147728); - assertEquals(estimatedFootprintAfter, 59640024); - } - else { - assertEquals(estimatedFootprintBefore, 37887616); - assertEquals(estimatedFootprintAfter, 51384016); - } - } - - private HashBuilderOperator createTestOperator(RowPagesBuilder buildPages, boolean spillEnabled, TaskContext taskContext) - { - List hashChannels = Ints.asList(0); - - JoinBridgeManager lookupSourceFactoryManager = JoinBridgeManager.lookupAllAtOnce(new PartitionedLookupSourceFactory( - buildPages.getTypes(), - rangeList(buildPages.getTypes().size()).stream() - .map(buildPages.getTypes()::get) - .collect(toImmutableList()), - hashChannels.stream() - .map(buildPages.getTypes()::get) - .collect(toImmutableList()), - 1, - requireNonNull(ImmutableMap.of(), "layout is null"), - false)); - - HashBuilderOperatorFactory buildOperatorFactory = new HashBuilderOperatorFactory( - 1, - new PlanNodeId("build"), - lookupSourceFactoryManager, - rangeList(buildPages.getTypes().size()), - hashChannels, - buildPages.getHashChannel() - .map(OptionalInt::of).orElse(OptionalInt.empty()), - Optional.empty(), - Optional.empty(), - ImmutableList.of(), - 100, - new PagesIndex.TestingFactory(false), - spillEnabled, - SINGLE_STREAM_SPILLER_FACTORY); - - PipelineContext buildPipeline = taskContext.addPipelineContext(0, true, true, false); - DriverContext buildDriverContext = buildPipeline.addDriverContext(); - HashBuilderOperator buildOperator = buildOperatorFactory.createOperator(buildDriverContext); - return buildOperator; - } - - private static List rangeList(int endExclusive) - { - return IntStream.range(0, endExclusive) - .boxed() - .collect(toImmutableList()); - } -} diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestHashJoinOperator.java b/presto-main/src/test/java/com/facebook/presto/operator/TestHashJoinOperator.java index 963f9b56791d7..374fe7c3c2a7e 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestHashJoinOperator.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestHashJoinOperator.java @@ -1527,7 +1527,7 @@ public boolean filter(int leftPosition, Page leftPage, int rightPosition, Page r } } - public static class DummySpillerFactory + private static class DummySpillerFactory implements SingleStreamSpillerFactory { private volatile boolean failSpill;