Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import io.trino.execution.StateMachine;
import io.trino.execution.StateMachine.StateChangeListener;
Expand Down Expand Up @@ -52,6 +53,7 @@
import static io.trino.execution.buffer.BufferState.OPEN;
import static io.trino.execution.buffer.OutputBuffers.BufferType.ARBITRARY;
import static io.trino.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers;
import static io.trino.execution.buffer.PagesSerde.getSerializedPagePositionCount;
import static io.trino.execution.buffer.SerializedPageReference.dereferencePages;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -207,7 +209,7 @@ public ListenableFuture<Void> isFull()
}

@Override
public void enqueue(List<SerializedPage> pages)
public void enqueue(List<Slice> pages)
{
checkState(!Thread.holdsLock(this), "Cannot enqueue pages while holding a lock on this");
requireNonNull(pages, "pages is null");
Expand All @@ -221,11 +223,12 @@ public void enqueue(List<SerializedPage> pages)
ImmutableList.Builder<SerializedPageReference> references = ImmutableList.builderWithExpectedSize(pages.size());
long bytesAdded = 0;
long rowCount = 0;
for (SerializedPage page : pages) {
bytesAdded += page.getRetainedSizeInBytes();
rowCount += page.getPositionCount();
for (Slice page : pages) {
bytesAdded += page.getRetainedSize();
int positionCount = getSerializedPagePositionCount(page);
rowCount += positionCount;
// create page reference counts with an initial single reference
references.add(new SerializedPageReference(page, 1));
references.add(new SerializedPageReference(page, positionCount, 1));
}
List<SerializedPageReference> serializedPageReferences = references.build();

Expand Down Expand Up @@ -258,7 +261,7 @@ public void enqueue(List<SerializedPage> pages)
}

@Override
public void enqueue(int partition, List<SerializedPage> pages)
public void enqueue(int partition, List<Slice> pages)
{
checkState(partition == 0, "Expected partition number to be zero");
enqueue(pages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import io.trino.execution.StateMachine;
import io.trino.execution.StateMachine.StateChangeListener;
Expand Down Expand Up @@ -48,6 +49,7 @@
import static io.trino.execution.buffer.BufferState.NO_MORE_PAGES;
import static io.trino.execution.buffer.BufferState.OPEN;
import static io.trino.execution.buffer.OutputBuffers.BufferType.BROADCAST;
import static io.trino.execution.buffer.PagesSerde.getSerializedPagePositionCount;
import static io.trino.execution.buffer.SerializedPageReference.dereferencePages;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -198,7 +200,7 @@ public ListenableFuture<Void> isFull()
}

@Override
public void enqueue(List<SerializedPage> pages)
public void enqueue(List<Slice> pages)
{
checkState(!Thread.holdsLock(this), "Cannot enqueue pages while holding a lock on this");
requireNonNull(pages, "pages is null");
Expand All @@ -212,11 +214,12 @@ public void enqueue(List<SerializedPage> pages)
ImmutableList.Builder<SerializedPageReference> references = ImmutableList.builderWithExpectedSize(pages.size());
long bytesAdded = 0;
long rowCount = 0;
for (SerializedPage page : pages) {
bytesAdded += page.getRetainedSizeInBytes();
rowCount += page.getPositionCount();
for (Slice page : pages) {
bytesAdded += page.getRetainedSize();
int positionCount = getSerializedPagePositionCount(page);
rowCount += positionCount;
// create page reference counts with an initial single reference
references.add(new SerializedPageReference(page, 1));
references.add(new SerializedPageReference(page, positionCount, 1));
}
List<SerializedPageReference> serializedPageReferences = references.build();

Expand Down Expand Up @@ -257,7 +260,7 @@ public void enqueue(List<SerializedPage> pages)
}

@Override
public void enqueue(int partitionNumber, List<SerializedPage> pages)
public void enqueue(int partitionNumber, List<Slice> pages)
{
checkState(partitionNumber == 0, "Expected partition number to be zero");
enqueue(pages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.execution.buffer;

import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;

import java.util.List;
import java.util.Objects;
Expand All @@ -34,9 +35,9 @@ public static BufferResult emptyResults(String taskInstanceId, long token, boole
private final long token;
private final long nextToken;
private final boolean bufferComplete;
private final List<SerializedPage> serializedPages;
private final List<Slice> serializedPages;

public BufferResult(String taskInstanceId, long token, long nextToken, boolean bufferComplete, List<SerializedPage> serializedPages)
public BufferResult(String taskInstanceId, long token, long nextToken, boolean bufferComplete, List<Slice> serializedPages)
{
checkArgument(!isNullOrEmpty(taskInstanceId), "taskInstanceId is null");

Expand All @@ -62,7 +63,7 @@ public boolean isBufferComplete()
return bufferComplete;
}

public List<SerializedPage> getSerializedPages()
public List<Slice> getSerializedPages()
{
return serializedPages;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import io.trino.execution.buffer.OutputBuffers.OutputBufferId;
import io.trino.execution.buffer.SerializedPageReference.PagesReleasedListener;
Expand Down Expand Up @@ -357,7 +358,7 @@ private synchronized BufferResult processRead(long sequenceId, DataSize maxSize)

// read the new pages
long maxBytes = maxSize.toBytes();
List<SerializedPage> result = new ArrayList<>();
List<Slice> result = new ArrayList<>();
long bytes = 0;

for (SerializedPageReference page : pages) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.ExtendedSettableFuture;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import io.trino.execution.StateMachine;
import io.trino.execution.StateMachine.StateChangeListener;
Expand Down Expand Up @@ -244,14 +245,14 @@ public ListenableFuture<Void> isFull()
}

@Override
public void enqueue(List<SerializedPage> pages)
public void enqueue(List<Slice> pages)
{
OutputBuffer outputBuffer = getDelegateOutputBufferOrFail();
outputBuffer.enqueue(pages);
}

@Override
public void enqueue(int partition, List<SerializedPage> pages)
public void enqueue(int partition, List<Slice> pages)
{
OutputBuffer outputBuffer = getDelegateOutputBufferOrFail();
outputBuffer.enqueue(partition, pages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.execution.buffer;

import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import io.trino.execution.StateMachine.StateChangeListener;
import io.trino.execution.buffer.OutputBuffers.OutputBufferId;
Expand Down Expand Up @@ -85,13 +86,13 @@ public interface OutputBuffer
* Adds a split-up page to an unpartitioned buffer. If no-more-pages has been set, the enqueue
* page call is ignored. This can happen with limit queries.
*/
void enqueue(List<SerializedPage> pages);
void enqueue(List<Slice> pages);

/**
* Adds a split-up page to a specific partition. If no-more-pages has been set, the enqueue
* page call is ignored. This can happen with limit queries.
*/
void enqueue(int partition, List<SerializedPage> pages);
void enqueue(int partition, List<Slice> pages);

/**
* Notify buffer that no more pages will be added. Any future calls to enqueue a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import static com.google.common.base.Preconditions.checkArgument;

/**
* Encodes boolean properties for {@link SerializedPage} by using a bitmasking strategy, allowing
* Encodes boolean properties for serialized page by using a bitmasking strategy, allowing
* up to 8 such properties to be stored in a single byte
*/
public enum PageCodecMarker
enum PageCodecMarker
{
COMPRESSED(1),
ENCRYPTED(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import io.airlift.compress.Decompressor;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceInput;
import io.airlift.slice.SliceOutput;
import io.airlift.slice.Slices;
import io.trino.execution.buffer.PageCodecMarker.MarkerSet;
import io.trino.spi.Page;
Expand All @@ -41,6 +43,13 @@
public class PagesSerde
{
private static final double MINIMUM_COMPRESSION_RATIO = 0.8;
private static final int SERIALIZED_PAGE_HEADER_SIZE = /*positionCount*/ Integer.BYTES +
// pageCodecMarkers
Byte.BYTES +
// uncompressedSizeInBytes
Integer.BYTES +
// sizeInBytes
Integer.BYTES;

private final BlockEncodingSerde blockEncodingSerde;
private final Optional<Compressor> compressor;
Expand All @@ -61,7 +70,7 @@ public PagesSerdeContext newContext()
return new PagesSerdeContext();
}

public SerializedPage serialize(PagesSerdeContext context, Page page)
public Slice serialize(PagesSerdeContext context, Page page)
{
DynamicSliceOutput serializationBuffer = context.acquireSliceOutput(toIntExact(page.getSizeInBytes() + Integer.BYTES)); // block length is an int
byte[] inUseTempBuffer = null;
Expand Down Expand Up @@ -109,8 +118,15 @@ public SerializedPage serialize(PagesSerdeContext context, Page page)
}
inUseTempBuffer = encrypted;
}
// Resulting slice *must* be copied to ensure the shared buffers aren't referenced after method exit
return new SerializedPage(Slices.copyOf(slice), markers, page.getPositionCount(), uncompressedSize);

SliceOutput output = Slices.allocate(SERIALIZED_PAGE_HEADER_SIZE + slice.length()).getOutput();
output.writeInt(page.getPositionCount());
output.writeByte(markers.byteValue());
output.writeInt(uncompressedSize);
output.writeInt(slice.length());
output.writeBytes(slice);

return output.getUnderlyingSlice();
}
finally {
context.releaseSliceOutput(serializationBuffer);
Expand All @@ -120,22 +136,48 @@ public SerializedPage serialize(PagesSerdeContext context, Page page)
}
}

public Page deserialize(SerializedPage serializedPage)
public static int getSerializedPagePositionCount(Slice serializedPage)
{
return serializedPage.getInt(0);
}

public static boolean isSerializedPageEncrypted(Slice serializedPage)
{
return getSerializedPageMarkerSet(serializedPage).contains(ENCRYPTED);
}

public static boolean isSerializedPageCompressed(Slice serializedPage)
{
return getSerializedPageMarkerSet(serializedPage).contains(COMPRESSED);
}

private static MarkerSet getSerializedPageMarkerSet(Slice serializedPage)
{
return MarkerSet.fromByteValue(serializedPage.getByte(Integer.BYTES));
}

public Page deserialize(Slice serializedPage)
{
try (PagesSerdeContext context = newContext()) {
return deserialize(context, serializedPage);
}
}

public Page deserialize(PagesSerdeContext context, SerializedPage serializedPage)
public Page deserialize(PagesSerdeContext context, Slice serializedPage)
{
checkArgument(serializedPage != null, "serializedPage is null");

Slice slice = serializedPage.getSlice();
SliceInput input = serializedPage.getInput();
int positionCount = input.readInt();
MarkerSet markers = MarkerSet.fromByteValue(input.readByte());
int uncompressedSize = input.readInt();
int compressedSize = input.readInt();
Slice slice = input.readSlice(compressedSize);

// This buffer *must not* be released at the end, since block decoding might create references to the buffer but
// *can* be released for reuse if used for decryption and later released after decompression
byte[] inUseTempBuffer = null;
if (serializedPage.isEncrypted()) {
if (markers.contains(ENCRYPTED)) {
checkState(spillCipher.isPresent(), "Page is encrypted, but spill cipher is missing");

byte[] decrypted = context.acquireBuffer(spillCipher.get().decryptedMaxLength(slice.length()));
Expand All @@ -150,10 +192,9 @@ public Page deserialize(PagesSerdeContext context, SerializedPage serializedPage
inUseTempBuffer = decrypted;
}

if (serializedPage.isCompressed()) {
if (markers.contains(COMPRESSED)) {
checkState(decompressor.isPresent(), "Page is compressed, but decompressor is missing");

int uncompressedSize = serializedPage.getUncompressedSizeInBytes();
byte[] decompressed = context.acquireBuffer(uncompressedSize);
checkState(decompressor.get().decompress(
slice.byteArray(),
Expand All @@ -170,7 +211,26 @@ public Page deserialize(PagesSerdeContext context, SerializedPage serializedPage
}
}

return readRawPage(serializedPage.getPositionCount(), slice.getInput(), blockEncodingSerde);
return readRawPage(positionCount, slice.getInput(), blockEncodingSerde);
}

public static Slice readSerializedPage(SliceInput input)
{
int positionCount = input.readInt();
byte marker = input.readByte();
int uncompressedSize = input.readInt();
int compressedSize = input.readInt();

SliceOutput output = Slices.allocate(SERIALIZED_PAGE_HEADER_SIZE + compressedSize).getOutput();
output.writeInt(positionCount);
output.writeByte(marker);
output.writeInt(uncompressedSize);
output.writeInt(compressedSize);

Slice result = output.getUnderlyingSlice();
input.readBytes(result, SERIALIZED_PAGE_HEADER_SIZE, compressedSize);

return result;
}

public static final class PagesSerdeContext
Expand Down
Loading