From ca4de69be96eb0d2398b9bb0fef5ca044d33461f Mon Sep 17 00:00:00 2001 From: wuchunfu <319355703@qq.com> Date: Fri, 6 Sep 2024 18:35:06 +0800 Subject: [PATCH 1/4] [Improve][SeaTunnel-Web] Change JobStatus to enum type to avoid hard coding --- .../seatunnel/app/dal/entity/JobInstance.java | 4 +- .../seatunnel/app/dal/entity/JobMetrics.java | 4 +- .../response/executor/JobExecutionStatus.java | 4 +- .../metrics/JobPipelineDetailMetricsRes.java | 4 +- .../metrics/JobPipelineSummaryMetricsRes.java | 4 +- .../metrics/JobSummaryMetricsRes.java | 4 +- .../response/task/JobSimpleInfoRes.java | 4 +- .../service/impl/JobExecutorServiceImpl.java | 9 ++-- .../service/impl/JobInstanceServiceImpl.java | 2 +- .../service/impl/JobMetricsServiceImpl.java | 42 ++++++++++--------- .../SeaTunnelEngineMetricsExtractor.java | 31 +++++++------- .../engine/SeaTunnelEngineProxy.java | 5 ++- .../metrics/IEngineMetricsExtractor.java | 3 +- 13 files changed, 68 insertions(+), 52 deletions(-) diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java index db5cbeba2..221867d8f 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.app.dal.entity; +import org.apache.seatunnel.engine.core.job.JobStatus; + import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; @@ -42,7 +44,7 @@ public class JobInstance { private Long jobDefineId; @TableField("job_status") - private String jobStatus; + private JobStatus jobStatus; @TableField("job_config") private String jobConfig; diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobMetrics.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobMetrics.java index 633ad6453..6706c5921 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobMetrics.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobMetrics.java @@ -16,6 +16,8 @@ */ package org.apache.seatunnel.app.dal.entity; +import org.apache.seatunnel.engine.core.job.JobStatus; + import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; @@ -65,7 +67,7 @@ public class JobMetrics { private long recordDelay; @TableField("status") - private String status; + private JobStatus status; @TableField("create_user_id") private Integer createUserId; diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutionStatus.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutionStatus.java index 833abd88a..8ab8c4c84 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutionStatus.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutionStatus.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.app.domain.response.executor; +import org.apache.seatunnel.engine.core.job.JobStatus; + import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -26,7 +28,7 @@ @NoArgsConstructor public class JobExecutionStatus { - private String jobStatus; + private JobStatus jobStatus; private String errorMessage; } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineDetailMetricsRes.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineDetailMetricsRes.java index b864817c1..a18c7c81c 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineDetailMetricsRes.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineDetailMetricsRes.java @@ -16,6 +16,8 @@ */ package org.apache.seatunnel.app.domain.response.metrics; +import org.apache.seatunnel.engine.core.job.JobStatus; + import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -45,5 +47,5 @@ public class JobPipelineDetailMetricsRes { private long recordDelay; - private String status; + private JobStatus status; } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineSummaryMetricsRes.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineSummaryMetricsRes.java index cac0499e1..4c9a3211e 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineSummaryMetricsRes.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineSummaryMetricsRes.java @@ -16,6 +16,8 @@ */ package org.apache.seatunnel.app.domain.response.metrics; +import org.apache.seatunnel.engine.core.job.JobStatus; + import lombok.AllArgsConstructor; import lombok.Data; @@ -28,5 +30,5 @@ public class JobPipelineSummaryMetricsRes { private long writeRowCount; - private String status; + private JobStatus status; } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobSummaryMetricsRes.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobSummaryMetricsRes.java index d4b36fc50..0bb943b58 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobSummaryMetricsRes.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobSummaryMetricsRes.java @@ -16,6 +16,8 @@ */ package org.apache.seatunnel.app.domain.response.metrics; +import org.apache.seatunnel.engine.core.job.JobStatus; + import lombok.AllArgsConstructor; import lombok.Data; @@ -30,5 +32,5 @@ public class JobSummaryMetricsRes { private long writeRowCount; - private String status; + private JobStatus status; } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/task/JobSimpleInfoRes.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/task/JobSimpleInfoRes.java index 22fda68ca..d66c4640f 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/task/JobSimpleInfoRes.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/task/JobSimpleInfoRes.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.app.domain.response.task; +import org.apache.seatunnel.engine.core.job.JobStatus; + import com.fasterxml.jackson.annotation.JsonProperty; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; @@ -40,7 +42,7 @@ public class JobSimpleInfoRes { private String jobName; @ApiModelProperty(value = "job status", dataType = "String") - private String jobStatus; + private JobStatus jobStatus; @ApiModelProperty(value = "job plan", dataType = "String") private String jobPlan; diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java index 15250caa2..ec1b37de2 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java @@ -55,7 +55,6 @@ import java.io.FileWriter; import java.io.IOException; import java.util.Date; -import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -128,7 +127,7 @@ private void executeJobBySeaTunnel(Integer userId, String filePath, Long jobInst } catch (Throwable e) { log.error("Job execution submission failed.", e); JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId); - jobInstance.setJobStatus(JobStatus.FAILED.name()); + jobInstance.setJobStatus(JobStatus.FAILED); jobInstance.setEndTime(new Date()); String jobInstanceErrorMessage = JobUtils.getJobInstanceErrorMessage(e.getMessage()); jobInstance.setErrorMessage(jobInstanceErrorMessage); @@ -183,14 +182,14 @@ private SeaTunnelClient createSeaTunnelClient() { @Override public Result jobPause(Integer userId, Long jobInstanceId) { JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId); - if (Objects.equals( - getJobStatusFromEngine(jobInstance, jobInstance.getJobEngineId()), "RUNNING")) { + if (getJobStatusFromEngine(jobInstance, jobInstance.getJobEngineId()) + == JobStatus.RUNNING) { pauseJobInEngine(jobInstance.getJobEngineId()); } return Result.success(); } - private String getJobStatusFromEngine(@NonNull JobInstance jobInstance, String jobEngineId) { + private JobStatus getJobStatusFromEngine(@NonNull JobInstance jobInstance, String jobEngineId) { Engine engine = new Engine(jobInstance.getEngineName(), jobInstance.getEngineVersion()); diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java index 8dff7bfff..d58d19b41 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java @@ -366,7 +366,7 @@ public void complete( funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_EXECUTOR_COMPLETE, userId); JobInstance jobInstance = jobInstanceDao.getJobInstanceMapper().selectById(jobInstanceId); jobMetricsService.syncJobDataToDb(jobInstance, userId, jobEngineId); - jobInstance.setJobStatus(jobResult.getStatus().name()); + jobInstance.setJobStatus(jobResult.getStatus()); jobInstance.setJobEngineId(jobEngineId); jobInstance.setUpdateUserId(userId); jobInstance.setErrorMessage(JobUtils.getJobInstanceErrorMessage(jobResult.getError())); diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java index d65679b00..f424aee0d 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java @@ -33,6 +33,7 @@ import org.apache.seatunnel.app.thirdparty.metrics.EngineMetricsExtractorFactory; import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor; import org.apache.seatunnel.app.utils.JobUtils; +import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.server.common.CodeGenerateUtils; import org.apache.seatunnel.server.common.Constants; import org.apache.seatunnel.server.common.SeatunnelErrorEnum; @@ -89,7 +90,7 @@ public JobSummaryMetricsRes getJobSummaryMetrics( Engine engine = new Engine(jobInstance.getEngineName(), jobInstance.getEngineVersion()); IEngineMetricsExtractor engineMetricsExtractor = (new EngineMetricsExtractorFactory(engine)).getEngineMetricsExtractor(); - String jobStatus = engineMetricsExtractor.getJobStatus(jobEngineId); + JobStatus jobStatus = engineMetricsExtractor.getJobStatus(jobEngineId); List jobPipelineDetailMetrics = getJobPipelineDetailMetrics(jobInstance, userId); @@ -160,8 +161,8 @@ private Map getMatricsListIfTaskTypeIsBatch( log.info("jobEngineId={}", jobInstance.getJobEngineId()); if (jobInstance.getJobStatus() == null - || jobInstance.getJobStatus().equals("FAILED") - || jobInstance.getJobStatus().equals("RUNNING")) { + || jobInstance.getJobStatus() == JobStatus.FAILED + || jobInstance.getJobStatus() == JobStatus.RUNNING) { // Obtain monitoring information from the collection of running jobs returned from // the engine if (!allRunningJobMetricsFromEngine.isEmpty() @@ -192,15 +193,15 @@ private Map getMatricsListIfTaskTypeIsBatch( if (jobMetricsFromDb != null) { jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromDb); } - if ("RUNNING".equals(jobInstance.getJobStatus())) { + if (jobInstance.getJobStatus() == JobStatus.RUNNING) { // Set the job status of jobInstance and jobMetrics in the database to // finished - jobInstance.setJobStatus("FINISHED"); + jobInstance.setJobStatus(JobStatus.FINISHED); jobInstanceDao.getJobInstanceMapper().updateById(jobInstance); } } - } else if (jobInstance.getJobStatus().equals("FINISHED") - || jobInstance.getJobStatus().equals("CANCELED")) { + } else if (jobInstance.getJobStatus() == JobStatus.FINISHED + || jobInstance.getJobStatus() == JobStatus.CANCELED) { // If the status of the job is finished or cancelled, the monitoring information is // directly obtained from MySQL JobSummaryMetricsRes jobMetricsFromDb = @@ -222,7 +223,7 @@ private void modifyAndUpdateJobInstanceAndJobMetrics( Map> allRunningJobMetricsFromEngine, Map jobInstanceIdAndJobEngineIdMap, Integer userId) { - jobInstance.setJobStatus("RUNNING"); + jobInstance.setJobStatus(JobStatus.RUNNING); HashMap jobMetricsFromEngine = allRunningJobMetricsFromEngine.get( jobInstanceIdAndJobEngineIdMap.get(jobInstance.getId())); @@ -246,7 +247,7 @@ private void modifyAndUpdateJobInstanceAndJobMetrics( jobMetricsFromEngine .get(jobMetrics.getPipelineId()) .getWriteRowCount())); - jobMetricsFromDb.forEach(jobMetrics -> jobMetrics.setStatus("RUNNING")); + jobMetricsFromDb.forEach(jobMetrics -> jobMetrics.setStatus(JobStatus.RUNNING)); updateJobInstanceAndMetrics(jobInstance, jobMetricsFromDb); } @@ -265,7 +266,7 @@ private Map getMatricsListIfTaskTypeIsStreaming( try { if (jobInstance.getJobStatus() != null - && jobInstance.getJobStatus().equals("CANCELED")) { + && jobInstance.getJobStatus() == JobStatus.CANCELED) { // If the status of the job is finished or cancelled // the monitoring information is directly obtained from MySQL JobSummaryMetricsRes jobMetricsFromDb = @@ -278,8 +279,8 @@ private Map getMatricsListIfTaskTypeIsStreaming( jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromDb); } else if (jobInstance.getJobStatus() != null - && (jobInstance.getJobStatus().equals("FINISHED") - || jobInstance.getJobStatus().equals("FAILED"))) { + && (jobInstance.getJobStatus() == JobStatus.FINISHED + || jobInstance.getJobStatus() == JobStatus.FAILED)) { // Obtain monitoring information from the collection of running jobs returned // from // the engine @@ -332,9 +333,9 @@ private Map getMatricsListIfTaskTypeIsStreaming( jobInstance); jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromEngineRes); } else { - String jobStatusByJobEngineId = null; + JobStatus jobStatus = null; try { - jobStatusByJobEngineId = + jobStatus = getJobStatusByJobEngineId( String.valueOf( jobInstanceIdAndJobEngineIdMap.get( @@ -345,8 +346,8 @@ private Map getMatricsListIfTaskTypeIsStreaming( jobInstance.getId()); } - if (jobStatusByJobEngineId != null) { - jobInstance.setJobStatus(jobStatusByJobEngineId); + if (jobStatus != null) { + jobInstance.setJobStatus(jobStatus); jobInstanceDao.update(jobInstance); JobSummaryMetricsRes jobSummaryMetricsResByDb = getJobSummaryMetricsResByDb( @@ -365,7 +366,7 @@ private Map getMatricsListIfTaskTypeIsStreaming( jobInstanceIdAndJobEngineIdMap.get( jobInstance.getId()))); if (!jobMetricsFromDb.isEmpty()) { - String finalJobStatusByJobEngineId = jobStatusByJobEngineId; + JobStatus finalJobStatusByJobEngineId = jobStatus; jobMetricsFromDb.forEach( jobMetrics -> jobMetrics.setStatus(finalJobStatusByJobEngineId)); @@ -402,7 +403,8 @@ private JobSummaryMetricsRes getRunningJobMetricsFromEngine( log.info("jobInstance={}", jobInstance); - return new JobSummaryMetricsRes(jobInstance.getId(), 1L, readCount, writeCount, "RUNNING"); + return new JobSummaryMetricsRes( + jobInstance.getId(), 1L, readCount, writeCount, JobStatus.RUNNING); } private JobSummaryMetricsRes getJobSummaryMetricsResByDb( @@ -442,7 +444,7 @@ private void updateJobInstanceAndMetrics(JobInstance jobInstance, List> getAllRunningJobMetrics() { return new HashMap<>(); } JsonNode jsonNode = JsonUtils.stringToJsonNode(allJobMetricsContent); - Iterator iterator = jsonNode.iterator(); - while (iterator.hasNext()) { + for (JsonNode item : jsonNode) { LinkedHashMap metricsMap = new LinkedHashMap<>(); - JsonNode next = iterator.next(); - JsonNode sourceReceivedCount = next.get("metrics").get("SourceReceivedCount"); - Long jobEngineId = 0L; + JsonNode sourceReceivedCount = item.get("metrics").get("SourceReceivedCount"); + long jobEngineId = 0L; if (sourceReceivedCount != null && sourceReceivedCount.isArray()) { for (JsonNode node : sourceReceivedCount) { jobEngineId = node.get("tags").get("jobId").asLong(); @@ -311,7 +310,7 @@ public Map> getAllRunningJobMetrics() { } } - JsonNode sinkWriteCount = next.get("metrics").get("SinkWriteCount"); + JsonNode sinkWriteCount = item.get("metrics").get("SinkWriteCount"); if (sinkWriteCount != null && sinkWriteCount.isArray()) { for (JsonNode node : sinkWriteCount) { jobEngineId = node.get("tags").get("jobId").asLong(); @@ -324,7 +323,7 @@ public Map> getAllRunningJobMetrics() { } } - JsonNode sinkWriteQPS = next.get("metrics").get("SinkWriteQPS"); + JsonNode sinkWriteQPS = item.get("metrics").get("SinkWriteQPS"); if (sinkWriteQPS != null && sinkWriteQPS.isArray()) { for (JsonNode node : sinkWriteQPS) { Integer pipelineId = node.get("tags").get("pipelineId").asInt(); @@ -336,7 +335,7 @@ public Map> getAllRunningJobMetrics() { } } - JsonNode sourceReceivedQPS = next.get("metrics").get("SourceReceivedQPS"); + JsonNode sourceReceivedQPS = item.get("metrics").get("SourceReceivedQPS"); if (sourceReceivedQPS != null && sourceReceivedQPS.isArray()) { for (JsonNode node : sourceReceivedQPS) { Integer pipelineId = node.get("tags").get("pipelineId").asInt(); @@ -348,7 +347,7 @@ public Map> getAllRunningJobMetrics() { } } - JsonNode cdcRecordEmitDelay = next.get("metrics").get("CDCRecordEmitDelay"); + JsonNode cdcRecordEmitDelay = item.get("metrics").get("CDCRecordEmitDelay"); if (cdcRecordEmitDelay != null && cdcRecordEmitDelay.isArray()) { Map> dataMap = new HashMap<>(); for (JsonNode node : cdcRecordEmitDelay) { @@ -387,7 +386,7 @@ private JobMetrics getOrCreatePipelineMetricsMapStatusRunning( JobMetrics currPipelineMetrics = metricsMap.get(pipelineId); if (currPipelineMetrics == null) { currPipelineMetrics = new JobMetrics(); - currPipelineMetrics.setStatus("RUNNING"); + currPipelineMetrics.setStatus(JobStatus.RUNNING); currPipelineMetrics.setPipelineId(pipelineId); metricsMap.put(pipelineId, currPipelineMetrics); } @@ -402,7 +401,7 @@ private JobMetrics getOrCreatePipelineMetricsMap( if (currPipelineMetrics == null) { currPipelineMetrics = new JobMetrics(); metricsMap.put(pipelineId, currPipelineMetrics); - currPipelineMetrics.setStatus(jobPipelineStatus.get(pipelineId)); + currPipelineMetrics.setStatus(JobStatus.valueOf(jobPipelineStatus.get(pipelineId))); currPipelineMetrics.setPipelineId(pipelineId); } return currPipelineMetrics; diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineProxy.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineProxy.java index 390ac6270..554a37e94 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineProxy.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineProxy.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; import org.apache.seatunnel.engine.common.config.YamlSeaTunnelConfigBuilder; import org.apache.seatunnel.engine.core.job.JobDAGInfo; +import org.apache.seatunnel.engine.core.job.JobStatus; import com.hazelcast.client.config.ClientConfig; import lombok.NonNull; @@ -74,10 +75,10 @@ public JobDAGInfo getJobInfo(@NonNull String jobEngineId) { } } - public String getJobStatus(@NonNull String jobEngineId) { + public JobStatus getJobStatus(@NonNull String jobEngineId) { SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig); try { - return seaTunnelClient.getJobStatus(Long.valueOf(jobEngineId)); + return JobStatus.valueOf(seaTunnelClient.getJobStatus(Long.valueOf(jobEngineId))); } catch (Exception e) { log.warn("Can not get job from engine.", e); return null; diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/metrics/IEngineMetricsExtractor.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/metrics/IEngineMetricsExtractor.java index 528d848ab..cee3af4e5 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/metrics/IEngineMetricsExtractor.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/metrics/IEngineMetricsExtractor.java @@ -18,6 +18,7 @@ import org.apache.seatunnel.app.dal.entity.JobInstanceHistory; import org.apache.seatunnel.app.dal.entity.JobMetrics; +import org.apache.seatunnel.engine.core.job.JobStatus; import lombok.NonNull; @@ -39,7 +40,7 @@ public interface IEngineMetricsExtractor { List> getClusterHealthMetrics(); - String getJobStatus(@NonNull String jobEngineId); + JobStatus getJobStatus(@NonNull String jobEngineId); /** Obtain all running task metrics in the engine cluster */ Map> getAllRunningJobMetrics(); From 29007baac4e73bc3485e9f4aceb32f142d0404f7 Mon Sep 17 00:00:00 2001 From: wuchunfu <319355703@qq.com> Date: Fri, 6 Sep 2024 20:14:26 +0800 Subject: [PATCH 2/4] [Improve][SeaTunnel-Web] Change JobStatus to enum type to avoid hard coding --- .../org/apache/seatunnel/app/utils/JobTestingUtils.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java index e0e2b8d5e..f4d6c5254 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java @@ -99,9 +99,9 @@ private static boolean isFinished(JobPipelineDetailMetricsRes metrics) { return false; } switch (metrics.getStatus()) { - case "FINISHED": - case "CANCELED": - case "FAILED": + case FINISHED: + case CANCELED: + case FAILED: return true; default: return false; From 00955992e7c8a3166a3c8eb5140685e68b4d2eed Mon Sep 17 00:00:00 2001 From: wuchunfu <319355703@qq.com> Date: Sat, 7 Sep 2024 02:50:09 +0800 Subject: [PATCH 3/4] [Improve][SeaTunnel-Web] Change JobStatus to enum type to avoid hard coding --- .../apache/seatunnel/app/test/JobControllerTest.java | 3 ++- .../seatunnel/app/test/JobExecutorControllerTest.java | 10 +++++----- .../seatunnel/app/test/TaskInstanceControllerTest.java | 3 ++- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java index 2ff5ef738..c963324ce 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java @@ -29,6 +29,7 @@ import org.apache.seatunnel.app.domain.response.job.JobRes; import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes; import org.apache.seatunnel.app.utils.JobTestingUtils; +import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.server.common.SeatunnelErrorEnum; import org.junit.jupiter.api.AfterAll; @@ -73,7 +74,7 @@ public void createJobWithSingleAPI_shouldExecuteSuccessfully() { Result> listResult = JobTestingUtils.waitForJobCompletion(result.getData()); assertEquals(1, listResult.getData().size()); - assertEquals("FINISHED", listResult.getData().get(0).getStatus()); + assertEquals(JobStatus.FINISHED, listResult.getData().get(0).getStatus()); assertEquals(5, listResult.getData().get(0).getReadRowCount()); assertEquals(5, listResult.getData().get(0).getWriteRowCount()); } diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java index 8cf6f9997..6e1cb1ea2 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java @@ -73,7 +73,7 @@ public void executeJob_shouldReturnSuccess_whenValidRequest() { Result> listResult = JobTestingUtils.waitForJobCompletion(result.getData()); assertEquals(1, listResult.getData().size()); - assertEquals("FINISHED", listResult.getData().get(0).getStatus()); + assertEquals(JobStatus.FINISHED, listResult.getData().get(0).getStatus()); assertEquals(5, listResult.getData().get(0).getReadRowCount()); assertEquals(5, listResult.getData().get(0).getWriteRowCount()); } @@ -88,7 +88,7 @@ public void executeJobWithParameters() { Result> listResult = JobTestingUtils.waitForJobCompletion(result.getData()); assertEquals(1, listResult.getData().size()); - assertEquals("FINISHED", listResult.getData().get(0).getStatus()); + assertEquals(JobStatus.FINISHED, listResult.getData().get(0).getStatus()); assertEquals(5, listResult.getData().get(0).getReadRowCount()); assertEquals(5, listResult.getData().get(0).getWriteRowCount()); String generatedJobFile = getGenerateJobFile(String.valueOf(jobVersionId)); @@ -122,7 +122,7 @@ public void executeJobWithParameters() { assertTrue(result.getData() > 0); listResult = JobTestingUtils.waitForJobCompletion(result.getData()); assertEquals(1, listResult.getData().size()); - assertEquals("FINISHED", listResult.getData().get(0).getStatus()); + assertEquals(JobStatus.FINISHED, listResult.getData().get(0).getStatus()); assertEquals(numberOfRecords, listResult.getData().get(0).getReadRowCount()); assertEquals(numberOfRecords, listResult.getData().get(0).getWriteRowCount()); @@ -281,7 +281,7 @@ public void executeJob_JobStatusUpdate_WhenSubmissionFailed() { Result jobExecutionStatusResult = jobExecutorControllerWrapper.getJobExecutionStatus(jobInstanceId); assertTrue(jobExecutionStatusResult.isSuccess()); - assertEquals(JobStatus.FAILED.name(), jobExecutionStatusResult.getData().getJobStatus()); + assertEquals(JobStatus.FAILED, jobExecutionStatusResult.getData().getJobStatus()); assertNotNull(jobExecutionStatusResult.getData().getErrorMessage()); // Invalid jobInstanceId @@ -306,7 +306,7 @@ public void storeErrorMessageWhenJobFailed() throws InterruptedException { Result jobExecutionDetailResult = jobExecutorControllerWrapper.getJobExecutionDetail(jobInstanceId); assertTrue(jobExecutionDetailResult.isSuccess()); - assertEquals(JobStatus.FAILED.name(), jobExecutionDetailResult.getData().getJobStatus()); + assertEquals(JobStatus.FAILED, jobExecutionDetailResult.getData().getJobStatus()); assertNotNull(jobExecutionDetailResult.getData().getErrorMessage()); assertNotNull(jobExecutionDetailResult.getData().getJobDefineName()); diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java index 485e07cc2..b5dfd7333 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto; import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes; import org.apache.seatunnel.app.utils.JobTestingUtils; +import org.apache.seatunnel.engine.core.job.JobStatus; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -75,7 +76,7 @@ private static void extractedJob1(String jobName) { Result> listResult = JobTestingUtils.waitForJobCompletion(execuitonResult.getData()); assertEquals(1, listResult.getData().size()); - assertEquals("FINISHED", listResult.getData().get(0).getStatus()); + assertEquals(JobStatus.FINISHED, listResult.getData().get(0).getStatus()); } @AfterAll From 1f21f2470fc50e6a12b99d57b14ad93ae9044e06 Mon Sep 17 00:00:00 2001 From: wuchunfu <319355703@qq.com> Date: Sat, 7 Sep 2024 15:47:15 +0800 Subject: [PATCH 4/4] [Improve][SeaTunnel-Web] Change JobStatus to enum type to avoid hard coding --- .../engine/SeaTunnelEngineMetricsExtractor.java | 5 ++++- .../java/org/apache/seatunnel/app/utils/JobUtils.java | 9 +++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java index f18fb4de7..b9003d828 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java @@ -401,7 +401,10 @@ private JobMetrics getOrCreatePipelineMetricsMap( if (currPipelineMetrics == null) { currPipelineMetrics = new JobMetrics(); metricsMap.put(pipelineId, currPipelineMetrics); - currPipelineMetrics.setStatus(JobStatus.valueOf(jobPipelineStatus.get(pipelineId))); + currPipelineMetrics.setStatus( + "DEPLOYING".equals(jobPipelineStatus.get(pipelineId)) + ? JobStatus.SCHEDULED + : JobStatus.valueOf(jobPipelineStatus.get(pipelineId))); currPipelineMetrics.setPipelineId(pipelineId); } return currPipelineMetrics; diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java index b9801eca7..bfa368302 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.app.dal.entity.JobTask; import org.apache.seatunnel.app.domain.request.job.JobExecParam; +import org.apache.seatunnel.engine.core.job.JobStatus; import java.util.List; import java.util.Map; @@ -98,9 +99,9 @@ public static void updateDataSource(JobExecParam jobExecParam, List tas }); } - public static boolean isJobEndStatus(String jobStatus) { - return "finished".equalsIgnoreCase(jobStatus) - || "canceled".equalsIgnoreCase(jobStatus) - || "failed".equalsIgnoreCase(jobStatus); + public static boolean isJobEndStatus(JobStatus jobStatus) { + return JobStatus.FINISHED == jobStatus + || JobStatus.CANCELED == jobStatus + || JobStatus.FAILED == jobStatus; } }