Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,25 @@ public long getRegionLogicalSizeInBytes(int positionOffset, int length)
}

long sizeInBytes = 0;
long[] seenSizes = new long[dictionary.getPositionCount()];
Arrays.fill(seenSizes, -1L);
for (int i = positionOffset; i < positionOffset + length; i++) {
int position = getId(i);
if (seenSizes[position] < 0) {
seenSizes[position] = dictionary.getRegionLogicalSizeInBytes(position, 1);
// Dictionary Block may contain large number of keys and small region length may be requested.
// If the length is less than keys the cache is likely to be not used.
if (length > dictionary.getPositionCount()) {
Copy link
Copy Markdown
Collaborator

@sdruzkin sdruzkin Mar 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tl;dr: so the idea is that it doesn't make sense to create a seenSizes cache if the cached seenSize value is not going to be reused, right?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is another way of looking at it, and that comment probably makes this more simpler.

I was looking at it from the opinion, that there is a cost to call getRegionLogicalSizeInBytes on a block. There is a cost to do the cache maintenance.

// Cache code path.
long[] seenSizes = new long[dictionary.getPositionCount()];
Arrays.fill(seenSizes, -1L);
for (int i = positionOffset; i < positionOffset + length; i++) {
int position = getId(i);
if (seenSizes[position] < 0) {
seenSizes[position] = dictionary.getRegionLogicalSizeInBytes(position, 1);
}
sizeInBytes += seenSizes[position];
Copy link
Copy Markdown
Collaborator

@sdruzkin sdruzkin Mar 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q regarding the original implementation. This is a dictionary, do we really need to multiply the entry size by the number of times this entry appears?

If the answer is no, we can try to collect used dictionary positions into a IntOpenHashSet from fastutil.

If yes, instead of a regular hash map you can use Int2IntOpenHashMap. It would really depends on the relation between the dict size and the length, and still likely be more expensive.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logical size, wants to get the underlying size of the data. That will involve multiplying the entry size, by number of times it appears. The logical size is used in few places, where the data is converted from dictionary encoding to direct encoding. One such place is abandoning dictionary encoding and switching to direct encoding.

I agree that fastutil's collections are better than java.utils in terms of CPU/memory. But my opinion, is the code is simple when the likelihood of hit is very low, we can have a non cached path and avoid the cache completely.

}
}
else {
// In-place code path.
for (int i = positionOffset; i < positionOffset + length; i++) {
sizeInBytes += dictionary.getRegionLogicalSizeInBytes(getId(i), 1);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getRegionLogicalSizeInBytes is a very expensive call. How likely/useful it would be to have a light-weighted hashmap:

int[] seenIds = new int[length];
long[] seenSize = new long[length];
Arrays.fill(seenIds, -1);
for (int i = positionOffset; i < positionOffset + length; i++) {
    int id = getId(i);
    int index = id % length;
    if (seenIds[index] == id) {
        sizeInBytes += seenSize[index];
    }
    else if (seenIds[index] == -1) {
        seenIds[index] = id;
        seenSize[index] = dictionary.getRegionLogicalSizeInBytes(id, 1);
        sizeInBytes += seenSize[index];
    }
    else {
        sizeInBytes += dictionary.getRegionLogicalSizeInBytes(getId(i), 1);
    }
}

Frankly, I don't know if this approach is going to be better or worth. Maybe worth profiling?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The seenIds, need to be a regular HashMap. The dictionary index could be 1,4,7 with length 3, in which case all of them will collide and produce wrong results. Adding the actual hash map will make any implementation much worser.

I looked at few different implementations of the getRegionLogicalSizeInBytes (VariableWidthBlock is cheap, but MapBlock is costly). So it is very hard to come up with cost model and even profiling.

What I have in mind is, instead of doing the in-place code path when length <= dictionarySize, we can do the in place code path, when length * 2 <= dictionarySize, this assumes that a hit in the cache is twice cheaper than calling the actual method. I am ok changing the condition to length * 5 <= dictionarySize, so the in place code path is triggered when the cache is not going to yield any benefits.

Or other approach we can do is, using a regular hashMap in the inplace code path and let it grow dynamically.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other approach I coded and discarded is, I can make changes to the dictionary writer to not call the getRegionLogicalSizeInBytes and can do inplace evaluation as the use case is well understood. But I believe the fix here is better as it will improve the performance of Presto. I coded up the dictionary Writer fix and discarded it. I will push that change as well, just in case.

}
sizeInBytes += seenSizes[position];
}

if (positionOffset == 0 && length == getPositionCount()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@
import io.airlift.slice.Slice;
import org.testng.annotations.Test;

import java.util.Arrays;

import static com.facebook.presto.block.BlockAssertions.createRLEBlock;
import static com.facebook.presto.block.BlockAssertions.createRandomDictionaryBlock;
import static com.facebook.presto.block.BlockAssertions.createRandomLongsBlock;
import static com.facebook.presto.block.BlockAssertions.createSlicesBlock;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static io.airlift.slice.SizeOf.SIZE_OF_INT;
import static io.airlift.slice.Slices.utf8Slice;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
Expand All @@ -44,6 +48,44 @@ public void testSizeInBytes()
assertEquals(dictionaryBlock.getSizeInBytes(), dictionaryBlock.getDictionary().getSizeInBytes() + (100 * SIZE_OF_INT));
}

@Test
public void testNonCachedLogicalBytes()
{
int numEntries = 10;
BlockBuilder blockBuilder = VARCHAR.createBlockBuilder(null, numEntries);

// Over allocate dictionary indexes but only use the required limit.
int[] dictionaryIndexes = new int[numEntries + 10];
Arrays.fill(dictionaryIndexes, 1);
blockBuilder.appendNull();
dictionaryIndexes[0] = 0;

String string = "";
for (int i = 1; i < numEntries; i++) {
string += "a";
VARCHAR.writeSlice(blockBuilder, utf8Slice(string));
dictionaryIndexes[i] = numEntries - i;
}

// A dictionary block of size 10, 1st element -> null, 2nd element size -> 9....9th element size -> 1
// Pass different maxChunkSize and different offset and verify if it computes the chunk lengths correctly.
Block elementBlock = blockBuilder.build();
DictionaryBlock block = new DictionaryBlock(numEntries, elementBlock, dictionaryIndexes);
int elementSize = Integer.BYTES + Byte.BYTES;

long size = block.getRegionLogicalSizeInBytes(0, 1);
assertEquals(size, 0 + 1 * elementSize);

size = block.getRegionLogicalSizeInBytes(0, numEntries);
assertEquals(size, 45 + numEntries * elementSize);

size = block.getRegionLogicalSizeInBytes(1, 2);
assertEquals(size, 9 + 8 + 2 * elementSize);

size = block.getRegionLogicalSizeInBytes(9, 1);
assertEquals(size, 1 + 1 * elementSize);
}

@Test
public void testLogicalSizeInBytes()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.DictionaryBlock;
import com.facebook.presto.common.block.DictionaryId;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.orc.DwrfDataEncryptor;
import com.facebook.presto.orc.OrcEncoding;
Expand All @@ -29,6 +30,7 @@
import com.facebook.presto.orc.stream.LongOutputStream;
import com.facebook.presto.orc.stream.PresentOutputStream;
import com.facebook.presto.orc.stream.StreamDataOutput;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
Expand Down Expand Up @@ -92,37 +94,44 @@ public int getDictionaryEntries()
return dictionary.getEntryCount();
}

@VisibleForTesting
static int getChunkLength(int offset, int[] dictionaryIndexes, int positionCount, Block elementBlock, int maxChunkSize)
{
int endOffset = offset;
long size = elementBlock.getSliceLength(dictionaryIndexes[endOffset++]);
while (endOffset < positionCount) {
// getSliceLength does not include the nulls and length array. But this
// is a heuristic to avoid too much memory allocation, so this is fine.
size += elementBlock.getSliceLength(dictionaryIndexes[endOffset]);
if (size > maxChunkSize) {
break;
}
endOffset++;
}
return endOffset - offset;
}

@Override
protected boolean tryConvertToDirect(int dictionaryIndexCount, IntBigArray dictionaryIndexes, int maxDirectBytes)
{
int[][] segments = dictionaryIndexes.getSegments();
for (int i = 0; dictionaryIndexCount > 0 && i < segments.length; i++) {
int[] segment = segments[i];
int positionCount = Math.min(dictionaryIndexCount, segment.length);
Block block = new DictionaryBlock(positionCount, dictionary.getElementBlock(), segment);

while (block != null) {
int chunkPositionCount = block.getPositionCount();
Block chunk = block.getRegion(0, chunkPositionCount);

// avoid chunk with huge logical size
while (chunkPositionCount > 1 && chunk.getLogicalSizeInBytes() > DIRECT_CONVERSION_CHUNK_MAX_LOGICAL_BYTES) {
chunkPositionCount /= 2;
chunk = chunk.getRegion(0, chunkPositionCount);
}

Block elementBlock = dictionary.getElementBlock();
DictionaryId dictionaryId = DictionaryId.randomDictionaryId();

int offset = 0;
while (offset < positionCount) {
// Dictionary can contain large values that are repeated. In such a case, the conversion will be abandoned
// due to maxDirectBytes. To avoid allocating too much memory on those cases, process the dictionary in chunks.
int length = getChunkLength(offset, segment, positionCount, elementBlock, DIRECT_CONVERSION_CHUNK_MAX_LOGICAL_BYTES);
Block chunk = new DictionaryBlock(offset, length, elementBlock, segment, false, dictionaryId);
offset += length;
directColumnWriter.writeBlock(chunk);
if (directColumnWriter.getBufferedBytes() > maxDirectBytes) {
return false;
}

// slice block to only unconverted rows
if (chunkPositionCount < block.getPositionCount()) {
block = block.getRegion(chunkPositionCount, block.getPositionCount() - chunkPositionCount);
}
else {
block = null;
}
}

dictionaryIndexCount -= positionCount;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.orc.writer;

import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockBuilder;
import org.testng.annotations.Test;

import java.util.Arrays;

import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static io.airlift.slice.Slices.utf8Slice;
import static org.testng.Assert.assertEquals;

public class TestSliceDictionaryColumnWriter
{
@Test
public void testChunkLength()
{
int numEntries = 10;
BlockBuilder blockBuilder = VARCHAR.createBlockBuilder(null, numEntries);

// Over allocate dictionary indexes but only use the required limit.
int[] dictionaryIndexes = new int[numEntries + 10];
Arrays.fill(dictionaryIndexes, 1);
blockBuilder.appendNull();
dictionaryIndexes[0] = 0;

String string = "";
for (int i = 1; i < numEntries; i++) {
string += "a";
VARCHAR.writeSlice(blockBuilder, utf8Slice(string));
dictionaryIndexes[i] = numEntries - i;
}

// A dictionary block of size 10, 1st element -> null, 2nd element size -> 9....9th element size -> 1
// Pass different maxChunkSize and different offset and verify if it computes the chunk lengths correctly.
Block elementBlock = blockBuilder.build();
int length = SliceDictionaryColumnWriter.getChunkLength(0, dictionaryIndexes, numEntries, elementBlock, 10);
assertEquals(length, 2);

length = SliceDictionaryColumnWriter.getChunkLength(0, dictionaryIndexes, numEntries, elementBlock, 1_000_000);
assertEquals(length, numEntries);

length = SliceDictionaryColumnWriter.getChunkLength(0, dictionaryIndexes, numEntries, elementBlock, 20);
assertEquals(length, 3);

length = SliceDictionaryColumnWriter.getChunkLength(1, dictionaryIndexes, numEntries, elementBlock, 9 + 8 + 7);
assertEquals(length, 3);

length = SliceDictionaryColumnWriter.getChunkLength(2, dictionaryIndexes, numEntries, elementBlock, 0);
assertEquals(length, 1);

length = SliceDictionaryColumnWriter.getChunkLength(9, dictionaryIndexes, numEntries, elementBlock, 0);
assertEquals(length, 1);
}
}