Implement write path zero copy#8526
Conversation
|
Merged build finished. Test PASSed. |
|
Test PASSed. |
|
@calvinjia @gpang PTAL. |
|
|
||
| @Override | ||
| public void readBytes(OutputStream outputStream, int length) { | ||
| throw new UnsupportedOperationException("ByteArrayDataBuffer#readBytes is not implemented."); |
There was a problem hiding this comment.
Why are these not supported?
Can we distinguish the 2 calls from the exception messages?
There was a problem hiding this comment.
This class is only used in unit tests and only one main function is implemented. It's not used in write path so I didn't spend any time completing this class.
There was a problem hiding this comment.
If this is just a test class, then can we move it to a test directory? It is not obvious that this is only used for tests.
| /** | ||
| * Appends buffer.readableBytes() bytes to the end of this block writer from the given buffer. | ||
| * | ||
| * @param buffer the byte buffer to hold the data |
There was a problem hiding this comment.
is this the buffer that holds the data currently? to hold the data makes is sound like this is the buffer that is the output?
There was a problem hiding this comment.
Yeah I copied the javadoc from an overloaded method. Updated.
| return null; | ||
| } | ||
| DataBuffer buffer = mMarshaller.pollBuffer(response); | ||
| DataBuffer buffer = mResponseMarshaller.pollBuffer(response); |
There was a problem hiding this comment.
mResponseMarshaller can be null?
There was a problem hiding this comment.
User should not call this method if they do not have a response marshaller. I will add a check.
| new Builder(Name.WORKER_NETWORK_WRITER_BUFFER_SIZE_MESSAGES) | ||
| .setDefaultValue(8) | ||
| .setDescription("When a client writes to a remote worker, the maximum number of " | ||
| + "data messages to buffer by the server.") |
There was a problem hiding this comment.
this is what the worker buffers, per client?
There was a problem hiding this comment.
Updated the description.
| * Gets a Netty buffer directly from a gRPC ReadableBuffer. | ||
| * | ||
| * @param buffer the input buffer | ||
| * @return the raw ByteBuf |
| DataMessageMarshaller<WriteRequest> marshaller = | ||
| ((DataMessageMarshallerProvider<WriteRequest, WriteResponse>) responseObserver) | ||
| .getRequestMarshaller(); | ||
| Preconditions.checkNotNull(marshaller); |
There was a problem hiding this comment.
Doesn't a DataMessageMarshallerProvider provide null sometimes?
There was a problem hiding this comment.
DataMessageMarshallerProvider is a generic holder of request marshaller and response marshaller. Depending on the situation, the caller most likely only provide one of the marshallers. In this writeBlock method, it is required that the user should provide the request marshaller for zero copy to work correctly, hence the check.
If a user does not want to use zero copy at all, they should not pass in a DataMessageMarshallerProvider.
| ((DataMessageMarshallerProvider<ReadResponse>) responseObserver).getMarshaller(); | ||
| ((DataMessageMarshallerProvider<ReadRequest, ReadResponse>) responseObserver) | ||
| .getResponseMarshaller(); | ||
| Preconditions.checkNotNull(marshaller); |
There was a problem hiding this comment.
can't this be null sometimes?
There was a problem hiding this comment.
Similar to above, if user does not want to use zero copy for reading blocks, they should not pass DataMessageMarshallerProvider instead of giving a null response marshaller.
| chunkBuffer = new NettyDataBuffer( | ||
| Unpooled.wrappedBuffer(message.getChunk().getData().asReadOnlyByteBuffer())); | ||
| } | ||
| int size = message.getSerializedSize() - chunkBuffer.readableBytes(); |
| private static final Logger LOG = LoggerFactory.getLogger(AbstractWriteHandler.class); | ||
|
|
||
| private final StreamObserver<WriteResponse> mResponseObserver; | ||
| private final SerializingExecutor mSerializingExecutor; |
There was a problem hiding this comment.
can you add comments on these fields? I don't fully understand them.
|
@gpang Thanks for the review. I updated the PR to address your comments. PTAL. |
|
Merged build finished. Test FAILed. |
|
Test FAILed. |
|
Jenkins, test this please |
|
Merged build finished. Test PASSed. |
|
Test PASSed. |
| */ | ||
| AbstractWriteHandler(StreamObserver<WriteResponse> responseObserver) { | ||
| mResponseObserver = responseObserver; | ||
| mSerializingExecutor = new SerializingExecutor(GrpcExecutors.BLOCK_WRITER_EXECUTOR); |
There was a problem hiding this comment.
the serializing executor only runs 1 task at a time, in order. Why does it need a large threadpool?
There was a problem hiding this comment.
It only serialize tasks within the executor for a single request. There can be multiple concurrent requests which requires more threads for processing messages concurrently.
| Preconditions.checkState(buffer.readableBytes() > 0, | ||
| "invalid data size from write request message"); | ||
| try { | ||
| mSemaphore.acquire(); |
There was a problem hiding this comment.
maybe there should be a helper method that does this acquire?
| Thread.currentThread().interrupt(); | ||
| return; | ||
| } | ||
| mSerializingExecutor.execute(() -> { |
There was a problem hiding this comment.
I feel like every serializing executor should follow the model:
try {
...
} finally {
sem.release()
}
|
Merged build finished. Test PASSed. |
|
Test PASSed. |
|
Thanks @gpang! I updated the PR. PTAL. |
|
@gpang I updated the PR to address your comment. PTAL. |
|
Merged build finished. Test PASSed. |
|
Test PASSed. |
| if (!tryAcquireSemaphore()) { | ||
| return; | ||
| } | ||
| mSerializingExecutor.execute(() -> { |
There was a problem hiding this comment.
What will happen if client tries zero copy but the server does not support it?
There was a problem hiding this comment.
The zero copy code path serialize and deserialize messages in the same format as the protobuf marshaller, therefore the client and the server can switch between zero copy or protobuf marshaller without breaking compatibility with the other side.
Implemented write path zero copy to improve performance on remote and domain socket writes. Also made write handler write to disk asynchronously to avoid blocking network traffic.