-
Notifications
You must be signed in to change notification settings - Fork 588
HDDS-10338. Implement a Client Datanode API to stream a block #6613
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 77 commits
d6d7b2a
d8a519d
ffebd66
a6ed056
fbd20eb
a663604
0f35371
80329dc
e398d17
a404266
c57c2e6
4dc9082
af4a25b
79ae3eb
b1b301e
ad0fd8d
e86aee9
5442761
8d97d47
b694448
d5dc908
74eac1e
29c8f80
741effb
f1d4d7f
6a67375
b0c64d7
b4cfd3f
91631ac
6e36ec1
e0b1d2b
0e8576e
f243c46
1d8500f
80eb936
7afa66b
b26b51d
c4e8b2b
56d8ca9
7807741
a8ea3ec
57b874c
7b98108
db82e3b
158d2e4
4935668
d87d2c4
4b5e14c
0d0e31c
d7e9c92
3cefe36
61b79ae
5fb1478
b857fba
92505bc
8202468
a1925c4
ef23455
7a7388f
7bbe3d9
71094bd
9e5f77c
281e91e
397ef40
fe5f8ec
9566b89
200da6d
0814a22
e040768
ece66fc
398692e
fead0b7
2fb2851
dec005d
1968756
5c70fd4
bc804ff
0e4e41e
f8e5f28
228de8d
c24da9b
f457ac2
c1fbad2
5e2b3cf
ba100c8
1fec95f
7daa875
5db133a
2bf716d
d53bdcb
aa51cbb
03986e4
2d07ec0
d39fd6b
7656d66
b794dd9
eec8afb
dbab8e2
fc95079
ec0aa17
3f2396f
1f90a0a
009938c
8ba23fc
7a3cd39
815a0cb
4c91e42
e2cdb12
f729fcc
94ad468
03898c2
10f3094
877df66
5525770
325e833
8d3ddff
7a37fcc
559c8a1
71c89f0
aee40b4
5b99b8d
3f117b7
7234baa
af70910
4b37064
4d74479
6bd9885
b12e235
27d4c66
6e4463e
ca61312
79ceaa9
5b70412
726f8e4
117fa17
4375666
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,6 +41,8 @@ | |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; | ||
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; | ||
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; | ||
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result; | ||
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; | ||
| import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc; | ||
| import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub; | ||
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos; | ||
|
|
@@ -441,7 +443,11 @@ private XceiverClientReply sendCommandWithRetry( | |
| // sendCommandAsyncCall will create a new channel and async stub | ||
| // in case these don't exist for the specific datanode. | ||
| reply.addDatanode(dn); | ||
| responseProto = sendCommandAsync(request, dn).getResponse().get(); | ||
| if (request.getCmdType() == ContainerProtos.Type.ReadBlock) { | ||
| responseProto = sendCommandReadBlock(request, dn).getResponse().get(); | ||
| } else { | ||
| responseProto = sendCommandAsync(request, dn).getResponse().get(); | ||
| } | ||
| if (validators != null && !validators.isEmpty()) { | ||
| for (Validator validator : validators) { | ||
| validator.accept(request, responseProto); | ||
|
|
@@ -485,7 +491,7 @@ private XceiverClientReply sendCommandWithRetry( | |
| String message = "Failed to execute command {}"; | ||
| if (LOG.isDebugEnabled()) { | ||
| LOG.debug(message + " on the pipeline {}.", | ||
| processForDebug(request), pipeline); | ||
| processForDebug(request), pipeline); | ||
| } else { | ||
| LOG.warn(message + " on the pipeline {}.", | ||
| request.getCmdType(), pipeline); | ||
|
|
@@ -618,6 +624,69 @@ private void decreasePendingMetricsAndReleaseSemaphore() { | |
| return new XceiverClientReply(replyFuture); | ||
| } | ||
|
|
||
| public XceiverClientReply sendCommandReadBlock( | ||
szetszwo marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ContainerCommandRequestProto request, DatanodeDetails dn) | ||
| throws IOException, InterruptedException { | ||
|
|
||
| CompletableFuture<ContainerCommandResponseProto> future = | ||
| new CompletableFuture<>(); | ||
| ContainerCommandResponseProto.Builder response = | ||
| ContainerCommandResponseProto.newBuilder(); | ||
| ContainerProtos.ReadBlockResponseProto.Builder readBlock = | ||
| ContainerProtos.ReadBlockResponseProto.newBuilder(); | ||
| checkOpen(dn); | ||
| UUID dnID = dn.getUuid(); | ||
| Type cmdType = request.getCmdType(); | ||
| semaphore.acquire(); | ||
| long requestTime = System.currentTimeMillis(); | ||
| metrics.incrPendingContainerOpsMetrics(cmdType); | ||
|
|
||
| final StreamObserver<ContainerCommandRequestProto> requestObserver = | ||
| asyncStubs.get(dnID).withDeadlineAfter(timeout, TimeUnit.SECONDS) | ||
| .send(new StreamObserver<ContainerCommandResponseProto>() { | ||
| @Override | ||
| public void onNext( | ||
| ContainerCommandResponseProto responseProto) { | ||
| if (responseProto.getResult() == Result.SUCCESS) { | ||
| readBlock.addReadChunk(responseProto.getReadChunk()); | ||
| } else { | ||
| future.complete( | ||
| ContainerCommandResponseProto.newBuilder(responseProto) | ||
szetszwo marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| .setCmdType(Type.ReadBlock).build()); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void onError(Throwable t) { | ||
| future.completeExceptionally(t); | ||
| metrics.decrPendingContainerOpsMetrics(cmdType); | ||
| metrics.addContainerOpsLatency( | ||
| cmdType, System.currentTimeMillis() - requestTime); | ||
chungen0126 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
chungen0126 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| @Override | ||
| public void onCompleted() { | ||
| if (readBlock.getReadChunkCount() > 0) { | ||
| future.complete(response.setReadBlock(readBlock) | ||
| .setCmdType(Type.ReadBlock).setResult(Result.SUCCESS).build()); | ||
| } | ||
| if (!future.isDone()) { | ||
| future.completeExceptionally(new IOException( | ||
| "Stream completed but no reply for request " + | ||
| processForDebug(request))); | ||
| } | ||
| metrics.decrPendingContainerOpsMetrics(cmdType); | ||
| metrics.addContainerOpsLatency( | ||
| cmdType, System.currentTimeMillis() - requestTime); | ||
| } | ||
| }); | ||
| requestObserver.onNext(request); | ||
| requestObserver.onCompleted(); | ||
| semaphore.release(); | ||
|
||
| return new XceiverClientReply(future); | ||
| } | ||
|
|
||
| private synchronized void checkOpen(DatanodeDetails dn) | ||
| throws IOException { | ||
| if (closed) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.