-
Notifications
You must be signed in to change notification settings - Fork 588
HDDS-10338. Implement a Client Datanode API to stream a block #6613
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This is for avoiding the deadlock but not the performance.
So, we have to start 32k threads for each client? It does not sound right. (Sorry that I missed your reply previously.) Also, it probably won't work well for YARN or S3Gateway since they need to start a client for each user and it has to support thousands of users. |
The thread overhead is on the datanode side, as streaming a block to a client ties up the server handler thread. So if you have lots of blocks being read slowly it will use up the handlers. I don't think there is an issue with the number of threads needed on the client side. For any given block read, it will need 1 extra thread while the block is being read. |
I have been primarily looking at 2, which I believe is the most common spark, Hive type workload, where blocks are largely read completely. 1 is more like an HBase use case, where it will scatter positional reads all over the file based on which part of the block its index tells it to read. |
| */ | ||
| public class StreamingReader implements StreamingReaderSpi { | ||
|
|
||
| private final BlockingQueue<ContainerProtos.ReadBlockResponseProto> responseQueue = new LinkedBlockingQueue<>(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thread overhead is on the datanode side, as streaming a block to a client ties up the server handler thread. ...
@sodonnel , This BlockingQueue<> responseQueue is at the client side. So, the thread blocking problem is at the client side.
Server side gRPC threads won't block in the current implementation. Since the server side onNext() implementation does not block. We may print some debug messages to verify.
It would be much better if the blocking thread problem is at the server side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added client side to #6613 (comment)
... the implementation uses a BlockingQueue responseQueue. When the responses are not removed from the queue, the
client sidegRPC thread is blocked in StreamingReader.onNext(..) and the gRPC streaming mechanism will throttle and slow down theserver side.
|
|
||
| ReadBlockResponseProto readBlock; | ||
| try { | ||
| readBlock = responseQueue.poll(30, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add param for this timeout value?
| return new ECBlockInputStreamProxy((ECReplicationConfig)repConfig, | ||
| blockInfo, xceiverFactory, refreshFunction, | ||
| ecBlockStreamFactory, config); | ||
| } else if (config.isStreamReadBlock() && allDataNodesSupportStreamBlock(pipeline)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do streaming for EC?
| adjustedOffset += bytesPerChecksum; | ||
| } | ||
| } | ||
| // TODO metrics.incContainerBytesStats(Type.ReadBlock, readBlock.getLen()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add metrics to see how it performs vs. non-streamed implementation?
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger( | ||
| KeyValueHandler.class); | ||
| private static final int STREAMING_BYTES_PER_CHUNK = 1024 * 64; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a configuration param for the size?
|
|
||
| @Override | ||
| public synchronized int read() throws IOException { | ||
| checkOpen(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be good to have open telemetry trace here.
| } | ||
|
|
||
| @Override | ||
| public synchronized int read(byte[] b, int off, int len) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be good to have open telemetry trace here.
| } | ||
|
|
||
| @Override | ||
| public synchronized int read(ByteBuffer targetBuf) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be good to have open telemetry trace here.
| } | ||
| } | ||
|
|
||
| private int fillBuffer() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be good to have open telemetry trace here.
| ByteString checksum = blockData.getChunks().get(chunkIndex).getChecksumData().getChecksums(checksumIndex); | ||
| checksumData = new ChecksumData(checksumType, bytesPerChecksum, Collections.singletonList(checksum)); | ||
| } | ||
| streamObserver.onNext(getReadBlockResponse(request, checksumData, buffer, adjustedOffset)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we need to check if streamObserver.isReady(), and if its not ready we should use setOnReadyHandler(Runnable onReadyHandler) for transferring the remaining data.
|
@sodonnel , Tried to add length to ReadBlockRequestProto. It works well. See https://github.com/szetszwo/ozone/commits/HDDS-10338b/ (please feel free to use the code.) Tested it with various client buffer sizes (17MB, 3.5MB, 4KB). Below is the case 3.5MB (=3565158). |
|
All three cases: test.log |
|
| stub.withDeadlineAfter(timeout, TimeUnit.SECONDS) | ||
| .streamBlock(request, streamObserver); | ||
| streamObserver.setStreamingDatanode(dn); | ||
| return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we do that only for one node?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it iterates over the list of available datanodes, and uses the first one that works. If the first one fails to connect, it will try to use the next one etc.
|
Thanks @sodonnel for the update and @szetszwo for the review. After reviewing the discussion above , have we considered implementing gRPC flow control here? I think that could effectively free up the blocked threads. Please refer to: Additionally, since we are optimizing the read path by switching from a unary request to streaming, the size of the TCP socket buffer might impact performance. I think we should expose this configuration to allow users to tune it. |
@chungen0126 , Let's merge the current code first and then work on the further improvements. It is already a lot of code. |
|
@chungen0126 , @sodonnel , we should plan how to merge the code. How about we merge it in three parts in the following order?
If you agree, please submit a new pull request against the current master. This PR is very long and hard to follow. (We may also consider merging to a dev branch first. However, since it is configurable and the default is disabled, we probably don't need a dev branch.) We may also create subtasks under HDDS-10338. |
Sure, I agree.
I have no idea how to merge this pr. If it was an acceptable way, let's do it. |
Please feel free to update the Summary and the Description of the JRIAs. |
|
I am happy for @szetszwo to simply pushed his latest changes onto this branch and just commit this one as a single PR. Its just more work and more confusing for future people to see the change split into 3, and if we want to cherry pick the change to other places for testing etc, its harder with 3 commits than 1. |
|
I am not able to take out time to do a complete review on this PR. I don't wanna block the PR please go ahead and merge the changes if other reviewers are good with the changes. I will review the changes offline and I will bring up bugs if at all I find any. Apologies for the delayed response. |
@devabhishekpal , could you follow up on this? |
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
Outdated
Show resolved
Hide resolved
|
@sodonnel @szetszwo Here's a commit that implements client-side gRPC flow control: chungen0126@73de01a This effectively prevents blocking the gRPC event loop thread. Since it's just a few lines of code, could you please take a look? If you agree with the change, I think it would be better to include it in this PR. |
|
@chungen0126 the goal is to get to the point where we commit https://github.com/szetszwo/ozone/commits/HDDS-10338b/ which @szetszwo created based on the work you and I did previously. Ideally we should just close this PR and open a new one with the change above and work to get it committed. It has changed the approach to implement the flow control by asking for the next chunks as it needs them. In some initial testing it appears to perform better than the latest commit on this PR, which also implemented flow control at the server side. |
…orage/StreamBlockInputStream.java Co-authored-by: Chung En Lee <[email protected]>
|
With some change in HDDS-14055 ( All JIRAs in #6613 (comment) have been merged. Closing this ... |
What changes were proposed in this pull request?
Introduce a new BlockInputStream to allow a block to be streamed to a client with a single RPC call.
Upon receiving the request to stream a block, the server seeks the block file to the given offset and starts sending pieces of the block to the client. Each piece of the block sent is a checksum sized piece of the block.
For example, if the checksum size is 16kb (the current default in Ozone), and a client requests a block starting at offset 17kb. Then the server will align the read to the checksum boundary and return 16kb of data starting at the 16kb offset and include the correct checksum. The server will then keep sending the subsequent pieces of the block to the client until it is blocked from doing so, where it will stall in the GRPC
onNext()method. The block file and file position are maintained in the server thread while the file remains open on the client side. This means the server handler thread is also blocked.On the client side, it will buffer the first response and wait for it to be consumed by any wrapping block streams. A blocking queue is used to prevent the server from sending too much data and hence provide back pressure to the server to prevent it sending any more until the client can consume it. In my testing, I found the server will send pieces of the block up to about 1MB which block on the TCP/IP socket until the client is able to read it off.
Compared to the current code in Ozone, with 16kb checksums, the time taken to cat a 1GB file on a docker cluster on my macbook as about 30 seconds:
With this change, the response time dropped to about 7.6 seconds:
Even with 1MB checksums, the speed up was 4.39s to 2.6s.
This approach has some potential problems, the main one being that it blocks server threads in the aim of efficiency. This means that this approach will require scaling the server handler pool accordingly, and possibly creating a new handler pool for stream reads to prevent them impacting other short lived calls (eg put block, traditional read block).
There are also a few pieces of this change that will need further work, eg:
What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/HDDS-10338
How was this patch tested?
Various new tests added