Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
type: fix
issue: 4846
title: "Job maintenance service would throw an exception if a job definition is unknown, this would run maintenance on every job instance after it.
Now the maintenance will skip over unknown job definitions and display a warning log message indication a job definition is missing."
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
* </p>
*/
public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasScheduledJobs {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
static final Logger ourLog = Logs.getBatchTroubleshootingLog();

public static final int INSTANCES_PER_PASS = 100;
public static final String SCHEDULED_JOB_ID = JobMaintenanceScheduledJob.class.getName();
Expand Down Expand Up @@ -218,12 +218,17 @@ private void doMaintenancePass() {

for (JobInstance instance : instances) {
String instanceId = instance.getInstanceId();
if (processedInstanceIds.add(instanceId)) {
myJobDefinitionRegistry.setJobDefinition(instance);
JobInstanceProcessor jobInstanceProcessor = new JobInstanceProcessor(myJobPersistence,
myBatchJobSender, instanceId, progressAccumulator, myReductionStepExecutorService, myJobDefinitionRegistry);
ourLog.debug("Triggering maintenance process for instance {} in status {}", instanceId, instance.getStatus());
jobInstanceProcessor.process();
if (myJobDefinitionRegistry.getJobDefinition(instance.getJobDefinitionId(),instance.getJobDefinitionVersion()).isPresent()) {
if (processedInstanceIds.add(instanceId)) {
myJobDefinitionRegistry.setJobDefinition(instance);
JobInstanceProcessor jobInstanceProcessor = new JobInstanceProcessor(myJobPersistence,
myBatchJobSender, instanceId, progressAccumulator, myReductionStepExecutorService, myJobDefinitionRegistry);
ourLog.debug("Triggering maintenance process for instance {} in status {}", instanceId, instance.getStatus());
jobInstanceProcessor.process();
}
}
else {
ourLog.warn("Job definition {} for instance {} is currently unavailable", instance.getJobDefinitionId(), instanceId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.test.util.LogbackCaptureTestExtension;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import com.google.common.collect.Lists;
import org.hl7.fhir.r4.model.DateTimeType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
Expand Down Expand Up @@ -53,6 +58,7 @@
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
Expand All @@ -61,6 +67,8 @@
@ExtendWith(MockitoExtension.class)
public class JobMaintenanceServiceImplTest extends BaseBatch2Test {

@RegisterExtension
LogbackCaptureTestExtension myLogCapture = new LogbackCaptureTestExtension((Logger) JobMaintenanceServiceImpl.ourLog, Level.WARN);
@Mock
IJobCompletionHandler<TestJobParameters> myCompletionHandler;
@Mock
Expand Down Expand Up @@ -115,6 +123,26 @@ public void testInProgress_CalculateProgress_FirstCompleteButNoOtherStepsYetComp
verify(myJobPersistence, times(1)).updateInstance(any(), any());
}

@Test
public void testInProgress_Calculate_progresss_JobDefinitionMissing() {
ArgumentCaptor<ILoggingEvent> logCaptor = ArgumentCaptor.forClass(ILoggingEvent.class);
List<WorkChunk> chunks = List.of(
JobCoordinatorImplTest.createWorkChunk(STEP_1, null).setStatus(WorkChunkStatusEnum.COMPLETED),
JobCoordinatorImplTest.createWorkChunk(STEP_2, null).setStatus(WorkChunkStatusEnum.QUEUED)
);

JobInstance instance = createInstance();
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(List.of(instance));

mySvc.runMaintenancePass();

String assumedRoleLogText = String.format("Job definition %s for instance %s is currently unavailable", JOB_DEFINITION_ID, instance.getInstanceId());
List<ILoggingEvent> fetchedCredentialLogs = myLogCapture.filterLoggingEventsWithMessageEqualTo(assumedRoleLogText);
assertEquals(1, fetchedCredentialLogs.size());

verify(myJobPersistence, never()).updateInstance(any(), any());
}

@Test
public void testInProgress_CalculateProgress_FirstStepComplete() {
List<WorkChunk> chunks = Arrays.asList(
Expand Down