diff --git a/presto-main/src/main/java/com/facebook/presto/execution/TaskManagerConfig.java b/presto-main/src/main/java/com/facebook/presto/execution/TaskManagerConfig.java index 02e202e1eded8..a6c750e3766b5 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/TaskManagerConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/TaskManagerConfig.java @@ -55,7 +55,7 @@ public class TaskManagerConfig private int maxWorkerThreads = Runtime.getRuntime().availableProcessors() * 2; private Integer minDrivers; private Integer initialSplitsPerNode; - private int minDriversPerTask = 3; + private int minDriversPerTask = 1; private int maxDriversPerTask = Integer.MAX_VALUE; private int maxTasksPerStage = Integer.MAX_VALUE; private Duration splitConcurrencyAdjustmentInterval = new Duration(100, TimeUnit.MILLISECONDS); diff --git a/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java b/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java index e93113e846b26..760d83a7219f4 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java @@ -264,6 +264,10 @@ public TaskExecutor( checkArgument(interruptSplitInterval.getValue(SECONDS) >= 1.0, "interruptSplitInterval must be at least 1 second"); // we manage thread pool size directly, so create an unlimited pool + guaranteedNumberOfDriversPerTask = 1; + maximumNumberOfDriversPerTask = 1; + runnerThreads = 1; + minDrivers = 1; this.executor = newCachedThreadPool(threadsNamed("task-processor-%s")); this.executorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor) executor); this.runnerThreads = runnerThreads; @@ -300,8 +304,9 @@ public TaskExecutor( public synchronized void start() { checkState(!closed, "TaskExecutor is closed"); - for (int i = 0; i < runnerThreads; i++) { + for (int i = 0; i < 1; i++) { addRunnerThread(); + log.info("Adding new runner thread"); } if (interruptRunawaySplitsTimeout != null) { long interval = (long) interruptSplitInterval.getValue(SECONDS); diff --git a/presto-main/src/main/java/com/facebook/presto/operator/GroupedTopNBuilder.java b/presto-main/src/main/java/com/facebook/presto/operator/GroupedTopNBuilder.java index b4a3056d64f2a..e08ea1d4cd1b7 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/GroupedTopNBuilder.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/GroupedTopNBuilder.java @@ -14,485 +14,23 @@ package com.facebook.presto.operator; import com.facebook.presto.common.Page; -import com.facebook.presto.common.PageBuilder; -import com.facebook.presto.common.array.ObjectBigArray; -import com.facebook.presto.common.type.Type; -import com.facebook.presto.spi.function.aggregation.GroupByIdBlock; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.ImmutableList; -import it.unimi.dsi.fastutil.ints.IntArrayFIFOQueue; -import it.unimi.dsi.fastutil.ints.IntIterator; -import it.unimi.dsi.fastutil.ints.IntOpenHashSet; -import it.unimi.dsi.fastutil.ints.IntSet; -import it.unimi.dsi.fastutil.objects.ObjectHeapPriorityQueue; -import org.openjdk.jol.info.ClassLayout; +import com.google.common.util.concurrent.ListenableFuture; -import java.util.Comparator; import java.util.Iterator; -import java.util.List; -import java.util.stream.IntStream; -import static com.facebook.presto.common.type.BigintType.BIGINT; -import static com.google.common.base.MoreObjects.toStringHelper; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; -import static com.google.common.base.Verify.verify; -import static com.google.common.collect.ImmutableList.toImmutableList; -import static io.airlift.slice.SizeOf.sizeOf; -import static java.util.Objects.requireNonNull; - -/** - * This class finds the top N rows defined by {@param comparator} for each group specified by {@param groupByHash}. - */ -public class GroupedTopNBuilder +public interface GroupedTopNBuilder { - private static final long INSTANCE_SIZE = ClassLayout.parseClass(GroupedTopNBuilder.class).instanceSize(); - // compact a page when 50% of its positions are unreferenced - private static final int COMPACT_THRESHOLD = 2; - - private final Type[] sourceTypes; - private final int topN; - private final boolean produceRowNumber; - private final GroupByHash groupByHash; - - // a map of heaps, each of which records the top N rows - private final ObjectBigArray groupedRows = new ObjectBigArray<>(); - // a list of input pages, each of which has information of which row in which heap references which position - private final ObjectBigArray pageReferences = new ObjectBigArray<>(); - // for heap element comparison - private final PageWithPositionComparator pageWithPositionComparator; - private final Comparator rowHeapComparator; - // when there is no row referenced in a page, it will be removed instead of compacted; use a list to record those empty slots to reuse them - private final IntFIFOQueue emptyPageReferenceSlots; - - // keeps track sizes of input pages and heaps - private long memorySizeInBytes; - private int currentPageCount; - - public GroupedTopNBuilder( - List sourceTypes, - PageWithPositionComparator comparator, - int topN, - boolean produceRowNumber, - GroupByHash groupByHash) - { - this.sourceTypes = requireNonNull(sourceTypes, "sourceTypes is null").toArray(new Type[0]); - checkArgument(topN > 0, "topN must be > 0"); - this.topN = topN; - this.produceRowNumber = produceRowNumber; - this.groupByHash = requireNonNull(groupByHash, "groupByHash is not null"); - - this.pageWithPositionComparator = requireNonNull(comparator, "comparator is null"); - // Note: this is comparator intentionally swaps left and right arguments form a "reverse order" comparator - this.rowHeapComparator = (right, left) -> this.pageWithPositionComparator.compareTo( - pageReferences.get(left.getPageId()).getPage(), - left.getPosition(), - pageReferences.get(right.getPageId()).getPage(), - right.getPosition()); - this.emptyPageReferenceSlots = new IntFIFOQueue(); - } - - public Work processPage(Page page) - { - return new TransformWork<>( - groupByHash.getGroupIds(page), - groupIds -> { - processPage(page, groupIds); - return null; - }); - } - - public Iterator buildResult() - { - return new ResultIterator(); - } - - public long getEstimatedSizeInBytes() - { - return INSTANCE_SIZE + - memorySizeInBytes + - groupByHash.getEstimatedSize() + - groupedRows.sizeOf() + - pageReferences.sizeOf() + - emptyPageReferenceSlots.getEstimatedSizeInBytes(); - } - - @VisibleForTesting - List getBufferedPages() - { - return IntStream.range(0, currentPageCount) - .filter(i -> pageReferences.get(i) != null) - .mapToObj(i -> pageReferences.get(i).getPage()) - .collect(toImmutableList()); - } - - private void processPage(Page newPage, GroupByIdBlock groupIds) - { - checkArgument(newPage != null); - checkArgument(groupIds != null); - - int firstPositionToInsert = findFirstPositionToInsert(newPage, groupIds); - if (firstPositionToInsert < 0) { - // no insertions required - return; - } - - PageReference newPageReference = new PageReference(newPage); - memorySizeInBytes += newPageReference.getEstimatedSizeInBytes(); - int newPageId; - if (emptyPageReferenceSlots.isEmpty()) { - // all the previous slots are full; create a new one - pageReferences.ensureCapacity(currentPageCount + 1); - newPageId = currentPageCount; - currentPageCount++; - } - else { - // reuse a previously removed page's slot - newPageId = emptyPageReferenceSlots.dequeueInt(); - } - verify(pageReferences.setIfNull(newPageId, newPageReference), "should not overwrite a non-empty slot"); - - // ensure sufficient group capacity outside of the loop - groupedRows.ensureCapacity(groupIds.getGroupCount()); - // update the affected heaps and record candidate pages that need compaction - IntSet pagesToCompact = new IntOpenHashSet(); - for (int position = firstPositionToInsert; position < newPage.getPositionCount(); position++) { - long groupId = groupIds.getGroupId(position); - RowHeap rows = groupedRows.get(groupId); - if (rows == null) { - // a new group - rows = new RowHeap(rowHeapComparator); - groupedRows.set(groupId, rows); - } - else { - // update an existing group; - // remove the memory usage for this group for now; add it back after update - memorySizeInBytes -= rows.getEstimatedSizeInBytes(); - } - - if (rows.size() < topN) { - Row row = new Row(newPageId, position); - newPageReference.reference(row); - rows.enqueue(row); - } - else { - // may compare with the topN-th element with in the heap to decide if update is necessary - Row previousRow = rows.first(); - PageReference previousPageReference = pageReferences.get(previousRow.getPageId()); - if (pageWithPositionComparator.compareTo(newPage, position, previousPageReference.getPage(), previousRow.getPosition()) < 0) { - // update reference and the heap - rows.dequeue(); - previousPageReference.dereference(previousRow.getPosition()); - - Row newRow = new Row(newPageId, position); - newPageReference.reference(newRow); - rows.enqueue(newRow); - - // compact a page if it is not the current input page and the reference count is below the threshold - if (previousPageReference.getPage() != newPage && - previousPageReference.getUsedPositionCount() * COMPACT_THRESHOLD < previousPageReference.getPage().getPositionCount()) { - pagesToCompact.add(previousRow.getPageId()); - } - } - } - - memorySizeInBytes += rows.getEstimatedSizeInBytes(); - } - - // may compact the new page as well - if (newPageReference.getUsedPositionCount() * COMPACT_THRESHOLD < newPage.getPositionCount()) { - verify(pagesToCompact.add(newPageId)); - } - - // compact pages - IntIterator iterator = pagesToCompact.iterator(); - while (iterator.hasNext()) { - int pageId = iterator.nextInt(); - PageReference pageReference = pageReferences.get(pageId); - if (pageReference.getUsedPositionCount() == 0) { - pageReferences.set(pageId, null); - emptyPageReferenceSlots.enqueue(pageId); - memorySizeInBytes -= pageReference.getEstimatedSizeInBytes(); - } - else { - memorySizeInBytes -= pageReference.getEstimatedSizeInBytes(); - pageReference.compact(); - memorySizeInBytes += pageReference.getEstimatedSizeInBytes(); - } - } - } - - private int findFirstPositionToInsert(Page newPage, GroupByIdBlock groupIds) - { - for (int position = 0; position < newPage.getPositionCount(); position++) { - long groupId = groupIds.getGroupId(position); - if (groupedRows.getCapacity() <= groupId) { - return position; - } - - RowHeap rows = groupedRows.get(groupId); - if (rows == null || rows.size() < topN) { - return position; - } - // check against current minimum - Row previousRow = rows.first(); - PageReference pageReference = pageReferences.get(previousRow.getPageId()); - if (pageWithPositionComparator.compareTo(newPage, position, pageReference.getPage(), previousRow.getPosition()) < 0) { - return position; - } - } - // no positions to insert - return -1; - } - - /** - * The class is a pointer to a row in a page. - * The actual position in the page is mutable because as pages are compacted, the position will change. - */ - private static class Row - { - private final int pageId; - private int position; - - private Row(int pageId, int position) - { - this.pageId = pageId; - reset(position); - } - - public void reset(int position) - { - this.position = position; - } - - public int getPageId() - { - return pageId; - } - - public int getPosition() - { - return position; - } - - @Override - public String toString() - { - return toStringHelper(this) - .add("pageId", pageId) - .add("position", position) - .toString(); - } - } - - private static class PageReference - { - private static final long INSTANCE_SIZE = ClassLayout.parseClass(PageReference.class).instanceSize(); - - private Page page; - private Row[] reference; - - private int usedPositionCount; - - public PageReference(Page page) - { - this.page = requireNonNull(page, "page is null"); - this.reference = new Row[page.getPositionCount()]; - } - - public void reference(Row row) - { - reference[row.getPosition()] = row; - usedPositionCount++; - } - - public boolean dereference(int position) - { - checkArgument(reference[position] != null && usedPositionCount > 0); - reference[position] = null; - return (--usedPositionCount) == 0; - } - - public int getUsedPositionCount() - { - return usedPositionCount; - } - - public void compact() - { - checkState(usedPositionCount > 0); - - if (usedPositionCount == page.getPositionCount()) { - return; - } - - // re-assign reference - Row[] newReference = new Row[usedPositionCount]; - int[] positions = new int[usedPositionCount]; - int index = 0; - // update all the elements in the heaps that reference the current page - // this does not change the elements in the heap; - // it only updates the value of the elements; while keeping the same order - for (int i = 0; i < reference.length && index < usedPositionCount; i++) { - Row value = reference[i]; - if (value != null) { - value.reset(index); - newReference[index] = value; - positions[index] = i; - index++; - } - } - verify(index == usedPositionCount); - - // compact page - page = page.copyPositions(positions, 0, usedPositionCount); - reference = newReference; - } - - public Page getPage() - { - return page; - } - - public long getEstimatedSizeInBytes() - { - return page.getRetainedSizeInBytes() + sizeOf(reference) + INSTANCE_SIZE; - } - } - - // this class is for precise memory tracking - private static class IntFIFOQueue - extends IntArrayFIFOQueue - { - private static final long INSTANCE_SIZE = ClassLayout.parseClass(IntFIFOQueue.class).instanceSize(); - - private long getEstimatedSizeInBytes() - { - return INSTANCE_SIZE + sizeOf(array); - } - } - - // this class is for precise memory tracking - private static class RowHeap - extends ObjectHeapPriorityQueue - { - private static final long INSTANCE_SIZE = ClassLayout.parseClass(RowHeap.class).instanceSize(); - private static final long ROW_ENTRY_SIZE = ClassLayout.parseClass(Row.class).instanceSize(); - - private RowHeap(Comparator comparator) - { - super(1, comparator); - } - - private long getEstimatedSizeInBytes() - { - return INSTANCE_SIZE + sizeOf(heap) + size() * ROW_ENTRY_SIZE; - } - } - - private class ResultIterator - extends AbstractIterator - { - // ObjectBigArray capacity is always at least 1024, so discarding "small" BigArrays even if you don't need the entire space is wasteful - private static final int UNUSED_CAPACITY_DISPOSAL_THRESHOLD = 4096; - - private final PageBuilder pageBuilder; - // we may have 0 groups if there is no input page processed - private final int groupCount = groupByHash.getGroupCount(); - - private int currentGroupNumber; - private long currentGroupSizeInBytes; - - // the row number of the current position in the group - private int currentGroupPosition; - // number of rows in the group - private int currentGroupSize; - - private ObjectBigArray currentRows; - - ResultIterator() - { - if (produceRowNumber) { - pageBuilder = new PageBuilder(new ImmutableList.Builder().add(sourceTypes).add(BIGINT).build()); - } - else { - pageBuilder = new PageBuilder(ImmutableList.copyOf(sourceTypes)); - } - // Populate the first group - currentRows = new ObjectBigArray<>(); - nextGroupedRows(); - } - - @Override - protected Page computeNext() - { - pageBuilder.reset(); - while (!pageBuilder.isFull()) { - if (currentRows == null) { - // no more groups - break; - } - if (currentGroupPosition == currentGroupSize) { - // the current group has produced all its rows - memorySizeInBytes -= currentGroupSizeInBytes; - currentGroupPosition = 0; - nextGroupedRows(); - continue; - } + Work processPage(Page page); - // Clear the reference to the Row after access to make it reclaimable by GC - Row row = currentRows.getAndSet(currentGroupPosition, null); - PageReference pageReference = pageReferences.get(row.getPageId()); - Page page = pageReference.getPage(); - int position = row.getPosition(); - for (int i = 0; i < sourceTypes.length; i++) { - sourceTypes[i].appendTo(page.getBlock(i), position, pageBuilder.getBlockBuilder(i)); - } + Iterator buildResult(); - if (produceRowNumber) { - BIGINT.writeLong(pageBuilder.getBlockBuilder(sourceTypes.length), currentGroupPosition + 1); - } - pageBuilder.declarePosition(); - currentGroupPosition++; + ListenableFuture startMemoryRevoke(); - // deference the row; no need to compact the pages but remove them if completely unused - if (pageReference.dereference(position)) { - pageReferences.set(row.getPageId(), null); - memorySizeInBytes -= pageReference.getEstimatedSizeInBytes(); - } - } + void finishMemoryRevoke(); - if (pageBuilder.isEmpty()) { - return endOfData(); - } - return pageBuilder.build(); - } + long getEstimatedSizeInBytes(); - private void nextGroupedRows() - { - if (currentGroupNumber < groupCount) { - RowHeap rows = groupedRows.getAndSet(currentGroupNumber, null); - verify(rows != null && !rows.isEmpty(), "impossible to have inserted a group without a witness row"); - currentGroupSizeInBytes = rows.getEstimatedSizeInBytes(); - currentGroupNumber++; - currentGroupSize = rows.size(); + long getGroupIdsSortingSize(); - // sort output rows in a big array in case there are too many rows - checkState(currentRows != null, "currentRows already observed the final group"); - if (currentRows.getCapacity() > UNUSED_CAPACITY_DISPOSAL_THRESHOLD && currentRows.getCapacity() > currentGroupSize * 2L) { - // Discard over-sized big array to avoid unnecessary waste - currentRows = new ObjectBigArray<>(); - } - currentRows.ensureCapacity(currentGroupSize); - for (int index = currentGroupSize - 1; index >= 0; index--) { - currentRows.set(index, rows.dequeue()); - } - } - else { - currentRows = null; - currentGroupSize = 0; - } - } - } + GroupByHash getGroupByHash(); } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/InMemoryGroupedTopNBuilder.java b/presto-main/src/main/java/com/facebook/presto/operator/InMemoryGroupedTopNBuilder.java new file mode 100644 index 0000000000000..3994e8b79fdc1 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/operator/InMemoryGroupedTopNBuilder.java @@ -0,0 +1,587 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.operator; + +import com.facebook.presto.common.Page; +import com.facebook.presto.common.PageBuilder; +import com.facebook.presto.common.array.ObjectBigArray; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.memory.context.LocalMemoryContext; +import com.facebook.presto.spi.function.aggregation.GroupByIdBlock; +import com.facebook.presto.sql.gen.JoinCompiler; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.ImmutableList; +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.ListenableFuture; +import it.unimi.dsi.fastutil.ints.IntArrayFIFOQueue; +import it.unimi.dsi.fastutil.ints.IntIterator; +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; +import it.unimi.dsi.fastutil.ints.IntSet; +import it.unimi.dsi.fastutil.objects.ObjectHeapPriorityQueue; +import org.openjdk.jol.info.ClassLayout; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.PrimitiveIterator; +import java.util.stream.IntStream; + +import static com.facebook.presto.common.type.BigintType.BIGINT; +import static com.facebook.presto.operator.GroupByHash.createGroupByHash; +import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.airlift.slice.SizeOf.sizeOf; +import static java.util.Objects.requireNonNull; + +/** + * This class finds the top N rows defined by {@param comparator} for each group specified by {@param groupByHash}. + */ +public class InMemoryGroupedTopNBuilder + implements GroupedTopNBuilder +{ + private static final long INSTANCE_SIZE = ClassLayout.parseClass(InMemoryGroupedTopNBuilder.class).instanceSize(); + // compact a page when 50% of its positions are unreferenced + private static final int COMPACT_THRESHOLD = 2; + + private final Type[] sourceTypes; + private final int topN; + private final boolean produceRowNumber; + private final GroupByHash groupByHash; + private final OperatorContext operatorContext; + // a map of heaps, each of which records the top N rows + private final ObjectBigArray groupedRows = new ObjectBigArray<>(); + // a list of input pages, each of which has information of which row in which heap references which position + private final ObjectBigArray pageReferences = new ObjectBigArray<>(); + // for heap element comparison + private final PageWithPositionComparator pageWithPositionComparator; + private final Comparator rowHeapComparator; + // when there is no row referenced in a page, it will be removed instead of compacted; use a list to record those empty slots to reuse them + private final IntFIFOQueue emptyPageReferenceSlots; + + // keeps track sizes of input pages and heaps + private long memorySizeInBytes; + private int currentPageCount; + private LocalMemoryContext localUserMemoryContext; + + public InMemoryGroupedTopNBuilder( + OperatorContext operatorContext, + List sourceTypes, + List partitionTypes, + List partitionChannels, + Optional hashChannel, + int expectedPositions, + boolean isDictionaryAggregationEnabled, + JoinCompiler joinCompiler, + PageWithPositionComparator comparator, + int topN, + boolean produceRowNumber) + { + this( + operatorContext, + sourceTypes, + partitionTypes, + partitionChannels, + hashChannel, + expectedPositions, + isDictionaryAggregationEnabled, + joinCompiler, + comparator, + topN, + produceRowNumber, + UpdateMemory.NOOP); + } + + public InMemoryGroupedTopNBuilder( + OperatorContext operatorContext, + List sourceTypes, + List partitionTypes, + List partitionChannels, + Optional hashChannel, + int expectedPositions, + boolean isDictionaryAggregationEnabled, + JoinCompiler joinCompiler, + PageWithPositionComparator comparator, + int topN, + boolean produceRowNumber, + UpdateMemory updateMemory) + { + this.operatorContext = operatorContext; + this.sourceTypes = requireNonNull(sourceTypes, "sourceTypes is null").toArray(new Type[0]); + checkArgument(topN > 0, "topN must be > 0"); + this.topN = topN; + this.produceRowNumber = produceRowNumber; + + this.pageWithPositionComparator = requireNonNull(comparator, "comparator is null"); + // Note: this is comparator intentionally swaps left and right arguments form a "reverse order" comparator + this.rowHeapComparator = (right, left) -> this.pageWithPositionComparator.compareTo( + pageReferences.get(left.getPageId()).getPage(), + left.getPosition(), + pageReferences.get(right.getPageId()).getPage(), + right.getPosition()); + this.emptyPageReferenceSlots = new IntFIFOQueue(); + + if (!partitionChannels.isEmpty()) { + checkArgument(expectedPositions > 0, "expectedPositions must be > 0"); + this.groupByHash = createGroupByHash( + partitionTypes, + Ints.toArray(partitionChannels), + hashChannel, + expectedPositions, + isDictionaryAggregationEnabled, + joinCompiler, + updateMemory); + } + else { + this.groupByHash = new NoChannelGroupByHash(); + } + } + + @Override + public Work processPage(Page page) + { + return new TransformWork<>( + groupByHash.getGroupIds(page), + groupIds -> { + processPage(page, groupIds); + return null; + }); + } + + @Override + public Iterator buildResult() + { + return new ResultIterator(IntStream.range(0, groupByHash.getGroupCount()).iterator()); + } + + @Override + public ListenableFuture startMemoryRevoke() + { + throw new UnsupportedOperationException("InMemoryGroupedTopNBuilder does not support startMemoryRevoke"); + } + + @Override + public void finishMemoryRevoke() + { + throw new UnsupportedOperationException("InMemoryGroupedTopNBuilder does not support finishMemoryRevoke"); + } + + private void updateMemory(long memorySize) + { + localUserMemoryContext.setBytes(memorySize); + } + + @Override + public long getEstimatedSizeInBytes() + { + return INSTANCE_SIZE + + memorySizeInBytes + + groupByHash.getEstimatedSize() + + groupedRows.sizeOf() + + pageReferences.sizeOf() + + emptyPageReferenceSlots.getEstimatedSizeInBytes(); + } + + @VisibleForTesting + List getBufferedPages() + { + return IntStream.range(0, currentPageCount) + .filter(i -> pageReferences.get(i) != null) + .mapToObj(i -> pageReferences.get(i).getPage()) + .collect(toImmutableList()); + } + + private void processPage(Page newPage, GroupByIdBlock groupIds) + { + checkArgument(newPage != null); + checkArgument(groupIds != null); + + int firstPositionToInsert = findFirstPositionToInsert(newPage, groupIds); + if (firstPositionToInsert < 0) { + // no insertions required + return; + } + + PageReference newPageReference = new PageReference(newPage); + memorySizeInBytes += newPageReference.getEstimatedSizeInBytes(); + int newPageId; + if (emptyPageReferenceSlots.isEmpty()) { + // all the previous slots are full; create a new one + pageReferences.ensureCapacity(currentPageCount + 1); + newPageId = currentPageCount; + currentPageCount++; + } + else { + // reuse a previously removed page's slot + newPageId = emptyPageReferenceSlots.dequeueInt(); + } + verify(pageReferences.setIfNull(newPageId, newPageReference), "should not overwrite a non-empty slot"); + + // ensure sufficient group capacity outside of the loop + groupedRows.ensureCapacity(groupIds.getGroupCount()); + // update the affected heaps and record candidate pages that need compaction + IntSet pagesToCompact = new IntOpenHashSet(); + for (int position = firstPositionToInsert; position < newPage.getPositionCount(); position++) { + long groupId = groupIds.getGroupId(position); + RowHeap rows = groupedRows.get(groupId); + if (rows == null) { + // a new group + rows = new RowHeap(rowHeapComparator); + groupedRows.set(groupId, rows); + } + else { + // update an existing group; + // remove the memory usage for this group for now; add it back after update + memorySizeInBytes -= rows.getEstimatedSizeInBytes(); + } + + if (rows.size() < topN) { + Row row = new Row(newPageId, position); + newPageReference.reference(row); + rows.enqueue(row); + } + else { + // may compare with the topN-th element with in the heap to decide if update is necessary + Row previousRow = rows.first(); + PageReference previousPageReference = pageReferences.get(previousRow.getPageId()); + if (pageWithPositionComparator.compareTo(newPage, position, previousPageReference.getPage(), previousRow.getPosition()) < 0) { + // update reference and the heap + rows.dequeue(); + previousPageReference.dereference(previousRow.getPosition()); + + Row newRow = new Row(newPageId, position); + newPageReference.reference(newRow); + rows.enqueue(newRow); + + // compact a page if it is not the current input page and the reference count is below the threshold + if (previousPageReference.getPage() != newPage && + previousPageReference.getUsedPositionCount() * COMPACT_THRESHOLD < previousPageReference.getPage().getPositionCount()) { + pagesToCompact.add(previousRow.getPageId()); + } + } + } + + memorySizeInBytes += rows.getEstimatedSizeInBytes(); + } + + // may compact the new page as well + if (newPageReference.getUsedPositionCount() * COMPACT_THRESHOLD < newPage.getPositionCount()) { + verify(pagesToCompact.add(newPageId)); + } + + IntIterator iterator = pagesToCompact.iterator(); + while (iterator.hasNext()) { + int pageId = iterator.nextInt(); + PageReference pageReference = pageReferences.get(pageId); + if (pageReference.getUsedPositionCount() == 0) { + pageReferences.set(pageId, null); + emptyPageReferenceSlots.enqueue(pageId); + memorySizeInBytes -= pageReference.getEstimatedSizeInBytes(); + } + else { + memorySizeInBytes -= pageReference.getEstimatedSizeInBytes(); + pageReference.compact(); + memorySizeInBytes += pageReference.getEstimatedSizeInBytes(); + } + } + } + + private int findFirstPositionToInsert(Page newPage, GroupByIdBlock groupIds) + { + for (int position = 0; position < newPage.getPositionCount(); position++) { + long groupId = groupIds.getGroupId(position); + if (groupedRows.getCapacity() <= groupId) { + return position; + } + + RowHeap rows = groupedRows.get(groupId); + if (rows == null || rows.size() < topN) { + return position; + } + // check against current minimum + Row previousRow = rows.first(); + PageReference pageReference = pageReferences.get(previousRow.getPageId()); + if (pageWithPositionComparator.compareTo(newPage, position, pageReference.getPage(), previousRow.getPosition()) < 0) { + return position; + } + } + // no positions to insert + return -1; + } + + /** + * The class is a pointer to a row in a page. + * The actual position in the page is mutable because as pages are compacted, the position will change. + */ + private static class Row + { + private final int pageId; + private int position; + + private Row(int pageId, int position) + { + this.pageId = pageId; + reset(position); + } + + public void reset(int position) + { + this.position = position; + } + + public int getPageId() + { + return pageId; + } + + public int getPosition() + { + return position; + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("pageId", pageId) + .add("position", position) + .toString(); + } + } + + private static class PageReference + { + private static final long INSTANCE_SIZE = ClassLayout.parseClass(PageReference.class).instanceSize(); + + private Page page; + private Row[] reference; + + private int usedPositionCount; + + public PageReference(Page page) + { + this.page = requireNonNull(page, "page is null"); + this.reference = new Row[page.getPositionCount()]; + } + + public void reference(Row row) + { + reference[row.getPosition()] = row; + usedPositionCount++; + } + + public boolean dereference(int position) + { + checkArgument(reference[position] != null && usedPositionCount > 0); + reference[position] = null; + return (--usedPositionCount) == 0; + } + + public int getUsedPositionCount() + { + return usedPositionCount; + } + + public void compact() + { + checkState(usedPositionCount > 0); + + if (usedPositionCount == page.getPositionCount()) { + return; + } + + // re-assign reference + Row[] newReference = new Row[usedPositionCount]; + int[] positions = new int[usedPositionCount]; + int index = 0; + // update all the elements in the heaps that reference the current page + // this does not change the elements in the heap; + // it only updates the value of the elements; while keeping the same order + for (int i = 0; i < reference.length && index < usedPositionCount; i++) { + Row value = reference[i]; + if (value != null) { + value.reset(index); + newReference[index] = value; + positions[index] = i; + index++; + } + } + verify(index == usedPositionCount); + + // compact page + page = page.copyPositions(positions, 0, usedPositionCount); + reference = newReference; + } + + public Page getPage() + { + return page; + } + + public long getEstimatedSizeInBytes() + { + return page.getRetainedSizeInBytes() + sizeOf(reference) + INSTANCE_SIZE; + } + } + + // this class is for precise memory tracking + private static class IntFIFOQueue + extends IntArrayFIFOQueue + { + private static final long INSTANCE_SIZE = ClassLayout.parseClass(IntFIFOQueue.class).instanceSize(); + + private long getEstimatedSizeInBytes() + { + return INSTANCE_SIZE + sizeOf(array); + } + } + + // this class is for precise memory tracking + private static class RowHeap + extends ObjectHeapPriorityQueue + { + private static final long INSTANCE_SIZE = ClassLayout.parseClass(RowHeap.class).instanceSize(); + private static final long ROW_ENTRY_SIZE = ClassLayout.parseClass(Row.class).instanceSize(); + + private RowHeap(Comparator comparator) + { + super(1, comparator); + } + + private long getEstimatedSizeInBytes() + { + return INSTANCE_SIZE + sizeOf(heap) + size() * ROW_ENTRY_SIZE; + } + } + + private class ResultIterator + extends AbstractIterator + { + // ObjectBigArray capacity is always at least 1024, so discarding "small" BigArrays even if you don't need the entire space is wasteful + private static final int UNUSED_CAPACITY_DISPOSAL_THRESHOLD = 4096; + + private final PageBuilder pageBuilder; + private final PrimitiveIterator.OfInt groupIds; + + private long currentGroupSizeInBytes; + + // the row number of the current position in the group + private int currentGroupPosition; + // number of rows in the group + private int currentGroupSize; + + private ObjectBigArray currentRows; + + ResultIterator(PrimitiveIterator.OfInt groupIds) + { + if (produceRowNumber) { + pageBuilder = new PageBuilder(new ImmutableList.Builder().add(sourceTypes).add(BIGINT).build()); + } + else { + pageBuilder = new PageBuilder(ImmutableList.copyOf(sourceTypes)); + } + // Populate the first group + currentRows = new ObjectBigArray<>(); + this.groupIds = groupIds; + nextGroupedRows(); + } + + @Override + protected Page computeNext() + { + pageBuilder.reset(); + while (!pageBuilder.isFull()) { + if (currentRows == null) { + // no more groups + break; + } + if (currentGroupPosition == currentGroupSize) { + // the current group has produced all its rows + memorySizeInBytes -= currentGroupSizeInBytes; + currentGroupPosition = 0; + nextGroupedRows(); + continue; + } + + // Clear the reference to the Row after access to make it reclaimable by GC + Row row = currentRows.getAndSet(currentGroupPosition, null); + PageReference pageReference = pageReferences.get(row.getPageId()); + Page page = pageReference.getPage(); + int position = row.getPosition(); + for (int i = 0; i < sourceTypes.length; i++) { + sourceTypes[i].appendTo(page.getBlock(i), position, pageBuilder.getBlockBuilder(i)); + } + + if (produceRowNumber) { + BIGINT.writeLong(pageBuilder.getBlockBuilder(sourceTypes.length), currentGroupPosition + 1); + } + pageBuilder.declarePosition(); + currentGroupPosition++; + + // deference the row; no need to compact the pages but remove them if completely unused + if (pageReference.dereference(position)) { + pageReferences.set(row.getPageId(), null); + memorySizeInBytes -= pageReference.getEstimatedSizeInBytes(); + } + } + + if (pageBuilder.isEmpty()) { + return endOfData(); + } + return pageBuilder.build(); + } + + private void nextGroupedRows() + { + if (this.groupIds.hasNext()) { + int n = this.groupIds.nextInt(); + RowHeap rows = groupedRows.getAndSet(n, null); + verify(rows != null && !rows.isEmpty(), "impossible to have inserted a group without a witness row. rows=%s for %s", rows, this); + currentGroupSizeInBytes = rows.getEstimatedSizeInBytes(); + currentGroupSize = rows.size(); + + // sort output rows in a big array in case there are too many rows + checkState(currentRows != null, "currentRows already observed the final group"); + if (currentRows.getCapacity() > UNUSED_CAPACITY_DISPOSAL_THRESHOLD && currentRows.getCapacity() > currentGroupSize * 2L) { + // Discard over-sized big array to avoid unnecessary waste + currentRows = new ObjectBigArray<>(); + } + currentRows.ensureCapacity(currentGroupSize); + for (int index = currentGroupSize - 1; index >= 0; index--) { + currentRows.set(index, rows.dequeue()); + } + } + else { + currentRows = null; + currentGroupSize = 0; + } + } + } + + @Override + public long getGroupIdsSortingSize() + { + return (long) groupByHash.getGroupCount() * Integer.BYTES; + } + + @Override + public GroupByHash getGroupByHash() + { + return groupByHash; + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/operator/TopNOperator.java b/presto-main/src/main/java/com/facebook/presto/operator/TopNOperator.java index 29ca992bfc59d..77ce8366a957b 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/TopNOperator.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/TopNOperator.java @@ -27,6 +27,7 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static java.util.Collections.emptyIterator; +import static java.util.Collections.emptyList; import static java.util.Objects.requireNonNull; /** @@ -91,7 +92,7 @@ public OperatorFactory duplicate() private final OperatorContext operatorContext; private final LocalMemoryContext localUserMemoryContext; - private GroupedTopNBuilder topNBuilder; + private InMemoryGroupedTopNBuilder topNBuilder; private boolean finishing; private Iterator outputIterator; @@ -106,18 +107,23 @@ public TopNOperator( this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); this.localUserMemoryContext = operatorContext.localUserMemoryContext(); checkArgument(n >= 0, "n must be positive"); - if (n == 0) { finishing = true; outputIterator = emptyIterator(); } else { - topNBuilder = new GroupedTopNBuilder( + topNBuilder = new InMemoryGroupedTopNBuilder( + operatorContext, types, + emptyList(), + emptyList(), + null, + 0, + false, + null, new SimplePageWithPositionComparator(types, sortChannels, sortOrders), n, - false, - new NoChannelGroupByHash()); + false); } } @@ -152,7 +158,6 @@ public void addInput(Page page) boolean done = topNBuilder.processPage(requireNonNull(page, "page is null")).process(); // there is no grouping so work will always be done verify(done); - updateMemoryReservation(); } @Override @@ -174,15 +179,9 @@ public Page getOutput() else { outputIterator = emptyIterator(); } - updateMemoryReservation(); return output; } - private void updateMemoryReservation() - { - localUserMemoryContext.setBytes(topNBuilder.getEstimatedSizeInBytes()); - } - private boolean noMoreOutput() { return outputIterator != null && !outputIterator.hasNext(); diff --git a/presto-main/src/main/java/com/facebook/presto/operator/TopNRowNumberOperator.java b/presto-main/src/main/java/com/facebook/presto/operator/TopNRowNumberOperator.java index b0b23aa666806..ccb75134686b7 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/TopNRowNumberOperator.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/TopNRowNumberOperator.java @@ -29,7 +29,6 @@ import static com.facebook.presto.SystemSessionProperties.isDictionaryAggregationEnabled; import static com.facebook.presto.common.type.BigintType.BIGINT; -import static com.facebook.presto.operator.GroupByHash.createGroupByHash; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; @@ -130,8 +129,7 @@ public OperatorFactory duplicate() private final int[] outputChannels; - private final GroupByHash groupByHash; - private final GroupedTopNBuilder groupedTopNBuilder; + private final InMemoryGroupedTopNBuilder inMemoryGroupedTopNBuilder; private boolean finishing; private Work unfinishedWork; @@ -165,28 +163,21 @@ public TopNRowNumberOperator( checkArgument(maxRowCountPerPartition > 0, "maxRowCountPerPartition must be > 0"); - if (!partitionChannels.isEmpty()) { - checkArgument(expectedPositions > 0, "expectedPositions must be > 0"); - groupByHash = createGroupByHash( - partitionTypes, - Ints.toArray(partitionChannels), - hashChannel, - expectedPositions, - isDictionaryAggregationEnabled(operatorContext.getSession()), - joinCompiler, - this::updateMemoryReservation); - } - else { - groupByHash = new NoChannelGroupByHash(); - } - List types = toTypes(sourceTypes, outputChannels, generateRowNumber); - this.groupedTopNBuilder = new GroupedTopNBuilder( + + this.inMemoryGroupedTopNBuilder = new InMemoryGroupedTopNBuilder( + operatorContext, ImmutableList.copyOf(sourceTypes), + partitionTypes, + partitionChannels, + hashChannel, + expectedPositions, + isDictionaryAggregationEnabled(operatorContext.getSession()), + joinCompiler, new SimplePageWithPositionComparator(types, sortChannels, sortOrders), maxRowCountPerPartition, generateRowNumber, - groupByHash); + this::updateMemoryReservation); } @Override @@ -222,11 +213,10 @@ public void addInput(Page page) checkState(unfinishedWork == null, "Cannot add input with the operator when unfinished work is not empty"); checkState(outputIterator == null, "Cannot add input with the operator when flushing"); requireNonNull(page, "page is null"); - unfinishedWork = groupedTopNBuilder.processPage(page); + unfinishedWork = inMemoryGroupedTopNBuilder.processPage(page); if (unfinishedWork.process()) { unfinishedWork = null; } - updateMemoryReservation(); } @Override @@ -234,7 +224,6 @@ public Page getOutput() { if (unfinishedWork != null) { boolean finished = unfinishedWork.process(); - updateMemoryReservation(); if (!finished) { return null; } @@ -247,20 +236,20 @@ public Page getOutput() if (outputIterator == null) { // start flushing - outputIterator = groupedTopNBuilder.buildResult(); + outputIterator = inMemoryGroupedTopNBuilder.buildResult(); } Page output = null; if (outputIterator.hasNext()) { output = outputIterator.next().extractChannels(outputChannels); } - updateMemoryReservation(); return output; } @VisibleForTesting public int getCapacity() { + GroupByHash groupByHash = inMemoryGroupedTopNBuilder.getGroupByHash(); checkState(groupByHash != null); return groupByHash.getCapacity(); } @@ -268,7 +257,7 @@ public int getCapacity() private boolean updateMemoryReservation() { // TODO: may need to use trySetMemoryReservation with a compaction to free memory (but that may cause GC pressure) - localUserMemoryContext.setBytes(groupedTopNBuilder.getEstimatedSizeInBytes()); + localUserMemoryContext.setBytes(inMemoryGroupedTopNBuilder.getEstimatedSizeInBytes()); return operatorContext.isWaitingForMemory().isDone(); } diff --git a/presto-main/src/test/java/com/facebook/presto/operator/BenchmarkGroupedTopNBuilder.java b/presto-main/src/test/java/com/facebook/presto/operator/BenchmarkInMemoryGroupedTopNBuilder.java similarity index 81% rename from presto-main/src/test/java/com/facebook/presto/operator/BenchmarkGroupedTopNBuilder.java rename to presto-main/src/test/java/com/facebook/presto/operator/BenchmarkInMemoryGroupedTopNBuilder.java index bccfd7a71efcd..e1ea77d13d8ba 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/BenchmarkGroupedTopNBuilder.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/BenchmarkInMemoryGroupedTopNBuilder.java @@ -38,8 +38,10 @@ import org.testng.annotations.Test; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Optional; import java.util.Random; import java.util.concurrent.TimeUnit; @@ -56,7 +58,7 @@ @Fork(4) @Warmup(iterations = 10, time = 500, timeUnit = TimeUnit.MILLISECONDS) @Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MILLISECONDS) -public class BenchmarkGroupedTopNBuilder +public class BenchmarkInMemoryGroupedTopNBuilder { private static final int HASH_GROUP = 0; private static final int EXTENDED_PRICE = 1; @@ -88,7 +90,7 @@ public static class BenchmarkData private int groupCount = 10; private List page; - private GroupedTopNBuilder topNBuilder; + private InMemoryGroupedTopNBuilder topNBuilder; @Setup public void setup() @@ -97,14 +99,38 @@ public void setup() GroupByHash groupByHash; if (groupCount > 1) { groupByHash = new BigintGroupByHash(HASH_GROUP, true, groupCount, UpdateMemory.NOOP); + topNBuilder = new InMemoryGroupedTopNBuilder( + null, + types, + ImmutableList.of(types.get(HASH_GROUP)), + ImmutableList.of(HASH_GROUP), + Optional.of(HASH_GROUP), + groupCount, + false, + null, + comparator, + topN, + false, + UpdateMemory.NOOP); } else { - groupByHash = new NoChannelGroupByHash(); + topNBuilder = new InMemoryGroupedTopNBuilder( + null, + types, + Collections.emptyList(), + Collections.emptyList(), + null, + groupCount, + false, + null, + comparator, + topN, + false, + UpdateMemory.NOOP); } - topNBuilder = new GroupedTopNBuilder(types, comparator, topN, false, groupByHash); } - public GroupedTopNBuilder getTopNBuilder() + public InMemoryGroupedTopNBuilder getTopNBuilder() { return topNBuilder; } @@ -118,7 +144,7 @@ public List getPages() @Benchmark public void topN(BenchmarkData data, Blackhole blackhole) { - GroupedTopNBuilder topNBuilder = data.getTopNBuilder(); + InMemoryGroupedTopNBuilder topNBuilder = data.getTopNBuilder(); for (Page page : data.getPages()) { Work work = topNBuilder.processPage(page); boolean finished; @@ -135,7 +161,7 @@ public void topN(BenchmarkData data, Blackhole blackhole) public List topNToList(BenchmarkData data) { - GroupedTopNBuilder topNBuilder = data.getTopNBuilder(); + InMemoryGroupedTopNBuilder topNBuilder = data.getTopNBuilder(); for (Page page : data.getPages()) { Work work = topNBuilder.processPage(page); boolean finished; @@ -159,7 +185,7 @@ public static void main(String[] args) { Options options = new OptionsBuilder() .parent(new CommandLineOptions(args)) - .include(".*" + BenchmarkGroupedTopNBuilder.class.getSimpleName() + ".*") + .include(".*" + BenchmarkInMemoryGroupedTopNBuilder.class.getSimpleName() + ".*") .build(); new Runner(options).run(); diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestGroupedTopNBuilder.java b/presto-main/src/test/java/com/facebook/presto/operator/TestInMemoryGroupedTopNBuilder.java similarity index 72% rename from presto-main/src/test/java/com/facebook/presto/operator/TestGroupedTopNBuilder.java rename to presto-main/src/test/java/com/facebook/presto/operator/TestInMemoryGroupedTopNBuilder.java index 945e32ab361ec..80ab883ae8bb5 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestGroupedTopNBuilder.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestInMemoryGroupedTopNBuilder.java @@ -40,16 +40,15 @@ import static com.facebook.presto.common.type.DoubleType.DOUBLE; import static com.facebook.presto.metadata.MetadataManager.createTestMetadataManager; import static com.facebook.presto.operator.PageAssertions.assertPageEquals; -import static com.facebook.presto.operator.UpdateMemory.NOOP; import static io.airlift.slice.SizeOf.sizeOf; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; -public class TestGroupedTopNBuilder +public class TestInMemoryGroupedTopNBuilder { - private static final long INSTANCE_SIZE = ClassLayout.parseClass(GroupedTopNBuilder.class).instanceSize(); + private static final long INSTANCE_SIZE = ClassLayout.parseClass(InMemoryGroupedTopNBuilder.class).instanceSize(); private static final long INT_FIFO_QUEUE_SIZE = ClassLayout.parseClass(IntArrayFIFOQueue.class).instanceSize(); private static final long OBJECT_OVERHEAD = ClassLayout.parseClass(Object.class).instanceSize(); private static final long PAGE_REFERENCE_INSTANCE_SIZE = ClassLayout.parseClass(TestPageReference.class).instanceSize(); @@ -70,15 +69,22 @@ public static Object[][] pageRowCounts() @Test public void testEmptyInput() { - GroupedTopNBuilder groupedTopNBuilder = new GroupedTopNBuilder( + InMemoryGroupedTopNBuilder inMemoryGroupedTopNBuilder = new InMemoryGroupedTopNBuilder( + null, ImmutableList.of(BIGINT), + Collections.emptyList(), + Collections.emptyList(), + null, + 0, + false, + null, (left, leftPosition, right, rightPosition) -> { throw new UnsupportedOperationException(); }, 5, - false, - new NoChannelGroupByHash()); - assertFalse(groupedTopNBuilder.buildResult().hasNext()); + false); + + assertFalse(inMemoryGroupedTopNBuilder.buildResult().hasNext()); } @Test(dataProvider = "produceRowNumbers") @@ -106,32 +112,39 @@ public void testMultiGroupTopN(boolean produceRowNumbers) page.compact(); } - GroupByHash groupByHash = createGroupByHash(ImmutableList.of(types.get(0)), ImmutableList.of(0), NOOP); - GroupedTopNBuilder groupedTopNBuilder = new GroupedTopNBuilder( + InMemoryGroupedTopNBuilder inMemoryGroupedTopNBuilder = new InMemoryGroupedTopNBuilder( + null, types, + ImmutableList.of(types.get(0)), + ImmutableList.of(0), + Optional.empty(), + 2, + false, + null, new SimplePageWithPositionComparator(types, ImmutableList.of(1), ImmutableList.of(ASC_NULLS_LAST)), 2, - produceRowNumbers, - groupByHash); - assertBuilderSize(groupByHash, types, ImmutableList.of(), ImmutableList.of(), groupedTopNBuilder.getEstimatedSizeInBytes()); + produceRowNumbers); + + GroupByHash groupByHash = inMemoryGroupedTopNBuilder.getGroupByHash(); + assertBuilderSize(groupByHash, types, ImmutableList.of(), ImmutableList.of(), inMemoryGroupedTopNBuilder.getEstimatedSizeInBytes()); // add 4 rows for the first page and created three heaps with 1, 1, 2 rows respectively - assertTrue(groupedTopNBuilder.processPage(input.get(0)).process()); - assertBuilderSize(groupByHash, types, ImmutableList.of(4), ImmutableList.of(1, 1, 2), groupedTopNBuilder.getEstimatedSizeInBytes()); + assertTrue(inMemoryGroupedTopNBuilder.processPage(input.get(0)).process()); + assertBuilderSize(groupByHash, types, ImmutableList.of(4), ImmutableList.of(1, 1, 2), inMemoryGroupedTopNBuilder.getEstimatedSizeInBytes()); // add 1 row for the second page and the three heaps become 2, 1, 2 rows respectively - assertTrue(groupedTopNBuilder.processPage(input.get(1)).process()); - assertBuilderSize(groupByHash, types, ImmutableList.of(4, 1), ImmutableList.of(2, 1, 2), groupedTopNBuilder.getEstimatedSizeInBytes()); + assertTrue(inMemoryGroupedTopNBuilder.processPage(input.get(1)).process()); + assertBuilderSize(groupByHash, types, ImmutableList.of(4, 1), ImmutableList.of(2, 1, 2), inMemoryGroupedTopNBuilder.getEstimatedSizeInBytes()); // add 2 new rows for the third page (which will be compacted into two rows only) and we have four heaps with 2, 2, 2, 1 rows respectively - assertTrue(groupedTopNBuilder.processPage(input.get(2)).process()); - assertBuilderSize(groupByHash, types, ImmutableList.of(4, 1, 2), ImmutableList.of(2, 2, 2, 1), groupedTopNBuilder.getEstimatedSizeInBytes()); + assertTrue(inMemoryGroupedTopNBuilder.processPage(input.get(2)).process()); + assertBuilderSize(groupByHash, types, ImmutableList.of(4, 1, 2), ImmutableList.of(2, 2, 2, 1), inMemoryGroupedTopNBuilder.getEstimatedSizeInBytes()); // the last page will be discarded - assertTrue(groupedTopNBuilder.processPage(input.get(3)).process()); - assertBuilderSize(groupByHash, types, ImmutableList.of(4, 1, 2, 0), ImmutableList.of(2, 2, 2, 1), groupedTopNBuilder.getEstimatedSizeInBytes()); + assertTrue(inMemoryGroupedTopNBuilder.processPage(input.get(3)).process()); + assertBuilderSize(groupByHash, types, ImmutableList.of(4, 1, 2, 0), ImmutableList.of(2, 2, 2, 1), inMemoryGroupedTopNBuilder.getEstimatedSizeInBytes()); - List output = ImmutableList.copyOf(groupedTopNBuilder.buildResult()); + List output = ImmutableList.copyOf(inMemoryGroupedTopNBuilder.buildResult()); assertEquals(output.size(), 1); Page expected = rowPagesBuilder(BIGINT, DOUBLE, BIGINT) @@ -151,7 +164,7 @@ public void testMultiGroupTopN(boolean produceRowNumbers) assertPageEquals(types, output.get(0), new Page(expected.getBlock(0), expected.getBlock(1))); } - assertBuilderSize(groupByHash, types, ImmutableList.of(0, 0, 0, 0), ImmutableList.of(0, 0, 0, 0), groupedTopNBuilder.getEstimatedSizeInBytes()); + assertBuilderSize(groupByHash, types, ImmutableList.of(0, 0, 0, 0), ImmutableList.of(0, 0, 0, 0), inMemoryGroupedTopNBuilder.getEstimatedSizeInBytes()); } @Test(dataProvider = "produceRowNumbers") @@ -179,31 +192,39 @@ public void testSingleGroupTopN(boolean produceRowNumbers) page.compact(); } - GroupedTopNBuilder groupedTopNBuilder = new GroupedTopNBuilder( + InMemoryGroupedTopNBuilder inMemoryGroupedTopNBuilder = new InMemoryGroupedTopNBuilder( + null, types, + ImmutableList.of(), + ImmutableList.of(), + Optional.empty(), + 0, + false, + null, new SimplePageWithPositionComparator(types, ImmutableList.of(1), ImmutableList.of(ASC_NULLS_LAST)), 5, - produceRowNumbers, - new NoChannelGroupByHash()); - assertBuilderSize(new NoChannelGroupByHash(), types, ImmutableList.of(), ImmutableList.of(), groupedTopNBuilder.getEstimatedSizeInBytes()); + produceRowNumbers); + + GroupByHash groupByHash = inMemoryGroupedTopNBuilder.getGroupByHash(); + assertBuilderSize(groupByHash, types, ImmutableList.of(), ImmutableList.of(), inMemoryGroupedTopNBuilder.getEstimatedSizeInBytes()); // add 4 rows for the first page and created a single heap with 4 rows - assertTrue(groupedTopNBuilder.processPage(input.get(0)).process()); - assertBuilderSize(new NoChannelGroupByHash(), types, ImmutableList.of(4), ImmutableList.of(4), groupedTopNBuilder.getEstimatedSizeInBytes()); + assertTrue(inMemoryGroupedTopNBuilder.processPage(input.get(0)).process()); + assertBuilderSize(groupByHash, types, ImmutableList.of(4), ImmutableList.of(4), inMemoryGroupedTopNBuilder.getEstimatedSizeInBytes()); // add 1 row for the second page and the heap is with 5 rows - assertTrue(groupedTopNBuilder.processPage(input.get(1)).process()); - assertBuilderSize(new NoChannelGroupByHash(), types, ImmutableList.of(4, 1), ImmutableList.of(5), groupedTopNBuilder.getEstimatedSizeInBytes()); + assertTrue(inMemoryGroupedTopNBuilder.processPage(input.get(1)).process()); + assertBuilderSize(groupByHash, types, ImmutableList.of(4, 1), ImmutableList.of(5), inMemoryGroupedTopNBuilder.getEstimatedSizeInBytes()); // update 1 new row from the third page (which will be compacted into a single row only) - assertTrue(groupedTopNBuilder.processPage(input.get(2)).process()); - assertBuilderSize(new NoChannelGroupByHash(), types, ImmutableList.of(4, 1, 1), ImmutableList.of(5), groupedTopNBuilder.getEstimatedSizeInBytes()); + assertTrue(inMemoryGroupedTopNBuilder.processPage(input.get(2)).process()); + assertBuilderSize(groupByHash, types, ImmutableList.of(4, 1, 1), ImmutableList.of(5), inMemoryGroupedTopNBuilder.getEstimatedSizeInBytes()); // the last page will be discarded - assertTrue(groupedTopNBuilder.processPage(input.get(3)).process()); - assertBuilderSize(new NoChannelGroupByHash(), types, ImmutableList.of(4, 1, 1), ImmutableList.of(5), groupedTopNBuilder.getEstimatedSizeInBytes()); + assertTrue(inMemoryGroupedTopNBuilder.processPage(input.get(3)).process()); + assertBuilderSize(groupByHash, types, ImmutableList.of(4, 1, 1), ImmutableList.of(5), inMemoryGroupedTopNBuilder.getEstimatedSizeInBytes()); - List output = ImmutableList.copyOf(groupedTopNBuilder.buildResult()); + List output = ImmutableList.copyOf(inMemoryGroupedTopNBuilder.buildResult()); assertEquals(output.size(), 1); Page expected = rowPagesBuilder(BIGINT, DOUBLE, BIGINT) @@ -221,7 +242,7 @@ public void testSingleGroupTopN(boolean produceRowNumbers) assertPageEquals(types, output.get(0), new Page(expected.getBlock(0), expected.getBlock(1))); } - assertBuilderSize(new NoChannelGroupByHash(), types, ImmutableList.of(0, 0, 0), ImmutableList.of(0), groupedTopNBuilder.getEstimatedSizeInBytes()); + assertBuilderSize(new NoChannelGroupByHash(), types, ImmutableList.of(0, 0, 0), ImmutableList.of(0), inMemoryGroupedTopNBuilder.getEstimatedSizeInBytes()); } @Test @@ -231,39 +252,49 @@ public void testYield() Page input = rowPagesBuilder(types) .row(1L, 0.3) .row(1L, 0.2) - .row(1L, 0.9) - .row(1L, 0.1) + .row(2L, 0.9) + .row(3L, 0.1) .build() .get(0); input.compact(); AtomicBoolean unblock = new AtomicBoolean(); - GroupByHash groupByHash = createGroupByHash(ImmutableList.of(types.get(0)), ImmutableList.of(0), unblock::get); - GroupedTopNBuilder groupedTopNBuilder = new GroupedTopNBuilder( + InMemoryGroupedTopNBuilder inMemoryGroupedTopNBuilder = new InMemoryGroupedTopNBuilder( + null, types, + ImmutableList.of(types.get(0)), + ImmutableList.of(0), + Optional.empty(), + 1, + false, + null, new SimplePageWithPositionComparator(types, ImmutableList.of(1), ImmutableList.of(ASC_NULLS_LAST)), 5, false, - groupByHash); - assertBuilderSize(groupByHash, types, ImmutableList.of(), ImmutableList.of(), groupedTopNBuilder.getEstimatedSizeInBytes()); + () -> { + return unblock.get(); + }); + + GroupByHash groupByHash = inMemoryGroupedTopNBuilder.getGroupByHash(); + assertBuilderSize(groupByHash, types, ImmutableList.of(), ImmutableList.of(), inMemoryGroupedTopNBuilder.getEstimatedSizeInBytes()); - Work work = groupedTopNBuilder.processPage(input); + Work work = inMemoryGroupedTopNBuilder.processPage(input); assertFalse(work.process()); assertFalse(work.process()); unblock.set(true); assertTrue(work.process()); - List output = ImmutableList.copyOf(groupedTopNBuilder.buildResult()); + List output = ImmutableList.copyOf(inMemoryGroupedTopNBuilder.buildResult()); assertEquals(output.size(), 1); Page expected = rowPagesBuilder(types) - .row(1L, 0.1) .row(1L, 0.2) .row(1L, 0.3) - .row(1L, 0.9) + .row(2L, 0.9) + .row(3L, 0.1) .build() .get(0); assertPageEquals(types, output.get(0), expected); - assertBuilderSize(groupByHash, types, ImmutableList.of(0), ImmutableList.of(), groupedTopNBuilder.getEstimatedSizeInBytes()); + assertBuilderSize(groupByHash, types, ImmutableList.of(0), ImmutableList.of(), inMemoryGroupedTopNBuilder.getEstimatedSizeInBytes()); } @Test @@ -289,18 +320,24 @@ public void testAutoCompact() .row(1L, 0.6) .build(); - GroupedTopNBuilder groupedTopNBuilder = new GroupedTopNBuilder( + InMemoryGroupedTopNBuilder inMemoryGroupedTopNBuilder = new InMemoryGroupedTopNBuilder( + null, types, - new SimplePageWithPositionComparator(types, ImmutableList.of(1), ImmutableList.of(ASC_NULLS_LAST)), + ImmutableList.of(types.get(0)), + ImmutableList.of(0), + Optional.empty(), 1, false, - createGroupByHash(ImmutableList.of(types.get(0)), ImmutableList.of(0), NOOP)); + null, + new SimplePageWithPositionComparator(types, ImmutableList.of(1), ImmutableList.of(ASC_NULLS_LAST)), + 1, + false); // page 1: // the first page will be compacted - assertTrue(groupedTopNBuilder.processPage(input.get(0)).process()); - assertEquals(groupedTopNBuilder.getBufferedPages().size(), 1); - Page firstCompactPage = groupedTopNBuilder.getBufferedPages().get(0); + assertTrue(inMemoryGroupedTopNBuilder.processPage(input.get(0)).process()); + assertEquals(inMemoryGroupedTopNBuilder.getBufferedPages().size(), 1); + Page firstCompactPage = inMemoryGroupedTopNBuilder.getBufferedPages().get(0); Page expected = rowPagesBuilder(types) .row(1L, 0.8) .row(2L, 0.7) @@ -311,15 +348,15 @@ public void testAutoCompact() // page 2: // the second page will be removed - assertTrue(groupedTopNBuilder.processPage(input.get(1)).process()); - assertEquals(groupedTopNBuilder.getBufferedPages().size(), 1); + assertTrue(inMemoryGroupedTopNBuilder.processPage(input.get(1)).process()); + assertEquals(inMemoryGroupedTopNBuilder.getBufferedPages().size(), 1); // assert the first page is not affected - assertEquals(firstCompactPage, groupedTopNBuilder.getBufferedPages().get(0)); + assertEquals(firstCompactPage, inMemoryGroupedTopNBuilder.getBufferedPages().get(0)); // page 3: // the third page will trigger another compaction of the first page - assertTrue(groupedTopNBuilder.processPage(input.get(2)).process()); - List bufferedPages = groupedTopNBuilder.getBufferedPages(); + assertTrue(inMemoryGroupedTopNBuilder.processPage(input.get(2)).process()); + List bufferedPages = inMemoryGroupedTopNBuilder.getBufferedPages(); assertEquals(bufferedPages.size(), 2); // assert the previously compacted first page no longer exists assertNotEquals(firstCompactPage, bufferedPages.get(0)); @@ -336,8 +373,8 @@ public void testAutoCompact() // page 4: // the fourth page will remove the first page; also it leaves it with an empty slot - assertTrue(groupedTopNBuilder.processPage(input.get(3)).process()); - bufferedPages = groupedTopNBuilder.getBufferedPages(); + assertTrue(inMemoryGroupedTopNBuilder.processPage(input.get(3)).process()); + bufferedPages = inMemoryGroupedTopNBuilder.getBufferedPages(); assertEquals(bufferedPages.size(), 2); expectedPages = rowPagesBuilder(types) @@ -351,8 +388,8 @@ public void testAutoCompact() // page 5: // the fifth page will remove the fourth page and it will take the empty slot from the first page - assertTrue(groupedTopNBuilder.processPage(input.get(4)).process()); - bufferedPages = groupedTopNBuilder.getBufferedPages(); + assertTrue(inMemoryGroupedTopNBuilder.processPage(input.get(4)).process()); + bufferedPages = inMemoryGroupedTopNBuilder.getBufferedPages(); assertEquals(bufferedPages.size(), 2); // assert the fifth page indeed takes the first empty slot @@ -382,24 +419,30 @@ public void testLargePagesMemoryTracking(int pageCount, int rowCount) } List input = rowPagesBuilder.build(); - GroupByHash groupByHash = createGroupByHash(ImmutableList.of(types.get(0)), ImmutableList.of(0), NOOP); - GroupedTopNBuilder groupedTopNBuilder = new GroupedTopNBuilder( + InMemoryGroupedTopNBuilder inMemoryGroupedTopNBuilder = new InMemoryGroupedTopNBuilder( + null, types, + ImmutableList.of(types.get(0)), + ImmutableList.of(0), + Optional.empty(), + 1, + false, + new JoinCompiler(createTestMetadataManager(), new FeaturesConfig()), new SimplePageWithPositionComparator(types, ImmutableList.of(1), ImmutableList.of(ASC_NULLS_LAST)), pageCount * rowCount, - false, - groupByHash); + false); // Assert memory usage gradually goes up + GroupByHash groupByHash = inMemoryGroupedTopNBuilder.getGroupByHash(); for (int i = 0; i < pageCount; i++) { - assertTrue(groupedTopNBuilder.processPage(input.get(i)).process()); - assertBuilderSize(groupByHash, types, Collections.nCopies(i + 1, rowCount), Collections.nCopies(rowCount, i + 1), groupedTopNBuilder.getEstimatedSizeInBytes()); + assertTrue(inMemoryGroupedTopNBuilder.processPage(input.get(i)).process()); + assertBuilderSize(groupByHash, types, Collections.nCopies(i + 1, rowCount), Collections.nCopies(rowCount, i + 1), inMemoryGroupedTopNBuilder.getEstimatedSizeInBytes()); } // Assert memory usage gradually goes down (i.e., proportional to the number of rows/pages we have produced) int outputPageCount = 0; int remainingRows = pageCount * rowCount; - Iterator output = groupedTopNBuilder.buildResult(); + Iterator output = inMemoryGroupedTopNBuilder.buildResult(); while (output.hasNext()) { remainingRows -= output.next().getPositionCount(); assertBuilderSize( @@ -410,12 +453,12 @@ public void testLargePagesMemoryTracking(int pageCount, int rowCount) .addAll(Collections.nCopies((remainingRows + pageCount - 1) / pageCount, pageCount)) .addAll(Collections.nCopies(rowCount - (remainingRows + pageCount - 1) / pageCount, 0)) .build(), - groupedTopNBuilder.getEstimatedSizeInBytes()); + inMemoryGroupedTopNBuilder.getEstimatedSizeInBytes()); outputPageCount++; } assertEquals(remainingRows, 0); assertGreaterThan(outputPageCount, 3); - assertBuilderSize(groupByHash, types, Collections.nCopies(pageCount, 0), Collections.nCopies(rowCount, 0), groupedTopNBuilder.getEstimatedSizeInBytes()); + assertBuilderSize(groupByHash, types, Collections.nCopies(pageCount, 0), Collections.nCopies(rowCount, 0), inMemoryGroupedTopNBuilder.getEstimatedSizeInBytes()); } private static GroupByHash createGroupByHash(List partitionTypes, List partitionChannels, UpdateMemory updateMemory)