Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceOutput;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.StageId;
Expand Down Expand Up @@ -579,7 +580,7 @@ private void writeToSink(TaskId taskId, List<Slice> pages)
writeBuffer.writeInt(taskId.getPartitionId());
writeBuffer.writeInt(taskId.getAttemptId());
writeBuffer.writeBytes(page);
exchangeSink.add(0, writeBuffer.slice());
exchangeSink.add(0, Slices.copyOf(writeBuffer.slice()));
writeBuffer.reset();
spilledBytes += page.length();
spilledPageCount++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ public interface ExchangeSink

/**
* Appends arbitrary {@code data} to a partition specified by {@code partitionId}.
* The engine is free to reuse the {@code data} buffer.
* The implementation is expected to copy the buffer as it may be invalidated and recycled.
* With method call the {@code data} buffer ownership is passed from caller to callee.
* This method is guaranteed not to be invoked after {@link #finish()}.
* This method can be invoked after {@link #abort()}.
* If this method is invoked after {@link #abort()} the invocation should be ignored.
Expand Down