Skip to content

Commit 4e0b405

Browse files
authored
HDFS-17544. [ARR] The router client rpc protocol PB supports asynchrony. (#6870). Contributed by Jian Zhang.
Signed-off-by: He Xiaoqiao <[email protected]>
1 parent 6de630b commit 4e0b405

14 files changed

+3100
-22
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java

+14-14
Original file line numberDiff line numberDiff line change
@@ -274,47 +274,47 @@ public class ClientNamenodeProtocolTranslatorPB implements
274274
ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
275275
final private ClientNamenodeProtocolPB rpcProxy;
276276

277-
static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
277+
protected static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
278278
GetServerDefaultsRequestProto.newBuilder().build();
279279

280-
private final static GetFsStatusRequestProto VOID_GET_FSSTATUS_REQUEST =
280+
protected final static GetFsStatusRequestProto VOID_GET_FSSTATUS_REQUEST =
281281
GetFsStatusRequestProto.newBuilder().build();
282282

283-
private final static GetFsReplicatedBlockStatsRequestProto
283+
protected final static GetFsReplicatedBlockStatsRequestProto
284284
VOID_GET_FS_REPLICATED_BLOCK_STATS_REQUEST =
285285
GetFsReplicatedBlockStatsRequestProto.newBuilder().build();
286286

287-
private final static GetFsECBlockGroupStatsRequestProto
287+
protected final static GetFsECBlockGroupStatsRequestProto
288288
VOID_GET_FS_ECBLOCKGROUP_STATS_REQUEST =
289289
GetFsECBlockGroupStatsRequestProto.newBuilder().build();
290290

291-
private final static RollEditsRequestProto VOID_ROLLEDITS_REQUEST =
291+
protected final static RollEditsRequestProto VOID_ROLLEDITS_REQUEST =
292292
RollEditsRequestProto.getDefaultInstance();
293293

294-
private final static RefreshNodesRequestProto VOID_REFRESH_NODES_REQUEST =
294+
protected final static RefreshNodesRequestProto VOID_REFRESH_NODES_REQUEST =
295295
RefreshNodesRequestProto.newBuilder().build();
296296

297-
private final static FinalizeUpgradeRequestProto
297+
protected final static FinalizeUpgradeRequestProto
298298
VOID_FINALIZE_UPGRADE_REQUEST =
299299
FinalizeUpgradeRequestProto.newBuilder().build();
300300

301-
private final static UpgradeStatusRequestProto
301+
protected final static UpgradeStatusRequestProto
302302
VOID_UPGRADE_STATUS_REQUEST =
303303
UpgradeStatusRequestProto.newBuilder().build();
304304

305-
private final static GetDataEncryptionKeyRequestProto
305+
protected final static GetDataEncryptionKeyRequestProto
306306
VOID_GET_DATA_ENCRYPTIONKEY_REQUEST =
307307
GetDataEncryptionKeyRequestProto.newBuilder().build();
308308

309-
private final static GetStoragePoliciesRequestProto
309+
protected final static GetStoragePoliciesRequestProto
310310
VOID_GET_STORAGE_POLICIES_REQUEST =
311311
GetStoragePoliciesRequestProto.newBuilder().build();
312312

313-
private final static GetErasureCodingPoliciesRequestProto
313+
protected final static GetErasureCodingPoliciesRequestProto
314314
VOID_GET_EC_POLICIES_REQUEST = GetErasureCodingPoliciesRequestProto
315315
.newBuilder().build();
316316

317-
private final static GetErasureCodingCodecsRequestProto
317+
protected final static GetErasureCodingCodecsRequestProto
318318
VOID_GET_EC_CODEC_REQUEST = GetErasureCodingCodecsRequestProto
319319
.newBuilder().build();
320320

@@ -1137,7 +1137,7 @@ public void removeCacheDirective(long id)
11371137
setId(id).build()));
11381138
}
11391139

1140-
private static class BatchedCacheEntries
1140+
protected static class BatchedCacheEntries
11411141
implements BatchedEntries<CacheDirectiveEntry> {
11421142
private final ListCacheDirectivesResponseProto response;
11431143

@@ -1200,7 +1200,7 @@ public void removeCachePool(String cachePoolName) throws IOException {
12001200
setPoolName(cachePoolName).build()));
12011201
}
12021202

1203-
private static class BatchedCachePoolEntries
1203+
protected static class BatchedCachePoolEntries
12041204
implements BatchedEntries<CachePoolEntry> {
12051205
private final ListCachePoolsResponseProto proto;
12061206

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hdfs.protocolPB;
20+
21+
import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
22+
import org.apache.hadoop.io.Writable;
23+
import org.apache.hadoop.ipc.CallerContext;
24+
import org.apache.hadoop.ipc.Client;
25+
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
26+
import org.apache.hadoop.ipc.Server;
27+
import org.apache.hadoop.ipc.internal.ShadedProtobufHelper;
28+
import org.apache.hadoop.util.concurrent.AsyncGet;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import java.io.IOException;
33+
import java.util.concurrent.CompletableFuture;
34+
35+
import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
36+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
37+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCompleteWith;
38+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
39+
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
40+
41+
public final class AsyncRpcProtocolPBUtil {
42+
public static final Logger LOG = LoggerFactory.getLogger(AsyncRpcProtocolPBUtil.class);
43+
44+
private AsyncRpcProtocolPBUtil() {}
45+
46+
public static <T, R> R asyncIpcClient(
47+
ShadedProtobufHelper.IpcCall<T> call, ApplyFunction<T, R> response,
48+
Class<R> clazz) throws IOException {
49+
ipc(call);
50+
AsyncGet<T, Exception> asyncReqMessage =
51+
(AsyncGet<T, Exception>) ProtobufRpcEngine2.getAsyncReturnMessage();
52+
CompletableFuture<Writable> responseFuture = Client.getResponseFuture();
53+
// transfer originCall & callerContext to worker threads of executor.
54+
final Server.Call originCall = Server.getCurCall().get();
55+
final CallerContext originContext = CallerContext.getCurrent();
56+
asyncCompleteWith(responseFuture);
57+
asyncApply(o -> {
58+
try {
59+
Server.getCurCall().set(originCall);
60+
CallerContext.setCurrent(originContext);
61+
T res = asyncReqMessage.get(-1, null);
62+
return response.apply(res);
63+
} catch (Exception e) {
64+
throw warpCompletionException(e);
65+
}
66+
});
67+
return asyncReturn(clazz);
68+
}
69+
}

0 commit comments

Comments
 (0)