Skip to content

Commit

Permalink
Merge branch 'develop-2.0.0-beta' of https://github.com/FederatedAI/F…
Browse files Browse the repository at this point in the history
…ATE-Board into develop-2.0.0-beta
  • Loading branch information
idwenwen committed Aug 30, 2023
2 parents 26ed04e + 8b8c925 commit af95528
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,16 @@ public ResponseResult getMetaInfo(@Valid @RequestBody ComponentQueryDTO componen
}
Preconditions.checkArgument(LogFileService.checkPathParameters(componentQueryDTO.getJob_id(), componentQueryDTO.getRole(), componentQueryDTO.getParty_id(), componentQueryDTO.getComponent_name()));


Map<String,Object> reqMap = new HashMap<>();
reqMap.put(Dict.JOBID,componentQueryDTO.getJob_id());
reqMap.put(Dict.ROLE,componentQueryDTO.getRole());
reqMap.put(Dict.PARTY_ID,componentQueryDTO.getParty_id());
reqMap.put(Dict.TASK_NAME,componentQueryDTO.getComponent_name());
String result;
try {

//generateURLParamJobQueryDTO
result = flowFeign.post(Dict.URL_COPONENT_METRIC, JSON.toJSONString(componentQueryDTO));
result = flowFeign.get(Dict.URL_COPONENT_METRIC, reqMap);
} catch (Exception e) {
logger.error("connect fateflow error:", e);
return new ResponseResult<>(ErrorCode.FATEFLOW_ERROR_CONNECTION);
Expand All @@ -109,7 +113,7 @@ public ResponseResult getMetricInfo(@Valid @RequestBody MetricDTO metricDTO, Bin

reqMap.put(Dict.ROLE, metricDTO.getRole());
reqMap.put(Dict.PARTY_ID, metricDTO.getParty_id());
reqMap.put(Dict.COMPONENT_NAME, metricDTO.getComponent_name());
reqMap.put(Dict.TASK_NAME, metricDTO.getComponent_name());
reqMap.put(Dict.METRIC_NAME, metricDTO.getMetric_name());
reqMap.put(Dict.METRIC_NAMESPACE, metricDTO.getMetric_namespace());
String result;
Expand All @@ -134,17 +138,18 @@ public ResponseResult getDetailInfo(@Valid @RequestBody ComponentQueryDTO compon
}
Preconditions.checkArgument(LogFileService.checkPathParameters(componentQueryDTO.getJob_id(), componentQueryDTO.getRole(), componentQueryDTO.getParty_id(), componentQueryDTO.getComponent_name()));

String result;
try {
result = flowFeign.post(Dict.URL_COPONENT_PARAMETERS, JSON.toJSONString(componentQueryDTO));
} catch (Exception e) {
logger.error("connect fateflow error:", e);
return new ResponseResult<>(ErrorCode.FATEFLOW_ERROR_CONNECTION);
}
if (StringUtils.isEmpty(result)) {
return new ResponseResult<>(ErrorCode.FATEFLOW_ERROR_NULL_RESULT);

}
//todo
String result = null;
// try {
// result = flowFeign.post(Dict.URL_COPONENT_PARAMETERS, JSON.toJSONString(componentQueryDTO));
// } catch (Exception e) {
// logger.error("connect fateflow error:", e);
// return new ResponseResult<>(ErrorCode.FATEFLOW_ERROR_CONNECTION);
// }
// if (StringUtils.isEmpty(result)) {
// return new ResponseResult<>(ErrorCode.FATEFLOW_ERROR_NULL_RESULT);

// }
JSONObject resultObject = JSON.parseObject(result);
Integer retcode = resultObject.getInteger(Dict.CODE);
String msg = resultObject.getString(Dict.REMOTE_RETURN_MSG);
Expand Down Expand Up @@ -262,40 +267,40 @@ public ResponseResult getDagDependenciesNew(String param) {
Map<String, String> componentModule = new HashMap<>();
Map<String, Object> dependencies = new HashMap<>();
for (String taskName : taskNames) {
JSONObject taskInfo = (JSONObject)tasks.get(taskName);
JSONObject taskInfo = (JSONObject) tasks.get(taskName);

// componentList handle
Map<String, Object> taskDetail = taskManagerService.findTaskDetail(jobId, role, partyId, taskName);
taskDetail.put("component_name",taskName);
taskDetail.put("component_name", taskName);
componentList.add(taskDetail);

// componentModule handle
String component_ref = taskInfo.getString("component_ref");
componentModule.put(taskName,component_ref);
componentModule.put(taskName, component_ref);

// dependencies handle
JSONArray dependentTasks = taskInfo.getJSONArray("dependent_tasks");
if (dependentTasks == null || dependentTasks.size() == 0) {
dependencies.put(taskName,null);
}else {
dependencies.put(taskName, null);
} else {
Object o = dependentTasks.get(0);
Map<String,Object> dependencyInfoMap = new HashMap<>();
dependencyInfoMap.put("component_name",o);
dependencyInfoMap.put("up_output_info",null);
dependencyInfoMap.put("type",null);
Map<String, Object> dependencyInfoMap = new HashMap<>();
dependencyInfoMap.put("component_name", o);
dependencyInfoMap.put("up_output_info", null);
dependencyInfoMap.put("type", null);
JSONArray array = new JSONArray();
array.add(dependencyInfoMap);
dependencies.put(taskName,array);
dependencies.put(taskName, array);
}
}

// componentNeedRun handle
JSONObject needRunInfo = getNeedRunInfo(paramMap);

data.put("component_list",componentList);
data.put("component_need_run",needRunInfo);
data.put("component_module",componentModule);
data.put("dependencies",dependencies);
data.put("component_list", componentList);
data.put("component_need_run", needRunInfo);
data.put("component_module", componentModule);
data.put("dependencies", dependencies);
return new ResponseResult<>(ErrorCode.SUCCESS, data);
}
} else {
Expand Down Expand Up @@ -372,8 +377,8 @@ public ResponseResult getDagDependencies(String param) {
String taskStatus = null;
Long createTime = null;
if (taskDetail != null) {
taskStatus = String.valueOf(taskDetail.get("status"));
createTime = Long.valueOf(taskDetail.get("time").toString());
taskStatus = taskDetail.get("status") == null ? null : String.valueOf(taskDetail.get("status"));
createTime = taskDetail.get("time") == null ? null : Long.valueOf(taskDetail.get("time").toString());
}

component.put(Dict.STATUS, taskStatus);
Expand Down Expand Up @@ -476,7 +481,7 @@ private Map<String, Object> parseQueryParam(ComponentQueryDTO componentQueryDTO)
reqMap.put(Dict.JOBID, componentQueryDTO.getJob_id());
reqMap.put(Dict.ROLE, componentQueryDTO.getRole());
reqMap.put(Dict.PARTY_ID, componentQueryDTO.getParty_id());
reqMap.put(Dict.COMPONENT_NAME, componentQueryDTO.getComponent_name());
reqMap.put(Dict.TASK_NAME, componentQueryDTO.getComponent_name());
return reqMap;
}
}
22 changes: 13 additions & 9 deletions src/main/java/org/fedai/fate/board/global/Dict.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@ public class Dict {
static public final String ID = "id";
static public final String NAME = "name";
static public final String JOBID = "job_id";
static public final String LOG_TYPE = "log_type";
static public final String CODE = "code";
static public final String DATA = "data";
static public final String JOB = "job";
static public final String DATASET = "dataset";
static public final String COMPONENT_NAME = "component_name";
static public final String TASK_NAME = "task_name";
static public final String INSTANCE_ID = "instance_id";
static public final String BEGIN = "begin";
static public final String END = "end";
static public final String ROLE = "role";
static public final String PARTY_ID = "party_id";
static public final String RETMSG = "message";
Expand Down Expand Up @@ -65,9 +69,9 @@ public class Dict {
static public final String JOB_STATUS = "status";
static public final String REMOTE_RETURN_MSG = "retmsg";

static public final String URL_COPONENT_METRIC_DATA = "/v2/output/metric/query";
static public final String URL_COPONENT_METRIC = "/v1/tracking/component/metrics";
static public final String URL_COPONENT_PARAMETERS = "/v1/tracking/component/parameters";
static public final String URL_COPONENT_METRIC_DATA = "/v2/output/metric/key/query";
static public final String URL_COPONENT_METRIC = "/v2/output/metric/query";
static public final String URL_COPONENT_PARAMETERS = "/v2/output/metric/key/query";
static public final String URL_OUTPUT_MODEL = "/v2/output/model/query";
static public final String URL_OUTPUT_DATA = "/v2/output/data/display";

Expand Down Expand Up @@ -119,12 +123,12 @@ public class Dict {
//前端传来的logType和flow的logType对应表
static public final HashMap<String, String> logTypeMap = new HashMap<>();
static {
logTypeMap.put("jobSchedule", "jobSchedule");
logTypeMap.put("jobError", "jobScheduleError");
logTypeMap.put("partyError", "partyError");
logTypeMap.put("partyWarning", "partyWarning");
logTypeMap.put("partyInfo", "partyInfo");
logTypeMap.put("partyDebug", "partyDebug");
logTypeMap.put("jobSchedule", "schedule_info");
logTypeMap.put("jobError", "schedule_error");
logTypeMap.put("partyError", "task_error");
logTypeMap.put("partyWarning", "task_warning");
logTypeMap.put("partyInfo", "task_info");
logTypeMap.put("partyDebug", "task_debug");
logTypeMap.put("componentInfo", "componentInfo");
}

Expand Down
6 changes: 4 additions & 2 deletions src/main/java/org/fedai/fate/board/services/FlowLogFeign.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@
import org.fedai.fate.board.intercept.FeignRequestInterceptor;
import org.fedai.fate.board.pojo.flow.*;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.cloud.openfeign.SpringQueryMap;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;

import java.util.List;
import java.util.Map;

@FeignClient(url = RouteTargeter.URL_PLACE_HOLDER + "/v2/log", name = "flowLogFeign", configuration = FeignRequestInterceptor.class)
public interface FlowLogFeign {

@RequestMapping(value = "/query", method = RequestMethod.GET)
FlowResponse<List<FlowLogCatResp>> logCat(FlowLogCatReq request);
String logCat( @SpringQueryMap(encoded = true) Map<String, Object> paramMap);

@RequestMapping(value = "/count", method = RequestMethod.GET)
FlowResponse<FlowLogSizeResp> logSize(FlowLogSizeReq request);
String logSize( @SpringQueryMap(encoded = true) Map<String, Object> paramMap);

}
4 changes: 2 additions & 2 deletions src/main/java/org/fedai/fate/board/utils/ResponseUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public static ResponseResult buildResponse(String result, String dataName) {
}
JSONObject resultObject = JSON.parseObject(result);

Integer retcode = resultObject.getInteger(Dict.CODE);
String msg = resultObject.getString(Dict.REMOTE_RETURN_MSG);
Integer retcode = resultObject.getInteger(Dict.CODE) == null ? 200 : resultObject.getInteger(Dict.CODE);
String msg = resultObject.getString(Dict.RETMSG) == null ? "ok" : resultObject.getString(Dict.RETMSG);

if (dataName != null) {
JSONArray jsonArray = resultObject.getJSONArray(Dict.DATA);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package org.fedai.fate.board.websocket;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Preconditions;
import org.fedai.fate.board.conf.Configurator;
import org.fedai.fate.board.global.Dict;
Expand All @@ -40,7 +42,9 @@
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@ServerEndpoint(value = "/log/new/{jobId}/{role}/{partyId}/{componentId}", configurator = Configurator.class)
Expand Down Expand Up @@ -125,36 +129,43 @@ private void logSize(Session session, String jobId, String role, String partyId,

LogSizeResponse logSizeResponse = new LogSizeResponse();
for (LogTypeEnum logTypeEnum : logTypes) {
FlowLogSizeReq logSizeReq = new FlowLogSizeReq();
logSizeReq.setJob_id(jobId);
logSizeReq.setLog_type(logTypeEnum.getFlowValue());
logSizeReq.setRole(role);
logSizeReq.setParty_id(partyId);
// logSizeReq.setComponent_name(componentId);
logSizeReq.setTask_name(componentId);
logSizeReq.setInstance_id(logQuery.getInstanceId());
FlowResponse<FlowLogSizeResp> resp = flowLogFeign.logSize(logSizeReq);
Map<String, Object> reqMap = new HashMap<>();
reqMap.put(Dict.JOBID, jobId);
reqMap.put(Dict.LOG_TYPE, logTypeEnum.getFlowValue());
reqMap.put(Dict.ROLE, role);
reqMap.put(Dict.PARTY_ID, partyId);
reqMap.put(Dict.TASK_NAME, componentId);
if (logQuery != null && StringUtils.isNotBlank(logQuery.getInstanceId())) {
reqMap.put(Dict.INSTANCE_ID,logQuery.getInstanceId());
}
String resp = flowLogFeign.logSize(reqMap);

JSONObject jsonData = JSON.parseObject(resp);
Integer logCount = 0;
if (jsonData != null) {
logCount = (Integer) jsonData.get(Dict.DATA);
}
switch (logTypeEnum) {
case JOB_SCHEDULE:
logSizeResponse.setJobSchedule(resp.getData().getSize());
logSizeResponse.setJobSchedule(logCount);
break;
case JOB_ERROR:
logSizeResponse.setJobError(resp.getData().getSize());
logSizeResponse.setJobError(logCount);
break;
case PARTY_ERROR:
logSizeResponse.setPartyError(resp.getData().getSize());
logSizeResponse.setPartyError(logCount);
break;
case PARTY_WARNING:
logSizeResponse.setPartyWarning(resp.getData().getSize());
logSizeResponse.setPartyWarning(logCount);
break;
case PARTY_INFO:
logSizeResponse.setPartyInfo(resp.getData().getSize());
logSizeResponse.setPartyInfo(logCount);
break;
case PARTY_DEBUG:
logSizeResponse.setPartyDebug(resp.getData().getSize());
logSizeResponse.setPartyDebug(logCount);
break;
case COMPONENT_INFO:
logSizeResponse.setComponentInfo(resp.getData().getSize());
logSizeResponse.setComponentInfo(logCount);
break;
}
}
Expand All @@ -176,23 +187,35 @@ private void logCat(Session session, String jobId, String role, String partyId,

Preconditions.checkArgument(LogFileService.checkPathParameters(jobId, role, partyId, componentId, logQuery.getType()));

FlowLogCatReq flowLogCatReq = new FlowLogCatReq();
flowLogCatReq.setJob_id(jobId);
flowLogCatReq.setLog_type(Dict.logTypeMap.get(logQuery.getType()));
flowLogCatReq.setRole(role);
flowLogCatReq.setParty_id(Integer.valueOf(partyId));
flowLogCatReq.setTask_name(componentId);
flowLogCatReq.setInstance_id(logQuery.getInstanceId());
flowLogCatReq.setBegin(logQuery.getBegin());
flowLogCatReq.setEnd(logQuery.getEnd());
Map<String, Object> reqMap = new HashMap<>();
reqMap.put(Dict.JOBID, jobId);
reqMap.put(Dict.LOG_TYPE, Dict.logTypeMap.get(logQuery.getType()));
reqMap.put(Dict.ROLE, role);
reqMap.put(Dict.PARTY_ID, Integer.valueOf(partyId));
reqMap.put(Dict.TASK_NAME, componentId);
reqMap.put(Dict.BEGIN, logQuery.getBegin());
reqMap.put(Dict.END, logQuery.getEnd());
if (logQuery != null && StringUtils.isNotBlank(logQuery.getInstanceId())) {
reqMap.put(Dict.INSTANCE_ID,logQuery.getInstanceId());
}
String resultFlow = flowLogFeign.logCat(reqMap);
JSONObject object = JSONObject.parseObject(resultFlow);
JSONArray jsonArray = object.getJSONArray(Dict.DATA);

FlowResponse<List<FlowLogCatResp>> resultFlow = flowLogFeign.logCat(flowLogCatReq);
List<LogContentResponse.LogContent> contentData = new ArrayList<>();

for (int i = 0; i < jsonArray.size(); i++) {
JSONObject logData = (JSONObject) jsonArray.get(i);
String content = logData.get(Dict.LOG_CONTENT) == null ? "" : (String) logData.get(Dict.LOG_CONTENT);
String lineNum = logData.get(Dict.LOG_CONTENT) == null ? "" : (String) logData.get(Dict.LOG_CONTENT);
LogContentResponse.LogContent logContent = new LogContentResponse.LogContent();
logContent.setContent(content);
logContent.setLineNum(lineNum);
contentData.add(logContent);
}
LogContentResponse logContentResponse = new LogContentResponse();
logContentResponse.setType(logQuery.getType());
logContentResponse.setData(resultFlow.getData().stream()
.map(LogContentResponse.LogContent::fromFlowContent)
.collect(Collectors.toList()));
logContentResponse.setData(contentData);
try {
session.getBasicRemote().sendText(JSON.toJSONString(logContentResponse));
} catch (IOException e) {
Expand Down Expand Up @@ -221,12 +244,13 @@ public void onError(Session session, Throwable error) {
}

public enum LogTypeEnum {
JOB_SCHEDULE("jobSchedule", "jobSchedule"),
JOB_ERROR("jobError", "jobScheduleError"),
PARTY_ERROR("partyError", "partyError"),
PARTY_WARNING("partyWarning", "partyWarning"),
PARTY_INFO("partyInfo", "partyInfo"),
PARTY_DEBUG("partyDebug", "partyDebug"),
//"schedule_info", "schedule_error","task_error", "task_info", "task_warning", "task_debug"
JOB_SCHEDULE("jobSchedule", "schedule_info"),
JOB_ERROR("jobError", "schedule_error"),
PARTY_ERROR("partyError", "task_error"),
PARTY_WARNING("partyWarning", "task_warning"),
PARTY_INFO("partyInfo", "task_info"),
PARTY_DEBUG("partyDebug", "task_debug"),
COMPONENT_INFO("componentInfo", "componentInfo"),
LOG_SIZE("logSize", null),
;
Expand Down

0 comments on commit af95528

Please sign in to comment.