diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java index 3d309235fe891..14adb74c6d59e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java @@ -46,7 +46,7 @@ public interface AlignmentContext { void updateResponseState(RpcResponseHeaderProto.Builder header); /** - * This is the intended client method call to implement to recieve state info + * This is the intended client method call to implement to receive state info * during RPC response processing. * * @param header The RPC response header. @@ -73,7 +73,7 @@ public interface AlignmentContext { * @return state id required for the server to execute the call. * @throws IOException raised on errors performing I/O. */ - long receiveRequestState(RpcRequestHeaderProto header, long threshold) + long receiveRequestState(RpcRequestHeaderProto header, long threshold, boolean isCoordinatedCall) throws IOException; /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/NameServiceStateIdMode.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/NameServiceStateIdMode.java new file mode 100644 index 0000000000000..2ab674e8a2e2c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/NameServiceStateIdMode.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +public enum NameServiceStateIdMode { + DISABLE("DISABLE"), + TRANSMISSION("TRANSMISSION"), + PROXY("PROXY"); + + private String name; + + NameServiceStateIdMode(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } + + public boolean isDisable() { + return this == NameServiceStateIdMode.DISABLE; + } + + public boolean isTransmission() { + return this == NameServiceStateIdMode.TRANSMISSION; + } + + public boolean isProxy() { + return this == NameServiceStateIdMode.PROXY; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java index 4af35ad9270f1..298779ebf4a08 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java @@ -30,6 +30,8 @@ import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.NameserviceStateIdContextProto.NameServiceStateIdModeProto; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.thirdparty.protobuf.RpcController; @@ -238,4 +240,29 @@ public static String toTraceName(String fullName) { return fullName.substring(secondLastPeriod + 1, lastPeriod) + "#" + fullName.substring(lastPeriod + 1); } + + public static NameServiceStateIdMode toStateIdMode(String mode) { + return NameServiceStateIdMode.valueOf(mode.toUpperCase()); + } + + public static NameServiceStateIdMode toStateIdMode(RpcRequestHeaderProto proto) { + if (proto.hasNameserviceStateIdsContext()) { + return NameServiceStateIdMode.valueOf(proto.getNameserviceStateIdsContext().getMode().name()); + } + return null; + } + + public static NameServiceStateIdModeProto toNameServiceStateIdModeProto( + NameServiceStateIdMode mode) { + switch(mode) { + case DISABLE: + return NameServiceStateIdModeProto.DISABLE; + case TRANSMISSION: + return NameServiceStateIdModeProto.TRANSMISSION; + case PROXY: + return NameServiceStateIdModeProto.PROXY; + default: + return null; + } + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index e79612f7a5a0f..1a275a8a2b9c0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -928,7 +928,7 @@ public static class Call implements Schedulable, private volatile String detailedMetricsName = ""; final int callId; // the client's call id final int retryCount; // the retry count of the call - long timestampNanos; // time the call was received + private final long timestampNanos; // time the call was received long responseTimestampNanos; // time the call was served private AtomicInteger responseWaitCount = new AtomicInteger(1); final RPC.RpcKind rpcKind; @@ -1110,6 +1110,18 @@ public void setDeferredResponse(Writable response) { public void setDeferredError(Throwable t) { } + + public long getTimestampNanos() { + return timestampNanos; + } + + public int getCallId() { + return callId; + } + + public byte[] getClientId() { + return clientId; + } } /** A RPC extended call queued for handling. */ @@ -1190,8 +1202,7 @@ public Void run() throws Exception { ResponseParams responseParams = new ResponseParams(); try { - value = call( - rpcKind, connection.protocolName, rpcRequest, timestampNanos); + value = call(rpcKind, connection.protocolName, rpcRequest, getTimestampNanos()); } catch (Throwable e) { populateResponseParamsOnError(e, responseParams); } @@ -2884,11 +2895,10 @@ private void processRpcRequest(RpcRequestHeaderProto header, protoName = req.getRequestHeader().getDeclaringClassProtocolName(); if (alignmentContext.isCoordinatedCall(protoName, methodName)) { call.markCallCoordinated(true); - long stateId; - stateId = alignmentContext.receiveRequestState( - header, getMaxIdleTime()); - call.setClientStateId(stateId); } + long stateId = alignmentContext.receiveRequestState(header, getMaxIdleTime(), + call.isCallCoordinated()); + call.setClientStateId(stateId); } catch (IOException ioe) { throw new RpcServerException("Processing RPC request caught ", ioe); } diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto index 042928c2aee18..b34ef57a72350 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto @@ -74,6 +74,21 @@ message RPCCallerContextProto { optional bytes signature = 2; } +message NameserviceStateIdContextProto { + enum NameServiceStateIdModeProto { + DISABLE = 0; // NameserviceStateIdContextProto will be ignored. + TRANSMISSION = 1; // NameserviceStateIdContextProto will transparent transmission by router. + PROXY = 2; // State id is proxy by router, NameserviceStateIdContextProto will be ignore. + } + required NameServiceStateIdModeProto mode = 1 [default = DISABLE]; + repeated NameserviceStateIdProto nameserviceStateIds = 2; // Last seen state IDs for multiple nameservices. +} + +message NameserviceStateIdProto { + required string nsId = 1; + required int64 stateId = 2; +} + message RpcRequestHeaderProto { // the header for the RpcRequest enum OperationProto { RPC_FINAL_PACKET = 0; // The final RPC Packet @@ -91,6 +106,7 @@ message RpcRequestHeaderProto { // the header for the RpcRequest optional RPCTraceInfoProto traceInfo = 6; // tracing info optional RPCCallerContextProto callerContext = 7; // call context optional int64 stateId = 8; // The last seen Global State ID + optional NameserviceStateIdContextProto nameserviceStateIdsContext = 9; } @@ -157,6 +173,7 @@ message RpcResponseHeaderProto { optional bytes clientId = 7; // Globally unique client ID optional sint32 retryCount = 8 [default = -1]; optional int64 stateId = 9; // The last written Global State ID + optional NameserviceStateIdContextProto nameserviceStateIdsContext = 10; } message RpcSaslProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java index 4de969642d574..f55c853f40794 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java @@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.ipc.AlignmentContext; +import org.apache.hadoop.ipc.NameServiceStateIdMode; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; @@ -37,18 +38,28 @@ @InterfaceStability.Evolving public class ClientGSIContext implements AlignmentContext { - private final LongAccumulator lastSeenStateId = - new LongAccumulator(Math::max, Long.MIN_VALUE); + public final static String DEFAULT_NS = ""; + private final FederatedNamespaceIds federatedNamespaceIds; + private final String nsId; + + public ClientGSIContext(NameServiceStateIdMode mode) { + this(mode, DEFAULT_NS); + } + + public ClientGSIContext(NameServiceStateIdMode mode, String nsId) { + this.federatedNamespaceIds = new FederatedNamespaceIds(mode); + this.nsId = nsId; + } + @Override public long getLastSeenStateId() { - return lastSeenStateId.get(); + return federatedNamespaceIds.getNamespaceId(nsId, true).get(); } @Override public boolean isCoordinatedCall(String protocolName, String method) { - throw new UnsupportedOperationException( - "Client should not be checking uncoordinated call"); + throw new UnsupportedOperationException("Client should not be checking uncoordinated call"); } /** @@ -66,7 +77,11 @@ public void updateResponseState(RpcResponseHeaderProto.Builder header) { */ @Override public void receiveResponseState(RpcResponseHeaderProto header) { - lastSeenStateId.accumulate(header.getStateId()); + if (federatedNamespaceIds.isDisable()) { + federatedNamespaceIds.updateNameserviceState(this.nsId, header.getStateId()); + } else { + federatedNamespaceIds.updateStateUsingResponseHeader(header); + } } /** @@ -74,7 +89,12 @@ public void receiveResponseState(RpcResponseHeaderProto header) { */ @Override public void updateRequestState(RpcRequestHeaderProto.Builder header) { - header.setStateId(lastSeenStateId.longValue()); + if (federatedNamespaceIds.isDisable()) { + header.setStateId(federatedNamespaceIds.getNamespaceId(this.nsId, true).get()); + header.clearNameserviceStateIdsContext(); + } else { + federatedNamespaceIds.setRequestHeaderState(header); + } } /** @@ -82,8 +102,8 @@ public void updateRequestState(RpcRequestHeaderProto.Builder header) { * Client does not receive RPC requests therefore this does nothing. */ @Override - public long receiveRequestState(RpcRequestHeaderProto header, long threshold) - throws IOException { + public long receiveRequestState(RpcRequestHeaderProto header, long threshold, + boolean isCoordinatedCall) throws IOException { // Do nothing. return 0; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java new file mode 100644 index 0000000000000..f04a3533d1f84 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.ipc.RpcClientUtil; +import org.apache.hadoop.ipc.NameServiceStateIdMode; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.NameserviceStateIdProto; + + +/** Collection of last-seen namespace state Ids for a set of namespaces. */ +public class FederatedNamespaceIds { + + private final Map namespaceIdMap = new ConcurrentHashMap<>(); + private NameServiceStateIdMode mode; + + public FederatedNamespaceIds(NameServiceStateIdMode mode) { + this.mode = mode; + } + + public void updateStateUsingRequestHeader(RpcRequestHeaderProto header) { + mode = RpcClientUtil.toStateIdMode(header); + header.getNameserviceStateIdsContext().getNameserviceStateIdsList() + .forEach(this::updateNameserviceState); + } + + public void updateStateUsingResponseHeader(RpcResponseHeaderProto header) { + header.getNameserviceStateIdsContext().getNameserviceStateIdsList() + .forEach(this::updateNameserviceState); + } + + public void updateNameserviceState(NameserviceStateIdProto proto) { + namespaceIdMap.computeIfAbsent(proto.getNsId(), n -> new NamespaceStateId()); + namespaceIdMap.get(proto.getNsId()).update(proto.getStateId()); + } + + public void updateNameserviceState(String nsId, long stateId) { + namespaceIdMap.computeIfAbsent(nsId, n -> new NamespaceStateId()); + namespaceIdMap.get(nsId).update(stateId); + } + + public void setRequestHeaderState(RpcRequestHeaderProto.Builder headerBuilder) { + headerBuilder.getNameserviceStateIdsContextBuilder() + .setMode(RpcClientUtil.toNameServiceStateIdModeProto(mode)); + namespaceIdMap.forEach((k, v) -> headerBuilder.getNameserviceStateIdsContextBuilder() + .addNameserviceStateIds( + NameserviceStateIdProto.newBuilder() + .setNsId(k) + .setStateId(v.get()) + .build()) + ); + } + + public void setRequestHeaderState(RpcRequestHeaderProto.Builder headerBuilder, String nsId) { + NamespaceStateId namespaceStateId = namespaceIdMap.get(nsId); + long stateId = (namespaceStateId == null) ? NamespaceStateId.DEFAULT : namespaceStateId.get(); + headerBuilder.setStateId(stateId); + } + + public void setResponseHeaderState(RpcResponseHeaderProto.Builder headerBuilder) { + headerBuilder.getNameserviceStateIdsContextBuilder() + .setMode(RpcClientUtil.toNameServiceStateIdModeProto(mode)); + namespaceIdMap.forEach((k, v) -> headerBuilder.getNameserviceStateIdsContextBuilder() + .addNameserviceStateIds( + NameserviceStateIdProto.newBuilder() + .setNsId(k) + .setStateId(v.get()) + .build()) + ); + } + + public NamespaceStateId getNamespaceId(String nsId, boolean useDefault) { + if (useDefault) { + namespaceIdMap.computeIfAbsent(nsId, n -> new NamespaceStateId()); + } + return namespaceIdMap.get(nsId); + } + + public boolean isProxyMode() { + return mode == NameServiceStateIdMode.PROXY; + } + + public boolean isTransmissionMode() { + return mode == NameServiceStateIdMode.TRANSMISSION; + } + + public boolean isDisable() { + return mode == NameServiceStateIdMode.DISABLE; + } + + public NameServiceStateIdMode getMode() { + return mode; + } + + public boolean contains(String nsId) { + return this.namespaceIdMap.containsKey(nsId); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NamespaceStateId.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NamespaceStateId.java new file mode 100644 index 0000000000000..bf864df6f2b3e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NamespaceStateId.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import java.util.concurrent.atomic.LongAccumulator; + +/** + * Object to hold the last-seen state Id for a namespace. + */ +public class NamespaceStateId { + public static final Long DEFAULT = Long.MIN_VALUE; + private final LongAccumulator lastSeenStateId = new LongAccumulator(Math::max, DEFAULT); + + public long get() { + return lastSeenStateId.get(); + } + + public void update(Long stateId) { + lastSeenStateId.accumulate(stateId); + } + + public long getThenReset() { + return lastSeenStateId.getThenReset(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 8e9a5b62490d0..65240d8f1c048 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -537,4 +537,8 @@ interface HttpClient { String FAILOVER_SLEEPTIME_MAX_KEY = PREFIX + "failover.sleep.max.millis"; int FAILOVER_SLEEPTIME_MAX_DEFAULT = 15000; } + + public static final String DFS_CLIENT_NAMESERVICE_STATE_ID_MODE = + "dfs.client.nameservice-state-id.mode"; + public static final String DFS_CLIENT_NAMESERVICE_STATE_ID_MODE_DEFAULT = "DISABLE"; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java index e1a7c2f8030cb..e4ca8167c145b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java @@ -41,9 +41,11 @@ import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.Client.ConnectionId; +import org.apache.hadoop.ipc.NameServiceStateIdMode; import org.apache.hadoop.ipc.ObserverRetryOnActiveException; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcInvocationHandler; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.util.Time; @@ -74,7 +76,7 @@ public class ObserverReadProxyProvider ObserverReadProxyProvider.class); /** Configuration key for {@link #autoMsyncPeriodMs}. */ - static final String AUTO_MSYNC_PERIOD_KEY_PREFIX = + public static final String AUTO_MSYNC_PERIOD_KEY_PREFIX = HdfsClientConfigKeys.Failover.PREFIX + "observer.auto-msync-period"; /** Auto-msync disabled by default. */ static final long AUTO_MSYNC_PERIOD_DEFAULT = -1; @@ -83,7 +85,7 @@ public class ObserverReadProxyProvider private final AlignmentContext alignmentContext; /** Configuration key for {@link #observerProbeRetryPeriodMs}. */ - static final String OBSERVER_PROBE_RETRY_PERIOD_KEY = + public static final String OBSERVER_PROBE_RETRY_PERIOD_KEY = HdfsClientConfigKeys.Failover.PREFIX + "observer.probe.retry.period"; /** Observer probe retry period default to 10 min. */ static final long OBSERVER_PROBE_RETRY_PERIOD_DEFAULT = 60 * 10 * 1000; @@ -177,7 +179,10 @@ public ObserverReadProxyProvider( AbstractNNFailoverProxyProvider failoverProxy) { super(conf, uri, xface, factory); this.failoverProxy = failoverProxy; - this.alignmentContext = new ClientGSIContext(); + NameServiceStateIdMode mode = RpcClientUtil.toStateIdMode( + conf.get(HdfsClientConfigKeys.DFS_CLIENT_NAMESERVICE_STATE_ID_MODE, + HdfsClientConfigKeys.DFS_CLIENT_NAMESERVICE_STATE_ID_MODE_DEFAULT)); + this.alignmentContext = new ClientGSIContext(mode); factory.setAlignmentContext(alignmentContext); this.lastObserverProbeTime = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java index 979e7504a872b..65c6c34eb2ff6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java @@ -30,6 +30,10 @@ public interface FederationRPCMBean { long getProxyOps(); + long getActiveProxyOps(); + + long getObserverProxyOps(); + double getProxyAvg(); long getProcessingOps(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java index 823bc7b8af21c..83175a372d4b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.annotation.Metric; @@ -49,6 +50,10 @@ public class FederationRPCMetrics implements FederationRPCMBean { private MutableRate proxy; @Metric("Number of operations the Router proxied to a Namenode") private MutableCounterLong proxyOp; + @Metric("Number of operations the Router proxied to a Active Namenode") + private MutableCounterLong activeProxyOp; + @Metric("Number of operations the Router proxied to a Observer Namenode") + private MutableCounterLong observerProxyOp; @Metric("Number of operations to hit a standby NN") private MutableCounterLong proxyOpFailureStandby; @@ -257,8 +262,13 @@ public String getAsyncCallerPool() { * the Namenode until it replied. * @param time Proxy time of an operation in nanoseconds. */ - public void addProxyTime(long time) { + public void addProxyTime(long time, FederationNamenodeServiceState state) { proxy.add(time); + if (FederationNamenodeServiceState.ACTIVE == state) { + activeProxyOp.incr(); + } else if (FederationNamenodeServiceState.OBSERVER == state) { + observerProxyOp.incr(); + } proxyOp.incr(); } @@ -272,6 +282,16 @@ public long getProxyOps() { return proxyOp.value(); } + @Override + public long getActiveProxyOps() { + return activeProxyOp.value(); + } + + @Override + public long getObserverProxyOps() { + return observerProxyOp.value(); + } + /** * Add the time to process a request in the Router from the time we receive * the call until we send it to the Namenode. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java index 159d08e26a161..f88a3418abc55 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java @@ -28,6 +28,7 @@ import javax.management.StandardMBean; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.router.FederationUtil; import org.apache.hadoop.hdfs.server.federation.router.RouterRpcMonitor; import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; @@ -147,12 +148,12 @@ public long proxyOp() { } @Override - public void proxyOpComplete(boolean success, String nsId) { + public void proxyOpComplete(boolean success, String nsId, FederationNamenodeServiceState state) { if (success) { long proxyTime = getProxyTime(); if (proxyTime >= 0) { if (metrics != null) { - metrics.addProxyTime(proxyTime); + metrics.addProxyTime(proxyTime, state); } if (nameserviceRPCMetricsMap != null && nameserviceRPCMetricsMap.containsKey(nsId)) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java index be88069b49166..fa828e51cdc21 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java @@ -877,7 +877,7 @@ private List getActiveNamenodeRegistrations() // Fetch the most recent namenode registration String nsId = nsInfo.getNameserviceId(); List nns = - namenodeResolver.getNamenodesForNameserviceId(nsId); + namenodeResolver.getNamenodesForNameserviceId(nsId, false); if (nns != null) { FederationNamenodeContext nn = nns.get(0); if (nn instanceof MembershipState) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java index f06df70b517cf..2878f1b3ef11a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java @@ -43,6 +43,16 @@ @InterfaceStability.Evolving public interface ActiveNamenodeResolver { + /** + * Report a failed, unavailable NN address for a nameservice or blockPool. + * + * @param ns Nameservice identifier. + * @param failedAddress The address the failed responded to the command. + * + * @throws IOException If the state store cannot be accessed. + */ + void updateUnavailableNamenode(String ns, InetSocketAddress failedAddress) throws IOException; + /** * Report a successful, active NN address for a nameservice or blockPool. * @@ -56,20 +66,30 @@ void updateActiveNamenode( /** * Returns a prioritized list of the most recent cached registration entries - * for a single nameservice ID. - * Returns an empty list if none are found. Returns entries in preference of: + * for a single nameservice ID. Returns an empty list if none are found. + * In the case of not observerRead Returns entries in preference of : + *

+ * + * In the case of observerRead Returns entries in preference of : * * * @param nameserviceId Nameservice identifier. + * @param observerRead Observer read case, observer NN will be ranked first. * @return Prioritized list of namenode contexts. * @throws IOException If the state store cannot be accessed. */ List - getNamenodesForNameserviceId(String nameserviceId) throws IOException; + getNamenodesForNameserviceId(String nameserviceId, boolean observerRead) throws IOException; /** * Returns a prioritized list of the most recent cached registration entries @@ -77,6 +97,7 @@ void updateActiveNamenode( * Returns an empty list if none are found. Returns entries in preference of: * diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java index 9f0f78067aedd..63e67d12f9a5b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.ACTIVE; import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.EXPIRED; +import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.OBSERVER; import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.UNAVAILABLE; import java.io.IOException; @@ -73,8 +74,12 @@ public class MembershipNamenodeResolver /** Parent router ID. */ private String routerId; - /** Cached lookup of NN for nameservice. Invalidated on cache refresh. */ + /** Cached lookup of NN for nameservice with active state ranked first. + * Invalidated on cache refresh. */ private Map> cacheNS; + /** Cached lookup of NN for nameservice with observer state ranked first. + * Invalidated on cache refresh. */ + private Map> observerFirstCacheNS; /** Cached lookup of NN for block pool. Invalidated on cache refresh. */ private Map> cacheBP; @@ -84,6 +89,7 @@ public MembershipNamenodeResolver( this.stateStore = store; this.cacheNS = new ConcurrentHashMap<>(); + this.observerFirstCacheNS = new ConcurrentHashMap<>(); this.cacheBP = new ConcurrentHashMap<>(); if (this.stateStore != null) { @@ -132,13 +138,24 @@ public boolean loadCache(boolean force) { // Force refresh of active NN cache cacheBP.clear(); + observerFirstCacheNS.clear(); cacheNS.clear(); return true; } @Override - public void updateActiveNamenode( - final String nsId, final InetSocketAddress address) throws IOException { + public void updateUnavailableNamenode(String nsId, InetSocketAddress address) throws IOException { + updateNameNodeState(nsId, address, UNAVAILABLE); + } + + @Override + public void updateActiveNamenode(final String nsId, final InetSocketAddress address) + throws IOException { + updateNameNodeState(nsId, address, ACTIVE); + } + + private void updateNameNodeState(final String nsId, final InetSocketAddress address, + FederationNamenodeServiceState state) throws IOException { // Called when we have an RPC miss and successful hit on an alternate NN. // Temporarily update our cache, it will be overwritten on the next update. @@ -160,10 +177,11 @@ public void updateActiveNamenode( MembershipState record = records.get(0); UpdateNamenodeRegistrationRequest updateRequest = UpdateNamenodeRegistrationRequest.newInstance( - record.getNameserviceId(), record.getNamenodeId(), ACTIVE); + record.getNameserviceId(), record.getNamenodeId(), state); membership.updateNamenodeRegistration(updateRequest); cacheNS.remove(nsId); + observerFirstCacheNS.remove(nsId); // Invalidating the full cacheBp since getting the blockpool id from // namespace id is quite costly. cacheBP.clear(); @@ -174,10 +192,12 @@ public void updateActiveNamenode( } @Override - public List getNamenodesForNameserviceId( - final String nsId) throws IOException { + public List getNamenodesForNameserviceId(final String nsId, + boolean observerRead) throws IOException { + Map> cache = + observerRead ? observerFirstCacheNS : cacheNS; - List ret = cacheNS.get(nsId); + List ret = cache.get(nsId); if (ret != null) { return ret; } @@ -189,7 +209,7 @@ public List getNamenodesForNameserviceId( partial.setNameserviceId(nsId); GetNamenodeRegistrationsRequest request = GetNamenodeRegistrationsRequest.newInstance(partial); - result = getRecentRegistrationForQuery(request, true, false); + result = getRecentRegistrationForQuery(request, true, false, observerRead); } catch (StateStoreUnavailableException e) { LOG.error("Cannot get active NN for {}, State Store unavailable", nsId); return null; @@ -218,7 +238,7 @@ public List getNamenodesForNameserviceId( // Cache the response ret = Collections.unmodifiableList(result); - cacheNS.put(nsId, result); + cache.put(nsId, result); return ret; } @@ -235,7 +255,7 @@ public List getNamenodesForBlockPoolId( GetNamenodeRegistrationsRequest.newInstance(partial); final List result = - getRecentRegistrationForQuery(request, true, false); + getRecentRegistrationForQuery(request, true, false, false); if (result == null || result.isEmpty()) { LOG.error("Cannot locate eligible NNs for {}", bpId); } else { @@ -346,22 +366,34 @@ public Set getDisabledNamespaces() throws IOException { } /** - * Picks the most relevant record registration that matches the query. Return - * registrations matching the query in this preference: 1) Most recently - * updated ACTIVE registration 2) Most recently updated STANDBY registration - * (if showStandby) 3) Most recently updated UNAVAILABLE registration (if - * showUnavailable). EXPIRED registrations are ignored. + * Picks the most relevant record registration that matches the query. + * If not observer read, + * return registrations matching the query in this preference: + * 1) Most recently updated ACTIVE registration + * 2) Most recently updated Observer registration + * 3) Most recently updated STANDBY registration (if showStandby) + * 4) Most recently updated UNAVAILABLE registration (if showUnavailable). + * + * If observer read, + * return registrations matching the query in this preference: + * 1) Most recently updated Observer registration + * 2) Most recently updated ACTIVE registration + * 3) Most recently updated STANDBY registration (if showStandby) + * 4) Most recently updated UNAVAILABLE registration (if showUnavailable). + * + * EXPIRED registrations are ignored. * * @param request The select query for NN registrations. * @param addUnavailable include UNAVAILABLE registrations. * @param addExpired include EXPIRED registrations. + * @param observerRead Observer read case, observer NN will be ranked first * @return List of memberships or null if no registrations that * both match the query AND the selected states. * @throws IOException */ private List getRecentRegistrationForQuery( GetNamenodeRegistrationsRequest request, boolean addUnavailable, - boolean addExpired) throws IOException { + boolean addExpired, boolean observerRead) throws IOException { // Retrieve a list of all registrations that match this query. // This may include all NN records for a namespace/blockpool, including @@ -371,24 +403,36 @@ private List getRecentRegistrationForQuery( membershipStore.getNamenodeRegistrations(request); List memberships = response.getNamenodeMemberships(); - if (!addExpired || !addUnavailable) { - Iterator iterator = memberships.iterator(); - while (iterator.hasNext()) { - MembershipState membership = iterator.next(); - if (membership.getState() == EXPIRED && !addExpired) { - iterator.remove(); - } else if (membership.getState() == UNAVAILABLE && !addUnavailable) { - iterator.remove(); - } + List observerMemberships = new ArrayList<>(); + Iterator iterator = memberships.iterator(); + while (iterator.hasNext()) { + MembershipState membership = iterator.next(); + if (membership.getState() == EXPIRED && !addExpired) { + iterator.remove(); + } else if (membership.getState() == UNAVAILABLE && !addUnavailable) { + iterator.remove(); + } else if (membership.getState() == OBSERVER && observerRead) { + iterator.remove(); + observerMemberships.add(membership); } } - List priorityList = new ArrayList<>(); - priorityList.addAll(memberships); - Collections.sort(priorityList, new NamenodePriorityComparator()); - - LOG.debug("Selected most recent NN {} for query", priorityList); - return priorityList; + if(!observerRead) { + Collections.sort(memberships, new NamenodePriorityComparator()); + LOG.debug("Selected most recent NN {} for query", memberships); + return memberships; + } else { + List ret = new ArrayList<>( + memberships.size() + observerMemberships.size()); + Collections.sort(memberships, new NamenodePriorityComparator()); + if(observerMemberships.size() > 1) { + Collections.shuffle(observerMemberships); + } + ret.addAll(observerMemberships); + ret.addAll(memberships); + LOG.debug("Selected most recent NN {} for query", ret); + return ret; + } } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java index 5fe797bf5ce2c..ddba8ff14caf6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java @@ -34,6 +34,9 @@ import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.ClientGSIContext; +import org.apache.hadoop.ipc.AlignmentContext; +import org.apache.hadoop.ipc.NameServiceStateIdMode; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; import org.eclipse.jetty.util.ajax.JSON; @@ -85,13 +88,15 @@ public class ConnectionManager { /** If the connection manager is running. */ private boolean running = false; + private NameServiceStateIdMode nsIdMode; + private Map alignmentContexts; /** * Creates a proxy client connection pool manager. * * @param config Configuration for the connections. */ - public ConnectionManager(Configuration config) { + public ConnectionManager(Configuration config, NameServiceStateIdMode mode) { this.conf = config; // Configure minimum, maximum and active connection pools @@ -125,6 +130,8 @@ public ConnectionManager(Configuration config) { RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT); LOG.info("Cleaning connections every {} seconds", TimeUnit.MILLISECONDS.toSeconds(this.connectionCleanupPeriodMs)); + this.alignmentContexts = new HashMap<>(); + this.nsIdMode = mode; } /** @@ -176,7 +183,7 @@ public void close() { * @throws IOException If the connection cannot be obtained. */ public ConnectionContext getConnection(UserGroupInformation ugi, - String nnAddress, Class protocol) throws IOException { + String nnAddress, Class protocol, String nsId) throws IOException { // Check if the manager is shutdown if (!this.running) { @@ -203,9 +210,22 @@ public ConnectionContext getConnection(UserGroupInformation ugi, try { pool = this.pools.get(connectionId); if (pool == null) { + if (alignmentContexts != null && !alignmentContexts.containsKey(nsId)) { + synchronized (alignmentContexts) { + if (!alignmentContexts.containsKey(nsId)) { + AlignmentContext context = null; + if (nsIdMode.isTransmission()) { + context = new RouterNameNodeSideStateIdContext(nsId); + } else if (nsIdMode.isProxy()) { + context = new ClientGSIContext(NameServiceStateIdMode.DISABLE, nsId); + } + alignmentContexts.put(nsId, context); + } + } + } pool = new ConnectionPool( this.conf, nnAddress, ugi, this.minSize, this.maxSize, - this.minActiveRatio, protocol); + this.minActiveRatio, protocol, alignmentContexts.get(nsId)); this.pools.put(connectionId, pool); } } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java index 293a4b64d2031..03ac6286e4a0a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java @@ -47,6 +47,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryUtils; +import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; @@ -105,6 +106,8 @@ public class ConnectionPool { /** The last time a connection was active. */ private volatile long lastActiveTime = 0; + private final AlignmentContext alignmentContext; + /** Map for the protocols and their protobuf implementations. */ private final static Map, ProtoImpl> PROTO_MAP = new HashMap<>(); static { @@ -134,7 +137,7 @@ private static class ProtoImpl { protected ConnectionPool(Configuration config, String address, UserGroupInformation user, int minPoolSize, int maxPoolSize, - float minActiveRatio, Class proto) throws IOException { + float minActiveRatio, Class proto, AlignmentContext alignmentContext) throws IOException { this.conf = config; @@ -149,6 +152,7 @@ protected ConnectionPool(Configuration config, String address, this.minSize = minPoolSize; this.maxSize = maxPoolSize; this.minActiveRatio = minActiveRatio; + this.alignmentContext = alignmentContext; // Add minimum connections to the pool for (int i=0; i ConnectionContext newConnection(Configuration conf, - String nnAddress, UserGroupInformation ugi, Class proto) + protected static ConnectionContext newConnection(Configuration conf, String nnAddress, + UserGroupInformation ugi, Class proto, AlignmentContext alignmentContext) throws IOException { if (!PROTO_MAP.containsKey(proto)) { String msg = "Unsupported protocol for connection to NameNode: " @@ -438,7 +442,7 @@ protected static ConnectionContext newConnection(Configuration conf, InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress); final long version = RPC.getProtocolVersion(classes.protoPb); Object proxy = RPC.getProtocolProxy(classes.protoPb, version, socket, ugi, - conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy(); + conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null, alignmentContext).getProxy(); T client = newProtoClient(proto, classes, proxy); Text dtService = SecurityUtil.buildTokenService(socket); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index c0a9e3f294cd8..db1f6e35122ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -181,6 +181,18 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { FEDERATION_STORE_PREFIX + "enable"; public static final boolean DFS_ROUTER_STORE_ENABLE_DEFAULT = true; + public static final String DFS_ROUTER_NAMESERVICE_STATE_ID_MODE = + FEDERATION_ROUTER_PREFIX + "nameservice-state-id.mode"; + public static final String DFS_ROUTER_NAMESERVICE_STATE_ID_MODE_DEFAULT = "DISABLE"; + + public static final String DFS_ROUTER_OBSERVER_AUTO_MSYNC_PERIOD = + FEDERATION_ROUTER_PREFIX + "observer.auto-msync-period"; + public static final long DFS_ROUTER_OBSERVER_AUTO_MSYNC_PERIOD_DEFAULT = 0; + + public static final String DFS_ROUTER_CACHE_STATE_ID_TIMEOUT = + FEDERATION_ROUTER_PREFIX + "cache.state.id.timeout"; + public static final long DFS_ROUTER_CACHE_STATE_ID_TIMEOUT_DEFAULT = 600000; + public static final String FEDERATION_STORE_SERIALIZER_CLASS = FEDERATION_STORE_PREFIX + "serializer"; public static final Class diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index 73445595de7ad..1064b3d1118d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -20,6 +20,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_KEY; import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.updateMountPointStatus; + +import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; @@ -41,6 +43,7 @@ import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.FederatedNamespaceIds; import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing; @@ -84,12 +87,15 @@ import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException; +import org.apache.hadoop.hdfs.server.federation.router.RouterStateIdCache.UniqueCallID; import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.NameServiceStateIdMode; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -115,6 +121,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * Module that implements all the RPC calls in {@link ClientProtocol} in the @@ -159,6 +166,8 @@ public class RouterClientProtocol implements ClientProtocol { /** Router security manager to handle token operations. */ private RouterSecurityManager securityManager = null; + private NameServiceStateIdMode defaultNsIdMode; + RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) { this.rpcServer = rpcServer; this.rpcClient = rpcServer.getRPCClient(); @@ -193,6 +202,9 @@ public class RouterClientProtocol implements ClientProtocol { this.routerCacheAdmin = new RouterCacheAdmin(rpcServer); this.securityManager = rpcServer.getRouterSecurityManager(); this.rbfRename = new RouterFederationRename(rpcServer, conf); + this.defaultNsIdMode = NameServiceStateIdMode.valueOf( + conf.get(RBFConfigKeys.DFS_ROUTER_NAMESERVICE_STATE_ID_MODE, + RBFConfigKeys.DFS_ROUTER_NAMESERVICE_STATE_ID_MODE_DEFAULT)); } @Override @@ -1847,7 +1859,27 @@ public BatchedEntries listOpenFiles(long prevId, @Override public void msync() throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.READ, false); + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true); + if (defaultNsIdMode.isTransmission()) { + Set nss = getMsyncNames(); + if (CollectionUtils.isNotEmpty(nss)) { + RemoteMethod method = new RemoteMethod("msync"); + rpcClient.invokeConcurrent(nss, method); + } + } + } + + private Set getMsyncNames() throws IOException { + Server.Call call = Server.getCurCall().get(); + assert call != null; + UniqueCallID suid = new UniqueCallID(call.getClientId(), call.getCallId()); + FederatedNamespaceIds ids = RouterStateIdCache.get(suid); + if (ids != null) { + Set nss = namenodeResolver.getNamespaces(); + return nss.stream().filter(ns -> ids.contains(ns.getNameserviceId())) + .collect(Collectors.toSet()); + } + return null; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientSideStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientSideStateIdContext.java new file mode 100644 index 0000000000000..ede74c7829221 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientSideStateIdContext.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.FederatedNamespaceIds; +import org.apache.hadoop.ipc.AlignmentContext; +import org.apache.hadoop.ipc.RpcClientUtil; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; +import org.apache.hadoop.hdfs.server.federation.router.RouterStateIdCache.UniqueCallID; + +import java.io.IOException; + +/* + * Only router's state id is CLIENT mode, this class will be constructed. + * */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +class RouterClientSideStateIdContext implements AlignmentContext { + + RouterClientSideStateIdContext() { + } + + @Override + public void updateResponseState(RpcResponseHeaderProto.Builder header) { + // Fill header with federatedNamespaceIds updated by namenode. Then send back to client. + UniqueCallID uid = new UniqueCallID(header.getClientId().toByteArray(), header.getCallId()); + FederatedNamespaceIds ids = RouterStateIdCache.get(uid); + if (ids != null) { + ids.setResponseHeaderState(header); + RouterStateIdCache.remove(uid); + } + } + + @Override + public void receiveResponseState(RpcResponseHeaderProto header) { + throw new UnsupportedOperationException("Router server should not receive response state"); + } + + @Override + public void updateRequestState(RpcRequestHeaderProto.Builder header) { + throw new UnsupportedOperationException("Router server should not update request state"); + } + + @Override + public long receiveRequestState(RpcRequestHeaderProto header,long clientWaitTime, + boolean isCoordinatedCall) throws IOException { + // Receive request from client, cache the federatedNamespaceIds + UniqueCallID uid = new UniqueCallID(header.getClientId().toByteArray(), header.getCallId()); + if (header.hasNameserviceStateIdsContext()) { + // Only cache FederatedNamespaceIds which mode is PROXY or TRANSMISSION + FederatedNamespaceIds ids = new FederatedNamespaceIds(RpcClientUtil.toStateIdMode(header)); + ids.updateStateUsingRequestHeader(header); + RouterStateIdCache.put(uid, ids); + } else { + RouterStateIdCache.remove(uid); + } + return 0; + } + + @Override + public long getLastSeenStateId() { + // In Router, getLastSeenStateId always larger than receiveRequestState. + return Long.MAX_VALUE; + } + + @Override + public boolean isCoordinatedCall(String protocolName, String methodName) { + return true; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNameNodeSideStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNameNodeSideStateIdContext.java new file mode 100644 index 0000000000000..3b79980b9d956 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNameNodeSideStateIdContext.java @@ -0,0 +1,80 @@ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.hdfs.FederatedNamespaceIds; +import org.apache.hadoop.ipc.AlignmentContext; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; +import org.apache.hadoop.hdfs.server.federation.router.RouterStateIdCache.UniqueCallID; + +import java.io.IOException; + +public class RouterNameNodeSideStateIdContext implements AlignmentContext { + + private String nsId; + + RouterNameNodeSideStateIdContext(String nsId) { + this.nsId = nsId; + } + + @Override + public void updateResponseState(RpcHeaderProtos.RpcResponseHeaderProto.Builder header) { + throw new UnsupportedOperationException("Router rpc Client should not update response state"); + } + + @Override + public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header) { + // Receive from NameNode, then update the cached variable. + UniqueCallID uid = new UniqueCallID(header.getClientId().toByteArray(), header.getCallId()); + FederatedNamespaceIds ids = RouterStateIdCache.get(uid); + if (ids != null) { + ids.updateNameserviceState(nsId, header.getStateId()); + if (ids.isProxyMode()) { + RouterStateIdCache.get(nsId).updateNameserviceState(nsId, header.getStateId()); + } else if (ids.isTransmissionMode()) { + ids.updateNameserviceState(nsId, header.getStateId()); + RouterStateIdCache.remove(uid); + } + } + } + + @Override + public void updateRequestState(RpcHeaderProtos.RpcRequestHeaderProto.Builder header) { + // Fill header with the cached thread local variable from client. + // Then send to NameNode from router. + Server.Call call = Server.getCurCall().get(); + assert call != null; + UniqueCallID suid = new UniqueCallID(call.getClientId(), call.getCallId()); + FederatedNamespaceIds ids = RouterStateIdCache.get(suid); + if (ids != null) { + if (ids.isProxyMode()) { + RouterStateIdCache.get(nsId).setRequestHeaderState(header, nsId); + } else if (ids.isTransmissionMode()) { + ids.setRequestHeaderState(header, nsId); + } + // Update to newCallId, it is used for receiveResponseState to find FederatedNamespaceIds + UniqueCallID clientCallId = new UniqueCallID(header.getClientId().toByteArray(), + header.getCallId()); + RouterStateIdCache.put(clientCallId, ids); + } else { + // If rpc request from old version hdfs client, ids will be null. + // Then we need to disable observe read. + header.clearStateId(); + } + } + + @Override + public long receiveRequestState(RpcHeaderProtos.RpcRequestHeaderProto header, long threshold, + boolean isCoordinatedCall) throws IOException { + throw new UnsupportedOperationException("Router rpc Client should not receive request state"); + } + + @Override + public long getLastSeenStateId() { + return 0; + } + + @Override + public boolean isCoordinatedCall(String protocolName, String method) { + throw new UnsupportedOperationException("Client should not be checking uncoordinated call"); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index e90cc5fda41d1..6442e085df9dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -37,6 +37,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -55,13 +56,16 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.FederatedNamespaceIds; import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.SnapshotException; import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController; @@ -69,18 +73,24 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.router.RouterStateIdCache.UniqueCallID; +import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision; import org.apache.hadoop.ipc.CallerContext; +import org.apache.hadoop.ipc.NameServiceStateIdMode; +import org.apache.hadoop.ipc.ObserverRetryOnActiveException; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server.Call; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.net.ConnectTimeoutException; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; import org.eclipse.jetty.util.ajax.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,6 +133,8 @@ public class RouterRpcClient { private final ThreadPoolExecutor executorService; /** Retry policy for router -> NN communication. */ private final RetryPolicy retryPolicy; + /* Retry policy for router -> observer NN communication. */ + private final RetryPolicy observerRetryPolicy; /** Optional perf monitor. */ private final RouterRpcMonitor rpcMonitor; /** Field separator of CallerContext. */ @@ -137,6 +149,24 @@ public class RouterRpcClient { private Map rejectedPermitsPerNs = new ConcurrentHashMap<>(); private Map acceptedPermitsPerNs = new ConcurrentHashMap<>(); + private NameServiceStateIdMode defaultNsIdMode; + private Map nsIdMode = new HashMap<>(); + /** Auto msync period. */ + private long autoMsyncPeriodMs; + /** Last msync times. */ + private Map lastMsyncTimes; + + private static final Method MSYNC_METHOD; + + static { + try { + MSYNC_METHOD = ClientProtocol.class.getDeclaredMethod("msync"); + } catch (NoSuchMethodException e) { + throw new RuntimeException("Failed to create msync method instance.", e); + } + } + + /** * Create a router RPC client to manage remote procedure calls to NNs. * @@ -155,7 +185,21 @@ public RouterRpcClient(Configuration conf, Router router, this.contextFieldSeparator = clientConf.get(HADOOP_CALLER_CONTEXT_SEPARATOR_KEY, HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT); - this.connectionManager = new ConnectionManager(clientConf); + this.defaultNsIdMode = NameServiceStateIdMode.valueOf( + conf.get(RBFConfigKeys.DFS_ROUTER_NAMESERVICE_STATE_ID_MODE, + RBFConfigKeys.DFS_ROUTER_NAMESERVICE_STATE_ID_MODE_DEFAULT)); + Map observerReadOverrides = conf + .getPropsWithPrefix(RBFConfigKeys.DFS_ROUTER_NAMESERVICE_STATE_ID_MODE + "."); + observerReadOverrides + .forEach((nsId, mode) -> + nsIdMode.put(nsId, NameServiceStateIdMode.valueOf(mode))); + + this.lastMsyncTimes = new HashMap<>(); + this.autoMsyncPeriodMs = + conf.getTimeDuration(RBFConfigKeys.DFS_ROUTER_OBSERVER_AUTO_MSYNC_PERIOD, + RBFConfigKeys.DFS_ROUTER_OBSERVER_AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS); + + this.connectionManager = new ConnectionManager(clientConf, defaultNsIdMode); this.connectionManager.start(); this.routerRpcFairnessPolicyController = FederationUtil.newFairnessPolicyController(conf); @@ -194,6 +238,8 @@ public RouterRpcClient(Configuration conf, Router router, this.retryPolicy = RetryPolicies.failoverOnNetworkException( RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, maxRetryAttempts, failoverSleepBaseMillis, failoverSleepMaxMillis); + this.observerRetryPolicy = + RetryPolicies.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL, 1); } /** @@ -369,7 +415,7 @@ private ConnectionContext getConnection(UserGroupInformation ugi, String nsId, ugi.getUserName(), routerUser); } connection = this.connectionManager.getConnection( - connUGI, rpcAddress, proto); + connUGI, rpcAddress, proto, nsId); LOG.debug("User {} NN {} is using connection {}", ugi.getUserName(), rpcAddress, connection); } catch (Exception ex) { @@ -412,7 +458,7 @@ private static IOException toIOException(Exception e) { * @throws NoNamenodesAvailableException Exception that the retry policy * generates for no available namenodes. */ - private RetryDecision shouldRetry(final IOException ioe, final int retryCount, + private RetryDecision shouldRetry(final IOException ioe, RetryPolicy policy, final int retryCount, final String nsId) throws IOException { // check for the case of cluster unavailable state if (isClusterUnAvailable(nsId)) { @@ -425,8 +471,7 @@ private RetryDecision shouldRetry(final IOException ioe, final int retryCount, } try { - final RetryPolicy.RetryAction a = - this.retryPolicy.shouldRetry(ioe, retryCount, 0, true); + final RetryPolicy.RetryAction a = policy.shouldRetry(ioe, retryCount, 0, true); return a.action; } catch (Exception ex) { LOG.error("Re-throwing API exception, no more retries", ex); @@ -456,7 +501,7 @@ private RetryDecision shouldRetry(final IOException ioe, final int retryCount, public Object invokeMethod( final UserGroupInformation ugi, final List namenodes, - final Class protocol, final Method method, final Object... params) + final Class protocol, final Method method, boolean skipObserver, final Object... params) throws ConnectException, StandbyException, IOException { if (namenodes == null || namenodes.isEmpty()) { @@ -472,8 +517,13 @@ public Object invokeMethod( rpcMonitor.proxyOp(); } boolean failover = false; + boolean tryActive = false; Map ioes = new LinkedHashMap<>(); for (FederationNamenodeContext namenode : namenodes) { + boolean isObserver = namenode.getState().equals(FederationNamenodeServiceState.OBSERVER); + if ((tryActive || skipObserver) && isObserver) { + continue; + } ConnectionContext connection = null; String nsId = namenode.getNameserviceId(); String rpcAddress = namenode.getRpcAddress(); @@ -482,14 +532,15 @@ public Object invokeMethod( ProxyAndInfo client = connection.getClient(); final Object proxy = client.getProxy(); - ret = invoke(nsId, 0, method, proxy, params); - if (failover) { + RetryPolicy policy = isObserver ? observerRetryPolicy : retryPolicy; + ret = invoke(nsId, policy, 0, method, proxy, params); + if (failover && !isObserver) { // Success on alternate server, update InetSocketAddress address = client.getAddress(); namenodeResolver.updateActiveNamenode(nsId, address); } if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpComplete(true, nsId); + this.rpcMonitor.proxyOpComplete(true, nsId, namenode.getState()); } if (this.router.getRouterClientMetrics() != null) { this.router.getRouterClientMetrics().incInvokedMethod(method); @@ -497,26 +548,52 @@ public Object invokeMethod( return ret; } catch (IOException ioe) { ioes.put(namenode, ioe); - if (ioe instanceof StandbyException) { + if (ioe instanceof ObserverRetryOnActiveException) { + LOG.info("Encountered ObserverRetryOnActiveException from {}." + + " Retry active namenode directly."); + tryActive = true; + } else if (ioe instanceof StandbyException) { // Fail over indicated by retry policy and/or NN if (this.rpcMonitor != null) { this.rpcMonitor.proxyOpFailureStandby(nsId); } - failover = true; + if (isObserver) { + // When invoke observer namenode throw StandbyException, means network exception. We + // should set it to unavailable. See in FailoverOnNetworkExceptionRetry::shouldRetry. + namenodeResolver.updateUnavailableNamenode(nsId, + NetUtils.createSocketAddr(namenode.getRpcAddress())); + } else { + failover = true; + } } else if (isUnavailableException(ioe)) { if (this.rpcMonitor != null) { this.rpcMonitor.proxyOpFailureCommunicate(nsId); } - failover = true; - } else if (ioe instanceof RemoteException) { - if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpComplete(true, nsId); + if (isObserver) { + // When invoke observer namenode throw StandbyException, means network exception. We + // should set it to unavailable. See in FailoverOnNetworkExceptionRetry::shouldRetry. + namenodeResolver.updateUnavailableNamenode(nsId, + NetUtils.createSocketAddr(namenode.getRpcAddress())); + } else { + failover = true; } + } else if (ioe instanceof RemoteException) { RemoteException re = (RemoteException) ioe; ioe = re.unwrapRemoteException(); ioe = getCleanException(ioe); - // RemoteException returned by NN - throw ioe; + if (ioe instanceof RetriableException && isObserver) { + // If observer Node is too far behind, we should failover to other observer or active + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpFailureStandby(nsId); + } + failover = true; + } else { + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpComplete(true, nsId, namenode.getState()); + } + // RemoteException returned by NN + throw ioe; + } } else if (ioe instanceof ConnectionNullException) { if (this.rpcMonitor != null) { this.rpcMonitor.proxyOpFailureCommunicate(nsId); @@ -540,7 +617,7 @@ public Object invokeMethod( // Communication retries are handled by the retry policy if (this.rpcMonitor != null) { this.rpcMonitor.proxyOpFailureCommunicate(nsId); - this.rpcMonitor.proxyOpComplete(false, nsId); + this.rpcMonitor.proxyOpComplete(false, nsId, namenode.getState()); } throw ioe; } @@ -551,7 +628,7 @@ public Object invokeMethod( } } if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpComplete(false, null); + this.rpcMonitor.proxyOpComplete(false, null, null); } // All namenodes were unavailable or in standby @@ -636,7 +713,7 @@ private void addClientInfoToCallerContext() { * @throws IOException * @throws InterruptedException */ - private Object invoke(String nsId, int retryCount, final Method method, + private Object invoke(String nsId, RetryPolicy retryPolicy, int retryCount, final Method method, final Object obj, final Object... params) throws IOException { try { return method.invoke(obj, params); @@ -652,14 +729,14 @@ private Object invoke(String nsId, int retryCount, final Method method, IOException ioe = (IOException) cause; // Check if we should retry. - RetryDecision decision = shouldRetry(ioe, retryCount, nsId); + RetryDecision decision = shouldRetry(ioe, retryPolicy, retryCount, nsId); if (decision == RetryDecision.RETRY) { if (this.rpcMonitor != null) { this.rpcMonitor.proxyOpRetries(); } // retry - return invoke(nsId, ++retryCount, method, obj, params); + return invoke(nsId, retryPolicy, ++retryCount, method, obj, params); } else if (decision == RetryDecision.FAILOVER_AND_RETRY) { // failover, invoker looks for standby exceptions for failover. if (ioe instanceof StandbyException) { @@ -707,7 +784,7 @@ public static boolean isUnavailableException(IOException ioe) { */ private boolean isClusterUnAvailable(String nsId) throws IOException { List nnState = this.namenodeResolver - .getNamenodesForNameserviceId(nsId); + .getNamenodesForNameserviceId(nsId, false); if (nnState != null) { for (FederationNamenodeContext nnContext : nnState) { @@ -838,13 +915,18 @@ public Object invokeSingle(final String nsId, RemoteMethod method) RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); acquirePermit(nsId, ugi, method, controller); try { - List nns = - getNamenodesForNameservice(nsId); + NameServiceStateIdMode mode = nsIdMode.getOrDefault(nsId, defaultNsIdMode); + boolean isObserverRead = !mode.isDisable() && isReadCall(method.getMethod()); + final List nns = + getNamenodesForNameservice(nsId, isObserverRead); + if (isObserverRead) { + msync(nsId, nns, ugi, defaultNsIdMode); + } RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/"); Class proto = method.getProtocol(); Method m = method.getMethod(); Object[] params = method.getParams(loc); - return invokeMethod(ugi, nns, proto, m, params); + return invokeMethod(ugi, nns, proto, m, !isObserverRead, params); } finally { releasePermit(nsId, ugi, method, controller); } @@ -921,7 +1003,7 @@ public T invokeSingle(final RemoteLocationContext location, * @throws IOException if the success condition is not met and one of the RPC * calls generated a remote exception. */ - public Object invokeSequential( + public T invokeSequential( final List locations, final RemoteMethod remoteMethod) throws IOException { return invokeSequential(locations, remoteMethod, null, null); @@ -1006,12 +1088,17 @@ public RemoteResult invokeSequential( for (final RemoteLocationContext loc : locations) { String ns = loc.getNameserviceId(); acquirePermit(ns, ugi, remoteMethod, controller); - List namenodes = - getNamenodesForNameservice(ns); + NameServiceStateIdMode mode = nsIdMode.getOrDefault(ns, defaultNsIdMode); + boolean isObserverRead = !mode.isDisable() && isReadCall(remoteMethod.getMethod()); + final List namenodes = + getNamenodesForNameservice(ns, isObserverRead); + if (isObserverRead) { + msync(ns, namenodes, ugi, defaultNsIdMode); + } try { Class proto = remoteMethod.getProtocol(); Object[] params = remoteMethod.getParams(loc); - Object result = invokeMethod(ugi, namenodes, proto, m, params); + Object result = invokeMethod(ugi, namenodes, proto, m, !isObserverRead, params); // Check if the result is what we expected if (isExpectedClass(expectedResultClass, result) && isExpectedValue(expectedResultValue, result)) { @@ -1367,12 +1454,17 @@ public Map invokeConcurrent( String ns = location.getNameserviceId(); RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); acquirePermit(ns, ugi, method, controller); + NameServiceStateIdMode mode = nsIdMode.getOrDefault(ns, defaultNsIdMode); + boolean isObserverRead = !mode.isDisable() && isReadCall(method.getMethod()); final List namenodes = - getNamenodesForNameservice(ns); + getNamenodesForNameservice(ns, isObserverRead); + if (isObserverRead) { + msync(ns, namenodes, ugi, defaultNsIdMode); + } try { Class proto = method.getProtocol(); Object[] paramList = method.getParams(location); - R result = (R) invokeMethod(ugi, namenodes, proto, m, paramList); + R result = (R) invokeMethod(ugi, namenodes, proto, m, !isObserverRead, paramList); RemoteResult remoteResult = new RemoteResult<>(location, result); return Collections.singletonList(remoteResult); } catch (IOException ioe) { @@ -1390,8 +1482,13 @@ public Map invokeConcurrent( final CallerContext originContext = CallerContext.getCurrent(); for (final T location : locations) { String nsId = location.getNameserviceId(); + NameServiceStateIdMode mode = nsIdMode.getOrDefault(nsId, defaultNsIdMode); + boolean isObserverRead = !mode.isDisable() && isReadCall(method.getMethod()); final List namenodes = - getNamenodesForNameservice(nsId); + getNamenodesForNameservice(nsId, isObserverRead); + if (isObserverRead) { + msync(nsId, namenodes, ugi, defaultNsIdMode); + } final Class proto = method.getProtocol(); final Object[] paramList = method.getParams(location); if (standby) { @@ -1408,7 +1505,7 @@ public Map invokeConcurrent( callables.add( () -> { transferThreadLocalContext(originCall, originContext); - return invokeMethod(ugi, nnList, proto, m, paramList); + return invokeMethod(ugi, nnList, proto, m, !isObserverRead, paramList); }); } } else { @@ -1417,7 +1514,7 @@ public Map invokeConcurrent( callables.add( () -> { transferThreadLocalContext(originCall, originContext); - return invokeMethod(ugi, namenodes, proto, m, paramList); + return invokeMethod(ugi, namenodes, proto, m, !isObserverRead, paramList); }); } } @@ -1508,17 +1605,20 @@ private void transferThreadLocalContext( /** * Get a prioritized list of NNs that share the same nameservice ID (in the - * same namespace). NNs that are reported as ACTIVE will be first in the list. + * same namespace). + * In observer read case, OBSERVER NNs will be first in the list. + * Otherwise, ACTIVE NNs will be first in the list. * * @param nsId The nameservice ID for the namespace. + * @param observerRead Read on observer namenode. * @return A prioritized list of NNs to use for communication. * @throws IOException If a NN cannot be located for the nameservice ID. */ private List getNamenodesForNameservice( - final String nsId) throws IOException { + final String nsId, boolean observerRead) throws IOException { final List namenodes = - namenodeResolver.getNamenodesForNameserviceId(nsId); + namenodeResolver.getNamenodesForNameserviceId(nsId, observerRead); if (namenodes == null || namenodes.isEmpty()) { throw new IOException("Cannot locate a registered namenode for " + nsId + @@ -1664,4 +1764,98 @@ private String getCurrentFairnessPolicyControllerClassName() { } return null; } + + private FederatedNamespaceIds getFederatedNamespaceIdsFromClient() { + Server.Call call = Server.getCurCall().get(); + if (call != null) { + UniqueCallID uid = new UniqueCallID(call.getClientId(), call.getCallId()); + FederatedNamespaceIds federatedNamespaceIds = RouterStateIdCache.get(uid); + return federatedNamespaceIds; + } + // If return null, maybe from old version hdfs client, need ignore observer. + return null; + } + + private void msync(String ns, List namenodes, + UserGroupInformation ugi, NameServiceStateIdMode mode) throws IOException { + + if (autoMsyncPeriodMs < 0) { + LOG.debug("Skipping msync because " + + RBFConfigKeys.DFS_ROUTER_OBSERVER_AUTO_MSYNC_PERIOD + + " is less than 0"); + return; // no need for msync + } + + FederatedNamespaceIds ids = null; + if (mode == NameServiceStateIdMode.TRANSMISSION) { + ids = getFederatedNamespaceIdsFromClient(); + if (ids != null) { + mode = ids.getMode(); + } + } + + // If mode is PROXY, we just msync. + // If mode is TRANSMISSION, and namespaceId is null, means initial msync is not invoked. + if (mode == NameServiceStateIdMode.PROXY || (mode == NameServiceStateIdMode.TRANSMISSION && ids != null && ids.getNamespaceId(ns, false) == null)) { + long callStartTime = callTime(); + LongHolder latestMsyncTime = lastMsyncTimes.get(ns); + + if (latestMsyncTime == null) { + // initialize + synchronized (lastMsyncTimes) { + latestMsyncTime = lastMsyncTimes.get(ns); + if(latestMsyncTime == null) { + latestMsyncTime = new LongHolder(0L); + lastMsyncTimes.put(ns, latestMsyncTime); + } + } + } + + if (callStartTime - latestMsyncTime.getValue() > autoMsyncPeriodMs) { + long requestTime = Time.monotonicNow(); + invokeMethod(ugi, namenodes, ClientProtocol.class, MSYNC_METHOD, + true, new Object[0]); + latestMsyncTime.setValue(requestTime); + } + } + return; + } + + private static long callTime() { + Server.Call call = Server.getCurCall().get(); + if (call != null) { + return call.getTimestampNanos() / 1000000L; + } + return Time.monotonicNow(); + } + + /** + * Check if a method is read-only. + * @return whether the 'method' is a read-only operation. + */ + private static boolean isReadCall(Method method) { + if (!method.isAnnotationPresent(ReadOnly.class)) { + return false; + } + return !method.getAnnotationsByType(ReadOnly.class)[0].activeOnly(); + } + + private final static class LongHolder { + private AtomicLong value; + + LongHolder(long value) { + this.value = new AtomicLong(value); + } + + public void setValue(long value) { + long expect; + do { + expect = getValue(); + } while (expect < value && !this.value.compareAndSet(expect, value)); + } + + public long getValue() { + return value.get(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java index 039b40ae2e585..9c99ed3530015 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java @@ -19,6 +19,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; /** @@ -62,7 +63,7 @@ void init( * Mark a proxy operation as completed. * @param success If the operation was successful. */ - void proxyOpComplete(boolean success, String nsId); + void proxyOpComplete(boolean success, String nsId, FederationNamenodeServiceState state); /** * Failed to proxy an operation to a Namenode because it was in standby. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 423e0ba8e483e..84be66a36b625 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -56,6 +56,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HAUtil; +import org.apache.hadoop.ipc.NameServiceStateIdMode; import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder; import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader; import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache; @@ -322,6 +323,10 @@ public RouterRpcServer(Configuration configuration, Router router, // Create security manager this.securityManager = new RouterSecurityManager(this.conf); + NameServiceStateIdMode mode = NameServiceStateIdMode.valueOf( + conf.get(RBFConfigKeys.DFS_ROUTER_NAMESERVICE_STATE_ID_MODE, + RBFConfigKeys.DFS_ROUTER_NAMESERVICE_STATE_ID_MODE_DEFAULT)); + this.rpcServer = new RPC.Builder(this.conf) .setProtocol(ClientNamenodeProtocolPB.class) .setInstance(clientNNPbService) @@ -331,6 +336,8 @@ public RouterRpcServer(Configuration configuration, Router router, .setnumReaders(readerCount) .setQueueSizePerHandler(handlerQueueSize) .setVerbose(false) + .setAlignmentContext(mode == NameServiceStateIdMode.TRANSMISSION ? + new RouterClientSideStateIdContext() : null) .setSecretManager(this.securityManager.getSecretManager()) .build(); @@ -1329,7 +1336,7 @@ public void modifyAclEntries(String src, List aclSpec) clientProto.modifyAclEntries(src, aclSpec); } - @Override // ClienProtocol + @Override // ClientProtocol public void removeAclEntries(String src, List aclSpec) throws IOException { clientProto.removeAclEntries(src, aclSpec); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdCache.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdCache.java new file mode 100644 index 0000000000000..ef680c987e85b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdCache.java @@ -0,0 +1,85 @@ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.hdfs.FederatedNamespaceIds; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.ipc.NameServiceStateIdMode; +import org.apache.hadoop.thirdparty.com.google.common.cache.Cache; +import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder; + +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +public class RouterStateIdCache { + + private static final Map stateIdCachByNs = new ConcurrentHashMap(); + private static Cache stateIdCacheByCallId; + + static { + HdfsConfiguration conf = new HdfsConfiguration(); + long timeout = conf.getTimeDuration(RBFConfigKeys.DFS_ROUTER_CACHE_STATE_ID_TIMEOUT, + RBFConfigKeys.DFS_ROUTER_CACHE_STATE_ID_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); + stateIdCacheByCallId = + CacheBuilder.newBuilder().expireAfterWrite(timeout, TimeUnit.MILLISECONDS).build(); + } + + static FederatedNamespaceIds get(String nsId) { + stateIdCachByNs.computeIfAbsent(nsId, n -> new FederatedNamespaceIds(NameServiceStateIdMode.PROXY)); + return stateIdCachByNs.get(nsId); + } + + static FederatedNamespaceIds get(UniqueCallID id) { + return stateIdCacheByCallId.getIfPresent(id); + } + + static void put(UniqueCallID uid, FederatedNamespaceIds ids) { + stateIdCacheByCallId.put(uid, ids); + } + + static void remove(UniqueCallID uid) { + stateIdCacheByCallId.invalidate(uid); + } + + @VisibleForTesting + static long size() { + return stateIdCacheByCallId.size() + stateIdCachByNs.size(); + } + + @VisibleForTesting + static void clear() { + stateIdCacheByCallId.cleanUp(); + stateIdCachByNs.clear(); + } + + static class UniqueCallID { + final byte[] clientId; + final int callId; + + UniqueCallID(byte[] clientId, int callId) { + this.clientId = clientId; + this.callId = callId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UniqueCallID that = (UniqueCallID) o; + return callId == that.callId && Arrays.equals(clientId, that.clientId); + } + + @Override + public int hashCode() { + int result = Objects.hash(callId); + result = 31 * result + Arrays.hashCode(clientId); + return result; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index fcf6a28475fbd..99cc80ef90002 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -781,4 +781,24 @@ (delete the source path directly) and skip (skip both trash and deletion). + + + dfs.federation.router.observer.read.mode + ROUTER + + Observer read Mode in router. + + + + + dfs.federation.router.observer.auto-msync-period + 0 + Observer auto msync period + + + + dfs.federation.router.cache.state.id.timeout + 600000 + how long the cache key be alive in TRANSMISSION mode + diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java index 107a1ba9551a3..c88309e5a43cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java @@ -83,6 +83,7 @@ import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.ipc.NameServiceStateIdMode; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; @@ -175,7 +176,7 @@ public static void waitNamenodeRegistered( GenericTestUtils.waitFor(() -> { try { List namenodes = - resolver.getNamenodesForNameserviceId(nsId); + resolver.getNamenodesForNameserviceId(nsId, false); if (namenodes != null) { for (FederationNamenodeContext namenode : namenodes) { // Check if this is the Namenode we are checking @@ -207,7 +208,7 @@ public static void waitNamenodeRegistered( GenericTestUtils.waitFor(() -> { try { List nns = - resolver.getNamenodesForNameserviceId(nsId); + resolver.getNamenodesForNameserviceId(nsId, false); for (FederationNamenodeContext nn : nns) { if (nn.getState().equals(state)) { return true; @@ -377,14 +378,14 @@ public static void simulateThrowExceptionRouterRpcServer( final RouterRpcServer server) throws IOException { RouterRpcClient rpcClient = server.getRPCClient(); ConnectionManager connectionManager = - new ConnectionManager(server.getConfig()); + new ConnectionManager(server.getConfig(), NameServiceStateIdMode.DISABLE); ConnectionManager spyConnectionManager = spy(connectionManager); doAnswer(invocation -> { LOG.info("Simulating connectionManager throw IOException {}", invocation.getMock()); throw new IOException("Simulate connectionManager throw IOException"); }).when(spyConnectionManager).getConnection( - any(UserGroupInformation.class), any(String.class), any(Class.class)); + any(UserGroupInformation.class), any(String.class), any(Class.class), any(String.class)); Whitebox.setInternalState(rpcClient, "connectionManager", spyConnectionManager); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java index 53247262cefb1..0c68fd71ed9a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java @@ -115,6 +115,7 @@ public class MiniRouterDFSCluster { /** Nameservices in the federated cluster. */ private List nameservices; + private String defaultNameservice; /** Namenodes in the federated cluster. */ private List namenodes; /** Routers in the federated cluster. */ @@ -558,7 +559,7 @@ public Configuration generateRouterConfiguration(String nsId, String nnId) { conf.set(DFS_ROUTER_HTTPS_ADDRESS_KEY, "127.0.0.1:0"); conf.set(DFS_ROUTER_HTTP_BIND_HOST_KEY, "0.0.0.0"); - conf.set(DFS_ROUTER_DEFAULT_NAMESERVICE, nameservices.get(0)); + conf.set(DFS_ROUTER_DEFAULT_NAMESERVICE, this.defaultNameservice); conf.setLong(DFS_ROUTER_HEARTBEAT_INTERVAL_MS, heartbeatInterval); conf.setLong(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, cacheFlushInterval); @@ -627,6 +628,7 @@ public void configureNameservices(int numNameservices, int numNamenodes, } } } + this.defaultNameservice = nameservices.get(0); } public void setNumDatanodesPerNameservice(int num) { @@ -806,6 +808,7 @@ public void startCluster(Configuration overrideConf) { .numDataNodes(numDNs) .nnTopology(topology) .dataNodeConfOverlays(dnConfs) + .checkExitOnShutdown(false) .storageTypes(storageTypes) .racks(racks) .build(); @@ -1038,6 +1041,26 @@ public void switchToStandby(String nsId, String nnId) { } } + /** + * Switch a namenode in a nameservice to be the observer. + * @param nsId Nameservice identifier. + * @param nnId Namenode identifier. + */ + public void switchToObserver(String nsId, String nnId) { + try { + int total = cluster.getNumNameNodes(); + NameNodeInfo[] nns = cluster.getNameNodeInfos(); + for (int i = 0; i < total; i++) { + NameNodeInfo nn = nns[i]; + if (nn.getNameserviceId().equals(nsId) && nn.getNamenodeId().equals(nnId)) { + cluster.transitionToObserver(i); + } + } + } catch (Throwable e) { + LOG.error("Cannot transition to active", e); + } + } + /** * Stop the federated HDFS cluster. */ @@ -1160,4 +1183,28 @@ public void waitClusterUp() throws IOException { throw new IOException("Cannot wait for the namenodes", e); } } + + public FileSystem getFileSystem(String nsId, Configuration overrideConf) throws IOException { + Configuration conf = new HdfsConfiguration(false); + conf.set(DFS_NAMESERVICES, nsId); + conf.set(FS_DEFAULT_NAME_KEY, "hdfs://" + nsId); + if (overrideConf != null) { + conf.addResource(overrideConf); + } + StringBuilder routers = new StringBuilder(); + for (int i = 0; i < getRouters().size(); i++) { + String routerId = "r" + i; + routers.append(routerId).append(","); + InetSocketAddress rpcAddress = getRouters().get(i).getRouter().getRpcServerAddress(); + conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY + "." + nsId + "." + routerId, + NetUtils.getHostPortString(rpcAddress)); + } + routers.delete(routers.length() - 1, routers.length()); + conf.set(DFS_HA_NAMENODES_KEY_PREFIX + "." + nsId, routers.toString()); + return DistributedFileSystem.get(conf); + } + + public String getDefaultNameservice() { + return defaultNameservice; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java index 1519bad74b5c1..7b2b8d3074c80 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -118,12 +119,20 @@ public void setDisableRegistration(boolean isDisable) { disableRegistration = isDisable; } + public void updateUnavailableNamenode(String ns, InetSocketAddress failedAddress) + throws IOException { + updateNameNodeState(ns, failedAddress, FederationNamenodeServiceState.UNAVAILABLE); + } + @Override - public void updateActiveNamenode( - String nsId, InetSocketAddress successfulAddress) { + public void updateActiveNamenode(String nsId, InetSocketAddress successfulAddress) { + updateNameNodeState(nsId, successfulAddress, FederationNamenodeServiceState.ACTIVE); + } + + private void updateNameNodeState(String nsId, InetSocketAddress iAddr, + FederationNamenodeServiceState state) { + String sAddress = iAddr.getHostName() + ":" + iAddr.getPort(); - String address = successfulAddress.getHostName() + ":" + - successfulAddress.getPort(); String key = nsId; if (key != null) { // Update the active entry @@ -131,9 +140,9 @@ public void updateActiveNamenode( List namenodes = (List) this.resolver.get(key); for (FederationNamenodeContext namenode : namenodes) { - if (namenode.getRpcAddress().equals(address)) { + if (namenode.getRpcAddress().equals(sAddress)) { MockNamenodeContext nn = (MockNamenodeContext) namenode; - nn.setState(FederationNamenodeServiceState.ACTIVE); + nn.setState(state); break; } } @@ -146,14 +155,37 @@ public void updateActiveNamenode( @Override public synchronized List - getNamenodesForNameserviceId(String nameserviceId) { + getNamenodesForNameserviceId(String nameserviceId, boolean observerRead) { // Return a copy of the list because it is updated periodically List namenodes = this.resolver.get(nameserviceId); if (namenodes == null) { namenodes = new ArrayList<>(); } - return Collections.unmodifiableList(new ArrayList<>(namenodes)); + List ret = new ArrayList<>(); + + if (observerRead) { + Iterator iterator = namenodes.iterator(); + List observerNN = new ArrayList<>(); + List nonObserverNN = new ArrayList<>(); + while (iterator.hasNext()) { + FederationNamenodeContext membership = iterator.next(); + if (membership.getState() == FederationNamenodeServiceState.OBSERVER) { + observerNN.add(membership); + } else { + nonObserverNN.add(membership); + } + } + Collections.shuffle(observerNN); + Collections.sort(nonObserverNN, new NamenodePriorityComparator()); + ret.addAll(observerNN); + ret.addAll(nonObserverNN); + } else { + ret.addAll(namenodes); + Collections.sort(ret, new NamenodePriorityComparator()); + } + + return Collections.unmodifiableList(ret); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java index 065209060220e..46ca8f131391e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java @@ -160,8 +160,8 @@ public void testRefreshStaticChangeHandlers() throws Exception { Mockito.doAnswer(invocationOnMock -> { Thread.sleep(sleepTime); return null; - }).when(client) - .invokeMethod(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + }).when(client).invokeMethod(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), + Mockito.anyBoolean(), Mockito.any()); // No calls yet assertEquals("{}", diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java index ed10a3a87317d..def799ec44ae0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java @@ -129,7 +129,7 @@ private void verifyFirstRegistration(String nsId, String nnId, int resultsCount, FederationNamenodeServiceState state) throws IOException { List namenodes = - namenodeResolver.getNamenodesForNameserviceId(nsId); + namenodeResolver.getNamenodesForNameserviceId(nsId, false); if (resultsCount == 0) { assertNull(namenodes); } else { @@ -292,7 +292,7 @@ public void testCacheUpdateOnNamenodeStateUpdate() throws IOException { stateStore.refreshCaches(true); // Check whether the namenpde state is reported correct as standby. FederationNamenodeContext namenode = - namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0]).get(0); + namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], false).get(0); assertEquals(FederationNamenodeServiceState.STANDBY, namenode.getState()); String rpcAddr = namenode.getRpcAddress(); InetSocketAddress inetAddr = getInetSocketAddress(rpcAddr); @@ -302,7 +302,7 @@ public void testCacheUpdateOnNamenodeStateUpdate() throws IOException { // Check whether correct updated state is returned post update. namenodeResolver.updateActiveNamenode(NAMESERVICES[0], inetAddr); FederationNamenodeContext namenode1 = - namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0]).get(0); + namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], false).get(0); assertEquals("The namenode state should be ACTIVE post update.", FederationNamenodeServiceState.ACTIVE, namenode1.getState()); } @@ -319,7 +319,7 @@ public void testCacheUpdateOnNamenodeStateUpdateWithIp() InetSocketAddress inetAddr = getInetSocketAddress(rpcAddress); namenodeResolver.updateActiveNamenode(NAMESERVICES[0], inetAddr); FederationNamenodeContext namenode = - namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0]).get(0); + namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], false).get(0); assertEquals("The namenode state should be ACTIVE post update.", FederationNamenodeServiceState.ACTIVE, namenode.getState()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java index acb79cb470119..a9df625ed6e77 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java @@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.ipc.NameServiceStateIdMode; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; @@ -60,7 +61,7 @@ public class TestConnectionManager { @Before public void setup() throws Exception { conf = new Configuration(); - connManager = new ConnectionManager(conf); + connManager = new ConnectionManager(conf, NameServiceStateIdMode.DISABLE); NetUtils.addStaticResolution("nn1", "localhost"); NetUtils.createSocketAddrForHost("nn1", 8080); connManager.start(); @@ -81,14 +82,14 @@ public void testCleanup() throws Exception { Map poolMap = connManager.getPools(); ConnectionPool pool1 = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class); + conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class, null); addConnectionsToPool(pool1, 9, 4); poolMap.put( new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class), pool1); ConnectionPool pool2 = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10, 0.5f, ClientProtocol.class); + conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10, 0.5f, ClientProtocol.class, null); addConnectionsToPool(pool2, 10, 10); poolMap.put( new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS, ClientProtocol.class), @@ -111,7 +112,7 @@ public void testCleanup() throws Exception { // Make sure the number of connections doesn't go below minSize ConnectionPool pool3 = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10, 0.5f, ClientProtocol.class); + conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10, 0.5f, ClientProtocol.class, null); addConnectionsToPool(pool3, 8, 0); poolMap.put( new ConnectionPoolId(TEST_USER3, TEST_NN_ADDRESS, ClientProtocol.class), @@ -136,7 +137,7 @@ public void testConnectionCreatorWithException() throws Exception { // Create a bad connection pool pointing to unresolvable namenode address. ConnectionPool badPool = new ConnectionPool( conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, - ClientProtocol.class); + ClientProtocol.class, null); BlockingQueue queue = new ArrayBlockingQueue<>(1); queue.add(badPool); ConnectionManager.ConnectionCreator connectionCreator = @@ -162,7 +163,7 @@ public void testGetConnectionWithException() throws Exception { // Create a bad connection pool pointing to unresolvable namenode address. ConnectionPool badPool = new ConnectionPool( conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 1, 10, 0.5f, - ClientProtocol.class); + ClientProtocol.class, null); } @Test @@ -172,7 +173,7 @@ public void testGetConnection() throws Exception { int activeConns = 5; ConnectionPool pool = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class); + conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class, null); addConnectionsToPool(pool, totalConns, activeConns); poolMap.put( new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class), @@ -197,7 +198,7 @@ public void testGetConnection() throws Exception { @Test public void testValidClientIndex() throws Exception { ConnectionPool pool = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER1, 2, 2, 0.5f, ClientProtocol.class); + conf, TEST_NN_ADDRESS, TEST_USER1, 2, 2, 0.5f, ClientProtocol.class, null); for(int i = -3; i <= 3; i++) { pool.getClientIndex().set(i); ConnectionContext conn = pool.getConnection(); @@ -213,7 +214,7 @@ public void getGetConnectionNamenodeProtocol() throws Exception { int activeConns = 5; ConnectionPool pool = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, NamenodeProtocol.class); + conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, NamenodeProtocol.class, null); addConnectionsToPool(pool, totalConns, activeConns); poolMap.put( new ConnectionPoolId( @@ -281,12 +282,12 @@ private void testConnectionCleanup(float ratio, int totalConns, // Set dfs.federation.router.connection.min-active-ratio tmpConf.setFloat( RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO, ratio); - ConnectionManager tmpConnManager = new ConnectionManager(tmpConf); + ConnectionManager tmpConnManager = new ConnectionManager(tmpConf, + NameServiceStateIdMode.DISABLE); tmpConnManager.start(); // Create one new connection pool - tmpConnManager.getConnection(TEST_USER1, TEST_NN_ADDRESS, - NamenodeProtocol.class); + tmpConnManager.getConnection(TEST_USER1, TEST_NN_ADDRESS, NamenodeProtocol.class, "ns0"); Map poolMap = tmpConnManager.getPools(); ConnectionPoolId connectionPoolId = new ConnectionPoolId(TEST_USER1, @@ -317,6 +318,6 @@ public void testUnsupportedProtoExceptionMsg() throws Exception { "Unsupported protocol for connection to NameNode: " + TestConnectionManager.class.getName(), () -> ConnectionPool.newConnection(conf, TEST_NN_ADDRESS, TEST_USER1, - TestConnectionManager.class)); + TestConnectionManager.class, null)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java new file mode 100644 index 0000000000000..5a73d78fdf527 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java @@ -0,0 +1,668 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS; +import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.AUTO_MSYNC_PERIOD_KEY_PREFIX; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; +import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider; +import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.ipc.NameServiceStateIdMode; +import org.apache.hadoop.test.LambdaTestUtils; +import org.junit.After; +import org.junit.Test; + +public class TestObserverWithRouter { + + public static final String FEDERATION_NS = "ns-fed"; + public static final int LOOPS = 10; + + private MiniRouterDFSCluster cluster; + + public void startUpCluster(int numberOfObserver) throws Exception { + startUpCluster(numberOfObserver, NameServiceStateIdMode.PROXY, null); + } + + public void startUpCluster(int numberOfObserver, NameServiceStateIdMode mode, + Configuration confOverrides) throws Exception { + int numberOfNamenode = 2 + numberOfObserver; + Configuration conf = new Configuration(false); + conf.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_AUTO_MSYNC_PERIOD, 0); + conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true); + conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms"); + conf.setInt(DFSConfigKeys.DFS_ESTIMATED_SERVER_TIME_MULTIPLIER_KEY, 0); + conf.setInt(DFS_ROUTER_HEARTBEAT_INTERVAL_MS, 1); + conf.set(RBFConfigKeys.DFS_ROUTER_NAMESERVICE_STATE_ID_MODE, mode.name()); + + if (confOverrides != null) { + conf.addResource(confOverrides); + } + cluster = new MiniRouterDFSCluster(true, 2, numberOfNamenode); + cluster.addNamenodeOverrides(conf); + // Start NNs and DNs and wait until ready + cluster.startCluster(); + + // Making one Namenodes active per nameservice + if (cluster.isHighAvailability()) { + for (String ns : cluster.getNameservices()) { + cluster.switchToActive(ns, NAMENODES[0]); + cluster.switchToStandby(ns, NAMENODES[1]); + for (int i = 2; i < numberOfNamenode; i++) { + cluster.switchToObserver(ns, NAMENODES[i]); + } + } + } + + Configuration routerConf = new RouterConfigBuilder() + .metrics() + .rpc() + .build(); + + cluster.addRouterOverrides(conf); + cluster.addRouterOverrides(routerConf); + + // Start routers with only an RPC service + cluster.startRouters(); + + // Register and verify all NNs with all routers + cluster.registerNamenodes(); + cluster.waitNamenodeRegistration(); + // Setup the mount table + cluster.installMockLocations(); + + cluster.waitActiveNamespaces(); + } + + @After + public void teardown() throws IOException { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + RouterStateIdCache.clear(); + } + + @Test(timeout = 600000) + public void testObserveRead() throws Exception { + /* TestCase 1 + * If the NameserviceStateIdStateIdMode of router is DISABLE, + * whatever the NameserviceStateIdStateIdMode and provider of client is, + * router will connect active namenode directly! + * In this unit-test, we will do below steps: + * (1) fs1 create Router will just connect active + * (2) fs1 read Router will just connect active + * (3) fs2 create Router will just connect active + * (4) stop observer EditLogTailer. + * (5) fs1 read Router will just connect active. + * active ops is 2 (fs1's create, complete) * LOOP + * + 1 (fs1's open) * LOOP + * + 2 (fs2's create, complete) * LOOP + * + 1 (fs1's open) * LOOP + * */ + testObserveRead(NameServiceStateIdMode.DISABLE, NameServiceStateIdMode.DISABLE, + ObserverReadProxyProvider.class, 6 * LOOPS, 0 * LOOPS, false, 1); + testObserveRead(NameServiceStateIdMode.TRANSMISSION, NameServiceStateIdMode.DISABLE, + ObserverReadProxyProvider.class, 6 * LOOPS, 0 * LOOPS, false, 1); + testObserveRead(NameServiceStateIdMode.PROXY, NameServiceStateIdMode.DISABLE, + ObserverReadProxyProvider.class, 6 * LOOPS, 0 * LOOPS, false, 1); + testObserveRead(NameServiceStateIdMode.TRANSMISSION, NameServiceStateIdMode.DISABLE, + ConfiguredFailoverProxyProvider.class, 6 * LOOPS, 0 * LOOPS, false, 1); + testObserveRead(NameServiceStateIdMode.PROXY, NameServiceStateIdMode.DISABLE, + ConfiguredFailoverProxyProvider.class, 6 * LOOPS, 0 * LOOPS, false, 1); + + /* TestCase 2 + * If the NameserviceStateIdStateIdMode of router is PROXY, + * whatever the NameserviceStateIdStateIdMode and provider of client is, + * router will connect observe with state id managed by router. + * In this unit-test, we will do below steps: + * (1) fs1 create Router will just connect active + * (2) fs1 read Router will connect observer because state is update in step (1) + * (3) fs2 create Router will just connect active + * (4) stop observer EditLogTailer. + * (5) fs1 read Router will connect active. the state manged by router is updated in + * step (3), but observer is far behind. So router will failover to active. + * active ops is 2 (fs1's create, complete) * LOOP + * + 1 (fs1's msync) * LOOP + * + 2 (fs2's create, complete) * LOOP + * + 2 (fs1's open + msync) * LOOP + * observer ops is 1 (fs1's open) * LOOP + * */ + testObserveRead(NameServiceStateIdMode.DISABLE, NameServiceStateIdMode.PROXY, + ObserverReadProxyProvider.class, 7 * LOOPS, 1 * LOOPS, false, 1); + testObserveRead(NameServiceStateIdMode.PROXY, NameServiceStateIdMode.PROXY, + ObserverReadProxyProvider.class, 7 * LOOPS, 1 * LOOPS, false, 1); + testObserveRead(NameServiceStateIdMode.TRANSMISSION, NameServiceStateIdMode.PROXY, + ObserverReadProxyProvider.class, 7 * LOOPS, 1 * LOOPS, false, 1); + testObserveRead(NameServiceStateIdMode.PROXY, NameServiceStateIdMode.PROXY, + ConfiguredFailoverProxyProvider.class, 7 * LOOPS, 1 * LOOPS, false, 1); + testObserveRead(NameServiceStateIdMode.TRANSMISSION, NameServiceStateIdMode.PROXY, + ConfiguredFailoverProxyProvider.class, 7 * LOOPS, 1 * LOOPS, false, 1); + + /* TestCase 3 + * If the NameserviceStateIdStateIdMode of router is TRANSMISSION, + * The behavior of the router depend on client's NameserviceStateIdStateIdMode. + * TestCase 3.1 + * If the NameserviceStateIdStateIdMode of client is DISABLE or provider is + * nonObserverReadProxyProvider, router will connect active namenode directly! + * In this unit-test, we will do below steps: + * (1) fs1 create Router will just connect active + * (2) fs1 read Router will just connect active + * (3) fs2 create Router will just connect active + * (4) stop observer EditLogTailer. + * (5) fs1 read Router will just connect active. + * active ops is 2 (fs1's create, complete) * LOOP + * + 1 (fs1's open) * LOOP + * + 2 (fs2's create, complete) * LOOP + * + 1 (fs1's open) * LOOP + * */ + testObserveRead(NameServiceStateIdMode.DISABLE, NameServiceStateIdMode.TRANSMISSION, + ObserverReadProxyProvider.class, 6 * LOOPS, 0 * LOOPS, false, 1); + testObserveRead(NameServiceStateIdMode.PROXY, NameServiceStateIdMode.TRANSMISSION, + ConfiguredFailoverProxyProvider.class, 6 * LOOPS, 0 * LOOPS, false, 1); + testObserveRead(NameServiceStateIdMode.TRANSMISSION, + NameServiceStateIdMode.TRANSMISSION, ConfiguredFailoverProxyProvider.class, 6 * LOOPS, + 0 * LOOPS, false, 1); + + /* TestCase 3.2 + * If the NameserviceStateIdStateIdMode of client is PROXY, + * router will connect observe with state id managed by router. + * In this unit-test, we will do below steps: + * (1) fs1 create Router will just connect active + * (2) fs1 read Router will connect observer because state is update in step (1) + * (3) fs2 create Router will just connect active + * (4) stop observer EditLogTailer. + * (5) fs1 read Router will connect active. the state manged by router is updated in + * step (3), but observer is far behind. So router will failover to active. + * active ops is 2 (fs1's create, complete) * LOOP + * + 1 (fs1's msync) * LOOP + * + 2 (fs2's create, complete) * LOOP + * + 2 (fs1's open + msync) * LOOP + * observer ops is 1 (fs1's open) * LOOP + * */ + testObserveRead(NameServiceStateIdMode.PROXY, NameServiceStateIdMode.TRANSMISSION, + ObserverReadProxyProvider.class, 7 * LOOPS, 1 * LOOPS, false, 1); + + /* TestCase 3.3 + * If the NameserviceStateIdStateIdMode of client is TRANSMISSION, + * router will connect observe with state id managed by client. + * In this unit-test, we will do below steps: + * (1) fs1 create Router will just connect active + * (2) fs1 read Router will connect observer because state is update in step (1) + * (3) fs2 create Router will just connect active + * (4) stop observer EditLogTailer. + * (5) fs1 read Router will connect observer. Though observer is far behind, fs use their + * own state id, observer will not reject fs1. + * active ops is 2 (fs1's create, complete) * LOOP + * + 2 (fs2's create, complete) * LOOP + * observer ops is 1 (fs1's open) * LOOP + * + 1 (fs3's open) * LOOP + * */ + testObserveRead(NameServiceStateIdMode.TRANSMISSION, NameServiceStateIdMode.TRANSMISSION, + ObserverReadProxyProvider.class, 4 * LOOPS, 2 * LOOPS, false, 1); + } + + @Test(timeout = 600000) + public void testMultipleObserveReadWithRouter() throws Exception { + /* TestCase 1 + * If the NameserviceStateIdStateIdMode of router is DISABLE, + * whatever the NameserviceStateIdStateIdMode and provider of client is, + * router will connect active namenode directly! + * In this unit-test, we will do below steps: + * (1) fs1 create Router will just connect active + * (2) fs1 read Router will just connect active + * (3) fs2 create Router will just connect active + * (4) stop observer EditLogTailer. + * (5) fs1 read Router will just connect active. + * active ops is 2 (fs1's create, complete) * LOOP + * + 1 (fs1's open) * LOOP + * + 2 (fs2's create, complete) * LOOP + * + 1 (fs1's open) * LOOP + * */ + testObserveRead(NameServiceStateIdMode.DISABLE, NameServiceStateIdMode.DISABLE, + ObserverReadProxyProvider.class, 6 * LOOPS, 0 * LOOPS, false, 2); + testObserveRead(NameServiceStateIdMode.TRANSMISSION, NameServiceStateIdMode.DISABLE, + ObserverReadProxyProvider.class, 6 * LOOPS, 0 * LOOPS, false, 2); + testObserveRead(NameServiceStateIdMode.PROXY, NameServiceStateIdMode.DISABLE, + ObserverReadProxyProvider.class, 6 * LOOPS, 0 * LOOPS, false, 2); + testObserveRead(NameServiceStateIdMode.TRANSMISSION, NameServiceStateIdMode.DISABLE, + ConfiguredFailoverProxyProvider.class, 6 * LOOPS, 0 * LOOPS, false, 2); + testObserveRead(NameServiceStateIdMode.PROXY, NameServiceStateIdMode.DISABLE, + ConfiguredFailoverProxyProvider.class, 6 * LOOPS, 0 * LOOPS, false, 2); + + /* TestCase 2 + * If the NameserviceStateIdStateIdMode of router is PROXY, + * whatever the NameserviceStateIdStateIdMode and provider of client is, + * router will connect observe with state id managed by router. + * In this unit-test, we will do below steps: + * (1) fs1 create Router will just connect active + * (2) fs1 read Router will connect observer because state is update in step (1) + * (3) fs2 create Router will just connect active + * (4) stop observer EditLogTailer. + * (5) fs1 read Router will connect active. the state manged by router is updated in + * step (3), but observer is far behind. So router will failover to active. + * active ops is 2 (fs1's create, complete) * LOOP + * + 1 (fs1's msync) * LOOP + * + 2 (fs2's create, complete) * LOOP + * + 2 (fs1's open + msync) * LOOP + * observer ops is 1 (fs1's open) * LOOP + * */ + testObserveRead(NameServiceStateIdMode.DISABLE, NameServiceStateIdMode.PROXY, + ObserverReadProxyProvider.class, 7 * LOOPS, 1 * LOOPS, false, 2); + testObserveRead(NameServiceStateIdMode.PROXY, NameServiceStateIdMode.PROXY, + ObserverReadProxyProvider.class, 7 * LOOPS, 1 * LOOPS, false, 2); + testObserveRead(NameServiceStateIdMode.TRANSMISSION, NameServiceStateIdMode.PROXY, + ObserverReadProxyProvider.class, 7 * LOOPS, 1 * LOOPS, false, 2); + testObserveRead(NameServiceStateIdMode.PROXY, NameServiceStateIdMode.PROXY, + ConfiguredFailoverProxyProvider.class, 7 * LOOPS, 1 * LOOPS, false, 2); + testObserveRead(NameServiceStateIdMode.TRANSMISSION, NameServiceStateIdMode.PROXY, + ConfiguredFailoverProxyProvider.class, 7 * LOOPS, 1 * LOOPS, false, 2); + + /* TestCase 3 + * If the NameserviceStateIdStateIdMode of router is TRANSMISSION, + * The behavior of the router depend on client's NameserviceStateIdStateIdMode. + * TestCase 3.1 + * If the NameserviceStateIdStateIdMode of client is DISABLE or provider is + * nonObserverReadProxyProvider, router will connect active namenode directly! + * In this unit-test, we will do below steps: + * (1) fs1 create Router will just connect active + * (2) fs1 read Router will just connect active + * (3) fs2 create Router will just connect active + * (4) stop observer EditLogTailer. + * (5) fs1 read Router will just connect active. + * active ops is 2 (fs1's create, complete) * LOOP + * + 1 (fs1's open) * LOOP + * + 2 (fs2's create, complete) * LOOP + * + 1 (fs1's open) * LOOP + * */ + testObserveRead(NameServiceStateIdMode.DISABLE, NameServiceStateIdMode.TRANSMISSION, + ObserverReadProxyProvider.class, 6 * LOOPS, 0 * LOOPS, false, 2); + testObserveRead(NameServiceStateIdMode.PROXY, NameServiceStateIdMode.TRANSMISSION, + ConfiguredFailoverProxyProvider.class, 6 * LOOPS, 0 * LOOPS, false, 2); + testObserveRead(NameServiceStateIdMode.TRANSMISSION, + NameServiceStateIdMode.TRANSMISSION, ConfiguredFailoverProxyProvider.class, 6 * LOOPS, + 0 * LOOPS, false, 2); + + /* TestCase 3.2 + * If the NameserviceStateIdStateIdMode of client is PROXY, + * router will connect observe with state id managed by router. + * In this unit-test, we will do below steps: + * (1) fs1 create Router will just connect active + * (2) fs1 read Router will connect observer because state is update in step (1) + * (3) fs2 create Router will just connect active + * (4) stop observer EditLogTailer. + * (5) fs1 read Router will connect active. the state manged by router is updated in + * step (3), but observer is far behind. So router will failover to active. + * active ops is 2 (fs1's create, complete) * LOOP + * + 1 (fs1's msync) * LOOP + * + 2 (fs2's create, complete) * LOOP + * + 2 (fs1's open + msync) * LOOP + * observer ops is 1 (fs1's open) * LOOP + * */ + testObserveRead(NameServiceStateIdMode.PROXY, NameServiceStateIdMode.TRANSMISSION, + ObserverReadProxyProvider.class, 7 * LOOPS, 1 * LOOPS, false, 2); + + /* TestCase 3.3 + * If the NameserviceStateIdStateIdMode of client is TRANSMISSION, + * router will connect observe with state id managed by client. + * In this unit-test, we will do below steps: + * (1) fs1 create Router will just connect active + * (2) fs1 read Router will connect observer because state is update in step (1) + * (3) fs2 create Router will just connect active + * (4) stop observer EditLogTailer. + * (5) fs1 read Router will connect observer. Though observer is far behind, fs use their + * own state id, observer will not reject fs1. + * active ops is 2 (fs1's create, complete) * LOOP + * + 2 (fs2's create, complete) * LOOP + * observer ops is 1 (fs1's open) * LOOP + * + 1 (fs3's open) * LOOP + * */ + testObserveRead(NameServiceStateIdMode.TRANSMISSION, NameServiceStateIdMode.TRANSMISSION, + ObserverReadProxyProvider.class, 4 * LOOPS, 2 * LOOPS, false, 2); + } + + /* TestCases for observer is down. */ + @Test(timeout = 120000) + public void testObserveReadWithRouterWhenObserverIsDown() throws Exception { + /* TestCase1: + * If the state id is managed by client, router just connector active NameNode. + * active ops is 2 (fs1's create, complete) * LOOP + * + 1 (fs1's open) * LOOP + * + 2 (fs2's create, complete) * LOOP + * + 1 (fs1's open) * LOOP + * */ + testObserveRead(NameServiceStateIdMode.TRANSMISSION, + NameServiceStateIdMode.TRANSMISSION, ObserverReadProxyProvider.class, 6 * LOOPS, 0 * LOOPS, + true, 1); + + /* TestCase2: + * If the state id is managed by router, in every read opeation, router will add a sync + * operation. + * active ops is 2 (fs1's create, complete) * LOOP + * + 2 (fs1's open, msync) * LOOP + * + 2 (fs2's create, complete) * LOOP + * + 2 (fs1's open, msync) * LOOP + * */ + testObserveRead(NameServiceStateIdMode.PROXY, NameServiceStateIdMode.TRANSMISSION, + ObserverReadProxyProvider.class, 8 * LOOPS, 0 * LOOPS, true, 1); + testObserveRead(NameServiceStateIdMode.DISABLE, NameServiceStateIdMode.PROXY, + ObserverReadProxyProvider.class, 8 * LOOPS, 0 * LOOPS, true, 1); + testObserveRead(NameServiceStateIdMode.TRANSMISSION, NameServiceStateIdMode.PROXY, + ObserverReadProxyProvider.class, 8 * LOOPS, 0 * LOOPS, true, 1); + testObserveRead(NameServiceStateIdMode.PROXY, NameServiceStateIdMode.PROXY, + ObserverReadProxyProvider.class, 8 * LOOPS, 0 * LOOPS, true, 1); + testObserveRead(NameServiceStateIdMode.TRANSMISSION, NameServiceStateIdMode.PROXY, + ConfiguredFailoverProxyProvider.class, 8 * LOOPS, 0 * LOOPS, true, 1); + testObserveRead(NameServiceStateIdMode.PROXY, NameServiceStateIdMode.PROXY, + ConfiguredFailoverProxyProvider.class, 8 * LOOPS, 0 * LOOPS, true, 1); + } + + @Test(timeout = 60000) + public void testUnavaliableObserverNN() throws Exception { + startUpCluster(2); + Configuration clientConf = createClientConf(NameServiceStateIdMode.DISABLE, + ObserverReadProxyProvider.class); + FileSystem fileSystem = cluster.getFileSystem(FEDERATION_NS, clientConf); + + stopObserver(2); + + fileSystem.listStatus(new Path("/")); + + // msync, getBlockLocation call should send to active when observer is stoped. + assertProxyOps(cluster, 2, -1); + + fileSystem.close(); + + boolean hasUnavailable = false; + for(String ns : cluster.getNameservices()) { + for (RouterContext routerContext : cluster.getRouters()) { + List nns = routerContext.getRouter() + .getNamenodeResolver().getNamenodesForNameserviceId(ns, false); + for(FederationNamenodeContext nn : nns) { + if(FederationNamenodeServiceState.UNAVAILABLE == nn.getState()) { + hasUnavailable = true; + } + } + } + } + // After communicate with unavailable observer namenode, + // we will update state to unavailable. + assertTrue("There must has unavailable NN", hasUnavailable); + } + + @Test(timeout = 120000) + public void testRouterReadWithMsync() throws Exception { + /* + * Case 1: + * Router' mode is PROXY or DISABLE, msync will be ignored. + * */ + testObserveReadWithMsync(NameServiceStateIdMode.TRANSMISSION, NameServiceStateIdMode.PROXY, + ObserverReadProxyProvider.class, 1 + 3 * LOOPS, 1 + 1 * LOOPS); + testObserveReadWithMsync(NameServiceStateIdMode.TRANSMISSION, NameServiceStateIdMode.DISABLE, + ObserverReadProxyProvider.class, 1 + 3 * LOOPS, 0 * LOOPS); + + /* + * Case 2: + * Router is TRANSMISSION, msync will be called by their demand. + * */ + testObserveReadWithMsync(NameServiceStateIdMode.TRANSMISSION, + NameServiceStateIdMode.TRANSMISSION, ObserverReadProxyProvider.class, + 1 + 3 * LOOPS, 1 + 1 * LOOPS); + } + + @Test(timeout = 600000) + public void testObserveReadWithRouterAndContinuousMsync() throws Exception { + // Here we simulate the whole pipline, include msync from namenode. + // Because client will trigger msync and observer is far behind, so the second time of fs1 + // read will connect to active. + startUpCluster(1, NameServiceStateIdMode.TRANSMISSION, null); + Configuration clientConf = createClientConf(NameServiceStateIdMode.TRANSMISSION, + ObserverReadProxyProvider.class, 0); + testExamples(cluster, clientConf, -1, 1 * LOOPS, false); + } + + private void testObserveReadWithMsync(NameServiceStateIdMode clientMode, + NameServiceStateIdMode routerMode, + Class provider, + int activeExpected, + int observerExpected) throws Exception { + try { + startUpCluster(1, routerMode, null); + Configuration clientConf = createClientConf(clientMode, provider); + testExamplesWithMsync(cluster, clientConf, activeExpected, observerExpected); + // assertEquals(0L, RouterStateIdCache.size()); + } finally { + teardown(); + } + } + + private void testObserveRead(NameServiceStateIdMode clientMode, + NameServiceStateIdMode routerMode, + Class provider, + int activeExpected, + int observerExpected, + boolean stopObserver, + int observers) throws Exception { + try { + startUpCluster(observers, routerMode, null); + Configuration clientConf = createClientConf(clientMode, provider); + testExamples(cluster, clientConf, activeExpected, observerExpected, stopObserver); + // assertEquals(0L, RouterStateIdCache.size()); + } finally { + teardown(); + } + } + + private Configuration createClientConf(NameServiceStateIdMode mode, + Class provider) { + return createClientConf(mode, provider, -1); + } + + private Configuration createClientConf(NameServiceStateIdMode mode, + Class provider, int period){ + Configuration clientConf = new Configuration(); + clientConf.set(PROXY_PROVIDER_KEY_PREFIX + "." + FEDERATION_NS, provider.getCanonicalName()); + clientConf.setInt(AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + FEDERATION_NS, period); + clientConf.set(HdfsClientConfigKeys.DFS_CLIENT_NAMESERVICE_STATE_ID_MODE, mode.name()); + clientConf.setLong(ObserverReadProxyProvider.OBSERVER_PROBE_RETRY_PERIOD_KEY, 0L); + // For TRANSMISSION mode, different user should use their own AlignmentContext in test. + clientConf.setBoolean("fs.hdfs.impl.disable.cache", true); + return clientConf; + } + + private void testExamplesWithMsync(MiniRouterDFSCluster cluster, Configuration clientConf, + int activeExpected, int observerExpected) throws Exception { + Path path = new Path("/testFile"); + String defaultNs = cluster.getDefaultNameservice(); + FileSystem fs1 = cluster.getFileSystem(FEDERATION_NS, clientConf); + try { + waitObserverKeepUpWithActive(cluster, defaultNs); + // initializeMsync. mysnc to active only at first time, list to observer. + // In PROXY mode, RouterClientProtocol::msync will be ignored. But RouterRpcClient::msync will + // be called. + // In TRANSMISSION, msync will be ignored firstly, then RouterRpcClient::msync found no nn was + // connected, so trigger msync. Add one rpc to active. + fs1.listStatus(new Path("/")); + + for (int i = 0; i < LOOPS; i ++) { + // Send create and complete call to active + fs1.create(path).close(); + + waitObserverKeepUpWithActive(cluster, defaultNs); + + // Send sync. + // In PROXY mode, RouterClientProtocol::msync will be ignored. But RouterRpcClient::msync + // will be called. + // In TRANSMISSION, msync will be called by demand. + // Here fs1 only visit ns1's namenode, because we only have connected ns1. + fs1.msync(); + + // Send read request to observer. msync to active, open to observer + fs1.open(path).close(); + } + } finally { + if (fs1 != null) { + fs1.close(); + } + } + assertProxyOps(cluster, activeExpected, observerExpected); + } + + private void testExamples(MiniRouterDFSCluster cluster, Configuration clientConf, + int activeExpected, int observerExpected, boolean stopObserver) throws Exception { + if (stopObserver) { + int nnIndex = stopObserver(1); + assertNotEquals("No observer found", 3, nnIndex); + } + String defaultNs = cluster.getDefaultNameservice(); + FileSystem fs1 = cluster.getFileSystem(FEDERATION_NS, clientConf); + FileSystem fs2 = cluster.getFileSystem(FEDERATION_NS, clientConf); + try { + fs1 = cluster.getFileSystem(FEDERATION_NS, clientConf); + fs2 = cluster.getFileSystem(FEDERATION_NS, clientConf); + for (int i = 0; i < LOOPS; i++) { + Path path1 = new Path("/testFile1" + i); + // Send create call and compelete to active + fs1.create(path1).close(); + + if (!stopObserver) { + // wait observer keep up with active + waitObserverKeepUpWithActive(cluster, defaultNs); + } + + // Send read request to observer + fs1.open(path1).close(); + + // stop observer namenode editlogtailer, obeserver's state id will delay + stopObserverEditLogTailer(cluster, defaultNs); + + Path path2 = new Path("/testFile2" + i); + // Send create call to active + fs2.create(path2).close(); + + // observer state id is delayed. + fs1.open(path1).close(); + + startObserverEditLogTailer(cluster, defaultNs); + } + } finally { + if (fs1 != null) { + fs1.close(); + } + if (fs2 != null) { + fs2.close(); + } + } + assertProxyOps(cluster, activeExpected, observerExpected); + } + + private int stopObserver(int num) { + int nnIndex; + for (nnIndex = 0; nnIndex < cluster.getNamenodes().size(); nnIndex++) { + NameNode nameNode = cluster.getCluster().getNameNode(nnIndex); + if (nameNode != null && nameNode.isObserverState()) { + cluster.getCluster().shutdownNameNode(nnIndex); + num--; + if (num == 0) { + break; + } + } + } + return nnIndex; + } + + public List getNameNodes(MiniRouterDFSCluster cluster, + String nsId, + HAServiceState state) { + List nameNodes = new ArrayList<>(); + for (MiniRouterDFSCluster.NamenodeContext context :cluster.getNamenodes(nsId)) { + if (context.getNamenode().getState().equals(state.toString())) { + nameNodes.add(context.getNamenode()); + } + } + return nameNodes; + } + + private void waitObserverKeepUpWithActive(MiniRouterDFSCluster cluster, String nsId) + throws Exception { + List namenodeContexts = cluster.getNamenodes(nsId); + LambdaTestUtils.await(2000, 10, () -> { + List actives = getNameNodes(cluster, nsId, HAServiceState.ACTIVE); + List observers = getNameNodes(cluster, nsId, HAServiceState.OBSERVER); + assertEquals("Active namenode must have one instance.", 1, actives.size()); + assertNotNull("Can't find observer namenode.", observers.size() > 0); + return observers.stream() + .map(o -> o.getNamesystem().getLastAppliedOrWrittenTxId() == + actives.get(0).getNamesystem().getLastAppliedOrWrittenTxId()) + .reduce(true, (a, b) -> a && b); + }); + } + + private void assertProxyOps(MiniRouterDFSCluster cluster, int active, int observer) { + long rpcCountForActive = cluster.getRouters().stream() + .map(s -> s.getRouter().getRpcServer().getRPCMetrics().getActiveProxyOps()) + .reduce(0L, Long::sum); + long rpcCountForObserver = cluster.getRouters().stream() + .map(s -> s.getRouter().getRpcServer().getRPCMetrics().getObserverProxyOps()) + .reduce(0L, Long::sum); + if (active >= 0) { + assertEquals(active, rpcCountForActive); + } + if (observer >= 0) { + assertEquals(observer, rpcCountForObserver); + } + } + + private void stopObserverEditLogTailer(MiniRouterDFSCluster cluster, String defaultNs) + throws InterruptedException { + List observers = getNameNodes(cluster, defaultNs, HAServiceState.OBSERVER); + observers.forEach(o -> o.getNamesystem().getEditLogTailer().setSkipForTest(true)); + Thread.sleep(20); + } + + private void startObserverEditLogTailer(MiniRouterDFSCluster cluster, String defaultNs) + throws InterruptedException { + List observers = getNameNodes(cluster, defaultNs, HAServiceState.OBSERVER); + observers.forEach(o -> o.getNamesystem().getEditLogTailer().setSkipForTest(false)); + Thread.sleep(20); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeHeartbeat.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeHeartbeat.java index 94f2baeaed136..04b4b58bcb6e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeHeartbeat.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeHeartbeat.java @@ -167,7 +167,7 @@ public void testHearbeat() throws InterruptedException, IOException { // Verify the locator has matching NN entries for each NS for (String ns : cluster.getNameservices()) { List nns = - namenodeResolver.getNamenodesForNameserviceId(ns); + namenodeResolver.getNamenodesForNameserviceId(ns, false); // Active FederationNamenodeContext active = nns.get(0); @@ -191,7 +191,7 @@ public void testHearbeat() throws InterruptedException, IOException { // Verify the locator has recorded the failover for the failover NS List failoverNSs = - namenodeResolver.getNamenodesForNameserviceId(failoverNS); + namenodeResolver.getNamenodesForNameserviceId(failoverNS, false); // Active FederationNamenodeContext active = failoverNSs.get(0); assertEquals(NAMENODES[1], active.getNamenodeId()); @@ -202,7 +202,7 @@ public void testHearbeat() throws InterruptedException, IOException { // Verify the locator has the same records for the other ns List normalNss = - namenodeResolver.getNamenodesForNameserviceId(normalNs); + namenodeResolver.getNamenodesForNameserviceId(normalNs, false); // Active active = normalNss.get(0); assertEquals(NAMENODES[0], active.getNamenodeId()); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java index 4fae86b01d399..bae2dea3ceabf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java @@ -204,7 +204,7 @@ public void testNamenodeMonitoring() throws Exception { final List namespaceInfo = new ArrayList<>(); for (String nsId : nns.keySet()) { List nnReports = - resolver.getNamenodesForNameserviceId(nsId); + resolver.getNamenodesForNameserviceId(nsId, false); namespaceInfo.addAll(nnReports); } for (FederationNamenodeContext nnInfo : namespaceInfo) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeWebScheme.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeWebScheme.java index ab507aaf9ecd4..f23b02092a299 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeWebScheme.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeWebScheme.java @@ -194,7 +194,7 @@ private void testWebScheme(HttpConfig.Policy httpPolicy, final List namespaceInfo = new ArrayList<>(); for (String nsId : nns.keySet()) { List nnReports = - resolver.getNamenodesForNameserviceId(nsId); + resolver.getNamenodesForNameserviceId(nsId, false); namespaceInfo.addAll(nnReports); } for (FederationNamenodeContext nnInfo : namespaceInfo) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java index b2bfb2f5121bb..1054e5ac8cf97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java @@ -166,7 +166,7 @@ public void testRetryWhenOneNameServiceDown() throws Exception { private void registerInvalidNameReport() throws IOException { String ns0 = cluster.getNameservices().get(0); List origin = resolver - .getNamenodesForNameserviceId(ns0); + .getNamenodesForNameserviceId(ns0, false); FederationNamenodeContext nnInfo = origin.get(0); NamenodeStatusReport report = new NamenodeStatusReport(ns0, nnInfo.getNamenodeId(), nnInfo.getRpcAddress(), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 9e1333f95295b..140a2bba119bb 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1640,6 +1640,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.namenode.state.context.enabled"; public static final boolean DFS_NAMENODE_STATE_CONTEXT_ENABLED_DEFAULT = false; + public static final String DFS_ESTIMATED_TXNS_PER_SECOND_KEY = "dfs.estimated.txns.per.second"; + public static final long DFS_ESTIMATED_TXNS_PER_SECOND_DEFAULT = 10000L; + + public static final String DFS_ESTIMATED_SERVER_TIME_MULTIPLIER_KEY = + "dfs.estimated.server.time.multiplier"; + public static final float DFS_ESTIMATED_SERVER_TIME_MULTIPLIER_DEFAULT = 0.8f; + /** * whether to protect the subdirectories of directories which * set on fs.protected.directories. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 13894b4fecf8c..a200bfe180b8f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -4688,7 +4688,7 @@ public long getMissingReplOneBlocksCount() { public int getExpiredHeartbeats() { return datanodeStatistics.getExpiredHeartbeats(); } - + @Metric({"TransactionsSinceLastCheckpoint", "Number of transactions since last checkpoint"}) public long getTransactionsSinceLastCheckpoint() { @@ -4724,7 +4724,13 @@ private long getCorrectTransactionsSinceLastLogRoll() { public long getLastWrittenTransactionId() { return getEditLog().getLastWrittenTxIdWithoutLock(); } - + + @Metric({"LastAppliedOrWrittenTxId", + "Last Transaction ID applied to FSImage"}) + public long getLastAppliedOrWrittenTxId() { + return this.getFSImage().getLastAppliedOrWrittenTxId(); + } + @Metric({"LastCheckpointTime", "Time in milliseconds since the epoch of the last checkpoint"}) public long getLastCheckpointTime() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java index 7d613594efd64..3ae8f11549fba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java @@ -25,7 +25,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly; @@ -48,7 +50,7 @@ class GlobalStateIdContext implements AlignmentContext { * RPC request will wait in the call queue before the Observer catches up * with its state id. */ - private static final long ESTIMATED_TRANSACTIONS_PER_SECOND = 10000L; + private final long estimatedTxnsPerSecond; /** * The client wait time on an RPC request is composed of @@ -56,7 +58,7 @@ class GlobalStateIdContext implements AlignmentContext { * This is an expected fraction of the total wait time spent on * server execution. */ - private static final float ESTIMATED_SERVER_TIME_MULTIPLIER = 0.8f; + private final float estimatedServerTimeMultiplier; private final FSNamesystem namesystem; private final HashSet coordinatedMethods; @@ -65,8 +67,13 @@ class GlobalStateIdContext implements AlignmentContext { * Server side constructor. * @param namesystem server side state provider */ - GlobalStateIdContext(FSNamesystem namesystem) { + GlobalStateIdContext(FSNamesystem namesystem, Configuration conf) { this.namesystem = namesystem; + this.estimatedTxnsPerSecond = conf.getLong(DFSConfigKeys.DFS_ESTIMATED_TXNS_PER_SECOND_KEY, + DFSConfigKeys.DFS_ESTIMATED_TXNS_PER_SECOND_DEFAULT); + this.estimatedServerTimeMultiplier = conf + .getFloat(DFSConfigKeys.DFS_ESTIMATED_SERVER_TIME_MULTIPLIER_KEY, + DFSConfigKeys.DFS_ESTIMATED_SERVER_TIME_MULTIPLIER_DEFAULT); this.coordinatedMethods = new HashSet<>(); // For now, only ClientProtocol methods can be coordinated, so only checking // against ClientProtocol. @@ -125,8 +132,11 @@ public void updateRequestState(RpcRequestHeaderProto.Builder header) { * @throws RetriableException if Observer is too far behind. */ @Override - public long receiveRequestState(RpcRequestHeaderProto header, - long clientWaitTime) throws IOException { + public long receiveRequestState(RpcRequestHeaderProto header, long clientWaitTime, + boolean isCoordinatedCall) throws IOException { + if (!isCoordinatedCall) { + return -1; + } if (!header.hasStateId() && HAServiceState.OBSERVER.equals(namesystem.getState())) { // This could happen if client configured with non-observer proxy provider @@ -151,11 +161,9 @@ public long receiveRequestState(RpcRequestHeaderProto header, clientStateId, serverStateId); return serverStateId; } - if (HAServiceState.OBSERVER.equals(namesystem.getState()) && - clientStateId - serverStateId > - ESTIMATED_TRANSACTIONS_PER_SECOND - * TimeUnit.MILLISECONDS.toSeconds(clientWaitTime) - * ESTIMATED_SERVER_TIME_MULTIPLIER) { + if (HAServiceState.OBSERVER.equals(namesystem.getState()) && clientStateId - serverStateId > + estimatedTxnsPerSecond * TimeUnit.MILLISECONDS.toSeconds(clientWaitTime) * + estimatedServerTimeMultiplier) { throw new RetriableException( "Observer Node is too far behind: serverStateId = " + serverStateId + " clientStateId = " + clientStateId); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index b64530337ee51..5bead6d6064c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -457,7 +457,7 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) GlobalStateIdContext stateIdContext = null; if (enableStateContext) { - stateIdContext = new GlobalStateIdContext(namesystem); + stateIdContext = new GlobalStateIdContext(namesystem, conf); } clientRpcServer = new RPC.Builder(conf) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index 25596dce9f51d..834101ae820c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -475,7 +475,8 @@ void sleep(long sleepTimeMillis) throws InterruptedException { */ private class EditLogTailerThread extends Thread { private volatile boolean shouldRun = true; - + private volatile boolean skipForTest = false; + private EditLogTailerThread() { super("Edit log tailer"); } @@ -483,7 +484,11 @@ private EditLogTailerThread() { private void setShouldRun(boolean shouldRun) { this.shouldRun = shouldRun; } - + + public void setSkipForTest(boolean skip) { + this.skipForTest = skip; + } + @Override public void run() { SecurityUtil.doAsLoginUserOrFatal( @@ -499,6 +504,14 @@ public Object run() { private void doWork() { long currentSleepTimeMs = sleepTimeMs; while (shouldRun) { + if (skipForTest) { + try { + EditLogTailer.this.sleep(10); + } catch (InterruptedException e) { + LOG.warn("Edit log tailer interrupted", e); + } + continue; + } long editsTailed = 0; try { // There's no point in triggering a log roll if the Standby hasn't @@ -637,4 +650,9 @@ private NamenodeProtocol getActiveNodeProxy() throws IOException { return cachedActiveProxy; } } + + @VisibleForTesting + public void setSkipForTest(boolean skip) { + tailerThread.setSkipForTest(skip); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 33ffd07c8de2b..1413242561077 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -6446,4 +6446,22 @@ frequently than this time, the client will give up waiting. + + + dfs.estimated.txns.per.second + 10000 + + Estimated number of journal transactions a typical NameNode can execute per second. The number is used to estimate + how long a client's RPC request will wait in the call queue before the Observer catches up with its state id. + + + + + dfs.estimated.server.time.multiplier + 0.8f + + The client wait time on an RPC request is composed of the server execution time plus the communication time. + This is an expected fraction of the total wait time spent on server execution. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 484958e3c302c..57425381974fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -2308,6 +2308,8 @@ public synchronized void restartNameNode(int nnIndex, boolean waitActive, nn.getHttpServer() .setAttribute(ImageServlet.RECENT_IMAGE_CHECK_ENABLED, false); info.nameNode = nn; + info.nameserviceId = info.conf.get(DFS_NAMESERVICE_ID); + info.nnId = info.conf.get(DFS_HA_NAMENODE_ID_KEY); info.setStartOpt(startOpt); if (waitActive) { if (numDataNodes > 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java index 307fe04618ba6..a898fbe165295 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java @@ -35,8 +35,8 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.LongAccumulator; +import org.apache.hadoop.hdfs.NamespaceStateId; import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; import org.slf4j.Logger; @@ -367,9 +367,9 @@ public static long setACStateId(DistributedFileSystem dfs, ClientGSIContext ac = (ClientGSIContext)(provider.getAlignmentContext()); Field f = ac.getClass().getDeclaredField("lastSeenStateId"); f.setAccessible(true); - LongAccumulator lastSeenStateId = (LongAccumulator)f.get(ac); + NamespaceStateId lastSeenStateId = (NamespaceStateId)f.get(ac); long currentStateId = lastSeenStateId.getThenReset(); - lastSeenStateId.accumulate(stateId); + lastSeenStateId.update(stateId); return currentStateId; }