Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,21 +101,37 @@ public String getTaskName() {
@Override
public Pair<String, Boolean> process(OMUpdateEventBatch events) {
long startTime = System.currentTimeMillis();
boolean success = nsSummaryTaskWithFSO.processWithFSO(events);
if (!success) {
LOG.error("processWithFSO failed.");
}
success = nsSummaryTaskWithLegacy.processWithLegacy(events);
if (!success) {
LOG.error("processWithLegacy failed.");
}
success = nsSummaryTaskWithOBS.processWithOBS(events);
if (!success) {
LOG.error("processWithOBS failed.");

// Thread pool to execute tasks concurrently
ExecutorService executorService = Executors.newFixedThreadPool(3);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a thread pool is heavy-weight, it should be reused.


// Create a list of tasks
List<Callable<Boolean>> tasks = new ArrayList<>();
tasks.add(() -> nsSummaryTaskWithFSO.processWithFSO(events));
tasks.add(() -> nsSummaryTaskWithLegacy.processWithLegacy(events));
tasks.add(() -> nsSummaryTaskWithOBS.processWithOBS(events));

try {
// Execute all tasks in parallel
List<Future<Boolean>> results = executorService.invokeAll(tasks);

// Check results and return failure if any task failed
for (Future<Boolean> result : results) {
if (!result.get()) {
LOG.error("One or more process tasks failed.");
return new ImmutablePair<>(getTaskName(), false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait for all results before returning.

}
}
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error executing process tasks in parallel", e);
return new ImmutablePair<>(getTaskName(), false);
} finally {
executorService.shutdown();
}
LOG.debug("{} successfully processed in {} milliseconds",
getTaskName(), (System.currentTimeMillis() - startTime));
return new ImmutablePair<>(getTaskName(), success);

return new ImmutablePair<>(getTaskName(), true);
}

@Override
Expand Down