Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
type: fix
issue: 5632
title: "Previously bulk export operation was returning an empty response when no resources matched the request, which
didn't comply with [HL7 HAPI IG](https://hl7.org/fhir/uv/bulkdata/export/index.html#response---complete-status).
This has been corrected."
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,9 @@ public void exportPollStatus(

String serverBase = getServerBase(theRequestDetails);

// an output is required, even if empty, according to HL7 FHIR IG
bulkResponseDocument.getOutput();

for (Map.Entry<String, List<String>> entrySet :
results.getResourceTypeToBinaryIds().entrySet()) {
String resourceType = entrySet.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ public void executeStep() {
return;
}

if (stepExecutorOutput.getDataSink().firstStepProducedNothing()) {
if (stepExecutorOutput.getDataSink().firstStepProducedNothing() && !lastStepIsReduction()) {
ourLog.info(
"First step of job myInstance {} produced no work chunks, marking as completed and setting end date",
"First step of job myInstance {} produced no work chunks and last step is not a reduction, "
+ "marking as completed and setting end date",
myInstanceId);
myJobPersistence.updateInstance(myInstance.getInstanceId(), instance -> {
instance.setEndTime(new Date());
Expand All @@ -92,6 +93,11 @@ public void executeStep() {
}
}

private boolean lastStepIsReduction() {
int stepCount = myDefinition.getSteps().size();
return stepCount >= 1 && myDefinition.getSteps().get(stepCount - 1).isReductionStep();
}

private void handleFastTracking(BaseDataSink<PT, IT, OT> theDataSink) {
if (theDataSink.getWorkChunkCount() <= 1) {
ourLog.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,12 @@ private int getChunkCount() {
/**
* Transitions from IN_PROGRESS/ERRORED based on chunk statuses.
*/
public void calculateNewStatus() {
public void calculateNewStatus(boolean theLastStepIsReduction) {
if (myFailedChunkCount > 0) {
myNewStatus = StatusEnum.FAILED;
} else if (myErroredChunkCount > 0) {
myNewStatus = StatusEnum.ERRORED;
} else if (myIncompleteChunkCount == 0 && myCompleteChunkCount > 0) {
} else if (myIncompleteChunkCount == 0 && myCompleteChunkCount > 0 && !theLastStepIsReduction) {
myNewStatus = StatusEnum.COMPLETED;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,34 @@
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.maintenance.JobChunkProgressAccumulator;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.util.StopWatch;
import jakarta.annotation.Nonnull;
import org.slf4j.Logger;

import java.util.Iterator;
import java.util.Optional;

public class JobInstanceProgressCalculator {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
private final IJobPersistence myJobPersistence;
private final JobChunkProgressAccumulator myProgressAccumulator;
private final JobInstanceStatusUpdater myJobInstanceStatusUpdater;
private final JobDefinitionRegistry myJobDefinitionRegistry;

public JobInstanceProgressCalculator(
IJobPersistence theJobPersistence,
JobChunkProgressAccumulator theProgressAccumulator,
JobDefinitionRegistry theJobDefinitionRegistry) {
myJobPersistence = theJobPersistence;
myProgressAccumulator = theProgressAccumulator;
myJobDefinitionRegistry = theJobDefinitionRegistry;
myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry);
}

Expand Down Expand Up @@ -96,8 +104,21 @@ public InstanceProgress calculateInstanceProgress(String instanceId) {
}

// wipmb separate status update from stats collection in 6.8
instanceProgress.calculateNewStatus();
instanceProgress.calculateNewStatus(lastStepIsReduction(instanceId));

return instanceProgress;
}

private boolean lastStepIsReduction(String theInstanceId) {
JobInstance jobInstance = getJobInstance(theInstanceId);
JobDefinition<IModelJson> jobDefinition = myJobDefinitionRegistry.getJobDefinitionOrThrowException(jobInstance);
int stepCount = jobDefinition.getSteps().size();
return stepCount >= 1 && jobDefinition.getSteps().get(stepCount - 1).isReductionStep();
}

private JobInstance getJobInstance(String theInstanceId) {
Optional<JobInstance> oInstance = myJobPersistence.fetchInstance(theInstanceId);
return oInstance.orElseThrow(() ->
new InternalErrorException(Msg.code(2486) + "Failed to fetch JobInstance with id: " + theInstanceId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.Collections;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -30,6 +31,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -90,12 +92,16 @@ public void accept_validInputSubmittedOnlyOnce_updatesInstanceWithData() {
String data = "data";
StepOutputData stepData = new StepOutputData(data);
WorkChunkData<StepOutputData> chunkData = new WorkChunkData<>(stepData);
@SuppressWarnings("unchecked")
JobDefinition<IModelJson> jobDefinition = mock(JobDefinition.class);

// when
JobInstance instance = JobInstance.fromInstanceId(INSTANCE_ID);
instance.setStatus(StatusEnum.FINALIZE);
stubUpdateInstanceCallback(instance);
when(myJobPersistence.fetchAllWorkChunksIterator(any(), anyBoolean())).thenReturn(Collections.emptyIterator());
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance));
when(myJobDefinitionRegistry.getJobDefinitionOrThrowException(instance)).thenReturn(jobDefinition);

// test
myDataSink.accept(chunkData);
Expand All @@ -111,13 +117,17 @@ public void accept_multipleCalls_firstInWins() {
String data2 = "data2";
WorkChunkData<StepOutputData> firstData = new WorkChunkData<>(new StepOutputData(data));
WorkChunkData<StepOutputData> secondData = new WorkChunkData<>(new StepOutputData(data2));
@SuppressWarnings("unchecked")
JobDefinition<IModelJson> jobDefinition = mock(JobDefinition.class);

ourLogger.setLevel(Level.ERROR);

JobInstance instance = JobInstance.fromInstanceId(INSTANCE_ID);
instance.setStatus(StatusEnum.FINALIZE);
when(myJobPersistence.fetchAllWorkChunksIterator(any(), anyBoolean())).thenReturn(Collections.emptyIterator());
stubUpdateInstanceCallback(instance);
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance));
when(myJobDefinitionRegistry.getJobDefinitionOrThrowException(instance)).thenReturn(jobDefinition);

// test
myDataSink.accept(firstData);
Expand All @@ -136,10 +146,15 @@ private void stubUpdateInstanceCallback(JobInstance theJobInstance) {
@Test
public void accept_noInstanceIdFound_throwsJobExecutionFailed() {
// setup
JobInstance jobInstance = mock(JobInstance.class);
@SuppressWarnings("unchecked")
JobDefinition<IModelJson> jobDefinition = (JobDefinition<IModelJson>) mock(JobDefinition.class);
String data = "data";
WorkChunkData<StepOutputData> chunkData = new WorkChunkData<>(new StepOutputData(data));
when(myJobPersistence.updateInstance(any(), any())).thenReturn(false);
when(myJobPersistence.fetchAllWorkChunksIterator(any(), anyBoolean())).thenReturn(Collections.emptyIterator());
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(jobInstance));
when(myJobDefinitionRegistry.getJobDefinitionOrThrowException(jobInstance)).thenReturn(jobDefinition);

// test
try {
Expand All @@ -151,5 +166,4 @@ public void accept_noInstanceIdFound_throwsJobExecutionFailed() {
fail("Unexpected exception", anyOtherEx);
}
}

}