Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…oard into develop-2.0.0
  • Loading branch information
idwenwen committed Dec 14, 2023
2 parents 797f921 + 65195e2 commit 6a674a5
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 138 deletions.
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.10</version>
<version>2.7.12</version>
<relativePath/>
</parent>

Expand Down Expand Up @@ -40,7 +40,7 @@
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>5.3.26</version>
<version>5.3.27</version>
<scope>compile</scope>
</dependency>

Expand Down Expand Up @@ -93,7 +93,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
<version>32.0.1-ANDROID</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -170,7 +170,7 @@
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.5</version>
<version>3.7.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
Expand Down
161 changes: 54 additions & 107 deletions src/main/java/org/fedai/fate/board/controller/JobDetailController.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ public ResponseResult getMetaInfo(@Valid @RequestBody ComponentQueryDTO componen
}
Preconditions.checkArgument(LogFileService.checkPathParameters(componentQueryDTO.getJob_id(), componentQueryDTO.getRole(), componentQueryDTO.getParty_id(), componentQueryDTO.getComponent_name()));

Map<String,Object> reqMap = new HashMap<>();
reqMap.put(Dict.JOBID,componentQueryDTO.getJob_id());
reqMap.put(Dict.ROLE,componentQueryDTO.getRole());
reqMap.put(Dict.PARTY_ID,componentQueryDTO.getParty_id());
reqMap.put(Dict.TASK_NAME,componentQueryDTO.getComponent_name());
Map<String, Object> reqMap = new HashMap<>();
reqMap.put(Dict.JOBID, componentQueryDTO.getJob_id());
reqMap.put(Dict.ROLE, componentQueryDTO.getRole());
reqMap.put(Dict.PARTY_ID, componentQueryDTO.getParty_id());
reqMap.put(Dict.TASK_NAME, componentQueryDTO.getComponent_name());
String result;
try {

Expand Down Expand Up @@ -152,12 +152,12 @@ public ResponseResult getDetailInfo(@Valid @RequestBody ComponentQueryDTO compon

String taskId = componentQueryDTO.getJob_id() + "_" + componentQueryDTO.getComponent_name();
Map<String, Object> paramMap = new HashMap<>();
paramMap.put(Dict.JOBID, componentQueryDTO.getJob_id() );
paramMap.put(Dict.JOBID, componentQueryDTO.getJob_id());
paramMap.put(Dict.TASK_ID, taskId);

String result = null;
try {
result = flowFeign.get(Dict.URL_TASK_DATAVIEW,paramMap);
result = flowFeign.get(Dict.URL_TASK_DATAVIEW, paramMap);
} catch (Exception e) {
logger.error("connect fateflow error:", e);
return new ResponseResult<>(ErrorCode.FATEFLOW_ERROR_CONNECTION);
Expand All @@ -169,32 +169,51 @@ public ResponseResult getDetailInfo(@Valid @RequestBody ComponentQueryDTO compon
Integer retcode = resultObject.getInteger(Dict.CODE);
String msg = resultObject.getString(Dict.RETMSG);

JSONObject detail = (JSONObject)resultObject.getJSONArray(Dict.DATA).get(0);
JSONObject detail = (JSONObject) resultObject.getJSONArray(Dict.DATA).get(0);
JSONObject newData = detail.getJSONObject(Dict.COMPONENT_PARAMETERS);

if (newData != null) {
JSONObject conf = newData.getJSONObject("conf");
if (conf != null) {
JSONObject federation = conf.getJSONObject("federation");
JSONObject federation = conf.getJSONObject("federation");
if (federation != null) {
JSONObject metadata = federation.getJSONObject("metadata");
if (metadata != null) {
JSONObject rollSiteConfig = metadata.getJSONObject("rollsite_config");
if (rollSiteConfig != null) {
String host = rollSiteConfig.getString("host");
if (null != host) {
newData.getJSONObject("conf").getJSONObject("federation").getJSONObject("metadata").getJSONObject("rollsite_config").put("host","xxx.xxx.xxx.xxx");
newData.getJSONObject("conf").getJSONObject("federation").getJSONObject("metadata").getJSONObject("rollsite_config").put("host", "xxx.xxx.xxx.xxx");
}
}
JSONObject osxConfig = metadata.getJSONObject("osx_config");
if (osxConfig != null) {
String host = osxConfig.getString("host");
if (null != host) {
newData.getJSONObject("conf").getJSONObject("federation").getJSONObject("metadata").getJSONObject("osx_config").put("host", "xxx.xxx.xxx.xxx");
}
}
}
}
JSONObject computing = conf.getJSONObject("computing");
if (computing != null) {
JSONObject metadata = computing.getJSONObject("metadata");
if (metadata != null) {
String host = metadata.getString("host");
if (null != host) {
newData.getJSONObject("conf").getJSONObject("computing").getJSONObject("metadata").put("host", "xxx.xxx.xxx.xxx");
}
}
}
}
JSONObject mlmd = newData.getJSONObject("mlmd");
if (null != mlmd) {
JSONObject metadata = mlmd.getJSONObject("metadata");
if (null != metadata) {
String host = metadata.getString("host");
newData.getJSONObject("mlmd").getJSONObject("metadata").put("host","xxx.xxx.xxx.xxx");
if (null != host) {
newData.getJSONObject("mlmd").getJSONObject("metadata").put("host", "xxx.xxx.xxx.xxx");
}
}
}
}
Expand Down Expand Up @@ -269,91 +288,6 @@ public ResponseResult getDagDependencies(@Valid @RequestBody JobQueryDTO jobQuer
}
}

public ResponseResult getDagDependenciesNew(String param) {
JSONObject jsonObject = JSON.parseObject(param);
String jobId = jsonObject.getString(Dict.JOBID);
String role = jsonObject.getString(Dict.ROLE);
String partyId = jsonObject.getString(Dict.PARTY_ID);

Map<String, Object> paramMap = new HashMap<>();
paramMap.put(Dict.JOBID, jobId);
paramMap.put(Dict.ROLE, role);
paramMap.put(Dict.PARTY_ID, partyId);

String result;
try {
result = flowFeign.get(Dict.URL_JOB_DATAVIEW, paramMap);
} catch (Exception e) {
logger.error("connect fateflow error:", e);
return new ResponseResult<>(ErrorCode.FATEFLOW_ERROR_CONNECTION);
}

if ((result == null) || 0 == result.trim().length()) {
return new ResponseResult<>(ErrorCode.FATEFLOW_ERROR_NULL_RESULT);
}

JSONObject resultObject = JSON.parseObject(result);
Integer retCode = resultObject.getInteger(Dict.CODE);
if (retCode == null) {
return new ResponseResult<>(ErrorCode.FATEFLOW_ERROR_WRONG_RESULT);
}

if (retCode == 0) {
JSONArray jsonArray = resultObject.getJSONArray(Dict.DATA);
if (jsonArray != null && jsonArray.size() > 0) {
JSONObject data = new JSONObject();

JSONObject detailData = (JSONObject) jsonArray.get(0);
JSONObject dagData = (JSONObject) detailData.get("dag");
JSONObject dagDetail = (JSONObject) dagData.get("dag");
JSONObject tasks = (JSONObject) dagDetail.get("tasks");
Set<String> taskNames = tasks.keySet();
ArrayList<Map<String, Object>> componentList = new ArrayList<>();
Map<String, String> componentModule = new HashMap<>();
Map<String, Object> dependencies = new HashMap<>();
for (String taskName : taskNames) {
JSONObject taskInfo = (JSONObject) tasks.get(taskName);

// componentList handle
Map<String, Object> taskDetail = taskManagerService.findTaskDetail(jobId, role, partyId, taskName);
taskDetail.put("component_name", taskName);
componentList.add(taskDetail);

// componentModule handle
String component_ref = taskInfo.getString("component_ref");
componentModule.put(taskName, component_ref);

// dependencies handle
JSONArray dependentTasks = taskInfo.getJSONArray("dependent_tasks");
if (dependentTasks == null || dependentTasks.size() == 0) {
dependencies.put(taskName, null);
} else {
Object o = dependentTasks.get(0);
Map<String, Object> dependencyInfoMap = new HashMap<>();
dependencyInfoMap.put("component_name", o);
dependencyInfoMap.put("up_output_info", null);
dependencyInfoMap.put("type", null);
JSONArray array = new JSONArray();
array.add(dependencyInfoMap);
dependencies.put(taskName, array);
}
}

// componentNeedRun handle
JSONObject needRunInfo = getNeedRunInfo(paramMap);

data.put("component_list", componentList);
data.put("component_need_run", needRunInfo);
data.put("component_module", componentModule);
data.put("dependencies", dependencies);
return new ResponseResult<>(ErrorCode.SUCCESS, data);
}
} else {
return new ResponseResult<>(retCode, resultObject.getString(Dict.RETMSG));
}
return new ResponseResult<>(retCode, resultObject.getString(Dict.RETMSG));
}

public JSONObject getNeedRunInfo(Map paramMap) {
String result;
try {
Expand All @@ -377,6 +311,7 @@ public JSONObject getNeedRunInfo(Map paramMap) {
return null;
}


public ResponseResult getDagDependencies(String param) {
//check and get parameters
JSONObject jsonObject = JSON.parseObject(param);
Expand Down Expand Up @@ -409,13 +344,15 @@ public ResponseResult getDagDependencies(String param) {

if (retCode == 0) {
JSONObject data = resultObject.getJSONObject(Dict.DATA);
JSONObject component_need_run = data.getJSONObject(Dict.COMPONENT_NEED_RUN);
JSONArray jsonArray = data.getJSONArray(Dict.COMPONENT_LIST);
ArrayList<Map<String, Object>> componentList = new ArrayList<>();
Set<String> keys = component_need_run.keySet();
for (Object o : keys) {


for (int i = 0; i < jsonArray.size(); i++) {
String componentName = jsonArray.getString(i);
HashMap<String, Object> component = new HashMap<>();
component.put(Dict.COMPONENT_NAME, o);
Map<String, Object> taskDetail = taskManagerService.findTaskDetail(jobId, role, partyId, (String) o);
component.put(Dict.COMPONENT_NAME, componentName);
Map<String, Object> taskDetail = taskManagerService.findTaskDetail(jobId, role, partyId, (String) componentName);
String taskStatus = null;
Long createTime = null;
if (taskDetail != null) {
Expand All @@ -427,7 +364,6 @@ public ResponseResult getDagDependencies(String param) {
component.put(Dict.TIME, createTime);
componentList.add(component);
}

data.put(Dict.COMPONENT_LIST, componentList);
return new ResponseResult<>(ErrorCode.SUCCESS, data);

Expand Down Expand Up @@ -482,20 +418,31 @@ public ResponseResult getData(@Valid @RequestBody ComponentQueryDTO componentQue
return new ResponseResult<>(ErrorCode.FATEFLOW_ERROR_CONNECTION);
}

if ((result == null) || 0 == result.trim().length()) {
return new ResponseResult<>(ErrorCode.FATEFLOW_ERROR_NULL_RESULT);
}
JSONObject object = JSON.parseObject(result);
JSONArray outputDataArray = object.getJSONArray("output_data");
JSONObject outputData = (JSONObject)outputDataArray.get(0);
Set<String> set = object.keySet();
String[] keys = set.toArray(new String[0]);
JSONArray outputDataArray = new JSONArray();
if (keys.length > 0) {
String keyName = keys[0];
outputDataArray = object.getJSONArray(keyName);
}

JSONObject outputData = (JSONObject) outputDataArray.get(0);

JSONArray jsonArray = outputData.getJSONArray("data");
int count = 0;
if (jsonArray != null) {
count = jsonArray.size();
}
outputData.put(Dict.DATA_COUNT,count);
outputData.put(Dict.DATA_COUNT, count);
outputDataArray.clear();
outputDataArray.add(outputData);
object.put(Dict.OUTPUT_DATA,outputDataArray);
return new ResponseResult(ErrorCode.SUCCESS,object);
object.clear();
object.put(Dict.OUTPUT_DATA, outputDataArray);
return new ResponseResult(ErrorCode.SUCCESS, object);
}

@RequestMapping(value = "/tracking/component/metric_data/batch", method = RequestMethod.POST)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public ResponseResult queryJobDataset(@Valid @RequestBody JobQueryDTO jobQueryDT

String result;
try {
result = flowFeign.get(Dict.URL_JOB_DATAVIEW, jobManagerService.generateURLParamJobQueryDTO(jobQueryDTO));
result = flowFeign.get(Dict.URL_JOB_QUERY, jobManagerService.generateURLParamJobQueryDTO(jobQueryDTO));
} catch (Exception e) {
logger.error("connect fateflow error:", e);
return new ResponseResult<>(ErrorCode.FATEFLOW_ERROR_CONNECTION);
Expand All @@ -124,7 +124,6 @@ public ResponseResult queryJobById(@PathVariable("jobId") String jobId,
return new ResponseResult<>(ErrorCode.DATABASE_ERROR_RESULT_NULL);
}

// jobWithBLOBs.setfRunIp(null);
jobWithBLOBs.setfDsl(null);
jobWithBLOBs.setfRuntimeConf(null);
if (jobWithBLOBs.getfStatus().equals(Dict.TIMEOUT)) {
Expand All @@ -138,7 +137,7 @@ public ResponseResult queryJobById(@PathVariable("jobId") String jobId,

String result;
try {
result = flowFeign.get(Dict.URL_JOB_DATAVIEW, paramMap);
result = flowFeign.get(Dict.URL_JOB_QUERY, paramMap);
} catch (Exception e) {
logger.error("connect fateflow error:", e);
return new ResponseResult<>(ErrorCode.FATEFLOW_ERROR_CONNECTION);
Expand All @@ -152,10 +151,23 @@ public ResponseResult queryJobById(@PathVariable("jobId") String jobId,
if (retcode == null) {
return new ResponseResult<>(ErrorCode.FATEFLOW_ERROR_WRONG_RESULT);
}

if (retcode == 0) {
JSONArray jsonArray = resultObject.getJSONArray(Dict.DATA);
JSONObject data = jsonArray == null ? null : (JSONObject)jsonArray.get(0);
resultMap.put(Dict.JOB, jobWithBLOBs);
JSONObject data = jsonArray == null ? null : (JSONObject) jsonArray.get(0);
if(data != null) {
String dataViewResult;
try {
dataViewResult = flowFeign.get(Dict.URL_JOB_DATA_VIEW, paramMap);
JSONObject dataViewResultObj = JSON.parseObject(dataViewResult);
JSONObject dataObject = dataViewResultObj.getJSONObject(Dict.DATA);
JSONObject dataViewObject = dataObject.getJSONObject(Dict.DATA_VIEW);
data.put(Dict.DATA_SET,dataViewObject);
} catch (Exception e) {
logger.error("add data view error:", e);
}
}
resultMap.put(Dict.JOB, jobWithBLOBs);
resultMap.put(Dict.DATASET, data);
return new ResponseResult<>(ErrorCode.SUCCESS, resultMap);
} else {
Expand Down Expand Up @@ -185,7 +197,6 @@ public ResponseResult<PageBean<Map<String, Object>>> queryPagedJob(@RequestBody
}



PageBean<Map<String, Object>> listPageBean = jobManagerService.queryPagedJobs(pagedJobQO);
return new ResponseResult<>(ErrorCode.SUCCESS, listPageBean);
}
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/org/fedai/fate/board/global/Dict.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class Dict {
static public final String LOG_TYPE = "log_type";
static public final String CODE = "code";
static public final String DATA = "data";
static public final String DATA_VIEW = "data_view";
static public final String DATA_SET = "dataset";
static public final String OUTPUT_DATA = "output_data";
static public final String DATA_COUNT = "total";
static public final String JOB = "job";
Expand Down Expand Up @@ -80,8 +82,10 @@ public class Dict {
static public final String URL_OUTPUT_DATA = "/v2/output/data/display";

static public final String URL_JOB_STOP = "/v2/job/stop";
static public final String URL_JOB_QUERY = "/v2/job/list/query";
static public final String URL_JOB_DATAVIEW = "/v2/job/query";
static public final String URL_JOB_QUERY_LIST = "/v2/job/list/query";

static public final String URL_JOB_QUERY = "/v2/job/query";
static public final String URL_JOB_DATA_VIEW = "/v2/job/data/view";
static public final String URL_JOB_UPDATE = "/v2/job/notes/add";
static public final String URL_JOB_RERUN = "/v2/job/rerun";
static public final String URL_DAG_DEPENDENCY = "/v2/job/dag/dependency";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public List<JobDO> queryJobStatus() {
private Map<String, Object> getJobMap(Map<String, Object> params) {
String result = null;
try {
result = flowFeign.get(Dict.URL_JOB_QUERY, params);
result = flowFeign.get(Dict.URL_JOB_QUERY_LIST, params);
} catch (Exception e) {
logger.error("connect fateflow error:", e);
LogicException.throwError(ErrorCode.FATEFLOW_ERROR_CONNECTION);
Expand Down Expand Up @@ -219,7 +219,7 @@ public PageBean<Map<String, Object>> queryPagedJobs(PagedJobQO pagedJobQO) {
jobParams.put(Dict.JOBID, jobId1);
jobParams.put((Dict.ROLE), role1);
jobParams.put(Dict.PARTY_ID, partyId1);
String result = flowFeign.get(Dict.URL_JOB_DATAVIEW, jobParams);
String result = flowFeign.get(Dict.URL_JOB_QUERY, jobParams);

JSONObject resultObject = JSON.parseObject(result);
Integer retCode = resultObject.getInteger(Dict.CODE);
Expand Down Expand Up @@ -290,7 +290,7 @@ public ResponseResult download(DownloadQO downloadQO, HttpServletResponse respon
jobParams.put(Dict.PARTY_ID, partyId);
String result = null;
try {
result = flowFeign.get(Dict.URL_JOB_DATAVIEW, jobParams);
result = flowFeign.get(Dict.URL_JOB_QUERY, jobParams);
} catch (Exception e) {
logger.error("connect fateflow error:", e);
return new ResponseResult<>(ErrorCode.FATEFLOW_ERROR_CONNECTION);
Expand Down
Loading

0 comments on commit 6a674a5

Please sign in to comment.