From 4167f48da6c39af30362263504b0afad593cfc4f Mon Sep 17 00:00:00 2001 From: Luke deGruchy Date: Wed, 10 May 2023 10:14:15 -0400 Subject: [PATCH 1/9] Add failing unit test. --- .../fhir/jpa/batch2/Batch2CoordinatorIT.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java index 38730c336348..be9f2a4b00c2 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java @@ -219,6 +219,15 @@ public void testFirstStepToSecondStep_singleChunkFasttracks() throws Interrupted // Since there was only one chunk, the job should proceed without requiring a maintenance pass myBatch2JobHelper.awaitJobCompletion(batchJobId); myLastStepLatch.awaitExpected(); + + final List jobInstances = myJobPersistence.fetchInstances(10, 0); + + assertEquals(1, jobInstances.size()); + + final JobInstance jobInstance = jobInstances.get(0); + + assertEquals(StatusEnum.COMPLETED, jobInstance.getStatus()); + assertEquals(1.0, jobInstance.getProgress()); } private void createThreeStepReductionJob( @@ -360,6 +369,15 @@ private void complete( testInfo + i )); } + + final List jobInstances = myJobPersistence.fetchInstances(10, 0); + + assertEquals(1, jobInstances.size()); + + final JobInstance jobInstance = jobInstances.get(0); + + assertEquals(StatusEnum.COMPLETED, jobInstance.getStatus()); + assertEquals(1.0, jobInstance.getProgress()); } @Test From 686ea69c32b9c2b6451ce78cc8501f999172f5a7 Mon Sep 17 00:00:00 2001 From: Luke deGruchy Date: Wed, 10 May 2023 13:55:58 -0400 Subject: [PATCH 2/9] Add conditional logic to the InstanceProgress progress percentage to disregard the incomplete count if this is called from the reduction step. This is to get around a race condition in which a work chunk is QUEUED and not yet complete when the reduction step calculates the progress. --- .../coordinator/ReductionStepDataSink.java | 2 +- .../batch2/progress/InstanceProgress.java | 22 +++++++++++++++++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepDataSink.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepDataSink.java index 7c348888219e..639d8f9e4cbf 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepDataSink.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/ReductionStepDataSink.java @@ -86,7 +86,7 @@ public void accept(WorkChunkData theData) { * here. Until then though, this is safer. */ - progress.updateInstance(instance); + progress.updateInstanceForReductionStep(instance); instance.setReport(dataString); instance.setStatus(StatusEnum.COMPLETED); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java index e7ade344c4cf..29cf9a1a82e6 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java @@ -105,13 +105,29 @@ private void updateRecordsProcessed(WorkChunk theChunk) { } } + /** + * Signal to the progress calculator to skip the incomplete work chunk count when determining the completed percentage. + *

+ * This is a hack: The reason we do this is to get around a race condition in which all work chunks are complete but + * the last chunk is * still in QUEUED status and will only be marked COMPLETE later. + * + * @param theInstance The Batch 2 {@link JobInstance} that we're updating + */ + public void updateInstanceForReductionStep(JobInstance theInstance) { + updateInstance(theInstance, true); + } + + public void updateInstance(JobInstance theInstance) { + updateInstance(theInstance, false); + } + /** * Update the job instance with status information. * We shouldn't read any values from theInstance here -- just write. * * @param theInstance the instance to update with progress statistics */ - public void updateInstance(JobInstance theInstance) { + public void updateInstance(JobInstance theInstance, boolean theCalledFromReducer) { if (myEarliestStartTime != null) { theInstance.setStartTime(myEarliestStartTime); } @@ -122,7 +138,9 @@ public void updateInstance(JobInstance theInstance) { theInstance.setCombinedRecordsProcessed(myRecordsProcessed); if (getChunkCount() > 0) { - double percentComplete = (double) (myCompleteChunkCount) / (double) getChunkCount(); + final int chunkCount = getChunkCount(); + final int conditionalChunkCount = theCalledFromReducer ? (chunkCount - myIncompleteChunkCount) : chunkCount; + double percentComplete = (double) (myCompleteChunkCount) / (double) conditionalChunkCount; theInstance.setProgress(percentComplete); } From 36dec09853bec3759a632c618852fc3b4834e446 Mon Sep 17 00:00:00 2001 From: Luke deGruchy Date: Wed, 10 May 2023 13:58:25 -0400 Subject: [PATCH 3/9] Add final. --- .../main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java index 29cf9a1a82e6..d5386c08bb67 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java @@ -140,7 +140,7 @@ public void updateInstance(JobInstance theInstance, boolean theCalledFromReducer if (getChunkCount() > 0) { final int chunkCount = getChunkCount(); final int conditionalChunkCount = theCalledFromReducer ? (chunkCount - myIncompleteChunkCount) : chunkCount; - double percentComplete = (double) (myCompleteChunkCount) / (double) conditionalChunkCount; + final double percentComplete = (double) (myCompleteChunkCount) / (double) conditionalChunkCount; theInstance.setProgress(percentComplete); } From fbf788335570bbdabcc3b0fb92f67f7b15e057e0 Mon Sep 17 00:00:00 2001 From: Luke deGruchy Date: Wed, 10 May 2023 14:09:37 -0400 Subject: [PATCH 4/9] Add changelog. --- .../4860-bulk-export-progress-less-than-100-percent.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_6_0/4860-bulk-export-progress-less-than-100-percent.yaml diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_6_0/4860-bulk-export-progress-less-than-100-percent.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_6_0/4860-bulk-export-progress-less-than-100-percent.yaml new file mode 100644 index 000000000000..2da2365fb737 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_6_0/4860-bulk-export-progress-less-than-100-percent.yaml @@ -0,0 +1,5 @@ +--- +type: fix +issue: 4860 +title: "Running an $export that completes successfully results in a progress percentage of less than 100%. + This has now been fixed." From 49d50ee57a4712ff57a9c7386eafe74da6fea2ce Mon Sep 17 00:00:00 2001 From: Michael Buckley Date: Fri, 12 May 2023 17:17:41 -0400 Subject: [PATCH 5/9] Failing test and debug info --- .../uhn/fhir/jpa/batch2/BulkDataErrorAbuseTest.java | 11 ++++++++++- .../batch2/jobs/services/Batch2JobRunnerImpl.java | 1 + .../ca/uhn/fhir/batch2/progress/InstanceProgress.java | 1 + .../java/ca/uhn/fhir/jpa/api/model/Batch2JobInfo.java | 10 ++++++++++ 4 files changed, 22 insertions(+), 1 deletion(-) diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/BulkDataErrorAbuseTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/BulkDataErrorAbuseTest.java index 6a1c1174aa3a..9b09ab7780c5 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/BulkDataErrorAbuseTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/BulkDataErrorAbuseTest.java @@ -1,6 +1,7 @@ package ca.uhn.fhir.jpa.batch2; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; +import ca.uhn.fhir.jpa.api.model.Batch2JobInfo; import ca.uhn.fhir.jpa.api.model.BulkExportJobResults; import ca.uhn.fhir.jpa.api.model.BulkExportParameters; import ca.uhn.fhir.jpa.api.svc.IBatch2JobRunner; @@ -33,6 +34,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; @@ -44,6 +46,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.emptyOrNullString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.not; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -63,6 +66,7 @@ public class BulkDataErrorAbuseTest extends BaseResourceProviderR4Test { @BeforeEach void beforeEach() { afterPurgeDatabase(); + //myStorageSettings.setJobFastTrackingEnabled(false); } @AfterEach @@ -181,7 +185,8 @@ private void duAbuseTest(int taskExecutions) throws InterruptedException, Execut private void verifyBulkExportResults(String theInstanceId, List theContainedList, List theExcludedList) { // Iterate over the files - String report = myJobRunner.getJobInfo(theInstanceId).getReport(); + Batch2JobInfo jobInfo = myJobRunner.getJobInfo(theInstanceId); + String report = jobInfo.getReport(); ourLog.debug("Export job {} report: {}", theInstanceId, report); if (!theContainedList.isEmpty()) { assertThat("report for instance " + theInstanceId + " is empty", report, not(emptyOrNullString())); @@ -227,6 +232,10 @@ private void verifyBulkExportResults(String theInstanceId, List theConta for (String excludedString : theExcludedList) { assertThat("export doesn't have expected ids", foundIds, not(hasItem(excludedString))); } + if (!Objects.equals(jobInfo.getCombinedRecordsProcessed(), 2)) { + assertThat(jobInfo.getCombinedRecordsProcessed(), equalTo(2)); + } + ourLog.info("Job {} ok", theInstanceId); } private String startJob(BulkDataExportOptions theOptions) { diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/services/Batch2JobRunnerImpl.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/services/Batch2JobRunnerImpl.java index 90e7e7fd261c..05fecaf6b358 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/services/Batch2JobRunnerImpl.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/services/Batch2JobRunnerImpl.java @@ -104,6 +104,7 @@ private Batch2JobInfo fromJobInstanceToBatch2JobInfo(@Nonnull JobInstance theIns info.setEndTime(theInstance.getEndTime()); info.setReport(theInstance.getReport()); info.setErrorMsg(theInstance.getErrorMessage()); + info.setCombinedRecordsProcessed(theInstance.getCombinedRecordsProcessed()); if ( Batch2JobDefinitionConstants.BULK_EXPORT.equals(theInstance.getJobDefinitionId())) { BulkExportJobParameters parameters = theInstance.getParameters(BulkExportJobParameters.class); info.setRequestPartitionId(parameters.getPartitionId()); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java index d5386c08bb67..a90e9a7d2564 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java @@ -128,6 +128,7 @@ public void updateInstance(JobInstance theInstance) { * @param theInstance the instance to update with progress statistics */ public void updateInstance(JobInstance theInstance, boolean theCalledFromReducer) { + ourLog.debug("updateInstance {}: {}", theInstance.getInstanceId(), this); if (myEarliestStartTime != null) { theInstance.setStartTime(myEarliestStartTime); } diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/model/Batch2JobInfo.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/model/Batch2JobInfo.java index bdbdf32c5e0a..945e24c9564f 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/model/Batch2JobInfo.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/model/Batch2JobInfo.java @@ -48,6 +48,7 @@ public class Batch2JobInfo { private String myReport; private RequestPartitionId myRequestPartitionId; + private Integer myCombinedRecordsProcessed; public String getJobId() { return myJobId; @@ -112,4 +113,13 @@ public RequestPartitionId getRequestPartitionId() { public void setRequestPartitionId(RequestPartitionId theRequestPartitionId) { myRequestPartitionId = theRequestPartitionId; } + + public Integer getCombinedRecordsProcessed() { + return myCombinedRecordsProcessed; + } + + public void setCombinedRecordsProcessed(Integer theCombinedRecordsProcessed) { + myCombinedRecordsProcessed = theCombinedRecordsProcessed; + } + } From d912c5724bd5274ab0af0ef4c8c4917901832fd7 Mon Sep 17 00:00:00 2001 From: Michael Buckley Date: Fri, 12 May 2023 18:02:19 -0400 Subject: [PATCH 6/9] Remove racy stats recalc. --- .../fhir/batch2/maintenance/JobInstanceProcessor.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java index 84fbfbe31bfa..3d95da060138 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java @@ -29,7 +29,6 @@ import ca.uhn.fhir.batch2.model.JobWorkNotification; import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; -import ca.uhn.fhir.batch2.progress.InstanceProgress; import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator; import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater; import ca.uhn.fhir.model.api.IModelJson; @@ -139,14 +138,6 @@ private void cleanupInstance(JobInstance theInstance) { if (theInstance.isFinished() && !theInstance.isWorkChunksPurged()) { myJobPersistence.deleteChunksAndMarkInstanceAsChunksPurged(theInstance.getInstanceId()); - - // update final statistics. - // wipmb For 6.8 - do we need to run stats again? If the status changed to finished, then we just ran them above. - InstanceProgress progress = myJobInstanceProgressCalculator.calculateInstanceProgress(theInstance.getInstanceId()); - myJobPersistence.updateInstance(theInstance.getInstanceId(), instance->{ - progress.updateInstance(instance); - return true; - }); } } From cccda49be5cabf2e62c85f92f7da721aa1adb9cb Mon Sep 17 00:00:00 2001 From: Michael Buckley Date: Fri, 12 May 2023 18:24:40 -0400 Subject: [PATCH 7/9] Changelog and cleanup --- .../changelog/6_8_0/4878-mb-bulk-export-processed-count.yaml | 4 ++++ .../java/ca/uhn/fhir/jpa/batch2/BulkDataErrorAbuseTest.java | 1 - 2 files changed, 4 insertions(+), 1 deletion(-) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_8_0/4878-mb-bulk-export-processed-count.yaml diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_8_0/4878-mb-bulk-export-processed-count.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_8_0/4878-mb-bulk-export-processed-count.yaml new file mode 100644 index 000000000000..1a439cf6ef5f --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_8_0/4878-mb-bulk-export-processed-count.yaml @@ -0,0 +1,4 @@ +--- +type: fix +issue: 4878 +title: "Batch jobs occasionaly reported zero (0) record processed counts. This has been corrected." diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/BulkDataErrorAbuseTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/BulkDataErrorAbuseTest.java index 9b09ab7780c5..a5e14f9f861c 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/BulkDataErrorAbuseTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/BulkDataErrorAbuseTest.java @@ -66,7 +66,6 @@ public class BulkDataErrorAbuseTest extends BaseResourceProviderR4Test { @BeforeEach void beforeEach() { afterPurgeDatabase(); - //myStorageSettings.setJobFastTrackingEnabled(false); } @AfterEach From 53694073b3eb3b7a48471b43e4c7fb5f72ac0ded Mon Sep 17 00:00:00 2001 From: Michael Buckley Date: Mon, 15 May 2023 10:04:44 -0400 Subject: [PATCH 8/9] Mock fixes --- .../batch2/maintenance/JobMaintenanceServiceImplTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java index b42955683a09..13104c121879 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java @@ -295,7 +295,7 @@ public void testInProgress_CalculateProgress_AllStepsComplete() { // Verify - verify(myJobPersistence, times(2)).updateInstance(eq(INSTANCE_ID), any()); + verify(myJobPersistence, times(1)).updateInstance(eq(INSTANCE_ID), any()); assertEquals(1.0, instance.getProgress()); assertEquals(StatusEnum.COMPLETED, instance.getStatus()); @@ -342,7 +342,7 @@ public void testInProgress_CalculateProgress_OneStepFailed() { assertEquals(parseTime("2022-02-12T14:10:00-04:00"), instance.getEndTime()); // twice - once to move to FAILED, and once to purge the chunks - verify(myJobPersistence, times(2)).updateInstance(eq(INSTANCE_ID), any()); + verify(myJobPersistence, times(1)).updateInstance(eq(INSTANCE_ID), any()); verify(myJobPersistence, times(1)).deleteChunksAndMarkInstanceAsChunksPurged(eq(INSTANCE_ID)); verifyNoMoreInteractions(myJobPersistence); From fba8a43f7a2b4e34c728a01de91cbed6e4a75fb0 Mon Sep 17 00:00:00 2001 From: Michael Buckley Date: Mon, 15 May 2023 11:29:04 -0400 Subject: [PATCH 9/9] Cleanup --- .../java/ca/uhn/fhir/jpa/batch2/BulkDataErrorAbuseTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/BulkDataErrorAbuseTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/BulkDataErrorAbuseTest.java index a5e14f9f861c..766f3edd748c 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/BulkDataErrorAbuseTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/BulkDataErrorAbuseTest.java @@ -231,9 +231,9 @@ private void verifyBulkExportResults(String theInstanceId, List theConta for (String excludedString : theExcludedList) { assertThat("export doesn't have expected ids", foundIds, not(hasItem(excludedString))); } - if (!Objects.equals(jobInfo.getCombinedRecordsProcessed(), 2)) { - assertThat(jobInfo.getCombinedRecordsProcessed(), equalTo(2)); - } + + assertThat(jobInfo.getCombinedRecordsProcessed(), equalTo(2)); + ourLog.info("Job {} ok", theInstanceId); }