Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions src/main/java/com/facebook/presto/DictionarySerde.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package com.facebook.presto;

import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableBiMap;

import java.util.Iterator;
import java.util.Map;

public class DictionarySerde
{
private final long maxCardinality;
// TODO: we may be able to determine and adjust this value dynamically with a smarter implementation
private final int reqBitSpace;

public DictionarySerde(long maxCardinality) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this should be an int

this.maxCardinality = maxCardinality;
reqBitSpace = Long.SIZE - Long.numberOfLeadingZeros(maxCardinality - 1);
}

public DictionarySerde() {
this(Long.MAX_VALUE);
}

public void serialize(final Iterable<Slice> slices, SliceOutput sliceOutput)
{
final BiMap<Slice, Long> idMap = HashBiMap.create();

PackedLongSerde packedLongSerde = new PackedLongSerde(reqBitSpace);
packedLongSerde.serialize(
new Iterable<Long>() {
@Override
public Iterator<Long> iterator() {
return new AbstractIterator<Long>() {
Iterator<Slice> sliceIterator = slices.iterator();
// Start ID at the smallest possible value to fully utilize available bit space
long nextId = -1L << (reqBitSpace - 1);

@Override
protected Long computeNext() {
if (!sliceIterator.hasNext()) {
return endOfData();
}

Slice slice = sliceIterator.next();

Long id = idMap.get(slice);
if (id == null) {
id = nextId;
nextId++;
idMap.put(slice, id);
}
return id;
}
};
}
},
sliceOutput
);

// Serialize Footer
int footerBytes = new Footer(idMap.inverse()).serialize(sliceOutput);

// Write length of Footer
sliceOutput.writeInt(footerBytes);
}

public static Iterable<Slice> deserialize(final SliceInput sliceInput) {
// Get map serialized byte length from tail and reset to beginning
int totalBytes = sliceInput.available();
sliceInput.skipBytes(totalBytes - SizeOf.SIZE_OF_INT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code will be much easier if you take a Slice instead of a SliceInput.

This line becomes something like

total size = slice.getInt(slice.size() - 4)

Then :

footerSlice = slice.slice(totalBytes - idMapByteLength - SIZE_OF_INT, slice.size - SIZE_OF_INT)

int idMapByteLength = sliceInput.readInt();

// Slice out Footer data and extract it
sliceInput.setPosition(totalBytes - idMapByteLength - SizeOf.SIZE_OF_INT);
Footer footer = Footer.deserialize(sliceInput.readSlice(idMapByteLength).input());

final Map<Long, Slice> idMap = footer.getIdMap();

sliceInput.setPosition(0);
final SliceInput paylodSliceInput =
sliceInput.readSlice(totalBytes - idMapByteLength - SizeOf.SIZE_OF_INT)
.input();
return new Iterable<Slice>() {
@Override
public Iterator<Slice> iterator() {
return new AbstractIterator<Slice>() {
Iterator<Long> iterator = PackedLongSerde.deserialize(paylodSliceInput).iterator();

@Override
protected Slice computeNext() {
if (!iterator.hasNext()) {
return endOfData();
}
Slice slice = idMap.get(iterator.next());
Preconditions.checkNotNull(slice, "Missing entry in dictionary");
return slice;
}
};
}
};
}

// TODO: this encoding can be made more compact if we leverage sorted order of the map
private static class Footer
{
Map<Long, Slice> idMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private final


private Footer(Map<Long, Slice> idMap)
{
this.idMap = idMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.idMap = ImmutableMap.copyOf(idMap);

}

/**
* Serialize this Footer to the specified SliceOutput
*
* @param sliceOutput
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The java doc comment should say something interesting or should be removed.

* @return bytes written to sliceOutput
*/
private int serialize(SliceOutput sliceOutput)
{
int startBytesWriteable = sliceOutput.writableBytes();
for (Map.Entry<Long, Slice> entry : idMap.entrySet()) {
// Write ID number
sliceOutput.writeLong(entry.getKey());
// Write Slice length
sliceOutput.writeInt(entry.getValue().length());
// Write Slice
sliceOutput.writeBytes(entry.getValue());
}
int endBytesWriteable = sliceOutput.writableBytes();
return startBytesWriteable - endBytesWriteable;
}

private static Footer deserialize(SliceInput sliceInput)
{
ImmutableBiMap.Builder<Long, Slice> builder = ImmutableBiMap.builder();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you just want ImmutableMap


while (sliceInput.isReadable()) {
// Read Slice ID number
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rephrase these as:

read value id
read value length
read value

The term slice is a bit to generic because it could mean the "slice" we are reading from.

long id = sliceInput.readLong();
// Read Slice Length
int sliceLength = sliceInput.readInt();
// Read Slice
Slice slice = sliceInput.readSlice(sliceLength);

builder.put(id, slice);
}

return new Footer(builder.build());
}

public Map<Long, Slice> getIdMap()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd move this getter to the top of the file, just after the constructor. I normally put helper methods at the bottom because I rarely scroll to the bottom of the file.

{
return idMap;
}
}
}
154 changes: 154 additions & 0 deletions src/main/java/com/facebook/presto/PackedLongSerde.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package com.facebook.presto;

import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Range;
import com.google.common.collect.Ranges;

import java.util.Iterator;

public class PackedLongSerde
{
private final byte bitWidth;
private final Range<Long> allowedRange;

public PackedLongSerde(int bitWidth)
{
Preconditions.checkArgument(bitWidth > 0 && bitWidth <= Long.SIZE);
this.bitWidth = (byte) bitWidth;
this.allowedRange = Ranges.closed(-1L << (bitWidth - 1), ~(-1L << (bitWidth - 1)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea what this does.

}

public void serialize(Iterable<Long> items, SliceOutput sliceOutput)
{
int packCapacity = Long.SIZE / bitWidth;
long mask = (~0L) >>> (Long.SIZE - bitWidth);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use -1 instead of ~0 or even better 0xFFFF_FFFF_FFFF_FFFFL


// Write the packed longs
int itemCount = 0;
Iterator<Long> iter = items.iterator();
while (iter.hasNext()) {
long pack = 0;
boolean packUsed = false;
for (int idx = 0; idx < packCapacity; idx++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

index instead of idx

if (!iter.hasNext()) {
break;
}

long rawValue = iter.next();
Preconditions.checkArgument(
allowedRange.contains(rawValue),
"Provided value does not fit into bitspace"
);
long maskedValue = rawValue & mask;
itemCount++;

pack |= maskedValue << (bitWidth * idx);
packUsed = true;
}
if (packUsed) {
sliceOutput.writeLong(pack);
}
}

// Write the Footer
new Footer(itemCount, bitWidth).serialize(sliceOutput);
}

public static Iterable<Long> deserialize(final SliceInput sliceInput)
{
Preconditions.checkArgument(
sliceInput.available() >= Footer.BYTE_SIZE,
"sliceInput not large enough to read a footer"
);
Preconditions.checkArgument(
(sliceInput.available() - Footer.BYTE_SIZE) % (SizeOf.SIZE_OF_LONG) == 0,
"sliceInput byte alignment incorrect"
);

// Extract Footer and then reset slice cursor
int totalBytes = sliceInput.available();
sliceInput.skipBytes(totalBytes - Footer.BYTE_SIZE);
final Footer footer = Footer.deserialize(sliceInput.readSlice(Footer.BYTE_SIZE).input());
sliceInput.setPosition(0);

final int packCapacity = Long.SIZE / footer.getBitWidth();

return new Iterable<Long>()
{
@Override
public Iterator<Long> iterator()
{
return new AbstractIterator<Long>()
{
int itemIdx = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

int packInternalIdx = 0;
long packValue = 0;

@Override
protected Long computeNext()
{
if (itemIdx >= footer.getItemCount()) {
return endOfData();
}
if (packInternalIdx == 0) {
packValue = sliceInput.readLong();
}
// TODO: replace with something more efficient (but needs sign extend)
long value = (packValue << (Long.SIZE - ((packInternalIdx + 1) * footer.getBitWidth()))) >> (Long.SIZE - footer.getBitWidth());

itemIdx++;
packInternalIdx = (packInternalIdx + 1) % packCapacity;
return value;
}
};
}
};
}

private static class Footer
{
private static final int BYTE_SIZE = SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_BYTE;

private final int itemCount;
private final byte bitWidth;

private Footer(int itemCount, byte bitWidth)
{
Preconditions.checkArgument(itemCount >= 0, "itemCount must be non-negative");
Preconditions.checkArgument(bitWidth > 0, "bitWidth must be greater than zero");
this.itemCount = itemCount;
this.bitWidth = bitWidth;
}

/**
* Serialize this Header into the specified SliceOutput
*
* @param sliceOutput
* @return bytes written to sliceOutput
*/
public int serialize(SliceOutput sliceOutput)
{
sliceOutput.writeInt(itemCount);
sliceOutput.writeByte(bitWidth);
return BYTE_SIZE;
}

public static Footer deserialize(SliceInput sliceInput)
{
int itemCount = sliceInput.readInt();
byte bitWidth = sliceInput.readByte();
return new Footer(itemCount, bitWidth);
}

public int getItemCount()
{
return itemCount;
}

public byte getBitWidth()
{
return bitWidth;
}
}
}
Loading