diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java index aa1bbd5934bd..e8ee03a5013a 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java @@ -66,7 +66,7 @@ public static T retryWithException( backoff); Thread.sleep(backoff); } else { - log.debug(attemptMessage, ExceptionUtils.getMessage(e), i, retryTimes, 0); + log.info(attemptMessage, ExceptionUtils.getMessage(e), i, retryTimes, 0); } } } diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java index e99940defec6..f295b941eb95 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java @@ -548,9 +548,11 @@ public void testTwoPipelineStreamJobRestoreIn2NodeWorkerDown() @Test public void testTwoPipelineBatchJobRestoreIn2NodeMasterDown() throws ExecutionException, InterruptedException { - String testCaseName = "testTwoPipelineBatchJobRestoreIn2NodeMasterDown"; + String testCaseName = + "testTwoPipelineBatchJobRestoreIn2NodeMasterDown" + System.currentTimeMillis(); String testClusterName = - "ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRestoreIn2NodeMasterDown"; + "ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRestoreIn2NodeMasterDown" + + System.currentTimeMillis(); long testRowNumber = 1000; int testParallelism = 6; HazelcastInstanceImpl node1 = null; @@ -657,9 +659,11 @@ public void testTwoPipelineBatchJobRestoreIn2NodeMasterDown() @Test public void testTwoPipelineStreamJobRestoreIn2NodeMasterDown() throws ExecutionException, InterruptedException { - String testCaseName = "testTwoPipelineStreamJobRestoreIn2NodeMasterDown"; + String testCaseName = + "testTwoPipelineStreamJobRestoreIn2NodeMasterDown" + System.currentTimeMillis(); String testClusterName = - "ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRestoreIn2NodeMasterDown"; + "ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRestoreIn2NodeMasterDown" + + System.currentTimeMillis(); long testRowNumber = 1000; int testParallelism = 6; HazelcastInstanceImpl node1 = null; diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java index cba498e99922..4ecee663ae52 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java @@ -145,6 +145,28 @@ public void testGetErrorInfo() throws ExecutionException, InterruptedException { Assertions.assertTrue(result.getError().startsWith("java.lang.NumberFormatException")); } + @Test + public void testGetUnKnownJobID() { + + ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); + clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT")); + SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig); + + ClientJobProxy newClientJobProxy = + engineClient.createJobClient().getJobProxy(System.currentTimeMillis()); + CompletableFuture waitForJobCompleteFuture = + CompletableFuture.supplyAsync(newClientJobProxy::waitForJobComplete); + + await().atMost(20000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + JobStatus.UNKNOWABLE, waitForJobCompleteFuture.get())); + + Assertions.assertEquals( + "UNKNOWABLE", engineClient.getJobClient().getJobStatus(System.currentTimeMillis())); + } + @Test public void testExpiredJobWasDeleted() throws Exception { Common.setDeployMode(DeployMode.CLIENT); @@ -164,8 +186,8 @@ public void testExpiredJobWasDeleted() throws Exception { await().atMost(65, TimeUnit.SECONDS) .untilAsserted( () -> - Assertions.assertThrowsExactly( - NullPointerException.class, clientJobProxy::getJobStatus)); + Assertions.assertEquals( + JobStatus.UNKNOWABLE, clientJobProxy.getJobStatus())); } @AfterEach diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java index f9dbfb4c6cc3..7c50744dba0a 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java @@ -60,7 +60,10 @@ public enum JobStatus { SUSPENDED(EndState.LOCALLY), /** The job is currently reconciling and waits for task execution report to recover state. */ - RECONCILING(EndState.NOT_END); + RECONCILING(EndState.NOT_END), + + /** Cannot find the JobID or the job status has already been cleared. */ + UNKNOWABLE(EndState.GLOBALLY); // -------------------------------------------------------------------------------------------- diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index 1f87218de0b5..b09083884148 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -507,9 +507,22 @@ public PassiveCompletableFuture savePoint(long jobId) { public PassiveCompletableFuture waitForJobComplete(long jobId) { JobMaster runningJobMaster = runningJobMasterMap.get(jobId); if (runningJobMaster == null) { - JobHistoryService.JobState jobState = jobHistoryService.getJobDetailState(jobId); + // Because operations on Imap cannot be performed within Operation. + CompletableFuture jobStateFuture = + CompletableFuture.supplyAsync( + () -> { + return jobHistoryService.getJobDetailState(jobId); + }, + executorService); + JobHistoryService.JobState jobState = null; + try { + jobState = jobStateFuture.get(); + } catch (Exception e) { + throw new SeaTunnelEngineException("get job state error", e); + } + CompletableFuture future = new CompletableFuture<>(); - if (jobState == null) future.complete(new JobResult(JobStatus.FAILED, null)); + if (jobState == null) future.complete(new JobResult(JobStatus.UNKNOWABLE, null)); else future.complete(new JobResult(jobState.getJobStatus(), jobState.getErrorMessage())); return new PassiveCompletableFuture<>(future); @@ -539,7 +552,7 @@ public JobStatus getJobStatus(long jobId) { JobMaster runningJobMaster = runningJobMasterMap.get(jobId); if (runningJobMaster == null) { JobHistoryService.JobState jobDetailState = jobHistoryService.getJobDetailState(jobId); - return null == jobDetailState ? null : jobDetailState.getJobStatus(); + return null == jobDetailState ? JobStatus.UNKNOWABLE : jobDetailState.getJobStatus(); } return runningJobMaster.getJobStatus(); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java index f75f5af8e4d8..f5311798f392 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java @@ -233,7 +233,7 @@ public boolean isMasterNode() { // must retry until the cluster have master node try { return RetryUtils.retryWithException( - () -> nodeEngine.getMasterAddress().equals(nodeEngine.getThisAddress()), + () -> nodeEngine.getThisAddress().equals(nodeEngine.getMasterAddress()), new RetryUtils.RetryMaterial( Constant.OPERATION_RETRY_TIME, true,