Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util

import scala.collection.JavaConverters._

import org.apache.commons.lang3.StringUtils
import picocli.CommandLine.{Command, ParameterException}

import org.apache.celeborn.cli.config.CliConfigManager
Expand Down Expand Up @@ -224,18 +225,34 @@ class MasterSubcommandImpl extends Runnable with MasterSubcommand {
private[master] def runShowContainerInfo: ContainerInfo = defaultApi.getContainerInfo

override private[master] def reviseLostShuffles: HandleResponse = {
if (StringUtils.isAnyBlank(commonOptions.apps, reviseLostShuffleOptions.shuffleIds)) {
throw new ParameterException(
spec.commandLine(),
"Application id and Shuffle ids must be provided for this command.")
}

val app = commonOptions.apps
if (app.contains(",")) {
throw new ParameterException(
spec.commandLine(),
"Only one application id can be provided for this command.")
}
val shuffleIds = reviseLostShuffleOptions.shuffleIds
applicationApi.reviseLostShuffles(app, shuffleIds)

val shuffleIds = util.Arrays.asList[Integer](
reviseLostShuffleOptions.shuffleIds.split(",").map(Integer.valueOf): _*)
val request =
new ReviseLostShufflesRequest().appId(app).shuffleIds(shuffleIds)
applicationApi.reviseLostShuffles(request)
}

override private[master] def deleteApps: HandleResponse = {
val apps = commonOptions.apps
applicationApi.deleteApps(apps)
if (StringUtils.isBlank(commonOptions.apps)) {
throw new ParameterException(
spec.commandLine(),
"Applications must be provided for this command.")
}
val appIds = util.Arrays.asList[String](commonOptions.apps.split(","): _*)
val request = new DeleteAppsRequest().apps(appIds)
applicationApi.deleteApps(request)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1467,14 +1467,6 @@ private[celeborn] class Master(
}
}

override def reviseLostShuffles(appId: String, shuffles: java.util.List[Integer]): Unit = {
statusSystem.reviseLostShuffles(appId, shuffles)
}

override def deleteApps(appIds: String): Unit = {
appIds.split(",").foreach(id => statusSystem.deleteApp(id))
}

override def getWorkerEventInfo(): String = {
val sb = new StringBuilder
sb.append("======================= Workers Event in Master ========================\n")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.celeborn.service.deploy.master.http.api.v1

import java.util
import javax.ws.rs.{Consumes, GET, Path, Produces, QueryParam}
import javax.ws.rs.{Consumes, DELETE, GET, Path, POST, Produces}
import javax.ws.rs.core.MediaType

import scala.collection.JavaConverters._
Expand All @@ -28,7 +27,7 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse
import io.swagger.v3.oas.annotations.tags.Tag

import org.apache.celeborn.common.util.Utils
import org.apache.celeborn.rest.v1.model.{AppDiskUsageData, AppDiskUsageSnapshotData, AppDiskUsageSnapshotsResponse, ApplicationHeartbeatData, ApplicationsHeartbeatResponse, HandleResponse, HostnamesResponse}
import org.apache.celeborn.rest.v1.model.{AppDiskUsageData, AppDiskUsageSnapshotData, AppDiskUsageSnapshotsResponse, ApplicationHeartbeatData, ApplicationsHeartbeatResponse, DeleteAppsRequest, HandleResponse, HostnamesResponse, ReviseLostShufflesRequest}
import org.apache.celeborn.server.common.http.api.ApiRequestContext
import org.apache.celeborn.service.deploy.master.Master

Expand All @@ -55,6 +54,19 @@ class ApplicationResource extends ApiRequestContext {
}.toSeq.asJava)
}

@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.APPLICATION_JSON,
schema = new Schema(implementation = classOf[HandleResponse]))),
description = "Delete resource of apps.")
@DELETE
def deleteApps(request: DeleteAppsRequest): HandleResponse = {
val apps = request.getApps.asScala
apps.foreach(app => statusSystem.deleteApp(app))
new HandleResponse().success(true).message(s"deleted shuffles of app ${apps}")
}

@ApiResponse(
responseCode = "200",
content = Array(new Content(
Expand Down Expand Up @@ -96,41 +108,18 @@ class ApplicationResource extends ApiRequestContext {
new HostnamesResponse().hostnames(statusSystem.hostnameSet.asScala.toSeq.asJava)
}

@Path("/reviseLostShuffles")
@Path("/revise_lost_shuffles")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.APPLICATION_JSON,
schema = new Schema(implementation = classOf[HandleResponse]))),
description =
"Revise lost shuffles")
@GET
def reviseLostShuffles(
@QueryParam("app") appId: String,
@QueryParam("shuffleIds") shufflesIds: String): HandleResponse = {
val shuffles = new util.ArrayList[Integer]()
shufflesIds.split(",").foreach { p =>
shuffles.add(Integer.parseInt(p))
}
if (!shuffles.isEmpty) {
httpService.reviseLostShuffles(appId, shuffles)
}
new HandleResponse().success(true).message("revise lost shuffle done")
description = "Revise lost shuffles or deleted shuffles of an application.")
@POST
def reviseLostShuffles(request: ReviseLostShufflesRequest): HandleResponse = {
val appId = request.getAppId
val shuffleIds = request.getShuffleIds
statusSystem.reviseLostShuffles(appId, shuffleIds)
new HandleResponse().success(true).message(s"revised app:$appId lost shuffles:$shuffleIds")
}

@Path("/deleteApps")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.APPLICATION_JSON,
schema = new Schema(implementation = classOf[HandleResponse]))),
description =
"Delete resource of an app")
@GET
def deleteApp(
@QueryParam("apps") apps: String): HandleResponse = {
httpService.deleteApps(apps)
new HandleResponse().success(true).message(s"delete shuffles of app ${apps}")
}

}
2 changes: 2 additions & 0 deletions openapi/openapi-client/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Celeborn OpenAPI Client

**Note:** It is recommended to use `under_score` style naming for new RESTful APIs to maintain consistency.

To update the OpenAPI specification
- just update the specification under `openapi/openapi-client/src/main/openapi3/` and keep the schema definitions consistent between master and worker.
- Install JDK 11 or above by whatever mechanism is appropriate for your system, and set that version to be the default Java version (e.g., by setting env variable `JAVA_HOME`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@

import org.apache.celeborn.rest.v1.model.AppDiskUsageSnapshotsResponse;
import org.apache.celeborn.rest.v1.model.ApplicationsHeartbeatResponse;
import org.apache.celeborn.rest.v1.model.DeleteAppsRequest;
import org.apache.celeborn.rest.v1.model.HandleResponse;
import org.apache.celeborn.rest.v1.model.HostnamesResponse;
import org.apache.celeborn.rest.v1.model.ReviseLostShufflesRequest;


import java.util.ArrayList;
Expand All @@ -51,29 +53,29 @@ public ApplicationApi(ApiClient apiClient) {

/**
*
* Delete resource of apps
* @param apps (optional)
* Delete resource of apps.
* @param deleteAppsRequest (optional)
* @return HandleResponse
* @throws ApiException if fails to make API call
*/
public HandleResponse deleteApps(String apps) throws ApiException {
return this.deleteApps(apps, Collections.emptyMap());
public HandleResponse deleteApps(DeleteAppsRequest deleteAppsRequest) throws ApiException {
return this.deleteApps(deleteAppsRequest, Collections.emptyMap());
}


/**
*
* Delete resource of apps
* @param apps (optional)
* Delete resource of apps.
* @param deleteAppsRequest (optional)
* @param additionalHeaders additionalHeaders for this call
* @return HandleResponse
* @throws ApiException if fails to make API call
*/
public HandleResponse deleteApps(String apps, Map<String, String> additionalHeaders) throws ApiException {
Object localVarPostBody = null;
public HandleResponse deleteApps(DeleteAppsRequest deleteAppsRequest, Map<String, String> additionalHeaders) throws ApiException {
Object localVarPostBody = deleteAppsRequest;

// create path and map variables
String localVarPath = "/api/v1/applications/deleteApps";
String localVarPath = "/api/v1/applications";

StringJoiner localVarQueryStringJoiner = new StringJoiner("&");
String localVarQueryParameterBaseName;
Expand All @@ -83,7 +85,6 @@ public HandleResponse deleteApps(String apps, Map<String, String> additionalHead
Map<String, String> localVarCookieParams = new HashMap<String, String>();
Map<String, Object> localVarFormParams = new HashMap<String, Object>();

localVarQueryParams.addAll(apiClient.parameterToPair("apps", apps));

localVarHeaderParams.putAll(additionalHeaders);

Expand All @@ -95,7 +96,7 @@ public HandleResponse deleteApps(String apps, Map<String, String> additionalHead
final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts);

final String[] localVarContentTypes = {

"application/json"
};
final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes);

Expand All @@ -104,7 +105,7 @@ public HandleResponse deleteApps(String apps, Map<String, String> additionalHead
TypeReference<HandleResponse> localVarReturnType = new TypeReference<HandleResponse>() {};
return apiClient.invokeAPI(
localVarPath,
"GET",
"DELETE",
localVarQueryParams,
localVarCollectionQueryParams,
localVarQueryStringJoiner.toString(),
Expand Down Expand Up @@ -322,31 +323,29 @@ public AppDiskUsageSnapshotsResponse getApplicationsDiskUsageSnapshots(Map<Strin

/**
*
* Revise lost shuffles or delete shuffles of an application.
* @param app (optional)
* @param shuffleIds (optional)
* Revise lost shuffles or deleted shuffles of an application.
* @param reviseLostShufflesRequest (optional)
* @return HandleResponse
* @throws ApiException if fails to make API call
*/
public HandleResponse reviseLostShuffles(String app, String shuffleIds) throws ApiException {
return this.reviseLostShuffles(app, shuffleIds, Collections.emptyMap());
public HandleResponse reviseLostShuffles(ReviseLostShufflesRequest reviseLostShufflesRequest) throws ApiException {
return this.reviseLostShuffles(reviseLostShufflesRequest, Collections.emptyMap());
}


/**
*
* Revise lost shuffles or delete shuffles of an application.
* @param app (optional)
* @param shuffleIds (optional)
* Revise lost shuffles or deleted shuffles of an application.
* @param reviseLostShufflesRequest (optional)
* @param additionalHeaders additionalHeaders for this call
* @return HandleResponse
* @throws ApiException if fails to make API call
*/
public HandleResponse reviseLostShuffles(String app, String shuffleIds, Map<String, String> additionalHeaders) throws ApiException {
Object localVarPostBody = null;
public HandleResponse reviseLostShuffles(ReviseLostShufflesRequest reviseLostShufflesRequest, Map<String, String> additionalHeaders) throws ApiException {
Object localVarPostBody = reviseLostShufflesRequest;

// create path and map variables
String localVarPath = "/api/v1/applications/reviseLostShuffles";
String localVarPath = "/api/v1/applications/revise_lost_shuffles";

StringJoiner localVarQueryStringJoiner = new StringJoiner("&");
String localVarQueryParameterBaseName;
Expand All @@ -356,8 +355,6 @@ public HandleResponse reviseLostShuffles(String app, String shuffleIds, Map<Stri
Map<String, String> localVarCookieParams = new HashMap<String, String>();
Map<String, Object> localVarFormParams = new HashMap<String, Object>();

localVarQueryParams.addAll(apiClient.parameterToPair("app", app));
localVarQueryParams.addAll(apiClient.parameterToPair("shuffleIds", shuffleIds));

localVarHeaderParams.putAll(additionalHeaders);

Expand All @@ -369,7 +366,7 @@ public HandleResponse reviseLostShuffles(String app, String shuffleIds, Map<Stri
final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts);

final String[] localVarContentTypes = {

"application/json"
};
final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes);

Expand All @@ -378,7 +375,7 @@ public HandleResponse reviseLostShuffles(String app, String shuffleIds, Map<Stri
TypeReference<HandleResponse> localVarReturnType = new TypeReference<HandleResponse>() {};
return apiClient.invokeAPI(
localVarPath,
"GET",
"POST",
localVarQueryParams,
localVarCollectionQueryParams,
localVarQueryStringJoiner.toString(),
Expand Down Expand Up @@ -411,7 +408,7 @@ public <T> T invokeAPI(String url, String method, Object request, TypeReference<
final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts);

final String[] localVarContentTypes = {

"application/json"
};
final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes);

Expand Down
Loading
Loading