-
Notifications
You must be signed in to change notification settings - Fork 9.2k
YARN-11160. Support getResourceProfiles, getResourceProfile API's for Federation #4540
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
4b0af99
16680a8
1f109eb
3079a70
52176b8
ed92e5b
e4b7a92
5198950
01c2415
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1448,13 +1448,52 @@ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( | |
| @Override | ||
| public GetAllResourceProfilesResponse getResourceProfiles( | ||
| GetAllResourceProfilesRequest request) throws YarnException, IOException { | ||
| throw new NotImplementedException("Code is not implemented"); | ||
| if (request == null) { | ||
| routerMetrics.incrGetResourceProfilesFailedRetrieved(); | ||
| RouterServerUtil.logAndThrowException("Missing getResourceProfiles request.", null); | ||
| } | ||
| long startTime = clock.getTime(); | ||
| ClientMethod remoteMethod = new ClientMethod("getResourceProfiles", | ||
| new Class[] {GetAllResourceProfilesRequest.class}, new Object[] {request}); | ||
| Collection<GetAllResourceProfilesResponse> resourceProfiles; | ||
| try { | ||
| resourceProfiles = invokeAppClientProtocolMethod(true, remoteMethod, | ||
| GetAllResourceProfilesResponse.class); | ||
| } catch (Exception ex) { | ||
| routerMetrics.incrGetResourceProfilesFailedRetrieved(); | ||
| LOG.error("Unable to get resource profiles due to exception.", ex); | ||
| throw ex; | ||
| } | ||
| long stopTime = clock.getTime(); | ||
| routerMetrics.succeededGetResourceProfilesRetrieved(stopTime - startTime); | ||
| // Merge the GetAllResourceProfilesResponse | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment is not very useful
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will remove it. |
||
| return RouterYarnClientUtils.mergeClusterResourceProfilesResponse(resourceProfiles); | ||
| } | ||
|
|
||
| @Override | ||
| public GetResourceProfileResponse getResourceProfile( | ||
| GetResourceProfileRequest request) throws YarnException, IOException { | ||
| throw new NotImplementedException("Code is not implemented"); | ||
| if (request == null || request.getProfileName() == null) { | ||
| routerMetrics.incrGetResourceProfileFailedRetrieved(); | ||
| RouterServerUtil.logAndThrowException("Missing getResourceProfile request or profileName.", | ||
| null); | ||
| } | ||
| long startTime = clock.getTime(); | ||
| ClientMethod remoteMethod = new ClientMethod("getResourceProfile", | ||
| new Class[] {GetResourceProfileRequest.class}, new Object[] {request}); | ||
| Collection<GetResourceProfileResponse> resourceProfile; | ||
| try { | ||
| resourceProfile = invokeAppClientProtocolMethod(true, remoteMethod, | ||
| GetResourceProfileResponse.class); | ||
| } catch (Exception ex) { | ||
| routerMetrics.incrGetResourceProfileFailedRetrieved(); | ||
| LOG.error("Unable to get resource profile due to exception.", ex); | ||
| throw ex; | ||
| } | ||
| long stopTime = clock.getTime(); | ||
| routerMetrics.succeededGetResourceProfileRetrieved(stopTime - startTime); | ||
| // Merge the GetResourceProfileResponse | ||
| return RouterYarnClientUtils.mergeClusterResourceProfileResponse(resourceProfile); | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,6 +35,8 @@ | |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; | ||
| import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse; | ||
| import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; | ||
| import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse; | ||
| import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse; | ||
| import org.apache.hadoop.yarn.api.records.ApplicationId; | ||
| import org.apache.hadoop.yarn.api.records.ApplicationReport; | ||
| import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; | ||
|
|
@@ -46,6 +48,7 @@ | |
| import org.apache.hadoop.yarn.api.records.ReservationAllocationState; | ||
| import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; | ||
| import org.apache.hadoop.yarn.api.records.QueueInfo; | ||
| import org.apache.hadoop.yarn.api.records.Resource; | ||
| import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; | ||
| import org.apache.hadoop.yarn.util.Records; | ||
| import org.apache.hadoop.yarn.util.resource.Resources; | ||
|
|
@@ -417,5 +420,59 @@ public static GetQueueInfoResponse mergeQueues( | |
| queueResponse.setQueueInfo(queueInfo); | ||
| return queueResponse; | ||
| } | ||
|
|
||
| /** | ||
| * Merges a list of GetAllResourceProfilesResponse. | ||
| * | ||
| * @param responses a list of GetAllResourceProfilesResponse to merge. | ||
| * @return the merged GetAllResourceProfilesResponse. | ||
| */ | ||
| public static GetAllResourceProfilesResponse mergeClusterResourceProfilesResponse( | ||
| Collection<GetAllResourceProfilesResponse> responses) { | ||
| GetAllResourceProfilesResponse profilesResponse = | ||
| Records.newRecord(GetAllResourceProfilesResponse.class); | ||
| Map<String, Resource> profilesMap = new HashMap<>(); | ||
| for (GetAllResourceProfilesResponse response : responses) { | ||
| if (response != null && response.getResourceProfiles() != null) { | ||
| for (Map.Entry<String, Resource> entry : response.getResourceProfiles().entrySet()) { | ||
| String key = entry.getKey(); | ||
| Resource value = entry.getValue(); | ||
| if (profilesMap.containsKey(key)) { | ||
| Resource resourceValue = profilesMap.get(key); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think in Resource or ResourceUtils there was something to add to an existing resource.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for your suggestion, I will modify the code. |
||
| resourceValue.setVirtualCores( | ||
| resourceValue.getVirtualCores() + value.getVirtualCores()); | ||
| resourceValue.setMemorySize(resourceValue.getMemorySize() + value.getMemorySize()); | ||
| profilesMap.put(key, resourceValue); | ||
| } else { | ||
| profilesMap.put(key, value); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| profilesResponse.setResourceProfiles(profilesMap); | ||
| return profilesResponse; | ||
| } | ||
|
|
||
| /** | ||
| * Merges a list of GetResourceProfileResponse. | ||
| * | ||
| * @param responses a list of GetResourceProfileResponse to merge. | ||
| * @return the merged GetResourceProfileResponse. | ||
| */ | ||
| public static GetResourceProfileResponse mergeClusterResourceProfileResponse( | ||
| Collection<GetResourceProfileResponse> responses) { | ||
| GetResourceProfileResponse profileResponse = | ||
| Records.newRecord(GetResourceProfileResponse.class); | ||
| Resource resource = Resource.newInstance(0, 0); | ||
| for (GetResourceProfileResponse response : responses) { | ||
| if (response != null && response.getResource() != null) { | ||
| resource.setMemorySize(resource.getMemorySize() + response.getResource().getMemorySize()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ResourceUtils?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, i will fix it. |
||
| resource.setVirtualCores(resource.getVirtualCores() + | ||
| response.getResource().getVirtualCores()); | ||
| } | ||
| } | ||
| profileResponse.setResource(resource); | ||
| return profileResponse; | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't we move this and the setting inside of the try?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code show as below:
startTime - stopTime represents the execution time of the code fragment
I think the current way should be a little better, can this part stay as it is?