File tree Expand file tree Collapse file tree 2 files changed +13
-2
lines changed
main/java/org/apache/flink/runtime/executiongraph
test/java/org/apache/flink/runtime/jobmaster Expand file tree Collapse file tree 2 files changed +13
-2
lines changed Original file line number Diff line number Diff line change @@ -1648,6 +1648,15 @@ public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
16481648 public void registerJobStatusListener (JobStatusListener listener ) {
16491649 if (listener != null ) {
16501650 jobStatusListeners .add (listener );
1651+ // Emit current state to the newly registered listener
1652+ // This ensures listeners don't miss the initial state
1653+ try {
1654+ listener .jobStatusChanges (getJobID (), state , stateTimestamps [state .ordinal ()]);
1655+ } catch (Throwable t ) {
1656+ LOG .warn (
1657+ "Error while notifying newly registered JobStatusListener of current state" ,
1658+ t );
1659+ }
16511660 }
16521661 }
16531662
Original file line number Diff line number Diff line change @@ -1714,7 +1714,8 @@ void testJobFailureWhenGracefulTaskExecutorTermination() throws Exception {
17141714 .equals (event .getName ()))
17151715 .map (Event ::getAttributes )
17161716 .map (x -> x .get ("newJobStatus" )))
1717- .containsSubsequence (
1717+ .containsExactly (
1718+ JobStatus .CREATED .toString (),
17181719 JobStatus .RUNNING .toString (),
17191720 JobStatus .FAILING .toString (),
17201721 JobStatus .FAILED .toString ());
@@ -1755,7 +1756,8 @@ void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws Exception {
17551756 .equals (event .getName ()))
17561757 .map (Event ::getAttributes )
17571758 .map (x -> x .get ("newJobStatus" )))
1758- .containsSubsequence (
1759+ .containsExactly (
1760+ JobStatus .CREATED .toString (),
17591761 JobStatus .RUNNING .toString (),
17601762 JobStatus .FAILING .toString (),
17611763 JobStatus .FAILED .toString ());
You can’t perform that action at this time.
0 commit comments