diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_8_0/4878-mb-bulk-export-processed-count.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_8_0/4878-mb-bulk-export-processed-count.yaml new file mode 100644 index 000000000000..1a439cf6ef5f --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_8_0/4878-mb-bulk-export-processed-count.yaml @@ -0,0 +1,4 @@ +--- +type: fix +issue: 4878 +title: "Batch jobs occasionaly reported zero (0) record processed counts. This has been corrected." diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/BulkDataErrorAbuseTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/BulkDataErrorAbuseTest.java index 6a1c1174aa3a..766f3edd748c 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/BulkDataErrorAbuseTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/BulkDataErrorAbuseTest.java @@ -1,6 +1,7 @@ package ca.uhn.fhir.jpa.batch2; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; +import ca.uhn.fhir.jpa.api.model.Batch2JobInfo; import ca.uhn.fhir.jpa.api.model.BulkExportJobResults; import ca.uhn.fhir.jpa.api.model.BulkExportParameters; import ca.uhn.fhir.jpa.api.svc.IBatch2JobRunner; @@ -33,6 +34,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; @@ -44,6 +46,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.emptyOrNullString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.not; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -181,7 +184,8 @@ private void duAbuseTest(int taskExecutions) throws InterruptedException, Execut private void verifyBulkExportResults(String theInstanceId, List theContainedList, List theExcludedList) { // Iterate over the files - String report = myJobRunner.getJobInfo(theInstanceId).getReport(); + Batch2JobInfo jobInfo = myJobRunner.getJobInfo(theInstanceId); + String report = jobInfo.getReport(); ourLog.debug("Export job {} report: {}", theInstanceId, report); if (!theContainedList.isEmpty()) { assertThat("report for instance " + theInstanceId + " is empty", report, not(emptyOrNullString())); @@ -227,6 +231,10 @@ private void verifyBulkExportResults(String theInstanceId, List theConta for (String excludedString : theExcludedList) { assertThat("export doesn't have expected ids", foundIds, not(hasItem(excludedString))); } + + assertThat(jobInfo.getCombinedRecordsProcessed(), equalTo(2)); + + ourLog.info("Job {} ok", theInstanceId); } private String startJob(BulkDataExportOptions theOptions) { diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/services/Batch2JobRunnerImpl.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/services/Batch2JobRunnerImpl.java index 90e7e7fd261c..05fecaf6b358 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/services/Batch2JobRunnerImpl.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/services/Batch2JobRunnerImpl.java @@ -104,6 +104,7 @@ private Batch2JobInfo fromJobInstanceToBatch2JobInfo(@Nonnull JobInstance theIns info.setEndTime(theInstance.getEndTime()); info.setReport(theInstance.getReport()); info.setErrorMsg(theInstance.getErrorMessage()); + info.setCombinedRecordsProcessed(theInstance.getCombinedRecordsProcessed()); if ( Batch2JobDefinitionConstants.BULK_EXPORT.equals(theInstance.getJobDefinitionId())) { BulkExportJobParameters parameters = theInstance.getParameters(BulkExportJobParameters.class); info.setRequestPartitionId(parameters.getPartitionId()); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java index 84fbfbe31bfa..3d95da060138 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java @@ -29,7 +29,6 @@ import ca.uhn.fhir.batch2.model.JobWorkNotification; import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; -import ca.uhn.fhir.batch2.progress.InstanceProgress; import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator; import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater; import ca.uhn.fhir.model.api.IModelJson; @@ -139,14 +138,6 @@ private void cleanupInstance(JobInstance theInstance) { if (theInstance.isFinished() && !theInstance.isWorkChunksPurged()) { myJobPersistence.deleteChunksAndMarkInstanceAsChunksPurged(theInstance.getInstanceId()); - - // update final statistics. - // wipmb For 6.8 - do we need to run stats again? If the status changed to finished, then we just ran them above. - InstanceProgress progress = myJobInstanceProgressCalculator.calculateInstanceProgress(theInstance.getInstanceId()); - myJobPersistence.updateInstance(theInstance.getInstanceId(), instance->{ - progress.updateInstance(instance); - return true; - }); } } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java index d5386c08bb67..a90e9a7d2564 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java @@ -128,6 +128,7 @@ public void updateInstance(JobInstance theInstance) { * @param theInstance the instance to update with progress statistics */ public void updateInstance(JobInstance theInstance, boolean theCalledFromReducer) { + ourLog.debug("updateInstance {}: {}", theInstance.getInstanceId(), this); if (myEarliestStartTime != null) { theInstance.setStartTime(myEarliestStartTime); } diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java index b42955683a09..13104c121879 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java @@ -295,7 +295,7 @@ public void testInProgress_CalculateProgress_AllStepsComplete() { // Verify - verify(myJobPersistence, times(2)).updateInstance(eq(INSTANCE_ID), any()); + verify(myJobPersistence, times(1)).updateInstance(eq(INSTANCE_ID), any()); assertEquals(1.0, instance.getProgress()); assertEquals(StatusEnum.COMPLETED, instance.getStatus()); @@ -342,7 +342,7 @@ public void testInProgress_CalculateProgress_OneStepFailed() { assertEquals(parseTime("2022-02-12T14:10:00-04:00"), instance.getEndTime()); // twice - once to move to FAILED, and once to purge the chunks - verify(myJobPersistence, times(2)).updateInstance(eq(INSTANCE_ID), any()); + verify(myJobPersistence, times(1)).updateInstance(eq(INSTANCE_ID), any()); verify(myJobPersistence, times(1)).deleteChunksAndMarkInstanceAsChunksPurged(eq(INSTANCE_ID)); verifyNoMoreInteractions(myJobPersistence); diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/model/Batch2JobInfo.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/model/Batch2JobInfo.java index bdbdf32c5e0a..945e24c9564f 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/model/Batch2JobInfo.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/model/Batch2JobInfo.java @@ -48,6 +48,7 @@ public class Batch2JobInfo { private String myReport; private RequestPartitionId myRequestPartitionId; + private Integer myCombinedRecordsProcessed; public String getJobId() { return myJobId; @@ -112,4 +113,13 @@ public RequestPartitionId getRequestPartitionId() { public void setRequestPartitionId(RequestPartitionId theRequestPartitionId) { myRequestPartitionId = theRequestPartitionId; } + + public Integer getCombinedRecordsProcessed() { + return myCombinedRecordsProcessed; + } + + public void setCombinedRecordsProcessed(Integer theCombinedRecordsProcessed) { + myCombinedRecordsProcessed = theCombinedRecordsProcessed; + } + }