Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
type: fix
issue: 4878
title: "Batch jobs occasionaly reported zero (0) record processed counts. This has been corrected."
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ca.uhn.fhir.jpa.batch2;

import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.model.Batch2JobInfo;
import ca.uhn.fhir.jpa.api.model.BulkExportJobResults;
import ca.uhn.fhir.jpa.api.model.BulkExportParameters;
import ca.uhn.fhir.jpa.api.svc.IBatch2JobRunner;
Expand Down Expand Up @@ -33,6 +34,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
Expand All @@ -44,6 +46,7 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.emptyOrNullString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -181,7 +184,8 @@ private void duAbuseTest(int taskExecutions) throws InterruptedException, Execut

private void verifyBulkExportResults(String theInstanceId, List<String> theContainedList, List<String> theExcludedList) {
// Iterate over the files
String report = myJobRunner.getJobInfo(theInstanceId).getReport();
Batch2JobInfo jobInfo = myJobRunner.getJobInfo(theInstanceId);
String report = jobInfo.getReport();
ourLog.debug("Export job {} report: {}", theInstanceId, report);
if (!theContainedList.isEmpty()) {
assertThat("report for instance " + theInstanceId + " is empty", report, not(emptyOrNullString()));
Expand Down Expand Up @@ -227,6 +231,10 @@ private void verifyBulkExportResults(String theInstanceId, List<String> theConta
for (String excludedString : theExcludedList) {
assertThat("export doesn't have expected ids", foundIds, not(hasItem(excludedString)));
}

assertThat(jobInfo.getCombinedRecordsProcessed(), equalTo(2));

ourLog.info("Job {} ok", theInstanceId);
}

private String startJob(BulkDataExportOptions theOptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ private Batch2JobInfo fromJobInstanceToBatch2JobInfo(@Nonnull JobInstance theIns
info.setEndTime(theInstance.getEndTime());
info.setReport(theInstance.getReport());
info.setErrorMsg(theInstance.getErrorMessage());
info.setCombinedRecordsProcessed(theInstance.getCombinedRecordsProcessed());
if ( Batch2JobDefinitionConstants.BULK_EXPORT.equals(theInstance.getJobDefinitionId())) {
BulkExportJobParameters parameters = theInstance.getParameters(BulkExportJobParameters.class);
info.setRequestPartitionId(parameters.getPartitionId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.progress.InstanceProgress;
import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.model.api.IModelJson;
Expand Down Expand Up @@ -139,14 +138,6 @@ private void cleanupInstance(JobInstance theInstance) {

if (theInstance.isFinished() && !theInstance.isWorkChunksPurged()) {
myJobPersistence.deleteChunksAndMarkInstanceAsChunksPurged(theInstance.getInstanceId());

// update final statistics.
// wipmb For 6.8 - do we need to run stats again? If the status changed to finished, then we just ran them above.
InstanceProgress progress = myJobInstanceProgressCalculator.calculateInstanceProgress(theInstance.getInstanceId());
myJobPersistence.updateInstance(theInstance.getInstanceId(), instance->{
progress.updateInstance(instance);
return true;
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public void updateInstance(JobInstance theInstance) {
* @param theInstance the instance to update with progress statistics
*/
public void updateInstance(JobInstance theInstance, boolean theCalledFromReducer) {
ourLog.debug("updateInstance {}: {}", theInstance.getInstanceId(), this);
if (myEarliestStartTime != null) {
theInstance.setStartTime(myEarliestStartTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public void testInProgress_CalculateProgress_AllStepsComplete() {

// Verify

verify(myJobPersistence, times(2)).updateInstance(eq(INSTANCE_ID), any());
verify(myJobPersistence, times(1)).updateInstance(eq(INSTANCE_ID), any());

assertEquals(1.0, instance.getProgress());
assertEquals(StatusEnum.COMPLETED, instance.getStatus());
Expand Down Expand Up @@ -342,7 +342,7 @@ public void testInProgress_CalculateProgress_OneStepFailed() {
assertEquals(parseTime("2022-02-12T14:10:00-04:00"), instance.getEndTime());

// twice - once to move to FAILED, and once to purge the chunks
verify(myJobPersistence, times(2)).updateInstance(eq(INSTANCE_ID), any());
verify(myJobPersistence, times(1)).updateInstance(eq(INSTANCE_ID), any());
verify(myJobPersistence, times(1)).deleteChunksAndMarkInstanceAsChunksPurged(eq(INSTANCE_ID));

verifyNoMoreInteractions(myJobPersistence);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class Batch2JobInfo {
private String myReport;

private RequestPartitionId myRequestPartitionId;
private Integer myCombinedRecordsProcessed;

public String getJobId() {
return myJobId;
Expand Down Expand Up @@ -112,4 +113,13 @@ public RequestPartitionId getRequestPartitionId() {
public void setRequestPartitionId(RequestPartitionId theRequestPartitionId) {
myRequestPartitionId = theRequestPartitionId;
}

public Integer getCombinedRecordsProcessed() {
return myCombinedRecordsProcessed;
}

public void setCombinedRecordsProcessed(Integer theCombinedRecordsProcessed) {
myCombinedRecordsProcessed = theCombinedRecordsProcessed;
}

}