-
Notifications
You must be signed in to change notification settings - Fork 3k
Implement zero copy for gRPC read path #8353
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@calvinjia @gpang PTAL. |
|
Merged build finished. Test FAILed. |
|
Test FAILed. |
|
Merged build finished. Test PASSed. |
|
Test PASSed. |
| @Override | ||
| public void beforeStart(ClientCallStreamObserver<ReqT> requestStream) { | ||
| if (mObserver instanceof ClientResponseObserver) { | ||
| ((ClientResponseObserver<ReqT, RespT>) mObserver).beforeStart(requestStream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is beforeStart called selectively?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just to make sure the observer provided actually implemented the interface for beforeStart(). I can put a warning message if it is not implemented.
| return mStreamingAsyncStub.readBlock(responseObserver); | ||
| public StreamObserver<ReadRequest> readBlock( | ||
| StreamObserver<ReadResponse> responseObserver) { | ||
| if (responseObserver instanceof DataMessageClientResponseObserver) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to have a new interface with just getMarshaller, and just check/cast to that interface when getting the marshaller?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| private final WorkerNetAddress mAddress; | ||
|
|
||
| private final GrpcBlockingStream<ReadRequest, ReadResponse> mStream; | ||
| private final ReadResponseMarshaller mMarhshaller; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spelling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
| public static final PropertyKey WORKER_NETWORK_ZEROCOPY_ENABLED = | ||
| new Builder(Name.WORKER_NETWORK_ZEROCOPY_ENABLED) | ||
| .setDefaultValue(true) | ||
| .setDescription("Whether zero copy is enabled on worker when processing data streams.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this for reading data from streams or writing data to streams?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a general switch for all worker side zero-copy code path. For now only read path is implemented.
| public static final PropertyKey USER_NETWORK_ZEROCOPY_ENABLED = | ||
| new Builder(Name.USER_NETWORK_ZEROCOPY_ENABLED) | ||
| .setDefaultValue(true) | ||
| .setDescription("Whether zero copy is enabled on client when processing data streams.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this for reading data from streams or writing data to streams?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a general switch for all client side zero-copy code path. For now only read path is implemented.
|
|
||
| try { | ||
| sNettyWritableBufferConstruct = getPrivateConstructor( | ||
| "io.grpc.netty.NettyWritableBuffer", ByteBuf.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can these strings be constants?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
Merged build finished. Test PASSed. |
|
Test PASSed. |
|
Merged build finished. Test PASSed. |
|
Test PASSed. |
| if (mObserver instanceof ClientResponseObserver) { | ||
| ((ClientResponseObserver<ReqT, RespT>) mObserver).beforeStart(requestStream); | ||
| } else { | ||
| LOG.warn("observer does not implement ClientResponseObserver:beforeStart"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to print out the class for the mObserver?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 comments.
- will this be spammy?
- add the
mObserverto the log message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will not be spammy unless someone pass in the wrong observer. In most use cases the DataMessageClientResponseObserver is a wrapper of the original stream observer user implements, so user should only call method on the wrapper that the original observer actually implements.
| public StreamObserver<ReadRequest> readBlock(StreamObserver<ReadResponse> responseObserver) { | ||
| return mStreamingAsyncStub.readBlock(responseObserver); | ||
| public StreamObserver<ReadRequest> readBlock( | ||
| StreamObserver<ReadResponse> responseObserver) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be on one line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reformatted.
| if (mObserver instanceof ClientResponseObserver) { | ||
| ((ClientResponseObserver<ReqT, RespT>) mObserver).beforeStart(requestStream); | ||
| } else { | ||
| LOG.warn("observer does not implement ClientResponseObserver:beforeStart"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 comments.
- will this be spammy?
- add the
mObserverto the log message.
| */ | ||
| public abstract T combineData(DataMessage<T, DataBuffer> message); | ||
|
|
||
| protected abstract ByteBuf[] extractMessageBuffer(T message) throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you javadoc these methods? I'm not sure what they are supposed to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed the methods and added some java docs. Hopefully now they are easier to understand.
|
Merged build finished. Test PASSed. |
|
Test PASSed. |
|
@calvinjia @gpang Thanks for the feedback. I updated the PR to address your comments. PTAL. |
gpang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Added APIs for intercepting gRPC client and server with custom data serialization/deserialization.
Also made the worker send read response asynchronously to improve throughput for single read request.