diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/ArbitraryOutputBuffer.java b/core/trino-main/src/main/java/io/trino/execution/buffer/ArbitraryOutputBuffer.java index 594ddc14db26..ef92c7d85903 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/ArbitraryOutputBuffer.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/ArbitraryOutputBuffer.java @@ -18,6 +18,7 @@ import com.google.common.collect.Sets; import com.google.common.collect.Sets.SetView; import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.slice.Slice; import io.airlift.units.DataSize; import io.trino.execution.StateMachine; import io.trino.execution.StateMachine.StateChangeListener; @@ -52,6 +53,7 @@ import static io.trino.execution.buffer.BufferState.OPEN; import static io.trino.execution.buffer.OutputBuffers.BufferType.ARBITRARY; import static io.trino.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers; +import static io.trino.execution.buffer.PagesSerde.getSerializedPagePositionCount; import static io.trino.execution.buffer.SerializedPageReference.dereferencePages; import static java.util.Objects.requireNonNull; @@ -207,7 +209,7 @@ public ListenableFuture isFull() } @Override - public void enqueue(List pages) + public void enqueue(List pages) { checkState(!Thread.holdsLock(this), "Cannot enqueue pages while holding a lock on this"); requireNonNull(pages, "pages is null"); @@ -221,11 +223,12 @@ public void enqueue(List pages) ImmutableList.Builder references = ImmutableList.builderWithExpectedSize(pages.size()); long bytesAdded = 0; long rowCount = 0; - for (SerializedPage page : pages) { - bytesAdded += page.getRetainedSizeInBytes(); - rowCount += page.getPositionCount(); + for (Slice page : pages) { + bytesAdded += page.getRetainedSize(); + int positionCount = getSerializedPagePositionCount(page); + rowCount += positionCount; // create page reference counts with an initial single reference - references.add(new SerializedPageReference(page, 1)); + references.add(new SerializedPageReference(page, positionCount, 1)); } List serializedPageReferences = references.build(); @@ -258,7 +261,7 @@ public void enqueue(List pages) } @Override - public void enqueue(int partition, List pages) + public void enqueue(int partition, List pages) { checkState(partition == 0, "Expected partition number to be zero"); enqueue(pages); diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/BroadcastOutputBuffer.java b/core/trino-main/src/main/java/io/trino/execution/buffer/BroadcastOutputBuffer.java index 15a07ad2bbe4..1c0f9e5329e0 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/BroadcastOutputBuffer.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/BroadcastOutputBuffer.java @@ -18,6 +18,7 @@ import com.google.common.collect.Sets; import com.google.common.collect.Sets.SetView; import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.slice.Slice; import io.airlift.units.DataSize; import io.trino.execution.StateMachine; import io.trino.execution.StateMachine.StateChangeListener; @@ -48,6 +49,7 @@ import static io.trino.execution.buffer.BufferState.NO_MORE_PAGES; import static io.trino.execution.buffer.BufferState.OPEN; import static io.trino.execution.buffer.OutputBuffers.BufferType.BROADCAST; +import static io.trino.execution.buffer.PagesSerde.getSerializedPagePositionCount; import static io.trino.execution.buffer.SerializedPageReference.dereferencePages; import static java.util.Objects.requireNonNull; @@ -198,7 +200,7 @@ public ListenableFuture isFull() } @Override - public void enqueue(List pages) + public void enqueue(List pages) { checkState(!Thread.holdsLock(this), "Cannot enqueue pages while holding a lock on this"); requireNonNull(pages, "pages is null"); @@ -212,11 +214,12 @@ public void enqueue(List pages) ImmutableList.Builder references = ImmutableList.builderWithExpectedSize(pages.size()); long bytesAdded = 0; long rowCount = 0; - for (SerializedPage page : pages) { - bytesAdded += page.getRetainedSizeInBytes(); - rowCount += page.getPositionCount(); + for (Slice page : pages) { + bytesAdded += page.getRetainedSize(); + int positionCount = getSerializedPagePositionCount(page); + rowCount += positionCount; // create page reference counts with an initial single reference - references.add(new SerializedPageReference(page, 1)); + references.add(new SerializedPageReference(page, positionCount, 1)); } List serializedPageReferences = references.build(); @@ -257,7 +260,7 @@ public void enqueue(List pages) } @Override - public void enqueue(int partitionNumber, List pages) + public void enqueue(int partitionNumber, List pages) { checkState(partitionNumber == 0, "Expected partition number to be zero"); enqueue(pages); diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/BufferResult.java b/core/trino-main/src/main/java/io/trino/execution/buffer/BufferResult.java index 451368ac16f3..9768dcd2f921 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/BufferResult.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/BufferResult.java @@ -14,6 +14,7 @@ package io.trino.execution.buffer; import com.google.common.collect.ImmutableList; +import io.airlift.slice.Slice; import java.util.List; import java.util.Objects; @@ -34,9 +35,9 @@ public static BufferResult emptyResults(String taskInstanceId, long token, boole private final long token; private final long nextToken; private final boolean bufferComplete; - private final List serializedPages; + private final List serializedPages; - public BufferResult(String taskInstanceId, long token, long nextToken, boolean bufferComplete, List serializedPages) + public BufferResult(String taskInstanceId, long token, long nextToken, boolean bufferComplete, List serializedPages) { checkArgument(!isNullOrEmpty(taskInstanceId), "taskInstanceId is null"); @@ -62,7 +63,7 @@ public boolean isBufferComplete() return bufferComplete; } - public List getSerializedPages() + public List getSerializedPages() { return serializedPages; } diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/ClientBuffer.java b/core/trino-main/src/main/java/io/trino/execution/buffer/ClientBuffer.java index 2ae1332338fe..a3f7e37e5b4a 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/ClientBuffer.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/ClientBuffer.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import io.airlift.slice.Slice; import io.airlift.units.DataSize; import io.trino.execution.buffer.OutputBuffers.OutputBufferId; import io.trino.execution.buffer.SerializedPageReference.PagesReleasedListener; @@ -357,7 +358,7 @@ private synchronized BufferResult processRead(long sequenceId, DataSize maxSize) // read the new pages long maxBytes = maxSize.toBytes(); - List result = new ArrayList<>(); + List result = new ArrayList<>(); long bytes = 0; for (SerializedPageReference page : pages) { diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/LazyOutputBuffer.java b/core/trino-main/src/main/java/io/trino/execution/buffer/LazyOutputBuffer.java index c24fb48a7b86..bf71642d4523 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/LazyOutputBuffer.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/LazyOutputBuffer.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ListenableFuture; import io.airlift.concurrent.ExtendedSettableFuture; +import io.airlift.slice.Slice; import io.airlift.units.DataSize; import io.trino.execution.StateMachine; import io.trino.execution.StateMachine.StateChangeListener; @@ -244,14 +245,14 @@ public ListenableFuture isFull() } @Override - public void enqueue(List pages) + public void enqueue(List pages) { OutputBuffer outputBuffer = getDelegateOutputBufferOrFail(); outputBuffer.enqueue(pages); } @Override - public void enqueue(int partition, List pages) + public void enqueue(int partition, List pages) { OutputBuffer outputBuffer = getDelegateOutputBufferOrFail(); outputBuffer.enqueue(partition, pages); diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/OutputBuffer.java b/core/trino-main/src/main/java/io/trino/execution/buffer/OutputBuffer.java index b501460798be..c49a13cdbfbf 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/OutputBuffer.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/OutputBuffer.java @@ -14,6 +14,7 @@ package io.trino.execution.buffer; import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.slice.Slice; import io.airlift.units.DataSize; import io.trino.execution.StateMachine.StateChangeListener; import io.trino.execution.buffer.OutputBuffers.OutputBufferId; @@ -85,13 +86,13 @@ public interface OutputBuffer * Adds a split-up page to an unpartitioned buffer. If no-more-pages has been set, the enqueue * page call is ignored. This can happen with limit queries. */ - void enqueue(List pages); + void enqueue(List pages); /** * Adds a split-up page to a specific partition. If no-more-pages has been set, the enqueue * page call is ignored. This can happen with limit queries. */ - void enqueue(int partition, List pages); + void enqueue(int partition, List pages); /** * Notify buffer that no more pages will be added. Any future calls to enqueue a diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/PageCodecMarker.java b/core/trino-main/src/main/java/io/trino/execution/buffer/PageCodecMarker.java index 176df5734333..972e0fd5ba9a 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/PageCodecMarker.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/PageCodecMarker.java @@ -20,10 +20,10 @@ import static com.google.common.base.Preconditions.checkArgument; /** - * Encodes boolean properties for {@link SerializedPage} by using a bitmasking strategy, allowing + * Encodes boolean properties for serialized page by using a bitmasking strategy, allowing * up to 8 such properties to be stored in a single byte */ -public enum PageCodecMarker +enum PageCodecMarker { COMPRESSED(1), ENCRYPTED(2); diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/PagesSerde.java b/core/trino-main/src/main/java/io/trino/execution/buffer/PagesSerde.java index 79c126e7e286..94f9b6868700 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/PagesSerde.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/PagesSerde.java @@ -17,6 +17,8 @@ import io.airlift.compress.Decompressor; import io.airlift.slice.DynamicSliceOutput; import io.airlift.slice.Slice; +import io.airlift.slice.SliceInput; +import io.airlift.slice.SliceOutput; import io.airlift.slice.Slices; import io.trino.execution.buffer.PageCodecMarker.MarkerSet; import io.trino.spi.Page; @@ -41,6 +43,13 @@ public class PagesSerde { private static final double MINIMUM_COMPRESSION_RATIO = 0.8; + private static final int SERIALIZED_PAGE_HEADER_SIZE = /*positionCount*/ Integer.BYTES + + // pageCodecMarkers + Byte.BYTES + + // uncompressedSizeInBytes + Integer.BYTES + + // sizeInBytes + Integer.BYTES; private final BlockEncodingSerde blockEncodingSerde; private final Optional compressor; @@ -61,7 +70,7 @@ public PagesSerdeContext newContext() return new PagesSerdeContext(); } - public SerializedPage serialize(PagesSerdeContext context, Page page) + public Slice serialize(PagesSerdeContext context, Page page) { DynamicSliceOutput serializationBuffer = context.acquireSliceOutput(toIntExact(page.getSizeInBytes() + Integer.BYTES)); // block length is an int byte[] inUseTempBuffer = null; @@ -109,8 +118,15 @@ public SerializedPage serialize(PagesSerdeContext context, Page page) } inUseTempBuffer = encrypted; } - // Resulting slice *must* be copied to ensure the shared buffers aren't referenced after method exit - return new SerializedPage(Slices.copyOf(slice), markers, page.getPositionCount(), uncompressedSize); + + SliceOutput output = Slices.allocate(SERIALIZED_PAGE_HEADER_SIZE + slice.length()).getOutput(); + output.writeInt(page.getPositionCount()); + output.writeByte(markers.byteValue()); + output.writeInt(uncompressedSize); + output.writeInt(slice.length()); + output.writeBytes(slice); + + return output.getUnderlyingSlice(); } finally { context.releaseSliceOutput(serializationBuffer); @@ -120,22 +136,48 @@ public SerializedPage serialize(PagesSerdeContext context, Page page) } } - public Page deserialize(SerializedPage serializedPage) + public static int getSerializedPagePositionCount(Slice serializedPage) + { + return serializedPage.getInt(0); + } + + public static boolean isSerializedPageEncrypted(Slice serializedPage) + { + return getSerializedPageMarkerSet(serializedPage).contains(ENCRYPTED); + } + + public static boolean isSerializedPageCompressed(Slice serializedPage) + { + return getSerializedPageMarkerSet(serializedPage).contains(COMPRESSED); + } + + private static MarkerSet getSerializedPageMarkerSet(Slice serializedPage) + { + return MarkerSet.fromByteValue(serializedPage.getByte(Integer.BYTES)); + } + + public Page deserialize(Slice serializedPage) { try (PagesSerdeContext context = newContext()) { return deserialize(context, serializedPage); } } - public Page deserialize(PagesSerdeContext context, SerializedPage serializedPage) + public Page deserialize(PagesSerdeContext context, Slice serializedPage) { checkArgument(serializedPage != null, "serializedPage is null"); - Slice slice = serializedPage.getSlice(); + SliceInput input = serializedPage.getInput(); + int positionCount = input.readInt(); + MarkerSet markers = MarkerSet.fromByteValue(input.readByte()); + int uncompressedSize = input.readInt(); + int compressedSize = input.readInt(); + Slice slice = input.readSlice(compressedSize); + // This buffer *must not* be released at the end, since block decoding might create references to the buffer but // *can* be released for reuse if used for decryption and later released after decompression byte[] inUseTempBuffer = null; - if (serializedPage.isEncrypted()) { + if (markers.contains(ENCRYPTED)) { checkState(spillCipher.isPresent(), "Page is encrypted, but spill cipher is missing"); byte[] decrypted = context.acquireBuffer(spillCipher.get().decryptedMaxLength(slice.length())); @@ -150,10 +192,9 @@ public Page deserialize(PagesSerdeContext context, SerializedPage serializedPage inUseTempBuffer = decrypted; } - if (serializedPage.isCompressed()) { + if (markers.contains(COMPRESSED)) { checkState(decompressor.isPresent(), "Page is compressed, but decompressor is missing"); - int uncompressedSize = serializedPage.getUncompressedSizeInBytes(); byte[] decompressed = context.acquireBuffer(uncompressedSize); checkState(decompressor.get().decompress( slice.byteArray(), @@ -170,7 +211,26 @@ public Page deserialize(PagesSerdeContext context, SerializedPage serializedPage } } - return readRawPage(serializedPage.getPositionCount(), slice.getInput(), blockEncodingSerde); + return readRawPage(positionCount, slice.getInput(), blockEncodingSerde); + } + + public static Slice readSerializedPage(SliceInput input) + { + int positionCount = input.readInt(); + byte marker = input.readByte(); + int uncompressedSize = input.readInt(); + int compressedSize = input.readInt(); + + SliceOutput output = Slices.allocate(SERIALIZED_PAGE_HEADER_SIZE + compressedSize).getOutput(); + output.writeInt(positionCount); + output.writeByte(marker); + output.writeInt(uncompressedSize); + output.writeInt(compressedSize); + + Slice result = output.getUnderlyingSlice(); + input.readBytes(result, SERIALIZED_PAGE_HEADER_SIZE, compressedSize); + + return result; } public static final class PagesSerdeContext diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/PagesSerdeUtil.java b/core/trino-main/src/main/java/io/trino/execution/buffer/PagesSerdeUtil.java index 1076211cc9e2..e40a27e68e64 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/PagesSerdeUtil.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/PagesSerdeUtil.java @@ -17,7 +17,6 @@ import io.airlift.slice.Slice; import io.airlift.slice.SliceInput; import io.airlift.slice.SliceOutput; -import io.airlift.slice.Slices; import io.airlift.slice.XxHash64; import io.trino.spi.Page; import io.trino.spi.block.Block; @@ -28,6 +27,7 @@ import static io.trino.block.BlockSerdeUtil.readBlock; import static io.trino.block.BlockSerdeUtil.writeBlock; +import static io.trino.execution.buffer.PagesSerde.readSerializedPage; import static java.util.Arrays.asList; import static java.util.Objects.requireNonNull; @@ -61,53 +61,11 @@ static Page readRawPage(int positionCount, SliceInput input, BlockEncodingSerde return new Page(positionCount, blocks); } - public static void writeSerializedPage(SliceOutput output, SerializedPage page) - { - // Every new field being written here must be added in updateChecksum() too. - output.writeInt(page.getPositionCount()); - output.writeByte(page.getPageCodecMarkers()); - output.writeInt(page.getUncompressedSizeInBytes()); - output.writeInt(page.getSizeInBytes()); - output.writeBytes(page.getSlice()); - } - - private static void updateChecksum(XxHash64 hash, SerializedPage page) - { - hash.update(Slices.wrappedIntArray( - page.getPositionCount(), - page.getPageCodecMarkers(), - page.getUncompressedSizeInBytes(), - page.getSizeInBytes())); - hash.update(page.getSlice()); - } - - private static SerializedPage readSerializedPage(SliceInput sliceInput) - { - int positionCount = sliceInput.readInt(); - PageCodecMarker.MarkerSet markers = PageCodecMarker.MarkerSet.fromByteValue(sliceInput.readByte()); - int uncompressedSizeInBytes = sliceInput.readInt(); - int sizeInBytes = sliceInput.readInt(); - Slice slice = sliceInput.readSlice(sizeInBytes); - return new SerializedPage(slice, markers, positionCount, uncompressedSizeInBytes); - } - - public static long writeSerializedPages(SliceOutput sliceOutput, Iterable pages) - { - Iterator pageIterator = pages.iterator(); - long size = 0; - while (pageIterator.hasNext()) { - SerializedPage page = pageIterator.next(); - writeSerializedPage(sliceOutput, page); - size += page.getSizeInBytes(); - } - return size; - } - - public static long calculateChecksum(List pages) + public static long calculateChecksum(List pages) { XxHash64 hash = new XxHash64(); - for (SerializedPage page : pages) { - updateChecksum(hash, page); + for (Slice page : pages) { + hash.update(page); } long checksum = hash.hash(); // Since NO_CHECKSUM is assigned a special meaning, it is not a valid checksum. @@ -128,7 +86,7 @@ public static long writePages(PagesSerde serde, SliceOutput sliceOutput, Iterato try (PagesSerde.PagesSerdeContext context = serde.newContext()) { while (pages.hasNext()) { Page page = pages.next(); - writeSerializedPage(sliceOutput, serde.serialize(context, page)); + sliceOutput.writeBytes(serde.serialize(context, page)); size += page.getSizeInBytes(); } } @@ -166,13 +124,13 @@ protected Page computeNext() } } - public static Iterator readSerializedPages(SliceInput sliceInput) + public static Iterator readSerializedPages(SliceInput sliceInput) { return new SerializedPageReader(sliceInput); } private static class SerializedPageReader - extends AbstractIterator + extends AbstractIterator { private final SliceInput input; @@ -182,7 +140,7 @@ private static class SerializedPageReader } @Override - protected SerializedPage computeNext() + protected Slice computeNext() { if (!input.isReadable()) { return endOfData(); diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/PartitionedOutputBuffer.java b/core/trino-main/src/main/java/io/trino/execution/buffer/PartitionedOutputBuffer.java index 989cfb346452..e307fb8582ac 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/PartitionedOutputBuffer.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/PartitionedOutputBuffer.java @@ -16,6 +16,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.slice.Slice; import io.airlift.units.DataSize; import io.trino.execution.StateMachine; import io.trino.execution.StateMachine.StateChangeListener; @@ -37,6 +38,7 @@ import static io.trino.execution.buffer.BufferState.NO_MORE_PAGES; import static io.trino.execution.buffer.BufferState.OPEN; import static io.trino.execution.buffer.OutputBuffers.BufferType.PARTITIONED; +import static io.trino.execution.buffer.PagesSerde.getSerializedPagePositionCount; import static io.trino.execution.buffer.SerializedPageReference.dereferencePages; import static java.util.Objects.requireNonNull; @@ -161,14 +163,14 @@ public ListenableFuture isFull() } @Override - public void enqueue(List pages) + public void enqueue(List pages) { checkState(partitions.size() == 1, "Expected exactly one partition"); enqueue(0, pages); } @Override - public void enqueue(int partitionNumber, List pages) + public void enqueue(int partitionNumber, List pages) { requireNonNull(pages, "pages is null"); @@ -181,11 +183,12 @@ public void enqueue(int partitionNumber, List pages) ImmutableList.Builder references = ImmutableList.builderWithExpectedSize(pages.size()); long bytesAdded = 0; long rowCount = 0; - for (SerializedPage page : pages) { - bytesAdded += page.getRetainedSizeInBytes(); - rowCount += page.getPositionCount(); + for (Slice page : pages) { + bytesAdded += page.getRetainedSize(); + int positionCount = getSerializedPagePositionCount(page); + rowCount += positionCount; // create page reference counts with an initial single reference - references.add(new SerializedPageReference(page, 1)); + references.add(new SerializedPageReference(page, positionCount, 1)); } List serializedPageReferences = references.build(); diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/SerializedPage.java b/core/trino-main/src/main/java/io/trino/execution/buffer/SerializedPage.java deleted file mode 100644 index e9ee100d8e9c..000000000000 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/SerializedPage.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.execution.buffer; - -import io.airlift.slice.Slice; -import org.openjdk.jol.info.ClassLayout; - -import static com.google.common.base.MoreObjects.toStringHelper; -import static com.google.common.base.Preconditions.checkArgument; -import static io.trino.execution.buffer.PageCodecMarker.COMPRESSED; -import static io.trino.execution.buffer.PageCodecMarker.ENCRYPTED; -import static java.util.Objects.requireNonNull; - -public class SerializedPage -{ - private static final int INSTANCE_SIZE = ClassLayout.parseClass(SerializedPage.class).instanceSize(); - - private final Slice slice; - private final int positionCount; - private final int uncompressedSizeInBytes; - private final byte pageCodecMarkers; - - public SerializedPage(Slice slice, PageCodecMarker.MarkerSet markers, int positionCount, int uncompressedSizeInBytes) - { - this.slice = requireNonNull(slice, "slice is null"); - this.positionCount = positionCount; - checkArgument(uncompressedSizeInBytes >= 0, "uncompressedSizeInBytes is negative"); - this.uncompressedSizeInBytes = uncompressedSizeInBytes; - this.pageCodecMarkers = requireNonNull(markers, "markers is null").byteValue(); - // Encrypted pages may include arbitrary overhead from ciphers, sanity checks skipped - if (!markers.contains(ENCRYPTED)) { - if (markers.contains(COMPRESSED)) { - checkArgument(uncompressedSizeInBytes > slice.length(), "compressed size must be smaller than uncompressed size when compressed"); - } - else { - checkArgument(uncompressedSizeInBytes == slice.length(), "uncompressed size must be equal to slice length when uncompressed"); - } - } - } - - public int getSizeInBytes() - { - return slice.length(); - } - - public int getUncompressedSizeInBytes() - { - return uncompressedSizeInBytes; - } - - public long getRetainedSizeInBytes() - { - return INSTANCE_SIZE + slice.getRetainedSize(); - } - - public int getPositionCount() - { - return positionCount; - } - - public Slice getSlice() - { - return slice; - } - - public byte getPageCodecMarkers() - { - return pageCodecMarkers; - } - - public boolean isCompressed() - { - return COMPRESSED.isSet(pageCodecMarkers); - } - - public boolean isEncrypted() - { - return ENCRYPTED.isSet(pageCodecMarkers); - } - - @Override - public String toString() - { - return toStringHelper(this) - .add("positionCount", positionCount) - .add("pageCodecMarkers", PageCodecMarker.toSummaryString(pageCodecMarkers)) - .add("sizeInBytes", slice.length()) - .add("uncompressedSizeInBytes", uncompressedSizeInBytes) - .toString(); - } -} diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/SerializedPageReference.java b/core/trino-main/src/main/java/io/trino/execution/buffer/SerializedPageReference.java index 068c9ebca96f..51d7cc939f31 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/SerializedPageReference.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/SerializedPageReference.java @@ -13,6 +13,8 @@ */ package io.trino.execution.buffer; +import io.airlift.slice.Slice; + import javax.annotation.concurrent.ThreadSafe; import java.util.List; @@ -28,13 +30,15 @@ final class SerializedPageReference { private static final AtomicIntegerFieldUpdater REFERENCE_COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(SerializedPageReference.class, "referenceCount"); - private final SerializedPage serializedPage; + private final Slice serializedPage; + private final int positionCount; private volatile int referenceCount; - public SerializedPageReference(SerializedPage serializedPage, int referenceCount) + public SerializedPageReference(Slice serializedPage, int positionCount, int referenceCount) { this.serializedPage = requireNonNull(serializedPage, "serializedPage is null"); checkArgument(referenceCount > 0, "referenceCount must be at least 1"); + this.positionCount = positionCount; this.referenceCount = referenceCount; } @@ -44,19 +48,19 @@ public void addReference() checkState(oldReferences > 0, "Page has already been dereferenced"); } - public SerializedPage getSerializedPage() + public Slice getSerializedPage() { return serializedPage; } public int getPositionCount() { - return serializedPage.getPositionCount(); + return positionCount; } public long getRetainedSizeInBytes() { - return serializedPage.getRetainedSizeInBytes(); + return serializedPage.getRetainedSize(); } private boolean dereferencePage() diff --git a/core/trino-main/src/main/java/io/trino/operator/DeduplicationExchangeClientBuffer.java b/core/trino-main/src/main/java/io/trino/operator/DeduplicationExchangeClientBuffer.java index 116bfc63b4f0..87a7ad9f8420 100644 --- a/core/trino-main/src/main/java/io/trino/operator/DeduplicationExchangeClientBuffer.java +++ b/core/trino-main/src/main/java/io/trino/operator/DeduplicationExchangeClientBuffer.java @@ -17,9 +17,9 @@ import com.google.common.collect.ListMultimap; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import io.airlift.slice.Slice; import io.airlift.units.DataSize; import io.trino.execution.TaskId; -import io.trino.execution.buffer.SerializedPage; import io.trino.spi.TrinoException; import javax.annotation.concurrent.GuardedBy; @@ -66,9 +66,9 @@ public class DeduplicationExchangeClientBuffer private Throwable failure; @GuardedBy("this") - private final ListMultimap pageBuffer = LinkedListMultimap.create(); + private final ListMultimap pageBuffer = LinkedListMultimap.create(); @GuardedBy("this") - private Iterator pagesIterator; + private Iterator pagesIterator; @GuardedBy("this") private volatile long bufferRetainedSizeInBytes; @GuardedBy("this") @@ -95,7 +95,7 @@ public ListenableFuture isBlocked() } @Override - public synchronized SerializedPage pollPage() + public synchronized Slice pollPage() { throwIfFailed(); @@ -115,9 +115,9 @@ public synchronized SerializedPage pollPage() return null; } - SerializedPage page = pagesIterator.next(); + Slice page = pagesIterator.next(); pagesIterator.remove(); - bufferRetainedSizeInBytes -= page.getRetainedSizeInBytes(); + bufferRetainedSizeInBytes -= page.getRetainedSize(); return page; } @@ -142,7 +142,7 @@ public synchronized void addTask(TaskId taskId) } @Override - public synchronized void addPages(TaskId taskId, List pages) + public synchronized void addPages(TaskId taskId, List pages) { if (closed) { return; @@ -161,8 +161,8 @@ public synchronized void addPages(TaskId taskId, List pages) } long pagesRetainedSizeInBytes = 0; - for (SerializedPage page : pages) { - pagesRetainedSizeInBytes += page.getRetainedSizeInBytes(); + for (Slice page : pages) { + pagesRetainedSizeInBytes += page.getRetainedSize(); } bufferRetainedSizeInBytes += pagesRetainedSizeInBytes; if (bufferRetainedSizeInBytes > bufferCapacityInBytes) { @@ -286,11 +286,11 @@ private synchronized void removePagesForPreviousAttempts(int currentAttemptId) { // wipe previous attempt pages long pagesRetainedSizeInBytes = 0; - Iterator> iterator = pageBuffer.entries().iterator(); + Iterator> iterator = pageBuffer.entries().iterator(); while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); + Map.Entry entry = iterator.next(); if (entry.getKey().getAttemptId() < currentAttemptId) { - pagesRetainedSizeInBytes += entry.getValue().getRetainedSizeInBytes(); + pagesRetainedSizeInBytes += entry.getValue().getRetainedSize(); iterator.remove(); } } diff --git a/core/trino-main/src/main/java/io/trino/operator/ExchangeClient.java b/core/trino-main/src/main/java/io/trino/operator/ExchangeClient.java index bffcc3c103f9..028f2afe841e 100644 --- a/core/trino-main/src/main/java/io/trino/operator/ExchangeClient.java +++ b/core/trino-main/src/main/java/io/trino/operator/ExchangeClient.java @@ -16,12 +16,12 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; import io.airlift.http.client.HttpClient; +import io.airlift.slice.Slice; import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.FeaturesConfig.DataIntegrityVerification; import io.trino.execution.TaskFailureListener; import io.trino.execution.TaskId; -import io.trino.execution.buffer.SerializedPage; import io.trino.memory.context.LocalMemoryContext; import io.trino.operator.HttpPageBufferClient.ClientCallback; import io.trino.operator.WorkProcessor.ProcessState; @@ -175,10 +175,10 @@ public synchronized void noMoreLocations() scheduleRequestIfNecessary(); } - public WorkProcessor pages() + public WorkProcessor pages() { return WorkProcessor.create(() -> { - SerializedPage page = pollPage(); + Slice page = pollPage(); if (page == null) { if (isFinished()) { return ProcessState.finished(); @@ -203,7 +203,7 @@ private void assertNotHoldsLock() } @Nullable - public SerializedPage pollPage() + public Slice pollPage() { assertNotHoldsLock(); @@ -211,7 +211,7 @@ public SerializedPage pollPage() return null; } - SerializedPage page = buffer.pollPage(); + Slice page = buffer.pollPage(); if (page == null) { return null; @@ -274,13 +274,13 @@ public ListenableFuture isBlocked() return buffer.isBlocked(); } - private boolean addPages(HttpPageBufferClient client, List pages) + private boolean addPages(HttpPageBufferClient client, List pages) { checkState(!completedClients.contains(client), "client is already marked as completed"); // Compute stats before acquiring the lock long responseSize = 0; - for (SerializedPage page : pages) { - responseSize += page.getSizeInBytes(); + for (Slice page : pages) { + responseSize += page.length(); } synchronized (this) { @@ -334,7 +334,7 @@ private class ExchangeClientCallback implements ClientCallback { @Override - public boolean addPages(HttpPageBufferClient client, List pages) + public boolean addPages(HttpPageBufferClient client, List pages) { requireNonNull(client, "client is null"); requireNonNull(pages, "pages is null"); diff --git a/core/trino-main/src/main/java/io/trino/operator/ExchangeClientBuffer.java b/core/trino-main/src/main/java/io/trino/operator/ExchangeClientBuffer.java index 7181e7dfe145..c4985b23d4e3 100644 --- a/core/trino-main/src/main/java/io/trino/operator/ExchangeClientBuffer.java +++ b/core/trino-main/src/main/java/io/trino/operator/ExchangeClientBuffer.java @@ -14,8 +14,8 @@ package io.trino.operator; import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.slice.Slice; import io.trino.execution.TaskId; -import io.trino.execution.buffer.SerializedPage; import java.io.Closeable; import java.util.List; @@ -30,11 +30,11 @@ public interface ExchangeClientBuffer */ ListenableFuture isBlocked(); - SerializedPage pollPage(); + Slice pollPage(); void addTask(TaskId taskId); - void addPages(TaskId taskId, List pages); + void addPages(TaskId taskId, List pages); void taskFinished(TaskId taskId); diff --git a/core/trino-main/src/main/java/io/trino/operator/ExchangeOperator.java b/core/trino-main/src/main/java/io/trino/operator/ExchangeOperator.java index 33987fcca719..5370a5ceedda 100644 --- a/core/trino-main/src/main/java/io/trino/operator/ExchangeOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/ExchangeOperator.java @@ -14,10 +14,10 @@ package io.trino.operator; import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.slice.Slice; import io.trino.connector.CatalogName; import io.trino.execution.buffer.PagesSerde; import io.trino.execution.buffer.PagesSerdeFactory; -import io.trino.execution.buffer.SerializedPage; import io.trino.metadata.Split; import io.trino.spi.Page; import io.trino.spi.connector.UpdatablePageSource; @@ -182,15 +182,14 @@ public void addInput(Page page) @Override public Page getOutput() { - SerializedPage page = exchangeClient.pollPage(); + Slice page = exchangeClient.pollPage(); if (page == null) { return null; } - operatorContext.recordNetworkInput(page.getSizeInBytes(), page.getPositionCount()); - Page deserializedPage = serde.deserialize(page); - operatorContext.recordProcessedInput(deserializedPage.getSizeInBytes(), page.getPositionCount()); + operatorContext.recordNetworkInput(page.length(), deserializedPage.getPositionCount()); + operatorContext.recordProcessedInput(deserializedPage.getSizeInBytes(), deserializedPage.getPositionCount()); return deserializedPage; } diff --git a/core/trino-main/src/main/java/io/trino/operator/HttpPageBufferClient.java b/core/trino-main/src/main/java/io/trino/operator/HttpPageBufferClient.java index 34cdcafa324c..1429c5649a20 100644 --- a/core/trino-main/src/main/java/io/trino/operator/HttpPageBufferClient.java +++ b/core/trino-main/src/main/java/io/trino/operator/HttpPageBufferClient.java @@ -28,12 +28,13 @@ import io.airlift.http.client.ResponseTooLargeException; import io.airlift.log.Logger; import io.airlift.slice.InputStreamSliceInput; +import io.airlift.slice.Slice; import io.airlift.slice.SliceInput; import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.FeaturesConfig.DataIntegrityVerification; import io.trino.execution.TaskId; -import io.trino.execution.buffer.SerializedPage; +import io.trino.execution.buffer.PagesSerde; import io.trino.server.remotetask.Backoff; import io.trino.spi.TrinoException; import io.trino.spi.TrinoTransportException; @@ -110,7 +111,7 @@ public final class HttpPageBufferClient */ public interface ClientCallback { - boolean addPages(HttpPageBufferClient client, List pages); + boolean addPages(HttpPageBufferClient client, List pages); void requestComplete(HttpPageBufferClient client); @@ -352,7 +353,7 @@ public void onSuccess(PagesResponse result) backoff.success(); - List pages; + List pages; try { if (result.isTaskFailed()) { throw new TrinoException(REMOTE_TASK_FAILED, format("Remote task failed: %s", remoteTaskId)); @@ -414,11 +415,11 @@ public Void handle(Request request, Response response) // frequency or buffer size. if (clientCallback.addPages(HttpPageBufferClient.this, pages)) { pagesReceived.addAndGet(pages.size()); - rowsReceived.addAndGet(pages.stream().mapToLong(SerializedPage::getPositionCount).sum()); + rowsReceived.addAndGet(pages.stream().mapToLong(PagesSerde::getSerializedPagePositionCount).sum()); } else { pagesRejected.addAndGet(pages.size()); - rowsRejected.addAndGet(pages.stream().mapToLong(SerializedPage::getPositionCount).sum()); + rowsRejected.addAndGet(pages.stream().mapToLong(PagesSerde::getSerializedPagePositionCount).sum()); } } catch (TrinoException e) { @@ -682,7 +683,7 @@ public PagesResponse handle(Request request, Response response) } long checksum = input.readLong(); int pagesCount = input.readInt(); - List pages = ImmutableList.copyOf(readSerializedPages(input)); + List pages = ImmutableList.copyOf(readSerializedPages(input)); verifyChecksum(checksum, pages); checkState(pages.size() == pagesCount, "Wrong number of pages, expected %s, but read %s", pagesCount, pages.size()); return createPagesResponse(taskInstanceId, token, nextToken, pages, complete, remoteTaskFailed); @@ -696,7 +697,7 @@ public PagesResponse handle(Request request, Response response) } } - private void verifyChecksum(long readChecksum, List pages) + private void verifyChecksum(long readChecksum, List pages) { if (dataIntegrityVerificationEnabled) { long calculatedChecksum = calculateChecksum(pages); @@ -769,7 +770,7 @@ private static boolean mediaTypeMatches(String value, MediaType range) public static class PagesResponse { - public static PagesResponse createPagesResponse(String taskInstanceId, long token, long nextToken, Iterable pages, boolean complete, boolean taskFailed) + public static PagesResponse createPagesResponse(String taskInstanceId, long token, long nextToken, Iterable pages, boolean complete, boolean taskFailed) { return new PagesResponse(taskInstanceId, token, nextToken, pages, complete, taskFailed); } @@ -782,11 +783,11 @@ public static PagesResponse createEmptyPagesResponse(String taskInstanceId, long private final String taskInstanceId; private final long token; private final long nextToken; - private final List pages; + private final List pages; private final boolean clientComplete; private final boolean taskFailed; - private PagesResponse(String taskInstanceId, long token, long nextToken, Iterable pages, boolean clientComplete, boolean taskFailed) + private PagesResponse(String taskInstanceId, long token, long nextToken, Iterable pages, boolean clientComplete, boolean taskFailed) { this.taskInstanceId = taskInstanceId; this.token = token; @@ -806,7 +807,7 @@ public long getNextToken() return nextToken; } - public List getPages() + public List getPages() { return pages; } diff --git a/core/trino-main/src/main/java/io/trino/operator/MergeOperator.java b/core/trino-main/src/main/java/io/trino/operator/MergeOperator.java index 46e1b2edbd2e..91e8cd7740f3 100644 --- a/core/trino-main/src/main/java/io/trino/operator/MergeOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/MergeOperator.java @@ -165,8 +165,9 @@ public Supplier> addSplit(Split split) exchangeClient.noMoreLocations(); pageProducers.add(exchangeClient.pages() .map(serializedPage -> { - operatorContext.recordNetworkInput(serializedPage.getSizeInBytes(), serializedPage.getPositionCount()); - return pagesSerde.deserialize(serializedPage); + Page page = pagesSerde.deserialize(serializedPage); + operatorContext.recordNetworkInput(serializedPage.length(), page.getPositionCount()); + return page; })); return Optional::empty; diff --git a/core/trino-main/src/main/java/io/trino/operator/StreamingExchangeClientBuffer.java b/core/trino-main/src/main/java/io/trino/operator/StreamingExchangeClientBuffer.java index 61a0b315f0a7..a75def8e3440 100644 --- a/core/trino-main/src/main/java/io/trino/operator/StreamingExchangeClientBuffer.java +++ b/core/trino-main/src/main/java/io/trino/operator/StreamingExchangeClientBuffer.java @@ -15,9 +15,9 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import io.airlift.slice.Slice; import io.airlift.units.DataSize; import io.trino.execution.TaskId; -import io.trino.execution.buffer.SerializedPage; import io.trino.spi.TrinoException; import javax.annotation.concurrent.GuardedBy; @@ -43,7 +43,7 @@ public class StreamingExchangeClientBuffer private final long bufferCapacityInBytes; @GuardedBy("this") - private final Queue bufferedPages = new ArrayDeque<>(); + private final Queue bufferedPages = new ArrayDeque<>(); @GuardedBy("this") private volatile long bufferRetainedSizeInBytes; @GuardedBy("this") @@ -72,16 +72,16 @@ public ListenableFuture isBlocked() } @Override - public synchronized SerializedPage pollPage() + public synchronized Slice pollPage() { throwIfFailed(); if (closed) { return null; } - SerializedPage page = bufferedPages.poll(); + Slice page = bufferedPages.poll(); if (page != null) { - bufferRetainedSizeInBytes -= page.getRetainedSizeInBytes(); + bufferRetainedSizeInBytes -= page.getRetainedSize(); checkState(bufferRetainedSizeInBytes >= 0, "unexpected bufferRetainedSizeInBytes: %s", bufferRetainedSizeInBytes); } // if buffer is empty block future calls @@ -102,11 +102,11 @@ public synchronized void addTask(TaskId taskId) } @Override - public void addPages(TaskId taskId, List pages) + public void addPages(TaskId taskId, List pages) { long pagesRetainedSizeInBytes = 0; - for (SerializedPage page : pages) { - pagesRetainedSizeInBytes += page.getRetainedSizeInBytes(); + for (Slice page : pages) { + pagesRetainedSizeInBytes += page.getRetainedSize(); } synchronized (this) { if (closed) { diff --git a/core/trino-main/src/main/java/io/trino/operator/output/DefaultPagePartitioner.java b/core/trino-main/src/main/java/io/trino/operator/output/DefaultPagePartitioner.java index 1bcfca98cc66..465c15e932dc 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/DefaultPagePartitioner.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/DefaultPagePartitioner.java @@ -16,11 +16,11 @@ import com.google.common.collect.ImmutableList; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.slice.Slice; import io.airlift.units.DataSize; import io.trino.execution.buffer.OutputBuffer; import io.trino.execution.buffer.PagesSerde; import io.trino.execution.buffer.PagesSerdeFactory; -import io.trino.execution.buffer.SerializedPage; import io.trino.operator.OperatorContext; import io.trino.operator.PartitionFunction; import io.trino.operator.output.PartitionedOutputOperator.PartitionedOutputInfo; @@ -259,10 +259,10 @@ public void flush(boolean force) } } - private List splitAndSerializePage(PagesSerde.PagesSerdeContext context, Page page) + private List splitAndSerializePage(PagesSerde.PagesSerdeContext context, Page page) { List split = splitPage(page, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); - ImmutableList.Builder builder = ImmutableList.builderWithExpectedSize(split.size()); + ImmutableList.Builder builder = ImmutableList.builderWithExpectedSize(split.size()); for (Page p : split) { builder.add(serde.serialize(context, p)); } diff --git a/core/trino-main/src/main/java/io/trino/operator/output/TaskOutputOperator.java b/core/trino-main/src/main/java/io/trino/operator/output/TaskOutputOperator.java index 86f4fbcc6154..3ca7b69e1783 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/TaskOutputOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/TaskOutputOperator.java @@ -15,10 +15,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.slice.Slice; import io.trino.execution.buffer.OutputBuffer; import io.trino.execution.buffer.PagesSerde; import io.trino.execution.buffer.PagesSerdeFactory; -import io.trino.execution.buffer.SerializedPage; import io.trino.operator.DriverContext; import io.trino.operator.Operator; import io.trino.operator.OperatorContext; @@ -158,10 +158,10 @@ public void addInput(Page page) operatorContext.recordOutput(page.getSizeInBytes(), page.getPositionCount()); } - private List splitAndSerializePage(Page page) + private List splitAndSerializePage(Page page) { List split = splitPage(page, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); - ImmutableList.Builder builder = ImmutableList.builderWithExpectedSize(split.size()); + ImmutableList.Builder builder = ImmutableList.builderWithExpectedSize(split.size()); try (PagesSerde.PagesSerdeContext context = serde.newContext()) { for (Page p : split) { builder.add(serde.serialize(context, p)); diff --git a/core/trino-main/src/main/java/io/trino/server/PagesResponseWriter.java b/core/trino-main/src/main/java/io/trino/server/PagesResponseWriter.java index c628a3bd99b7..b03824fae5c0 100644 --- a/core/trino-main/src/main/java/io/trino/server/PagesResponseWriter.java +++ b/core/trino-main/src/main/java/io/trino/server/PagesResponseWriter.java @@ -15,10 +15,10 @@ import com.google.common.reflect.TypeToken; import io.airlift.slice.OutputStreamSliceOutput; +import io.airlift.slice.Slice; import io.airlift.slice.SliceOutput; import io.trino.FeaturesConfig; import io.trino.FeaturesConfig.DataIntegrityVerification; -import io.trino.execution.buffer.SerializedPage; import javax.inject.Inject; import javax.ws.rs.Produces; @@ -39,13 +39,12 @@ import static io.trino.TrinoMediaTypes.TRINO_PAGES; import static io.trino.execution.buffer.PagesSerdeUtil.NO_CHECKSUM; import static io.trino.execution.buffer.PagesSerdeUtil.calculateChecksum; -import static io.trino.execution.buffer.PagesSerdeUtil.writeSerializedPages; import static java.util.Objects.requireNonNull; @Provider @Produces(TRINO_PAGES) public class PagesResponseWriter - implements MessageBodyWriter> + implements MessageBodyWriter> { public static final int SERIALIZED_PAGES_MAGIC = 0xfea4f001; @@ -74,18 +73,19 @@ public PagesResponseWriter(FeaturesConfig featuresConfig) public boolean isWriteable(Class type, Type genericType, Annotation[] annotations, MediaType mediaType) { return List.class.isAssignableFrom(type) && - TypeToken.of(genericType).resolveType(LIST_GENERIC_TOKEN).getRawType().equals(SerializedPage.class) && + TypeToken.of(genericType).resolveType(LIST_GENERIC_TOKEN).getRawType().equals(Slice.class) && mediaType.isCompatible(TRINO_PAGES_TYPE); } @Override - public long getSize(List serializedPages, Class type, Type genericType, Annotation[] annotations, MediaType mediaType) + public long getSize(List serializedPages, Class type, Type genericType, Annotation[] annotations, MediaType mediaType) { return -1; } @Override - public void writeTo(List serializedPages, + public void writeTo( + List serializedPages, Class type, Type genericType, Annotation[] annotations, @@ -99,7 +99,9 @@ public void writeTo(List serializedPages, sliceOutput.writeInt(SERIALIZED_PAGES_MAGIC); sliceOutput.writeLong(dataIntegrityVerificationEnabled ? calculateChecksum(serializedPages) : NO_CHECKSUM); sliceOutput.writeInt(serializedPages.size()); - writeSerializedPages(sliceOutput, serializedPages); + for (Slice page : serializedPages) { + sliceOutput.writeBytes(page); + } // We use flush instead of close, because the underlying stream would be closed and that is not allowed. sliceOutput.flush(); } diff --git a/core/trino-main/src/main/java/io/trino/server/TaskResource.java b/core/trino-main/src/main/java/io/trino/server/TaskResource.java index c932bf35876b..2c779a993318 100644 --- a/core/trino-main/src/main/java/io/trino/server/TaskResource.java +++ b/core/trino-main/src/main/java/io/trino/server/TaskResource.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.ListenableFuture; import io.airlift.concurrent.BoundedExecutor; import io.airlift.log.Logger; +import io.airlift.slice.Slice; import io.airlift.stats.TimeStat; import io.airlift.units.DataSize; import io.airlift.units.Duration; @@ -32,7 +33,6 @@ import io.trino.execution.TaskStatus; import io.trino.execution.buffer.BufferResult; import io.trino.execution.buffer.OutputBuffers.OutputBufferId; -import io.trino.execution.buffer.SerializedPage; import io.trino.metadata.SessionPropertyManager; import io.trino.server.security.ResourceSecurity; import org.weakref.jmx.Managed; @@ -320,7 +320,7 @@ public void getResults( timeoutExecutor); ListenableFuture responseFuture = Futures.transform(bufferResultFuture, result -> { - List serializedPages = result.getSerializedPages(); + List serializedPages = result.getSerializedPages(); GenericEntity entity = null; Status status; @@ -328,7 +328,7 @@ public void getResults( status = Status.NO_CONTENT; } else { - entity = new GenericEntity<>(serializedPages, new TypeToken>() {}.getType()); + entity = new GenericEntity<>(serializedPages, new TypeToken>() {}.getType()); status = Status.OK; } diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/Query.java b/core/trino-main/src/main/java/io/trino/server/protocol/Query.java index 9e6c53d70a96..0fe5c004d128 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/Query.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/Query.java @@ -20,6 +20,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import io.airlift.log.Logger; +import io.airlift.slice.Slice; import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.Session; @@ -47,7 +48,6 @@ import io.trino.execution.TaskInfo; import io.trino.execution.buffer.PagesSerde; import io.trino.execution.buffer.PagesSerdeFactory; -import io.trino.execution.buffer.SerializedPage; import io.trino.memory.context.SimpleLocalMemoryContext; import io.trino.operator.ExchangeClient; import io.trino.operator.ExchangeClientSupplier; @@ -523,7 +523,7 @@ private synchronized QueryResultRows removePagesFromExchange(QueryInfo queryInfo try (PagesSerde.PagesSerdeContext context = serde.newContext()) { long bytes = 0; while (bytes < targetResultBytes) { - SerializedPage serializedPage = exchangeClient.pollPage(); + Slice serializedPage = exchangeClient.pollPage(); if (serializedPage == null) { break; } diff --git a/core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpiller.java b/core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpiller.java index 263cf897d8b8..31106cee5787 100644 --- a/core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpiller.java +++ b/core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpiller.java @@ -22,10 +22,10 @@ import com.google.common.util.concurrent.ListeningExecutorService; import io.airlift.slice.InputStreamSliceInput; import io.airlift.slice.OutputStreamSliceOutput; +import io.airlift.slice.Slice; import io.airlift.slice.SliceOutput; import io.trino.execution.buffer.PagesSerde; import io.trino.execution.buffer.PagesSerdeUtil; -import io.trino.execution.buffer.SerializedPage; import io.trino.memory.context.LocalMemoryContext; import io.trino.operator.SpillContext; import io.trino.spi.Page; @@ -45,7 +45,6 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.util.concurrent.Futures.immediateVoidFuture; -import static io.trino.execution.buffer.PagesSerdeUtil.writeSerializedPage; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.trino.spiller.FileSingleStreamSpillerFactory.SPILL_FILE_PREFIX; import static io.trino.spiller.FileSingleStreamSpillerFactory.SPILL_FILE_SUFFIX; @@ -149,11 +148,11 @@ private void writePages(Iterator pageIterator) while (pageIterator.hasNext()) { Page page = pageIterator.next(); spilledPagesInMemorySize += page.getSizeInBytes(); - SerializedPage serializedPage = serde.serialize(context, page); - long pageSize = serializedPage.getSizeInBytes(); + Slice serializedPage = serde.serialize(context, page); + long pageSize = serializedPage.length(); localSpillContext.updateBytes(pageSize); spillerStats.addToTotalSpilledBytes(pageSize); - writeSerializedPage(output, serializedPage); + output.writeBytes(serializedPage); } } catch (UncheckedIOException | IOException e) { diff --git a/core/trino-main/src/test/java/io/trino/execution/TestSqlTask.java b/core/trino-main/src/test/java/io/trino/execution/TestSqlTask.java index 8be142f4bb00..0d2d859bc4f0 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestSqlTask.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestSqlTask.java @@ -60,6 +60,7 @@ import static io.trino.execution.TaskTestUtils.updateTask; import static io.trino.execution.buffer.OutputBuffers.BufferType.PARTITIONED; import static io.trino.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers; +import static io.trino.execution.buffer.PagesSerde.getSerializedPagePositionCount; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.testing.TestingSession.testSessionBuilder; import static java.util.concurrent.Executors.newScheduledThreadPool; @@ -163,7 +164,7 @@ public void testSimpleQuery() BufferResult results = sqlTask.getTaskResults(OUT, 0, DataSize.of(1, MEGABYTE)).get(); assertFalse(results.isBufferComplete()); assertEquals(results.getSerializedPages().size(), 1); - assertEquals(results.getSerializedPages().get(0).getPositionCount(), 1); + assertEquals(getSerializedPagePositionCount(results.getSerializedPages().get(0)), 1); for (boolean moreResults = true; moreResults; moreResults = !results.isBufferComplete()) { results = sqlTask.getTaskResults(OUT, results.getToken() + results.getSerializedPages().size(), DataSize.of(1, MEGABYTE)).get(); diff --git a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java index ce6351779c2f..cf8313b676dd 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import io.airlift.slice.Slice; import io.airlift.stats.TestingGcMonitor; import io.airlift.units.DataSize; import io.airlift.units.Duration; @@ -31,7 +32,6 @@ import io.trino.execution.buffer.OutputBuffers.OutputBufferId; import io.trino.execution.buffer.PagesSerdeFactory; import io.trino.execution.buffer.PartitionedOutputBuffer; -import io.trino.execution.buffer.SerializedPage; import io.trino.execution.executor.TaskExecutor; import io.trino.memory.MemoryPool; import io.trino.memory.QueryContext; @@ -99,6 +99,7 @@ import static io.trino.execution.buffer.BufferState.TERMINAL_BUFFER_STATES; import static io.trino.execution.buffer.OutputBuffers.BufferType.PARTITIONED; import static io.trino.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers; +import static io.trino.execution.buffer.PagesSerde.getSerializedPagePositionCount; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static io.trino.operator.PipelineExecutionStrategy.GROUPED_EXECUTION; import static io.trino.operator.PipelineExecutionStrategy.UNGROUPED_EXECUTION; @@ -661,8 +662,8 @@ public void consume(int positions, Duration timeout) assertFalse(bufferComplete, "bufferComplete is set before enough positions are consumed"); BufferResult results = outputBuffer.get(outputBufferId, sequenceId, DataSize.of(1, MEGABYTE)).get(nanoUntil - System.nanoTime(), TimeUnit.NANOSECONDS); bufferComplete = results.isBufferComplete(); - for (SerializedPage serializedPage : results.getSerializedPages()) { - surplusPositions += serializedPage.getPositionCount(); + for (Slice serializedPage : results.getSerializedPages()) { + surplusPositions += getSerializedPagePositionCount(serializedPage); } sequenceId += results.getSerializedPages().size(); } @@ -676,8 +677,8 @@ public void assertBufferComplete(Duration timeout) while (!bufferComplete) { BufferResult results = outputBuffer.get(outputBufferId, sequenceId, DataSize.of(1, MEGABYTE)).get(nanoUntil - System.nanoTime(), TimeUnit.NANOSECONDS); bufferComplete = results.isBufferComplete(); - for (SerializedPage serializedPage : results.getSerializedPages()) { - assertEquals(serializedPage.getPositionCount(), 0); + for (Slice serializedPage : results.getSerializedPages()) { + assertEquals(getSerializedPagePositionCount(serializedPage), 0); } sequenceId += results.getSerializedPages().size(); } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManager.java b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManager.java index 05b684583ae5..b6529bdfbf50 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManager.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManager.java @@ -57,6 +57,7 @@ import static io.trino.execution.TaskTestUtils.createTestingPlanner; import static io.trino.execution.buffer.OutputBuffers.BufferType.PARTITIONED; import static io.trino.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers; +import static io.trino.execution.buffer.PagesSerde.getSerializedPagePositionCount; import static io.trino.testing.TestingSession.testSessionBuilder; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -124,7 +125,7 @@ public void testSimpleQuery() BufferResult results = sqlTaskManager.getTaskResults(taskId, OUT, 0, DataSize.of(1, Unit.MEGABYTE)).get(); assertFalse(results.isBufferComplete()); assertEquals(results.getSerializedPages().size(), 1); - assertEquals(results.getSerializedPages().get(0).getPositionCount(), 1); + assertEquals(getSerializedPagePositionCount(results.getSerializedPages().get(0)), 1); for (boolean moreResults = true; moreResults; moreResults = !results.isBufferComplete()) { results = sqlTaskManager.getTaskResults(taskId, OUT, results.getToken() + results.getSerializedPages().size(), DataSize.of(1, Unit.MEGABYTE)).get(); diff --git a/core/trino-main/src/test/java/io/trino/execution/buffer/BenchmarkBlockSerde.java b/core/trino-main/src/test/java/io/trino/execution/buffer/BenchmarkBlockSerde.java index a08d99c58919..81430da3c64a 100644 --- a/core/trino-main/src/test/java/io/trino/execution/buffer/BenchmarkBlockSerde.java +++ b/core/trino-main/src/test/java/io/trino/execution/buffer/BenchmarkBlockSerde.java @@ -176,7 +176,7 @@ public Object deserializeLineitem(LineitemBenchmarkData data) return ImmutableList.copyOf(readPages(data.getPagesSerde(), new BasicSliceInput(data.getDataSource()))); } - private static List serializePages(BenchmarkData data) + private static List serializePages(BenchmarkData data) { PagesSerdeContext context = new PagesSerdeContext(); return data.getPages().stream() diff --git a/core/trino-main/src/test/java/io/trino/execution/buffer/BenchmarkPagesSerde.java b/core/trino-main/src/test/java/io/trino/execution/buffer/BenchmarkPagesSerde.java index 9a42272b0ff1..7b98ab057e31 100644 --- a/core/trino-main/src/test/java/io/trino/execution/buffer/BenchmarkPagesSerde.java +++ b/core/trino-main/src/test/java/io/trino/execution/buffer/BenchmarkPagesSerde.java @@ -14,6 +14,7 @@ package io.trino.execution.buffer; import com.google.common.collect.ImmutableList; +import io.airlift.slice.Slice; import io.trino.spi.Page; import io.trino.spi.PageBuilder; import io.trino.spi.block.BlockBuilder; @@ -71,7 +72,7 @@ public void serialize(BenchmarkData data, Blackhole blackhole) @Benchmark public void deserialize(BenchmarkData data, Blackhole blackhole) { - SerializedPage[] serializedPages = data.serializedPages; + Slice[] serializedPages = data.serializedPages; PagesSerde serde = data.serde; try (PagesSerde.PagesSerdeContext context = serde.newContext()) { for (int i = 0; i < serializedPages.length; i++) { @@ -86,7 +87,7 @@ public void testBenchmarkData() BenchmarkData data = new BenchmarkData(); data.compressed = true; data.initialize(); - SerializedPage[] serializedPages = data.serializedPages; + Slice[] serializedPages = data.serializedPages; PagesSerde serde = data.serde; try (PagesSerde.PagesSerdeContext context = serde.newContext()) { // Sanity test by deserializing and checking against the original pages @@ -110,7 +111,7 @@ public static class BenchmarkData private PagesSerde serde; private Page[] dataPages; - private SerializedPage[] serializedPages; + private Slice[] serializedPages; @Setup public void initialize() @@ -131,9 +132,9 @@ private PagesSerde createPagesSerde() return encrypted ? serdeFactory.createPagesSerdeForSpill(Optional.of(new AesSpillCipher())) : serdeFactory.createPagesSerde(); } - private SerializedPage[] createSerializedPages() + private Slice[] createSerializedPages() { - SerializedPage[] result = new SerializedPage[dataPages.length]; + Slice[] result = new Slice[dataPages.length]; try (PagesSerde.PagesSerdeContext context = serde.newContext()) { for (int i = 0; i < result.length; i++) { result[i] = serde.serialize(context, dataPages[i]); @@ -221,7 +222,7 @@ public static void main(String[] args) System.out.println("Page Size Max: " + Arrays.stream(data.dataPages).mapToLong(Page::getSizeInBytes).max().getAsLong()); System.out.println("Page Size Sum: " + Arrays.stream(data.dataPages).mapToLong(Page::getSizeInBytes).sum()); System.out.println("Page count: " + data.dataPages.length); - System.out.println("Compressed: " + Arrays.stream(data.serializedPages).filter(SerializedPage::isCompressed).count()); + System.out.println("Compressed: " + Arrays.stream(data.serializedPages).filter(PagesSerde::isSerializedPageCompressed).count()); benchmark(BenchmarkPagesSerde.class) .withOptions(optionsBuilder -> optionsBuilder.jvmArgs("-Xms4g", "-Xmx4g")) diff --git a/core/trino-main/src/test/java/io/trino/execution/buffer/BufferTestUtils.java b/core/trino-main/src/test/java/io/trino/execution/buffer/BufferTestUtils.java index 8337a67c674d..e9b06924874e 100644 --- a/core/trino-main/src/test/java/io/trino/execution/buffer/BufferTestUtils.java +++ b/core/trino-main/src/test/java/io/trino/execution/buffer/BufferTestUtils.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.slice.Slice; import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.block.BlockAssertions; @@ -43,7 +44,7 @@ private BufferTestUtils() {} private static final PagesSerde PAGES_SERDE = testingPagesSerde(); static final Duration NO_WAIT = new Duration(0, MILLISECONDS); static final Duration MAX_WAIT = new Duration(1, SECONDS); - private static final DataSize BUFFERED_PAGE_SIZE = DataSize.ofBytes(serializePage(createPage(42)).getRetainedSizeInBytes()); + private static final DataSize BUFFERED_PAGE_SIZE = DataSize.ofBytes(serializePage(createPage(42)).getRetainedSize()); static BufferResult getFuture(ListenableFuture future, Duration maxWait) { @@ -68,7 +69,7 @@ static void assertBufferResultEquals(List types, BufferResult ac static BufferResult createBufferResult(String bufferId, long token, List pages) { checkArgument(!pages.isEmpty(), "pages is empty"); - ImmutableList.Builder builder = ImmutableList.builderWithExpectedSize(pages.size()); + ImmutableList.Builder builder = ImmutableList.builderWithExpectedSize(pages.size()); try (PagesSerde.PagesSerdeContext context = PAGES_SERDE.newContext()) { for (Page p : pages) { builder.add(PAGES_SERDE.serialize(context, p)); @@ -87,7 +88,7 @@ public static Page createPage(int i) return new Page(BlockAssertions.createLongsBlock(i)); } - static SerializedPage serializePage(Page page) + static Slice serializePage(Page page) { try (PagesSerde.PagesSerdeContext context = PAGES_SERDE.newContext()) { return PAGES_SERDE.serialize(context, page); diff --git a/core/trino-main/src/test/java/io/trino/execution/buffer/TestBroadcastOutputBuffer.java b/core/trino-main/src/test/java/io/trino/execution/buffer/TestBroadcastOutputBuffer.java index 90f013ce1737..bbe5bea964d5 100644 --- a/core/trino-main/src/test/java/io/trino/execution/buffer/TestBroadcastOutputBuffer.java +++ b/core/trino-main/src/test/java/io/trino/execution/buffer/TestBroadcastOutputBuffer.java @@ -1010,7 +1010,7 @@ public void testSharedBufferBlocking() AggregatedMemoryContext memoryContext = newRootAggregatedMemoryContext(reservationHandler, 0L); Page page = createPage(1); - long pageSize = serializePage(page).getRetainedSizeInBytes(); + long pageSize = serializePage(page).getRetainedSize(); // create a buffer that can only hold two pages BroadcastOutputBuffer buffer = createBroadcastBuffer(createInitialEmptyOutputBuffers(BROADCAST), DataSize.ofBytes(pageSize * 2), memoryContext, directExecutor()); @@ -1041,7 +1041,7 @@ public void testSharedBufferBlocking2() AggregatedMemoryContext memoryContext = newRootAggregatedMemoryContext(reservationHandler, 0L); Page page = createPage(1); - long pageSize = serializePage(page).getRetainedSizeInBytes(); + long pageSize = serializePage(page).getRetainedSize(); // create a buffer that can only hold two pages BroadcastOutputBuffer buffer = createBroadcastBuffer(createInitialEmptyOutputBuffers(BROADCAST), DataSize.ofBytes(pageSize * 2), memoryContext, directExecutor()); @@ -1087,7 +1087,7 @@ public void testSharedBufferBlockingNoBlockOnFull() AggregatedMemoryContext memoryContext = newRootAggregatedMemoryContext(reservationHandler, 0L); Page page = createPage(1); - long pageSize = serializePage(page).getRetainedSizeInBytes(); + long pageSize = serializePage(page).getRetainedSize(); // create a buffer that can only hold two pages BroadcastOutputBuffer buffer = createBroadcastBuffer(createInitialEmptyOutputBuffers(BROADCAST), DataSize.ofBytes(pageSize * 2), memoryContext, directExecutor()); diff --git a/core/trino-main/src/test/java/io/trino/execution/buffer/TestClientBuffer.java b/core/trino-main/src/test/java/io/trino/execution/buffer/TestClientBuffer.java index 5e2ca85149f1..21913a818be6 100644 --- a/core/trino-main/src/test/java/io/trino/execution/buffer/TestClientBuffer.java +++ b/core/trino-main/src/test/java/io/trino/execution/buffer/TestClientBuffer.java @@ -427,7 +427,7 @@ private static void addPage(ClientBuffer buffer, Page page) private static void addPage(ClientBuffer buffer, Page page, PagesReleasedListener onPagesReleased) { - SerializedPageReference serializedPageReference = new SerializedPageReference(serializePage(page), 1); + SerializedPageReference serializedPageReference = new SerializedPageReference(serializePage(page), page.getPositionCount(), 1); buffer.enqueuePages(ImmutableList.of(serializedPageReference)); dereferencePages(ImmutableList.of(serializedPageReference), onPagesReleased); } @@ -499,7 +499,7 @@ public synchronized void addPage(Page page) { requireNonNull(page, "page is null"); checkState(!noMorePages); - buffer.add(new SerializedPageReference(serializePage(page), 1)); + buffer.add(new SerializedPageReference(serializePage(page), page.getPositionCount(), 1)); } @Override diff --git a/core/trino-main/src/test/java/io/trino/execution/buffer/TestPartitionedOutputBuffer.java b/core/trino-main/src/test/java/io/trino/execution/buffer/TestPartitionedOutputBuffer.java index 8396c4fda1bd..dc9f5c713bc6 100644 --- a/core/trino-main/src/test/java/io/trino/execution/buffer/TestPartitionedOutputBuffer.java +++ b/core/trino-main/src/test/java/io/trino/execution/buffer/TestPartitionedOutputBuffer.java @@ -799,7 +799,7 @@ public void testBufferPeakMemoryUsage() .withNoMoreBufferIds(), sizeOfPages(5)); Page page = createPage(1); - long serializePageSize = serializePage(page).getRetainedSizeInBytes(); + long serializePageSize = serializePage(page).getRetainedSize(); for (int i = 0; i < 5; i++) { addPage(buffer, page, 0); assertEquals(buffer.getPeakMemoryUsage(), (i + 1) * serializePageSize); diff --git a/core/trino-main/src/test/java/io/trino/execution/buffer/TestingPagesSerdeFactory.java b/core/trino-main/src/test/java/io/trino/execution/buffer/TestingPagesSerdeFactory.java index abc360e90f10..f276f29b6e4d 100644 --- a/core/trino-main/src/test/java/io/trino/execution/buffer/TestingPagesSerdeFactory.java +++ b/core/trino-main/src/test/java/io/trino/execution/buffer/TestingPagesSerdeFactory.java @@ -17,6 +17,7 @@ import io.airlift.compress.Decompressor; import io.airlift.compress.lz4.Lz4Compressor; import io.airlift.compress.lz4.Lz4Decompressor; +import io.airlift.slice.Slice; import io.trino.metadata.BlockEncodingManager; import io.trino.metadata.InternalBlockEncodingSerde; import io.trino.spi.Page; @@ -56,19 +57,19 @@ public SynchronizedPagesSerde(BlockEncodingSerde blockEncodingSerde, Optional serializedPages = new LinkedBlockingQueue<>(); + private final BlockingQueue serializedPages = new LinkedBlockingQueue<>(); private final AtomicReference failure = new AtomicReference<>(); private MockBuffer(URI location) @@ -177,7 +178,7 @@ public void setCompleted() completed.set(true); } - public synchronized void addPage(SerializedPage page) + public synchronized void addPage(Slice page) { checkState(completed.get() != Boolean.TRUE, "Location %s is complete", location); serializedPages.add(page); @@ -211,7 +212,7 @@ public BufferResult getPages(long sequenceId, DataSize maxSize) assertEquals(sequenceId, token.get(), "token"); // wait for a single page to arrive - SerializedPage serializedPage = null; + Slice serializedPage = null; try { serializedPage = serializedPages.poll(10, TimeUnit.MILLISECONDS); } @@ -225,16 +226,16 @@ public BufferResult getPages(long sequenceId, DataSize maxSize) } // add serializedPages up to the size limit - List responsePages = new ArrayList<>(); + List responsePages = new ArrayList<>(); responsePages.add(serializedPage); - long responseSize = serializedPage.getSizeInBytes(); + long responseSize = serializedPage.length(); while (responseSize < maxSize.toBytes()) { serializedPage = serializedPages.poll(); if (serializedPage == null) { break; } responsePages.add(serializedPage); - responseSize += serializedPage.getSizeInBytes(); + responseSize += serializedPage.length(); } // update sequence id diff --git a/core/trino-main/src/test/java/io/trino/operator/TestDeduplicationExchangeClientBuffer.java b/core/trino-main/src/test/java/io/trino/operator/TestDeduplicationExchangeClientBuffer.java index 90afee652007..8c30ffb2d068 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestDeduplicationExchangeClientBuffer.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestDeduplicationExchangeClientBuffer.java @@ -23,8 +23,6 @@ import io.airlift.units.DataSize; import io.trino.execution.StageId; import io.trino.execution.TaskId; -import io.trino.execution.buffer.PageCodecMarker; -import io.trino.execution.buffer.SerializedPage; import io.trino.spi.TrinoException; import org.testng.annotations.Test; @@ -149,31 +147,31 @@ public void testPollPage() { testPollPages(ImmutableListMultimap.of(), ImmutableMap.of(), ImmutableList.of()); testPollPages( - ImmutableListMultimap.builder() - .put(createTaskId(0, 0), createPage("p0a0v0")) + ImmutableListMultimap.builder() + .put(createTaskId(0, 0), utf8Slice("p0a0v0")) .build(), ImmutableMap.of(), ImmutableList.of("p0a0v0")); testPollPages( - ImmutableListMultimap.builder() - .put(createTaskId(0, 0), createPage("p0a0v0")) - .put(createTaskId(0, 1), createPage("p0a1v0")) + ImmutableListMultimap.builder() + .put(createTaskId(0, 0), utf8Slice("p0a0v0")) + .put(createTaskId(0, 1), utf8Slice("p0a1v0")) .build(), ImmutableMap.of(), ImmutableList.of("p0a1v0")); testPollPages( - ImmutableListMultimap.builder() - .put(createTaskId(0, 0), createPage("p0a0v0")) - .put(createTaskId(1, 0), createPage("p1a0v0")) - .put(createTaskId(0, 1), createPage("p0a1v0")) + ImmutableListMultimap.builder() + .put(createTaskId(0, 0), utf8Slice("p0a0v0")) + .put(createTaskId(1, 0), utf8Slice("p1a0v0")) + .put(createTaskId(0, 1), utf8Slice("p0a1v0")) .build(), ImmutableMap.of(), ImmutableList.of("p0a1v0")); testPollPages( - ImmutableListMultimap.builder() - .put(createTaskId(0, 0), createPage("p0a0v0")) - .put(createTaskId(1, 0), createPage("p1a0v0")) - .put(createTaskId(0, 1), createPage("p0a1v0")) + ImmutableListMultimap.builder() + .put(createTaskId(0, 0), utf8Slice("p0a0v0")) + .put(createTaskId(1, 0), utf8Slice("p1a0v0")) + .put(createTaskId(0, 1), utf8Slice("p0a1v0")) .build(), ImmutableMap.of( createTaskId(2, 0), @@ -181,20 +179,20 @@ public void testPollPage() ImmutableList.of("p0a1v0")); RuntimeException error = new RuntimeException("error"); testPollPagesFailure( - ImmutableListMultimap.builder() - .put(createTaskId(0, 0), createPage("p0a0v0")) - .put(createTaskId(1, 0), createPage("p1a0v0")) - .put(createTaskId(0, 1), createPage("p0a1v0")) + ImmutableListMultimap.builder() + .put(createTaskId(0, 0), utf8Slice("p0a0v0")) + .put(createTaskId(1, 0), utf8Slice("p1a0v0")) + .put(createTaskId(0, 1), utf8Slice("p0a1v0")) .build(), ImmutableMap.of( createTaskId(2, 2), error), error); testPollPagesFailure( - ImmutableListMultimap.builder() - .put(createTaskId(0, 0), createPage("p0a0v0")) - .put(createTaskId(1, 0), createPage("p1a0v0")) - .put(createTaskId(0, 1), createPage("p0a1v0")) + ImmutableListMultimap.builder() + .put(createTaskId(0, 0), utf8Slice("p0a0v0")) + .put(createTaskId(1, 0), utf8Slice("p1a0v0")) + .put(createTaskId(0, 1), utf8Slice("p0a1v0")) .build(), ImmutableMap.of( createTaskId(0, 1), @@ -202,28 +200,27 @@ public void testPollPage() error); } - private static void testPollPages(Multimap pages, Map failures, List expectedValues) + private static void testPollPages(Multimap pages, Map failures, List expectedValues) { - List actualPages = pollPages(pages, failures); + List actualPages = pollPages(pages, failures); List actualValues = actualPages.stream() - .map(SerializedPage::getSlice) .map(Slice::toStringUtf8) .collect(toImmutableList()); assertThat(actualValues).containsExactlyInAnyOrderElementsOf(expectedValues); } - private static void testPollPagesFailure(Multimap pages, Map failures, Throwable expectedFailure) + private static void testPollPagesFailure(Multimap pages, Map failures, Throwable expectedFailure) { assertThatThrownBy(() -> pollPages(pages, failures)).isEqualTo(expectedFailure); } - private static List pollPages(Multimap pages, Map failures) + private static List pollPages(Multimap pages, Map failures) { try (ExchangeClientBuffer buffer = new DeduplicationExchangeClientBuffer(directExecutor(), ONE_KB, RetryPolicy.QUERY)) { for (TaskId taskId : Sets.union(pages.keySet(), failures.keySet())) { buffer.addTask(taskId); } - for (Map.Entry page : pages.entries()) { + for (Map.Entry page : pages.entries()) { buffer.addPages(page.getKey(), ImmutableList.of(page.getValue())); } for (Map.Entry failure : failures.entrySet()) { @@ -234,9 +231,9 @@ private static List pollPages(Multimap p } buffer.noMoreTasks(); - ImmutableList.Builder result = ImmutableList.builder(); + ImmutableList.Builder result = ImmutableList.builder(); while (true) { - SerializedPage page = buffer.pollPage(); + Slice page = buffer.pollPage(); if (page == null) { break; } @@ -257,9 +254,9 @@ public void testRemovePagesForPreviousAttempts() TaskId partition1Attempt0 = createTaskId(1, 0); TaskId partition0Attempt1 = createTaskId(0, 1); - SerializedPage page1 = createPage("textofrandomlength"); - SerializedPage page2 = createPage("textwithdifferentlength"); - SerializedPage page3 = createPage("smalltext"); + Slice page1 = utf8Slice("textofrandomlength"); + Slice page2 = utf8Slice("textwithdifferentlength"); + Slice page3 = utf8Slice("smalltext"); buffer.addTask(partition0Attempt0); buffer.addPages(partition0Attempt0, ImmutableList.of(page1)); @@ -267,13 +264,13 @@ public void testRemovePagesForPreviousAttempts() buffer.addPages(partition1Attempt0, ImmutableList.of(page2)); assertThat(buffer.getRetainedSizeInBytes()).isGreaterThan(0); - assertEquals(buffer.getRetainedSizeInBytes(), page1.getRetainedSizeInBytes() + page2.getRetainedSizeInBytes()); + assertEquals(buffer.getRetainedSizeInBytes(), page1.getRetainedSize() + page2.getRetainedSize()); buffer.addTask(partition0Attempt1); assertEquals(buffer.getRetainedSizeInBytes(), 0); buffer.addPages(partition0Attempt1, ImmutableList.of(page3)); - assertEquals(buffer.getRetainedSizeInBytes(), page3.getRetainedSizeInBytes()); + assertEquals(buffer.getRetainedSizeInBytes(), page3.getRetainedSize()); } } @@ -283,17 +280,17 @@ public void testBufferOverflow() try (ExchangeClientBuffer buffer = new DeduplicationExchangeClientBuffer(directExecutor(), DataSize.of(100, BYTE), RetryPolicy.QUERY)) { TaskId task = createTaskId(0, 0); - SerializedPage page1 = createPage("1234"); - SerializedPage page2 = createPage("123456789"); - assertThat(page1.getRetainedSizeInBytes()).isLessThanOrEqualTo(100); - assertThat(page1.getRetainedSizeInBytes() + page2.getRetainedSizeInBytes()).isGreaterThan(100); + Slice page1 = utf8Slice("1234"); + Slice page2 = utf8Slice("123456789"); + assertThat(page1.getRetainedSize()).isLessThanOrEqualTo(100); + assertThat(page1.getRetainedSize() + page2.getRetainedSize()).isGreaterThan(100); buffer.addTask(task); buffer.addPages(task, ImmutableList.of(page1)); assertFalse(buffer.isFinished()); assertBlocked(buffer.isBlocked()); - assertEquals(buffer.getRetainedSizeInBytes(), page1.getRetainedSizeInBytes()); + assertEquals(buffer.getRetainedSizeInBytes(), page1.getRetainedSize()); buffer.addPages(task, ImmutableList.of(page2)); assertFalse(buffer.isFinished()); @@ -393,7 +390,7 @@ public void testIsFinished() TaskId taskId = createTaskId(0, 0); buffer.addTask(taskId); - buffer.addPages(taskId, ImmutableList.of(createPage("page"))); + buffer.addPages(taskId, ImmutableList.of(utf8Slice("page"))); assertFalse(buffer.isFinished()); buffer.noMoreTasks(); @@ -410,7 +407,7 @@ public void testIsFinished() TaskId taskId = createTaskId(0, 0); buffer.addTask(taskId); - buffer.addPages(taskId, ImmutableList.of(createPage("page"))); + buffer.addPages(taskId, ImmutableList.of(utf8Slice("page"))); assertFalse(buffer.isFinished()); buffer.noMoreTasks(); @@ -429,7 +426,7 @@ public void testIsFinished() TaskId taskId = createTaskId(0, 0); buffer.addTask(taskId); - buffer.addPages(taskId, ImmutableList.of(createPage("page"))); + buffer.addPages(taskId, ImmutableList.of(utf8Slice("page"))); assertFalse(buffer.isFinished()); buffer.taskFinished(taskId); @@ -451,10 +448,10 @@ public void testRemainingBufferCapacity() TaskId taskId = createTaskId(0, 0); buffer.addTask(taskId); - SerializedPage page = createPage("page"); + Slice page = utf8Slice("page"); buffer.addPages(taskId, ImmutableList.of(page)); - assertEquals(buffer.getRemainingCapacityInBytes(), ONE_KB.toBytes() - page.getRetainedSizeInBytes()); + assertEquals(buffer.getRemainingCapacityInBytes(), ONE_KB.toBytes() - page.getRetainedSize()); } } @@ -463,11 +460,6 @@ private static TaskId createTaskId(int partition, int attempt) return new TaskId(new StageId("query", 0), partition, attempt); } - private static SerializedPage createPage(String value) - { - return new SerializedPage(utf8Slice(value), PageCodecMarker.MarkerSet.empty(), 1, value.length()); - } - private static void assertNotBlocked(ListenableFuture blocked) { assertTrue(blocked.isDone()); diff --git a/core/trino-main/src/test/java/io/trino/operator/TestExchangeClient.java b/core/trino-main/src/test/java/io/trino/operator/TestExchangeClient.java index db3c51a835db..36b10c9a0b78 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestExchangeClient.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestExchangeClient.java @@ -24,6 +24,7 @@ import io.airlift.http.client.Response; import io.airlift.http.client.testing.TestingHttpClient; import io.airlift.http.client.testing.TestingResponse; +import io.airlift.slice.Slice; import io.airlift.units.DataSize; import io.airlift.units.DataSize.Unit; import io.airlift.units.Duration; @@ -31,9 +32,7 @@ import io.trino.block.BlockAssertions; import io.trino.execution.StageId; import io.trino.execution.TaskId; -import io.trino.execution.buffer.PageCodecMarker; import io.trino.execution.buffer.PagesSerde; -import io.trino.execution.buffer.SerializedPage; import io.trino.memory.context.SimpleLocalMemoryContext; import io.trino.spi.Page; import io.trino.spi.TrinoException; @@ -57,6 +56,7 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableListMultimap.toImmutableListMultimap; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Maps.uniqueIndex; import static com.google.common.collect.Sets.newConcurrentHashSet; import static com.google.common.io.ByteStreams.toByteArray; @@ -64,8 +64,8 @@ import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static io.airlift.concurrent.MoreFutures.tryGetFutureValue; import static io.airlift.concurrent.Threads.daemonThreadsNamed; -import static io.airlift.slice.Slices.utf8Slice; import static io.airlift.testing.Assertions.assertLessThan; +import static io.trino.execution.buffer.PagesSerde.getSerializedPagePositionCount; import static io.trino.execution.buffer.TestingPagesSerdeFactory.testingPagesSerde; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; @@ -117,7 +117,7 @@ public void testHappyPath() DataSize maxResponseSize = DataSize.of(10, Unit.MEGABYTE); MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize); - List pages = ImmutableList.of(createSerializedPage("val1"), createSerializedPage("value2"), createSerializedPage("valllue3")); + List pages = ImmutableList.of(createSerializedPage(1), createSerializedPage(2), createSerializedPage(3)); URI location = URI.create("http://localhost:8080"); pages.forEach(page -> processor.addPage(location, page)); @@ -239,8 +239,8 @@ public void testAddLocation() URI location2 = URI.create("http://localhost:8080/2"); URI location3 = URI.create("http://localhost:8080/3"); - processor.addPage(location1, createSerializedPage("location-1-page-1")); - processor.addPage(location1, createSerializedPage("location-1-page-2")); + processor.addPage(location1, createSerializedPage(1)); + processor.addPage(location1, createSerializedPage(2)); TestingExchangeClientBuffer buffer = new TestingExchangeClientBuffer(DataSize.of(1, Unit.MEGABYTE)); @@ -513,9 +513,9 @@ public void testDeduplication() URI locationP1A0 = URI.create("http://localhost:8080/2"); URI locationP0A1 = URI.create("http://localhost:8080/3"); - processor.addPage(locationP1A0, createSerializedPage("location-1-page-1")); - processor.addPage(locationP0A1, createSerializedPage("location-2-page-1")); - processor.addPage(locationP0A1, createSerializedPage("location-2-page-2")); + processor.addPage(locationP1A0, createSerializedPage(1)); + processor.addPage(locationP0A1, createSerializedPage(2)); + processor.addPage(locationP0A1, createSerializedPage(3)); @SuppressWarnings("resource") ExchangeClient exchangeClient = new ExchangeClient( @@ -548,16 +548,17 @@ public void testDeduplication() exchangeClient.noMoreLocations(); exchangeClient.isBlocked().get(10, SECONDS); - List pageValues = new ArrayList<>(); + List pages = new ArrayList<>(); while (!exchangeClient.isFinished()) { - SerializedPage page = exchangeClient.pollPage(); + Slice page = exchangeClient.pollPage(); if (page == null) { break; } - pageValues.add(page.getSlice().toStringUtf8()); + pages.add(PAGES_SERDE.deserialize(page)); } - assertThat(pageValues).containsExactlyInAnyOrder("location-2-page-1", "location-2-page-2"); + assertThat(pages).hasSize(2); + assertThat(pages.stream().map(Page::getPositionCount).collect(toImmutableSet())).containsAll(ImmutableList.of(2, 3)); assertEventually(() -> assertTrue(exchangeClient.isFinished())); assertEventually(() -> { @@ -586,9 +587,9 @@ public void testTaskFailure() URI location3 = URI.create("http://localhost:8080/3"); URI location4 = URI.create("http://localhost:8080/4"); - processor.addPage(location1, createSerializedPage("location-1-page-1")); - processor.addPage(location4, createSerializedPage("location-4-page-1")); - processor.addPage(location4, createSerializedPage("location-4-page-2")); + processor.addPage(location1, createSerializedPage(1)); + processor.addPage(location4, createSerializedPage(2)); + processor.addPage(location4, createSerializedPage(3)); TestingExchangeClientBuffer buffer = new TestingExchangeClientBuffer(DataSize.of(1, Unit.MEGABYTE)); @@ -787,7 +788,7 @@ public void testStreamingAbortOnDataCorruption() assertThatThrownBy(() -> getNextPage(exchangeClient)) .isInstanceOf(TrinoException.class) - .hasMessageMatching("Checksum verification failure on localhost when reading from http://localhost:8080/0: Data corruption, read checksum: 0xf91cfe5d2bc6e1c2, calculated checksum: 0x3c51297c7b78052f"); + .hasMessageMatching("Checksum verification failure on localhost when reading from http://localhost:8080/0: Data corruption, read checksum: 0xdd450d930a94ddde, calculated checksum: 0x9bdc9de3ce57c972"); exchangeClient.close(); } @@ -932,21 +933,21 @@ private static Page createPage(int size) return new Page(BlockAssertions.createLongSequenceBlock(0, size)); } - private static SerializedPage createSerializedPage(String value) + private static Slice createSerializedPage(int size) { - return new SerializedPage(utf8Slice(value), PageCodecMarker.MarkerSet.empty(), 1, value.length()); + return PAGES_SERDE.serialize(PAGES_SERDE.newContext(), createPage(size)); } - private static SerializedPage getNextPage(ExchangeClient exchangeClient) + private static Slice getNextPage(ExchangeClient exchangeClient) { - ListenableFuture futurePage = Futures.transform(exchangeClient.isBlocked(), ignored -> exchangeClient.isFinished() ? null : exchangeClient.pollPage(), directExecutor()); + ListenableFuture futurePage = Futures.transform(exchangeClient.isBlocked(), ignored -> exchangeClient.isFinished() ? null : exchangeClient.pollPage(), directExecutor()); return tryGetFutureValue(futurePage, 100, TimeUnit.SECONDS).orElse(null); } - private static void assertPageEquals(SerializedPage actualPage, Page expectedPage) + private static void assertPageEquals(Slice actualPage, Page expectedPage) { assertNotNull(actualPage); - assertEquals(actualPage.getPositionCount(), expectedPage.getPositionCount()); + assertEquals(getSerializedPagePositionCount(actualPage), expectedPage.getPositionCount()); assertEquals(PAGES_SERDE.deserialize(actualPage).getChannelCount(), expectedPage.getChannelCount()); } diff --git a/core/trino-main/src/test/java/io/trino/operator/TestHttpPageBufferClient.java b/core/trino-main/src/test/java/io/trino/operator/TestHttpPageBufferClient.java index ff0dcecba006..fc732a1871e6 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestHttpPageBufferClient.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestHttpPageBufferClient.java @@ -19,6 +19,7 @@ import io.airlift.http.client.Response; import io.airlift.http.client.testing.TestingHttpClient; import io.airlift.http.client.testing.TestingResponse; +import io.airlift.slice.Slice; import io.airlift.testing.TestingTicker; import io.airlift.units.DataSize; import io.airlift.units.DataSize.Unit; @@ -27,7 +28,6 @@ import io.trino.execution.StageId; import io.trino.execution.TaskId; import io.trino.execution.buffer.PagesSerde; -import io.trino.execution.buffer.SerializedPage; import io.trino.operator.HttpPageBufferClient.ClientCallback; import io.trino.spi.HostAddress; import io.trino.spi.Page; @@ -452,7 +452,7 @@ public void testMemoryExceededInAddPages() TestingClientCallback callback = new TestingClientCallback(requestComplete) { @Override - public boolean addPages(HttpPageBufferClient client, List pages) + public boolean addPages(HttpPageBufferClient client, List pages) { addPagesCalled.set(true); throw expectedException; @@ -517,7 +517,7 @@ private static class TestingClientCallback implements ClientCallback { private final CyclicBarrier done; - private final List pages = Collections.synchronizedList(new ArrayList<>()); + private final List pages = Collections.synchronizedList(new ArrayList<>()); private final AtomicInteger completedRequests = new AtomicInteger(); private final AtomicInteger finishedBuffers = new AtomicInteger(); private final AtomicInteger failedBuffers = new AtomicInteger(); @@ -556,7 +556,7 @@ public Throwable getFailure() } @Override - public boolean addPages(HttpPageBufferClient client, List pages) + public boolean addPages(HttpPageBufferClient client, List pages) { this.pages.addAll(pages); return true; diff --git a/core/trino-main/src/test/java/io/trino/operator/TestStreamingExchangeClientBuffer.java b/core/trino-main/src/test/java/io/trino/operator/TestStreamingExchangeClientBuffer.java index 25e1bcf6f160..e20e5d41b598 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestStreamingExchangeClientBuffer.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestStreamingExchangeClientBuffer.java @@ -15,11 +15,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.slice.Slice; import io.airlift.units.DataSize; import io.trino.execution.StageId; import io.trino.execution.TaskId; -import io.trino.execution.buffer.PageCodecMarker; -import io.trino.execution.buffer.SerializedPage; import io.trino.spi.QueryId; import org.testng.annotations.Test; @@ -37,9 +36,9 @@ public class TestStreamingExchangeClientBuffer private static final StageId STAGE_ID = new StageId(new QueryId("query"), 0); private static final TaskId TASK_0 = new TaskId(STAGE_ID, 0, 0); private static final TaskId TASK_1 = new TaskId(STAGE_ID, 1, 0); - private static final SerializedPage PAGE_0 = createPage("page0"); - private static final SerializedPage PAGE_1 = createPage("page-1"); - private static final SerializedPage PAGE_2 = createPage("page-_2"); + private static final Slice PAGE_0 = utf8Slice("page0"); + private static final Slice PAGE_1 = utf8Slice("page-1"); + private static final Slice PAGE_2 = utf8Slice("page-_2"); @Test public void testHappyPath() @@ -66,14 +65,14 @@ public void testHappyPath() buffer.addPages(TASK_0, ImmutableList.of(PAGE_0)); assertEquals(buffer.getBufferedPageCount(), 1); - assertEquals(buffer.getRetainedSizeInBytes(), PAGE_0.getRetainedSizeInBytes()); - assertEquals(buffer.getMaxRetainedSizeInBytes(), PAGE_0.getRetainedSizeInBytes()); - assertEquals(buffer.getRemainingCapacityInBytes(), DataSize.of(1, KILOBYTE).toBytes() - PAGE_0.getRetainedSizeInBytes()); + assertEquals(buffer.getRetainedSizeInBytes(), PAGE_0.getRetainedSize()); + assertEquals(buffer.getMaxRetainedSizeInBytes(), PAGE_0.getRetainedSize()); + assertEquals(buffer.getRemainingCapacityInBytes(), DataSize.of(1, KILOBYTE).toBytes() - PAGE_0.getRetainedSize()); assertFalse(buffer.isFinished()); assertTrue(buffer.isBlocked().isDone()); - assertPageEquals(buffer.pollPage(), PAGE_0); + assertEquals(buffer.pollPage(), PAGE_0); assertEquals(buffer.getRetainedSizeInBytes(), 0); - assertEquals(buffer.getMaxRetainedSizeInBytes(), PAGE_0.getRetainedSizeInBytes()); + assertEquals(buffer.getMaxRetainedSizeInBytes(), PAGE_0.getRetainedSize()); assertEquals(buffer.getRemainingCapacityInBytes(), DataSize.of(1, KILOBYTE).toBytes()); assertFalse(buffer.isFinished()); assertFalse(buffer.isBlocked().isDone()); @@ -84,17 +83,17 @@ public void testHappyPath() buffer.addPages(TASK_1, ImmutableList.of(PAGE_1, PAGE_2)); assertEquals(buffer.getBufferedPageCount(), 2); - assertEquals(buffer.getRetainedSizeInBytes(), PAGE_1.getRetainedSizeInBytes() + PAGE_2.getRetainedSizeInBytes()); - assertEquals(buffer.getMaxRetainedSizeInBytes(), PAGE_1.getRetainedSizeInBytes() + PAGE_2.getRetainedSizeInBytes()); - assertEquals(buffer.getRemainingCapacityInBytes(), DataSize.of(1, KILOBYTE).toBytes() - PAGE_1.getRetainedSizeInBytes() - PAGE_2.getRetainedSizeInBytes()); + assertEquals(buffer.getRetainedSizeInBytes(), PAGE_1.getRetainedSize() + PAGE_2.getRetainedSize()); + assertEquals(buffer.getMaxRetainedSizeInBytes(), PAGE_1.getRetainedSize() + PAGE_2.getRetainedSize()); + assertEquals(buffer.getRemainingCapacityInBytes(), DataSize.of(1, KILOBYTE).toBytes() - PAGE_1.getRetainedSize() - PAGE_2.getRetainedSize()); assertFalse(buffer.isFinished()); assertTrue(buffer.isBlocked().isDone()); - assertPageEquals(buffer.pollPage(), PAGE_1); - assertPageEquals(buffer.pollPage(), PAGE_2); + assertEquals(buffer.pollPage(), PAGE_1); + assertEquals(buffer.pollPage(), PAGE_2); assertFalse(buffer.isFinished()); assertFalse(buffer.isBlocked().isDone()); assertEquals(buffer.getRetainedSizeInBytes(), 0); - assertEquals(buffer.getMaxRetainedSizeInBytes(), PAGE_1.getRetainedSizeInBytes() + PAGE_2.getRetainedSizeInBytes()); + assertEquals(buffer.getMaxRetainedSizeInBytes(), PAGE_1.getRetainedSize() + PAGE_2.getRetainedSize()); assertEquals(buffer.getRemainingCapacityInBytes(), DataSize.of(1, KILOBYTE).toBytes()); buffer.taskFinished(TASK_1); @@ -197,17 +196,4 @@ public void testFutureCancellationDoesNotAffectOtherFutures() assertTrue(blocked2.isDone()); } } - - private static SerializedPage createPage(String value) - { - return new SerializedPage(utf8Slice(value), PageCodecMarker.MarkerSet.empty(), 1, value.length()); - } - - private static void assertPageEquals(SerializedPage actual, SerializedPage expected) - { - assertEquals(actual.getPositionCount(), expected.getPositionCount()); - assertEquals(actual.getUncompressedSizeInBytes(), expected.getUncompressedSizeInBytes()); - assertEquals(actual.getPageCodecMarkers(), expected.getPageCodecMarkers()); - assertEquals(actual.getSlice(), expected.getSlice()); - } } diff --git a/core/trino-main/src/test/java/io/trino/operator/TestingExchangeClientBuffer.java b/core/trino-main/src/test/java/io/trino/operator/TestingExchangeClientBuffer.java index a6d8450c3a03..497e4909f9ed 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestingExchangeClientBuffer.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestingExchangeClientBuffer.java @@ -19,9 +19,9 @@ import com.google.common.collect.ListMultimap; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import io.airlift.slice.Slice; import io.airlift.units.DataSize; import io.trino.execution.TaskId; -import io.trino.execution.buffer.SerializedPage; import java.util.HashMap; import java.util.HashSet; @@ -38,7 +38,7 @@ public class TestingExchangeClientBuffer { private ListenableFuture blocked = immediateVoidFuture(); private final Set allTasks = new HashSet<>(); - private final ListMultimap pages = ArrayListMultimap.create(); + private final ListMultimap pages = ArrayListMultimap.create(); private final Set finishedTasks = new HashSet<>(); private final ListMultimap failedTasks = ArrayListMultimap.create(); private boolean noMoreTasks; @@ -65,7 +65,7 @@ public synchronized void setBlocked(ListenableFuture blocked) } @Override - public synchronized SerializedPage pollPage() + public synchronized Slice pollPage() { return null; } @@ -82,13 +82,13 @@ public synchronized Set getAllTasks() } @Override - public synchronized void addPages(TaskId taskId, List pages) + public synchronized void addPages(TaskId taskId, List pages) { checkState(allTasks.contains(taskId), "task is expected to be present: %s", taskId); this.pages.putAll(taskId, pages); } - public synchronized ListMultimap getPages() + public synchronized ListMultimap getPages() { return ImmutableListMultimap.copyOf(pages); } diff --git a/core/trino-main/src/test/java/io/trino/operator/TestingExchangeHttpClientHandler.java b/core/trino-main/src/test/java/io/trino/operator/TestingExchangeHttpClientHandler.java index 8d3a8605f856..a800874367da 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestingExchangeHttpClientHandler.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestingExchangeHttpClientHandler.java @@ -23,14 +23,13 @@ import io.airlift.http.client.testing.TestingHttpClient; import io.airlift.http.client.testing.TestingResponse; import io.airlift.slice.DynamicSliceOutput; +import io.airlift.slice.Slice; import io.trino.execution.TaskId; import io.trino.execution.buffer.PagesSerde; -import io.trino.execution.buffer.SerializedPage; import io.trino.spi.Page; import static io.trino.TrinoMediaTypes.TRINO_PAGES; import static io.trino.execution.buffer.PagesSerdeUtil.calculateChecksum; -import static io.trino.execution.buffer.PagesSerdeUtil.writeSerializedPage; import static io.trino.execution.buffer.TestingPagesSerdeFactory.testingPagesSerde; import static io.trino.server.InternalHeaders.TRINO_BUFFER_COMPLETE; import static io.trino.server.InternalHeaders.TRINO_PAGE_NEXT_TOKEN; @@ -78,7 +77,7 @@ public Response handle(Request request) if (page != null) { headers.put(TRINO_PAGE_NEXT_TOKEN, String.valueOf(pageToken + 1)); headers.put(TRINO_BUFFER_COMPLETE, String.valueOf(false)); - SerializedPage serializedPage; + Slice serializedPage; try (PagesSerde.PagesSerdeContext context = PAGES_SERDE.newContext()) { serializedPage = PAGES_SERDE.serialize(context, page); } @@ -86,7 +85,7 @@ public Response handle(Request request) output.writeInt(SERIALIZED_PAGES_MAGIC); output.writeLong(calculateChecksum(ImmutableList.of(serializedPage))); output.writeInt(1); - writeSerializedPage(output, serializedPage); + output.writeBytes(serializedPage); return new TestingResponse(HttpStatus.OK, headers.build(), output.slice().getInput()); } else if (taskBuffer.isFinished()) { diff --git a/core/trino-main/src/test/java/io/trino/operator/output/BenchmarkPartitionedOutputOperator.java b/core/trino-main/src/test/java/io/trino/operator/output/BenchmarkPartitionedOutputOperator.java index 44c091112d8d..3c1a96e50481 100644 --- a/core/trino-main/src/test/java/io/trino/operator/output/BenchmarkPartitionedOutputOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/output/BenchmarkPartitionedOutputOperator.java @@ -14,6 +14,7 @@ package io.trino.operator.output; import com.google.common.collect.ImmutableList; +import io.airlift.slice.Slice; import io.airlift.units.DataSize; import io.trino.Session; import io.trino.execution.StateMachine; @@ -21,7 +22,6 @@ import io.trino.execution.buffer.OutputBuffers; import io.trino.execution.buffer.PagesSerdeFactory; import io.trino.execution.buffer.PartitionedOutputBuffer; -import io.trino.execution.buffer.SerializedPage; import io.trino.jmh.Benchmarks; import io.trino.memory.context.LocalMemoryContext; import io.trino.memory.context.SimpleLocalMemoryContext; @@ -489,7 +489,7 @@ public TestingPartitionedOutputBuffer( // Use a dummy enqueue method to avoid OutOfMemory error @Override - public void enqueue(int partitionNumber, List pages) + public void enqueue(int partitionNumber, List pages) { // The blackhole will be null only for not benchmark runs (test and profile pollution). // For the benchmarks, the instance will be provided by jmh infra via setup method. diff --git a/core/trino-main/src/test/java/io/trino/operator/output/TestPartitionedOutputOperator.java b/core/trino-main/src/test/java/io/trino/operator/output/TestPartitionedOutputOperator.java index 49701ea057ad..9a25fc63703c 100644 --- a/core/trino-main/src/test/java/io/trino/operator/output/TestPartitionedOutputOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/output/TestPartitionedOutputOperator.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Multimap; import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.slice.Slice; import io.airlift.units.DataSize; import io.trino.Session; import io.trino.execution.StateMachine; @@ -27,7 +28,6 @@ import io.trino.execution.buffer.OutputBuffers; import io.trino.execution.buffer.PagesSerde; import io.trino.execution.buffer.PagesSerdeFactory; -import io.trino.execution.buffer.SerializedPage; import io.trino.operator.BucketPartitionFunction; import io.trino.operator.DriverContext; import io.trino.operator.OperatorContext; @@ -560,14 +560,14 @@ public PartitionedOutputOperator build() private static class TestOutputBuffer implements OutputBuffer { - private final Multimap enqueued = ArrayListMultimap.create(); + private final Multimap enqueued = ArrayListMultimap.create(); public Stream getEnqueuedDeserialized() { return getEnqueued().stream().map(PAGES_SERDE::deserialize); } - public List getEnqueued() + public List getEnqueued() { return ImmutableList.copyOf(enqueued.values()); } @@ -577,14 +577,14 @@ public Stream getEnqueuedDeserialized(int partition) return getEnqueued(partition).stream().map(PAGES_SERDE::deserialize); } - public List getEnqueued(int partition) + public List getEnqueued(int partition) { - Collection serializedPages = enqueued.get(partition); + Collection serializedPages = enqueued.get(partition); return serializedPages == null ? ImmutableList.of() : ImmutableList.copyOf(serializedPages); } @Override - public void enqueue(int partition, List pages) + public void enqueue(int partition, List pages) { enqueued.putAll(partition, pages); } @@ -646,7 +646,7 @@ public ListenableFuture isFull() } @Override - public void enqueue(List pages) + public void enqueue(List pages) { } diff --git a/core/trino-main/src/test/java/io/trino/spiller/TestBinaryFileSpiller.java b/core/trino-main/src/test/java/io/trino/spiller/TestBinaryFileSpiller.java index 56ef63fab66a..b0ef0eb86b07 100644 --- a/core/trino-main/src/test/java/io/trino/spiller/TestBinaryFileSpiller.java +++ b/core/trino-main/src/test/java/io/trino/spiller/TestBinaryFileSpiller.java @@ -149,7 +149,7 @@ private final void testSpiller(List types, Spiller spiller, List... try (PagesSerde.PagesSerdeContext context = pagesSerde.newContext()) { for (List spill : spills) { spilledBytes += spill.stream() - .mapToLong(page -> pagesSerde.serialize(context, page).getSizeInBytes()) + .mapToLong(page -> pagesSerde.serialize(context, page).length()) .sum(); spiller.spill(spill.iterator()).get(); } diff --git a/core/trino-main/src/test/java/io/trino/spiller/TestFileSingleStreamSpiller.java b/core/trino-main/src/test/java/io/trino/spiller/TestFileSingleStreamSpiller.java index 298552364837..f04bb9d172a7 100644 --- a/core/trino-main/src/test/java/io/trino/spiller/TestFileSingleStreamSpiller.java +++ b/core/trino-main/src/test/java/io/trino/spiller/TestFileSingleStreamSpiller.java @@ -17,9 +17,8 @@ import com.google.common.collect.Iterators; import com.google.common.util.concurrent.ListeningExecutorService; import io.airlift.slice.InputStreamSliceInput; -import io.trino.execution.buffer.PageCodecMarker; +import io.airlift.slice.Slice; import io.trino.execution.buffer.PagesSerdeUtil; -import io.trino.execution.buffer.SerializedPage; import io.trino.memory.context.LocalMemoryContext; import io.trino.operator.PageAssertions; import io.trino.spi.Page; @@ -41,6 +40,8 @@ import static com.google.common.io.MoreFiles.listFiles; import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; +import static io.trino.execution.buffer.PagesSerde.isSerializedPageCompressed; +import static io.trino.execution.buffer.PagesSerde.isSerializedPageEncrypted; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.DoubleType.DOUBLE; @@ -128,11 +129,11 @@ private void assertSpill(boolean compression, boolean encryption) // Assert the spill codec flags match the expected configuration try (InputStream is = newInputStream(listFiles(spillPath.toPath()).get(0))) { - Iterator serializedPages = PagesSerdeUtil.readSerializedPages(new InputStreamSliceInput(is)); + Iterator serializedPages = PagesSerdeUtil.readSerializedPages(new InputStreamSliceInput(is)); assertTrue(serializedPages.hasNext(), "at least one page should be successfully read back"); - byte markers = serializedPages.next().getPageCodecMarkers(); - assertEquals(PageCodecMarker.COMPRESSED.isSet(markers), compression); - assertEquals(PageCodecMarker.ENCRYPTED.isSet(markers), encryption); + Slice serializedPage = serializedPages.next(); + assertEquals(isSerializedPageCompressed(serializedPage), compression); + assertEquals(isSerializedPageEncrypted(serializedPage), encryption); } // The spillers release their memory reservations when they are closed, therefore at this point diff --git a/core/trino-main/src/test/java/io/trino/spiller/TestSpillCipherPagesSerde.java b/core/trino-main/src/test/java/io/trino/spiller/TestSpillCipherPagesSerde.java index be1c759b0891..caa9d14cd055 100644 --- a/core/trino-main/src/test/java/io/trino/spiller/TestSpillCipherPagesSerde.java +++ b/core/trino-main/src/test/java/io/trino/spiller/TestSpillCipherPagesSerde.java @@ -14,8 +14,8 @@ package io.trino.spiller; import com.google.common.collect.ImmutableList; +import io.airlift.slice.Slice; import io.trino.execution.buffer.PagesSerde; -import io.trino.execution.buffer.SerializedPage; import io.trino.execution.buffer.TestingPagesSerdeFactory; import io.trino.spi.Page; import io.trino.spi.TrinoException; @@ -27,6 +27,7 @@ import java.util.List; import java.util.Optional; +import static io.trino.execution.buffer.PagesSerde.isSerializedPageEncrypted; import static io.trino.operator.PageAssertions.assertPageEquals; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.trino.spi.type.VarcharType.VARCHAR; @@ -51,9 +52,9 @@ public void test() VARCHAR.writeString(blockBuilder, "world"); Page helloWorldPage = new Page(blockBuilder.build()); - SerializedPage serialized = serde.serialize(context, helloWorldPage); + Slice serialized = serde.serialize(context, helloWorldPage); assertPageEquals(types, serde.deserialize(serialized), helloWorldPage); - assertTrue(serialized.isEncrypted(), "page should be encrypted"); + assertTrue(isSerializedPageEncrypted(serialized), "page should be encrypted"); cipher.close(); diff --git a/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerWithSplits.java b/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerWithSplits.java index 008345af8a47..56b698e1fde3 100644 --- a/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerWithSplits.java +++ b/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerWithSplits.java @@ -164,7 +164,7 @@ public void testSplitsForNormalQuery() // Deterministic statistics assertEquals(statistics.getPhysicalInputBytes(), 0); assertEquals(statistics.getPhysicalInputRows(), expectedCompletedPositions); - assertEquals(statistics.getInternalNetworkBytes(), 366); + assertEquals(statistics.getInternalNetworkBytes(), 405); assertEquals(statistics.getInternalNetworkRows(), 3); assertEquals(statistics.getTotalBytes(), 0); assertEquals(statistics.getOutputBytes(), 9);