diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_0_0/5632-bulk-export-response-must-have-required-fields.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_0_0/5632-bulk-export-response-must-have-required-fields.yaml new file mode 100644 index 000000000000..cc207611c3f3 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_0_0/5632-bulk-export-response-must-have-required-fields.yaml @@ -0,0 +1,6 @@ +--- +type: fix +issue: 5632 +title: "Previously bulk export operation was returning an empty response when no resources matched the request, which + didn't comply with [HL7 HAPI IG](https://hl7.org/fhir/uv/bulkdata/export/index.html#response---complete-status). + This has been corrected." diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkExportUseCaseTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkExportUseCaseTest.java index 6899c125cb72..88e989e2ebd1 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkExportUseCaseTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/bulk/BulkExportUseCaseTest.java @@ -87,6 +87,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.startsWith; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -299,9 +300,9 @@ public void export_shouldExportPatientAndObservationAndEncounterResources_whenTy assertThat(result.getRequiresAccessToken(), is(equalTo(true))); assertThat(result.getTransactionTime(), is(notNullValue())); assertEquals(result.getOutput().size(), 3); - assertEquals(1, result.getOutput().stream().filter(o -> o.getType().equals("Patient")).collect(Collectors.toList()).size()); - assertEquals(1, result.getOutput().stream().filter(o -> o.getType().equals("Observation")).collect(Collectors.toList()).size()); - assertEquals(1, result.getOutput().stream().filter(o -> o.getType().equals("Encounter")).collect(Collectors.toList()).size()); + assertEquals(1, result.getOutput().stream().filter(o -> o.getType().equals("Patient")).count()); + assertEquals(1, result.getOutput().stream().filter(o -> o.getType().equals("Observation")).count()); + assertEquals(1, result.getOutput().stream().filter(o -> o.getType().equals("Encounter")).count()); //We assert specifically on content as the deserialized version will "helpfully" fill in missing fields. assertThat(responseContent, containsString("\"error\" : [ ]")); @@ -338,8 +339,8 @@ public void export_shouldNotExportBinaryResource_whenTypeParameterOmitted() thro assertThat(result.getRequiresAccessToken(), is(equalTo(true))); assertThat(result.getTransactionTime(), is(notNullValue())); assertEquals(result.getOutput().size(), 1); - assertEquals(1, result.getOutput().stream().filter(o -> o.getType().equals("Patient")).collect(Collectors.toList()).size()); - assertEquals(0, result.getOutput().stream().filter(o -> o.getType().equals("Binary")).collect(Collectors.toList()).size()); + assertEquals(1, result.getOutput().stream().filter(o -> o.getType().equals("Patient")).count()); + assertEquals(0, result.getOutput().stream().filter(o -> o.getType().equals("Binary")).count()); //We assert specifically on content as the deserialized version will "helpfully" fill in missing fields. assertThat(responseContent, containsString("\"error\" : [ ]")); @@ -381,7 +382,7 @@ public void testBinariesAreStreamedWithRespectToAcceptHeader() throws IOExceptio } HashSet types = Sets.newHashSet("Patient"); - BulkExportJobResults bulkExportJobResults = startSystemBulkExportJobAndAwaitCompletion(types, new HashSet()); + BulkExportJobResults bulkExportJobResults = startSystemBulkExportJobAndAwaitCompletion(types, new HashSet<>()); Map> resourceTypeToBinaryIds = bulkExportJobResults.getResourceTypeToBinaryIds(); assertThat(resourceTypeToBinaryIds.get("Patient"), hasSize(1)); String patientBinaryId = resourceTypeToBinaryIds.get("Patient").get(0); @@ -477,7 +478,7 @@ public void testResourceCountIsCorrect() { String entities = myJobInstanceRepository .findAll() .stream() - .map(t -> t.toString()) + .map(Batch2JobInstanceEntity::toString) .collect(Collectors.joining("\n * ")); ourLog.info("Entities:\n * " + entities); }); @@ -492,6 +493,41 @@ public void testResourceCountIsCorrect() { assertEquals(patientCount, jobInstance.getCombinedRecordsProcessed()); } + @Test + public void testEmptyExport() { + BulkExportJobParameters options = new BulkExportJobParameters(); + options.setResourceTypes(Collections.singleton("Patient")); + options.setFilters(Collections.emptySet()); + options.setExportStyle(BulkExportJobParameters.ExportStyle.SYSTEM); + options.setOutputFormat(Constants.CT_FHIR_NDJSON); + + JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); + startRequest.setJobDefinitionId(Batch2JobDefinitionConstants.BULK_EXPORT); + startRequest.setParameters(options); + Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(mySrd, startRequest); + + assertNotNull(startResponse); + + final String jobId = startResponse.getInstanceId(); + + // Run a scheduled pass to build the export + myBatch2JobHelper.awaitJobCompletion(startResponse.getInstanceId()); + runInTransaction(() -> { + String entities = myJobInstanceRepository + .findAll() + .stream() + .map(Batch2JobInstanceEntity::toString) + .collect(Collectors.joining("\n * ")); + ourLog.info("Entities:\n * " + entities); + }); + + final Optional optJobInstance = myJobPersistence.fetchInstance(jobId); + assertNotNull(optJobInstance); + assertTrue(optJobInstance.isPresent()); + assertThat(optJobInstance.get().getReport(), + containsString("Export complete, but no data to generate report for job instance:")); + } + private void logContentTypeAndResponse(Header[] headers, String response) { ourLog.info("**************************"); ourLog.info("Content-Type is: {}", headers[0]); @@ -542,7 +578,7 @@ public void testPatientExportIgnoresResourcesNotInPatientCompartment() { // test HashSet types = Sets.newHashSet("Patient", "Observation"); - BulkExportJobResults bulkExportJobResults = startPatientBulkExportJobAndAwaitResults(types, new HashSet(), "ha"); + BulkExportJobResults bulkExportJobResults = startPatientBulkExportJobAndAwaitResults(types, new HashSet<>(), "ha"); Map> typeToResources = convertJobResultsToResources(bulkExportJobResults); assertThat(typeToResources.get("Patient"), hasSize(1)); assertThat(typeToResources.get("Observation"), hasSize(1)); @@ -605,6 +641,34 @@ public void testBulkExportWithLowMaxFileCapacity() { assertTrue(patientIds.contains(resourceId)); } } + + @Test + public void testExportEmptyResult() { + BulkExportJobParameters options = new BulkExportJobParameters(); + options.setResourceTypes(Sets.newHashSet("Patient")); + options.setExportStyle(BulkExportJobParameters.ExportStyle.PATIENT); + options.setOutputFormat(Constants.CT_FHIR_NDJSON); + + JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); + startRequest.setJobDefinitionId(Batch2JobDefinitionConstants.BULK_EXPORT); + startRequest.setParameters(options); + Batch2JobStartResponse job = myJobCoordinator.startInstance(mySrd, startRequest); + myBatch2JobHelper.awaitJobCompletion(job.getInstanceId(), 60); + ourLog.debug("Job status after awaiting - {}", myJobCoordinator.getInstance(job.getInstanceId()).getStatus()); + await() + .atMost(300, TimeUnit.SECONDS) + .until(() -> { + StatusEnum status = myJobCoordinator.getInstance(job.getInstanceId()).getStatus(); + if (!StatusEnum.COMPLETED.equals(status)) { + fail("Job status was changed from COMPLETE to " + status); + } + return myJobCoordinator.getInstance(job.getInstanceId()).getReport() != null; + }); + + String report = myJobCoordinator.getInstance(job.getInstanceId()).getReport(); + assertThat(report, + containsString("Export complete, but no data to generate report for job instance:")); + } } @@ -1081,7 +1145,7 @@ public void testGroupBulkExportWithTypeFilter_ReturnsOnlyResourcesInTypeFilter() } } ] - } + } """; Bundle bundle = parser.parseResource(Bundle.class, bundleStr); myClient.transaction().withBundle(bundle).execute(); @@ -1218,6 +1282,21 @@ public void testGroupExportPatientOnly() { assertThat(typeToContents.get("Patient"), not(containsString("POG2"))); } + @Test + public void testExportEmptyResult() { + Group group = new Group(); + group.setId("Group/G-empty"); + group.setActive(true); + myClient.update().resource(group).execute(); + + HashSet resourceTypes = Sets.newHashSet("Patient"); + BulkExportJobResults bulkExportJobResults = startGroupBulkExportJobAndAwaitCompletion( + resourceTypes, new HashSet<>(), "G-empty"); + + assertThat(bulkExportJobResults.getReportMsg(), + startsWith("Export complete, but no data to generate report for job instance:")); + } + @Test public void testGroupBulkExportMultipleResourceTypes() { Patient patient = new Patient(); @@ -1398,7 +1477,7 @@ private Map convertJobResultsToStringContents(BulkExportJobResul Map> convertJobResultsToResources(BulkExportJobResults theResults) { Map stringStringMap = convertJobResultsToStringContents(theResults); Map> typeToResources = new HashMap<>(); - stringStringMap.entrySet().forEach(entry -> typeToResources.put(entry.getKey(), convertNDJSONToResources(entry.getValue()))); + stringStringMap.forEach((key, value) -> typeToResources.put(key, convertNDJSONToResources(value))); return typeToResources; } @@ -1412,8 +1491,7 @@ private List convertNDJSONToResources(String theValue) { private String getBinaryContentsAsString(String theBinaryId) { Binary binary = myBinaryDao.read(new IdType(theBinaryId)); assertEquals(Constants.CT_FHIR_NDJSON, binary.getContentType()); - String contents = new String(binary.getContent(), Constants.CHARSET_UTF8); - return contents; + return new String(binary.getContent(), Constants.CHARSET_UTF8); } BulkExportJobResults startGroupBulkExportJobAndAwaitCompletion(HashSet theResourceTypes, HashSet theFilters, String theGroupId) { @@ -1509,8 +1587,7 @@ BulkExportJobResults startBulkExportJobAndAwaitCompletion( await().atMost(300, TimeUnit.SECONDS).until(() -> myJobCoordinator.getInstance(jobInstanceId).getReport() != null); String report = myJobCoordinator.getInstance(jobInstanceId).getReport(); - BulkExportJobResults results = JsonUtil.deserialize(report, BulkExportJobResults.class); - return results; + return JsonUtil.deserialize(report, BulkExportJobResults.class); } private void verifyBulkExportResults(String theGroupId, HashSet theFilters, List theContainedList, List theExcludedList) { diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/BulkDataExportProvider.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/BulkDataExportProvider.java index 686297507405..1df5e286a99c 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/BulkDataExportProvider.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/export/BulkDataExportProvider.java @@ -502,6 +502,9 @@ public void exportPollStatus( String serverBase = getServerBase(theRequestDetails); + // an output is required, even if empty, according to HL7 FHIR IG + bulkResponseDocument.getOutput(); + for (Map.Entry> entrySet : results.getResourceTypeToBinaryIds().entrySet()) { String resourceType = entrySet.getKey(); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobStepExecutor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobStepExecutor.java index b0ae258fd272..a39dccb9a757 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobStepExecutor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobStepExecutor.java @@ -75,9 +75,10 @@ public void executeStep() { return; } - if (stepExecutorOutput.getDataSink().firstStepProducedNothing()) { + if (stepExecutorOutput.getDataSink().firstStepProducedNothing() && !myDefinition.isLastStepReduction()) { ourLog.info( - "First step of job myInstance {} produced no work chunks, marking as completed and setting end date", + "First step of job myInstance {} produced no work chunks and last step is not a reduction, " + + "marking as completed and setting end date", myInstanceId); myJobPersistence.updateInstance(myInstance.getInstanceId(), instance -> { instance.setEndTime(new Date()); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobDefinition.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobDefinition.java index 29ac2faf4f5c..d10ca861a3f6 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobDefinition.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobDefinition.java @@ -145,6 +145,11 @@ public boolean isGatedExecution() { return myGatedExecution; } + public boolean isLastStepReduction() { + int stepCount = getSteps().size(); + return stepCount >= 1 && getSteps().get(stepCount - 1).isReductionStep(); + } + public int getStepIndex(String theStepId) { int retVal = myStepIds.indexOf(theStepId); Validate.isTrue(retVal != -1); @@ -304,9 +309,9 @@ public Builder addFinalReducerStep( throw new ConfigurationException(Msg.code(2106) + String.format("Job Definition %s has a reducer step but is not gated", myJobDefinitionId)); } - mySteps.add(new JobDefinitionReductionStep( + mySteps.add(new JobDefinitionReductionStep<>( theStepId, theStepDescription, theStepWorker, myNextInputType, theOutputType)); - return new Builder( + return new Builder<>( mySteps, myJobDefinitionId, myJobDefinitionVersion, diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java index a21d6c595e2b..790ed970c1ae 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java @@ -192,12 +192,12 @@ private int getChunkCount() { /** * Transitions from IN_PROGRESS/ERRORED based on chunk statuses. */ - public void calculateNewStatus() { + public void calculateNewStatus(boolean theLastStepIsReduction) { if (myFailedChunkCount > 0) { myNewStatus = StatusEnum.FAILED; } else if (myErroredChunkCount > 0) { myNewStatus = StatusEnum.ERRORED; - } else if (myIncompleteChunkCount == 0 && myCompleteChunkCount > 0) { + } else if (myIncompleteChunkCount == 0 && myCompleteChunkCount > 0 && !theLastStepIsReduction) { myNewStatus = StatusEnum.COMPLETED; } } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/JobInstanceProgressCalculator.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/JobInstanceProgressCalculator.java index e5ce87c8a589..348fd30e5404 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/JobInstanceProgressCalculator.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/JobInstanceProgressCalculator.java @@ -22,19 +22,26 @@ import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry; import ca.uhn.fhir.batch2.maintenance.JobChunkProgressAccumulator; +import ca.uhn.fhir.batch2.model.JobDefinition; +import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.WorkChunk; +import ca.uhn.fhir.i18n.Msg; +import ca.uhn.fhir.model.api.IModelJson; +import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.util.Logs; import ca.uhn.fhir.util.StopWatch; import jakarta.annotation.Nonnull; import org.slf4j.Logger; import java.util.Iterator; +import java.util.Optional; public class JobInstanceProgressCalculator { private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); private final IJobPersistence myJobPersistence; private final JobChunkProgressAccumulator myProgressAccumulator; private final JobInstanceStatusUpdater myJobInstanceStatusUpdater; + private final JobDefinitionRegistry myJobDefinitionRegistry; public JobInstanceProgressCalculator( IJobPersistence theJobPersistence, @@ -42,6 +49,7 @@ public JobInstanceProgressCalculator( JobDefinitionRegistry theJobDefinitionRegistry) { myJobPersistence = theJobPersistence; myProgressAccumulator = theProgressAccumulator; + myJobDefinitionRegistry = theJobDefinitionRegistry; myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry); } @@ -96,8 +104,20 @@ public InstanceProgress calculateInstanceProgress(String instanceId) { } // wipmb separate status update from stats collection in 6.8 - instanceProgress.calculateNewStatus(); + instanceProgress.calculateNewStatus(lastStepIsReduction(instanceId)); return instanceProgress; } + + private boolean lastStepIsReduction(String theInstanceId) { + JobInstance jobInstance = getJobInstance(theInstanceId); + JobDefinition jobDefinition = myJobDefinitionRegistry.getJobDefinitionOrThrowException(jobInstance); + return jobDefinition.isLastStepReduction(); + } + + private JobInstance getJobInstance(String theInstanceId) { + Optional oInstance = myJobPersistence.fetchInstance(theInstanceId); + return oInstance.orElseThrow(() -> + new InternalErrorException(Msg.code(2486) + "Failed to fetch JobInstance with id: " + theInstanceId)); + } } diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/ReductionStepDataSinkTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/ReductionStepDataSinkTest.java index 425f1d4b66e4..28d246ccd4fb 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/ReductionStepDataSinkTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/ReductionStepDataSinkTest.java @@ -22,6 +22,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import java.util.Collections; +import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -30,6 +31,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -90,12 +92,16 @@ public void accept_validInputSubmittedOnlyOnce_updatesInstanceWithData() { String data = "data"; StepOutputData stepData = new StepOutputData(data); WorkChunkData chunkData = new WorkChunkData<>(stepData); + @SuppressWarnings("unchecked") + JobDefinition jobDefinition = mock(JobDefinition.class); // when JobInstance instance = JobInstance.fromInstanceId(INSTANCE_ID); instance.setStatus(StatusEnum.FINALIZE); stubUpdateInstanceCallback(instance); when(myJobPersistence.fetchAllWorkChunksIterator(any(), anyBoolean())).thenReturn(Collections.emptyIterator()); + when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance)); + when(myJobDefinitionRegistry.getJobDefinitionOrThrowException(instance)).thenReturn(jobDefinition); // test myDataSink.accept(chunkData); @@ -111,6 +117,8 @@ public void accept_multipleCalls_firstInWins() { String data2 = "data2"; WorkChunkData firstData = new WorkChunkData<>(new StepOutputData(data)); WorkChunkData secondData = new WorkChunkData<>(new StepOutputData(data2)); + @SuppressWarnings("unchecked") + JobDefinition jobDefinition = mock(JobDefinition.class); ourLogger.setLevel(Level.ERROR); @@ -118,6 +126,8 @@ public void accept_multipleCalls_firstInWins() { instance.setStatus(StatusEnum.FINALIZE); when(myJobPersistence.fetchAllWorkChunksIterator(any(), anyBoolean())).thenReturn(Collections.emptyIterator()); stubUpdateInstanceCallback(instance); + when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance)); + when(myJobDefinitionRegistry.getJobDefinitionOrThrowException(instance)).thenReturn(jobDefinition); // test myDataSink.accept(firstData); @@ -136,10 +146,15 @@ private void stubUpdateInstanceCallback(JobInstance theJobInstance) { @Test public void accept_noInstanceIdFound_throwsJobExecutionFailed() { // setup + JobInstance jobInstance = mock(JobInstance.class); + @SuppressWarnings("unchecked") + JobDefinition jobDefinition = (JobDefinition) mock(JobDefinition.class); String data = "data"; WorkChunkData chunkData = new WorkChunkData<>(new StepOutputData(data)); when(myJobPersistence.updateInstance(any(), any())).thenReturn(false); when(myJobPersistence.fetchAllWorkChunksIterator(any(), anyBoolean())).thenReturn(Collections.emptyIterator()); + when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(jobInstance)); + when(myJobDefinitionRegistry.getJobDefinitionOrThrowException(jobInstance)).thenReturn(jobDefinition); // test try { @@ -151,5 +166,4 @@ public void accept_noInstanceIdFound_throwsJobExecutionFailed() { fail("Unexpected exception", anyOtherEx); } } - }