-
Notifications
You must be signed in to change notification settings - Fork 9.2k
YARN-11326. [Federation] Add RM FederationStateStoreService Metrics. #4963
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
fa42bae
b164bd3
8aa81a9
a913c33
460d332
d5e7538
b14d43b
a127c35
7e0a9d4
9ed1f3a
6f7fc19
5614886
52a72ab
b9a4eb0
30ee750
9e2e40c
64ab71e
77b784d
6abdc0e
3ae3e39
2b9ba9f
3c4cef5
46ed46b
17cefac
13b5870
2298bae
2505955
ad3f2c8
fe0c056
1e99166
8387258
001a96a
fa52991
d15542a
fac9676
c21a998
4ef0b90
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.hadoop.yarn.server.federation.utils; | ||
|
|
||
| import org.apache.hadoop.yarn.exceptions.YarnException; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.util.Arrays; | ||
|
|
||
| /** | ||
| * Class to define client method,params and arguments. | ||
| */ | ||
| public class FederationClientMethod { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(FederationClientMethod.class); | ||
|
|
||
| /** | ||
| * List of parameters: static and dynamic values, matchings types. | ||
| */ | ||
| private final Object[] params; | ||
| /** | ||
| * List of method parameters types, matches parameters. | ||
| */ | ||
| private final Class<?>[] types; | ||
| /** | ||
| * String name of the method. | ||
| */ | ||
| private final String methodName; | ||
|
|
||
| public FederationClientMethod(String method, Class<?>[] pTypes, Object... pParams) | ||
| throws YarnException { | ||
| if (pParams.length != pTypes.length) { | ||
| throw new YarnException("Invalid parameters for method " + method); | ||
| } | ||
|
|
||
| this.params = pParams; | ||
| this.types = Arrays.copyOf(pTypes, pTypes.length); | ||
| this.methodName = method; | ||
| } | ||
|
|
||
| public FederationClientMethod(String method, Class pTypes, Object pParams) | ||
| throws YarnException { | ||
| this(method, new Class[]{pTypes}, new Object[]{pParams}); | ||
| } | ||
|
|
||
| public Object[] getParams() { | ||
| return Arrays.copyOf(this.params, this.params.length); | ||
| } | ||
|
|
||
| public String getMethodName() { | ||
| return methodName; | ||
| } | ||
|
|
||
| /** | ||
| * Get the calling types for this method. | ||
| * | ||
| * @return An array of calling types. | ||
| */ | ||
| public Class<?>[] getTypes() { | ||
| return Arrays.copyOf(this.types, this.types.length); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,14 +19,14 @@ | |
| package org.apache.hadoop.yarn.server.resourcemanager.federation; | ||
|
|
||
| import java.io.IOException; | ||
| import java.lang.reflect.Method; | ||
| import java.net.InetSocketAddress; | ||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.List; | ||
|
|
||
| import org.apache.commons.lang3.NotImplementedException; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.io.retry.RetryPolicy; | ||
| import org.apache.hadoop.net.NetUtils; | ||
|
|
@@ -79,11 +79,14 @@ | |
| import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse; | ||
| import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest; | ||
| import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse; | ||
| import org.apache.hadoop.yarn.server.federation.utils.FederationClientMethod; | ||
| import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; | ||
| import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; | ||
| import org.apache.hadoop.yarn.server.records.Version; | ||
| import org.apache.hadoop.yarn.server.resourcemanager.RMContext; | ||
| import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; | ||
| import org.apache.hadoop.yarn.util.Clock; | ||
| import org.apache.hadoop.yarn.util.MonotonicClock; | ||
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; | ||
| import org.apache.hadoop.yarn.webapp.util.WebAppUtils; | ||
| import org.slf4j.Logger; | ||
|
|
@@ -109,6 +112,8 @@ public class FederationStateStoreService extends AbstractService | |
| private long heartbeatInterval; | ||
| private long heartbeatInitialDelay; | ||
| private RMContext rmContext; | ||
| private final Clock clock = new MonotonicClock(); | ||
| private FederationStateStoreServiceMetrics metrics; | ||
| private String cleanUpThreadNamePrefix = "FederationStateStoreService-Clean-Thread"; | ||
| private int cleanUpRetryCountNum; | ||
| private long cleanUpRetrySleepTime; | ||
|
|
@@ -170,6 +175,9 @@ protected void serviceInit(Configuration conf) throws Exception { | |
|
|
||
| LOG.info("Initialized federation membership service."); | ||
|
|
||
| this.metrics = FederationStateStoreServiceMetrics.getMetrics(); | ||
| LOG.info("Initialized federation statestore service metrics."); | ||
|
|
||
| super.serviceInit(conf); | ||
| } | ||
|
|
||
|
|
@@ -272,130 +280,191 @@ public Version loadVersion() { | |
| @Override | ||
| public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( | ||
| GetSubClusterPolicyConfigurationRequest request) throws YarnException { | ||
| return stateStoreClient.getPolicyConfiguration(request); | ||
| FederationClientMethod clientMethod = new FederationClientMethod( | ||
| "getPolicyConfiguration", GetSubClusterPolicyConfigurationRequest.class, request); | ||
| return invoke(clientMethod, GetSubClusterPolicyConfigurationResponse.class); | ||
| } | ||
|
|
||
| @Override | ||
| public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration( | ||
| SetSubClusterPolicyConfigurationRequest request) throws YarnException { | ||
| return stateStoreClient.setPolicyConfiguration(request); | ||
| FederationClientMethod clientMethod = new FederationClientMethod( | ||
| "setPolicyConfiguration", SetSubClusterPolicyConfigurationRequest.class, request); | ||
| return invoke(clientMethod, SetSubClusterPolicyConfigurationResponse.class); | ||
| } | ||
|
|
||
| @Override | ||
| public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations( | ||
| GetSubClusterPoliciesConfigurationsRequest request) throws YarnException { | ||
| return stateStoreClient.getPoliciesConfigurations(request); | ||
| FederationClientMethod clientMethod = new FederationClientMethod( | ||
| "getPoliciesConfigurations", GetSubClusterPoliciesConfigurationsRequest.class, request); | ||
| return invoke(clientMethod, GetSubClusterPoliciesConfigurationsResponse.class); | ||
|
||
| } | ||
|
|
||
| @Override | ||
| public SubClusterRegisterResponse registerSubCluster( | ||
| SubClusterRegisterRequest registerSubClusterRequest) | ||
| throws YarnException { | ||
| return stateStoreClient.registerSubCluster(registerSubClusterRequest); | ||
| FederationClientMethod clientMethod = new FederationClientMethod( | ||
| "registerSubCluster", SubClusterRegisterRequest.class, registerSubClusterRequest); | ||
| return invoke(clientMethod, SubClusterRegisterResponse.class); | ||
| } | ||
|
|
||
| @Override | ||
| public SubClusterDeregisterResponse deregisterSubCluster( | ||
| SubClusterDeregisterRequest subClusterDeregisterRequest) | ||
| throws YarnException { | ||
| return stateStoreClient.deregisterSubCluster(subClusterDeregisterRequest); | ||
| FederationClientMethod clientMethod = new FederationClientMethod( | ||
| "deregisterSubCluster", SubClusterDeregisterRequest.class, subClusterDeregisterRequest); | ||
| return invoke(clientMethod, SubClusterDeregisterResponse.class); | ||
| } | ||
|
|
||
| @Override | ||
| public SubClusterHeartbeatResponse subClusterHeartbeat( | ||
| SubClusterHeartbeatRequest subClusterHeartbeatRequest) | ||
| throws YarnException { | ||
| return stateStoreClient.subClusterHeartbeat(subClusterHeartbeatRequest); | ||
| FederationClientMethod clientMethod = new FederationClientMethod( | ||
| "subClusterHeartbeat", SubClusterHeartbeatRequest.class, subClusterHeartbeatRequest); | ||
| return invoke(clientMethod, SubClusterHeartbeatResponse.class); | ||
| } | ||
|
|
||
| @Override | ||
| public GetSubClusterInfoResponse getSubCluster( | ||
| GetSubClusterInfoRequest subClusterRequest) throws YarnException { | ||
| return stateStoreClient.getSubCluster(subClusterRequest); | ||
| FederationClientMethod clientMethod = new FederationClientMethod( | ||
| "getSubClusters", GetSubClusterInfoRequest.class, subClusterRequest); | ||
| return invoke(clientMethod, GetSubClusterInfoResponse.class); | ||
| } | ||
|
|
||
| @Override | ||
| public GetSubClustersInfoResponse getSubClusters( | ||
| GetSubClustersInfoRequest subClustersRequest) throws YarnException { | ||
| return stateStoreClient.getSubClusters(subClustersRequest); | ||
| FederationClientMethod clientMethod = new FederationClientMethod( | ||
| "getSubClusters", GetSubClustersInfoRequest.class, subClustersRequest); | ||
| return invoke(clientMethod, GetSubClustersInfoResponse.class); | ||
| } | ||
|
|
||
| @Override | ||
| public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( | ||
| AddApplicationHomeSubClusterRequest request) throws YarnException { | ||
| return stateStoreClient.addApplicationHomeSubCluster(request); | ||
| FederationClientMethod clientMethod = new FederationClientMethod( | ||
| "addApplicationHomeSubCluster", AddApplicationHomeSubClusterRequest.class, request); | ||
| return invoke(clientMethod, AddApplicationHomeSubClusterResponse.class); | ||
| } | ||
|
|
||
| @Override | ||
| public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( | ||
| UpdateApplicationHomeSubClusterRequest request) throws YarnException { | ||
| return stateStoreClient.updateApplicationHomeSubCluster(request); | ||
| FederationClientMethod clientMethod = new FederationClientMethod( | ||
| "updateApplicationHomeSubCluster", UpdateApplicationHomeSubClusterRequest.class, request); | ||
| return invoke(clientMethod, UpdateApplicationHomeSubClusterResponse.class); | ||
| } | ||
|
|
||
| @Override | ||
| public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( | ||
| GetApplicationHomeSubClusterRequest request) throws YarnException { | ||
| return stateStoreClient.getApplicationHomeSubCluster(request); | ||
| FederationClientMethod clientMethod = new FederationClientMethod( | ||
| "getApplicationHomeSubCluster", GetApplicationHomeSubClusterRequest.class, request); | ||
| return invoke(clientMethod, GetApplicationHomeSubClusterResponse.class); | ||
| } | ||
|
|
||
| @Override | ||
| public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( | ||
| GetApplicationsHomeSubClusterRequest request) throws YarnException { | ||
| return stateStoreClient.getApplicationsHomeSubCluster(request); | ||
| FederationClientMethod clientMethod = new FederationClientMethod( | ||
| "getApplicationsHomeSubCluster", GetApplicationsHomeSubClusterRequest.class, request); | ||
| return invoke(clientMethod, GetApplicationsHomeSubClusterResponse.class); | ||
| } | ||
|
|
||
| @Override | ||
| public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( | ||
| DeleteApplicationHomeSubClusterRequest request) throws YarnException { | ||
| return stateStoreClient.deleteApplicationHomeSubCluster(request); | ||
| FederationClientMethod clientMethod = new FederationClientMethod( | ||
| "deleteApplicationHomeSubCluster", DeleteApplicationHomeSubClusterRequest.class, request); | ||
| return invoke(clientMethod, DeleteApplicationHomeSubClusterResponse.class); | ||
| } | ||
|
|
||
| @Override | ||
| public AddReservationHomeSubClusterResponse addReservationHomeSubCluster( | ||
| AddReservationHomeSubClusterRequest request) throws YarnException { | ||
| return stateStoreClient.addReservationHomeSubCluster(request); | ||
| FederationClientMethod clientMethod = new FederationClientMethod( | ||
| "addReservationHomeSubCluster", AddReservationHomeSubClusterRequest.class, request); | ||
| return invoke(clientMethod, AddReservationHomeSubClusterResponse.class); | ||
| } | ||
|
|
||
| @Override | ||
| public GetReservationHomeSubClusterResponse getReservationHomeSubCluster( | ||
| GetReservationHomeSubClusterRequest request) throws YarnException { | ||
| return stateStoreClient.getReservationHomeSubCluster(request); | ||
| FederationClientMethod clientMethod = new FederationClientMethod( | ||
| "getReservationHomeSubCluster", GetReservationHomeSubClusterRequest.class, request); | ||
| return invoke(clientMethod, GetReservationHomeSubClusterResponse.class); | ||
| } | ||
|
|
||
| @Override | ||
| public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster( | ||
| GetReservationsHomeSubClusterRequest request) throws YarnException { | ||
| return stateStoreClient.getReservationsHomeSubCluster(request); | ||
| FederationClientMethod clientMethod = new FederationClientMethod( | ||
| "getReservationsHomeSubCluster", GetReservationsHomeSubClusterRequest.class, request); | ||
| return invoke(clientMethod, GetReservationsHomeSubClusterResponse.class); | ||
| } | ||
|
|
||
| @Override | ||
| public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster( | ||
| UpdateReservationHomeSubClusterRequest request) throws YarnException { | ||
| return stateStoreClient.updateReservationHomeSubCluster(request); | ||
| FederationClientMethod clientMethod = new FederationClientMethod( | ||
| "updateReservationHomeSubCluster", UpdateReservationHomeSubClusterRequest.class, request); | ||
| return invoke(clientMethod, UpdateReservationHomeSubClusterResponse.class); | ||
| } | ||
|
|
||
| @Override | ||
| public DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster( | ||
| DeleteReservationHomeSubClusterRequest request) throws YarnException { | ||
| return stateStoreClient.deleteReservationHomeSubCluster(request); | ||
| FederationClientMethod clientMethod = new FederationClientMethod( | ||
| "deleteReservationHomeSubCluster", DeleteReservationHomeSubClusterRequest.class, request); | ||
| return invoke(clientMethod, DeleteReservationHomeSubClusterResponse.class); | ||
| } | ||
|
|
||
| @Override | ||
| public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request) | ||
| throws YarnException, IOException { | ||
| throw new NotImplementedException("Code is not implemented"); | ||
| FederationClientMethod clientMethod = new FederationClientMethod( | ||
| "storeNewMasterKey", RouterMasterKeyRequest.class, request); | ||
| return invoke(clientMethod, RouterMasterKeyResponse.class); | ||
| } | ||
|
|
||
| @Override | ||
| public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request) | ||
| throws YarnException, IOException { | ||
| throw new NotImplementedException("Code is not implemented"); | ||
| FederationClientMethod clientMethod = new FederationClientMethod( | ||
| "removeStoredMasterKey", RouterMasterKeyRequest.class, request); | ||
| return invoke(clientMethod, RouterMasterKeyResponse.class); | ||
| } | ||
|
|
||
| @Override | ||
| public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyRequest request) | ||
| throws YarnException, IOException { | ||
| throw new NotImplementedException("Code is not implemented"); | ||
| FederationClientMethod clientMethod = new FederationClientMethod( | ||
| "getMasterKeyByDelegationKey", RouterMasterKeyRequest.class, request); | ||
| return invoke(clientMethod, RouterMasterKeyResponse.class); | ||
|
||
| } | ||
|
|
||
| private <R> R invoke(FederationClientMethod request, Class<R> clazz) | ||
|
||
| throws YarnException { | ||
| try { | ||
| long startTime = clock.getTime(); | ||
| Method method = FederationStateStore.class. | ||
| getMethod(request.getMethodName(), request.getTypes()); | ||
| R result = clazz.cast(method.invoke(stateStoreClient, request.getParams())); | ||
| long stopTime = clock.getTime(); | ||
| FederationStateStoreServiceMetrics.succeededStateStoreServiceCall( | ||
| request.getMethodName(), stopTime - startTime); | ||
| return result; | ||
| } catch (Exception e) { | ||
| LOG.error("stateStoreClient call method {} error.", request.getMethodName(), e); | ||
| FederationStateStoreServiceMetrics.failedStateStoreServiceCall( | ||
|
||
| request.getMethodName()); | ||
| throw new YarnException(e); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it make more sense to make it:
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or actually just specify it as an arg:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good idea, I will modify the code.