diff --git a/docs/changelog/136886.yaml b/docs/changelog/136886.yaml new file mode 100644 index 0000000000000..258a760e1b4fd --- /dev/null +++ b/docs/changelog/136886.yaml @@ -0,0 +1,5 @@ +pr: 136886 +summary: Fix ML calendar event update scalability issues +area: Machine Learning +type: bug +issues: [] diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java index 3a242104fc027..569b3e17c5be7 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java @@ -6,14 +6,24 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilter; +import org.elasticsearch.action.support.ActionFilterChain; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.tasks.Task; import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; +import org.elasticsearch.xpack.core.ml.action.GetCalendarEventsAction; import org.elasticsearch.xpack.core.ml.action.GetRecordsAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; +import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction; import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; @@ -28,8 +38,10 @@ import java.io.IOException; import java.time.Instant; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; @@ -45,6 +57,13 @@ public void cleanUpTest() { cleanUp(); } + @Override + protected Collection> nodePlugins() { + List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(ScheduledEventsIT.UpdateProcessActionTrackerPlugin.class); + return plugins; + } + public void testScheduledEvents() throws IOException { TimeValue bucketSpan = TimeValue.timeValueMinutes(30); @@ -464,6 +483,166 @@ public void testNewJobWithGlobalCalendar() throws Exception { assertThat(buckets.get(5).getScheduledEvents(), contains("Event added after job is opened")); } + /** + * Test that verifies UpdateProcessAction is called with correct parameters when calendar events + * are posted asynchronously, using ActionFilter to directly intercept the calls + */ + public void testCalendarUpdateCallsUpdateProcessAction() throws Exception { + // Reset tracker + ScheduledEventsIT.UpdateProcessActionTrackerPlugin.reset(); + + TimeValue bucketSpan = TimeValue.timeValueMinutes(30); + String jobId = "calendar-update-test"; + String calendarId = "test-calendar"; + + // Create and open single job + createJob(jobId, bucketSpan); + openJob(jobId); + + // Create calendar with the job + putCalendar(calendarId, List.of(jobId), "Update process action test"); + + // Create scheduled event + List events = List.of( + new ScheduledEvent.Builder().description("Direct Test Event") + .startTime(Instant.ofEpochMilli(System.currentTimeMillis() + 60000)) + .endTime(Instant.ofEpochMilli(System.currentTimeMillis() + 120000)) + .calendarId(calendarId) + .build() + ); + + // Post events - API should return immediately with async implementation + postScheduledEvents(calendarId, events); + + // Wait for and verify ActionFilter captured the UpdateProcessAction call + // We intercept the call to UpdateProcessAction using an ActionFilter to verify the call was made + assertBusy(() -> { + assertThat( + "Should have intercepted UpdateProcessAction call", + ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updateProcessCallCount.get(), + equalTo(1) + ); + assertThat( + "Should have called UpdateProcessAction for the correct job", + ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updatedJobIds, + contains(jobId) + ); + }); + + // Verify calendar events were stored correctly + verifyCalendarEventsStored(calendarId, 1); + + // Cleanup + closeJob(jobId); + + logger.info("Successfully verified UpdateProcessAction call with updateScheduledEvents=true for job [{}]", jobId); + } + + /** + * Test calendar updates with closed jobs (should not fail) + */ + public void testCalendarUpdateWithClosedJobs() throws IOException { + TimeValue bucketSpan = TimeValue.timeValueMinutes(30); + String jobId = "closed-job-test"; + + // Create and run job, then close it + Job.Builder job = createJob(jobId, bucketSpan); + long startTime = 1514764800000L; + runJob(job, startTime, bucketSpan, 10); + + // Create calendar with the closed job + String calendarId = "closed-job-calendar"; + putCalendar(calendarId, Collections.singletonList(jobId), "Calendar with closed job"); + + // Create scheduled event + List events = new ArrayList<>(); + long eventStartTime = startTime + (bucketSpan.millis() * 5); + long eventEndTime = eventStartTime + (bucketSpan.millis() * 2); + events.add( + new ScheduledEvent.Builder().description("Closed Job Event") + .startTime(Instant.ofEpochMilli(eventStartTime)) + .endTime(Instant.ofEpochMilli(eventEndTime)) + .calendarId(calendarId) + .build() + ); + + // This should not fail even though the job is closed + // The async implementation should gracefully skip closed jobs + postScheduledEvents(calendarId, events); + + // Verify job is still closed and buckets don't have the new event + // (since the job was closed when the event was added) + GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId); + List buckets = getBuckets(getBucketsRequest); + + // All buckets should be empty of scheduled events since job was closed when event was added + for (Bucket bucket : buckets) { + assertThat("Closed job buckets should not contain new scheduled events", bucket.getScheduledEvents(), empty()); + } + } + + /** + * Test calendar updates with mixed open and closed jobs - verify open jobs are updated and closed jobs are skipped + */ + public void testCalendarUpdateWithMixedOpenAndClosedJobs() throws Exception { + TimeValue bucketSpan = TimeValue.timeValueMinutes(30); + + // Create two jobs + String openJobId = "mixed-test-open-job"; + String closedJobId = "mixed-test-closed-job"; + + // Create and open first job + createJob(openJobId, bucketSpan); + openJob(openJobId); + + // Create and run second job, then close it + Job.Builder closedJob = createJob(closedJobId, bucketSpan); + long startTime = 1514764800000L; + runJob(closedJob, startTime, bucketSpan, 10); + + // Create calendar with both jobs + String calendarId = "mixed-jobs-calendar"; + putCalendar(calendarId, List.of(openJobId, closedJobId), "Calendar with mixed open and closed jobs"); + + // Reset tracker + ScheduledEventsIT.UpdateProcessActionTrackerPlugin.reset(); + + // Create scheduled event + List events = List.of( + new ScheduledEvent.Builder().description("Mixed Jobs Event") + .startTime(Instant.ofEpochMilli(System.currentTimeMillis() + 60000)) + .endTime(Instant.ofEpochMilli(System.currentTimeMillis() + 120000)) + .calendarId(calendarId) + .build() + ); + + // Post events - should update open job and skip closed job + postScheduledEvents(calendarId, events); + + // Wait for ActionFilter to capture the UpdateProcessAction call + // Should only be called for the open job, not the closed one + assertBusy(() -> { + assertThat( + "Should have intercepted UpdateProcessAction call for open job only", + ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updateProcessCallCount.get(), + equalTo(1) + ); + assertThat( + "Should have called UpdateProcessAction for the open job only", + ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updatedJobIds, + contains(openJobId) + ); + assertThat( + "Should not have called UpdateProcessAction for the closed job", + ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updatedJobIds.contains(closedJobId), + is(false) + ); + }); + + // Cleanup + closeJob(openJobId); + } + private Job.Builder createJob(String jobId, TimeValue bucketSpan) { Detector.Builder detector = new Detector.Builder("count", null); AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); @@ -486,4 +665,62 @@ private void runJob(Job.Builder job, long startTime, TimeValue bucketSpan, int b ); closeJob(job.getId()); } + + /** + * Helper method to verify that calendar events are stored and retrievable + */ + private void verifyCalendarEventsStored(String calendarId, int expectedEventCount) { + GetCalendarEventsAction.Request request = new GetCalendarEventsAction.Request(calendarId); + GetCalendarEventsAction.Response response = client().execute(GetCalendarEventsAction.INSTANCE, request).actionGet(); + + assertThat( + "Calendar should have the expected number of events", + response.getResources().results().size(), + equalTo(expectedEventCount) + ); + } + + /** + * Test plugin that tracks UpdateProcessAction calls with updateScheduledEvents=true + * using an ActionFilter to verify native process interaction in integration tests + */ + public static class UpdateProcessActionTrackerPlugin extends Plugin implements ActionPlugin { + + public static final AtomicInteger updateProcessCallCount = new AtomicInteger(0); + public static final List updatedJobIds = Collections.synchronizedList(new ArrayList<>()); + + public static void reset() { + updateProcessCallCount.set(0); + updatedJobIds.clear(); + } + + @Override + public List getActionFilters() { + return List.of(new ActionFilter() { + @Override + public int order() { + return 0; + } + + @Override + public void apply( + Task task, + String action, + Request request, + ActionListener listener, + ActionFilterChain chain + ) { + if (UpdateProcessAction.NAME.equals(action) && request instanceof UpdateProcessAction.Request) { + UpdateProcessAction.Request updateRequest = (UpdateProcessAction.Request) request; + if (updateRequest.isUpdateScheduledEvents()) { + updateProcessCallCount.incrementAndGet(); + updatedJobIds.add(updateRequest.getJobId()); + } + } + chain.proceed(task, action, request, listener); + } + }); + } + } + } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java index 9d43cb5366381..a832142f6b39d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java @@ -17,6 +17,8 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.ToXContent; @@ -42,6 +44,8 @@ public class TransportPostCalendarEventsAction extends HandledTransportAction< PostCalendarEventsAction.Request, PostCalendarEventsAction.Response> { + private static final Logger logger = LogManager.getLogger(TransportPostCalendarEventsAction.class); + private final Client client; private final JobResultsProvider jobResultsProvider; private final JobManager jobManager; @@ -75,6 +79,13 @@ protected void doExecute( List events = request.getScheduledEvents(); ActionListener calendarListener = ActionListener.wrap(calendar -> { + logger.debug( + "Calendar [{}] accepted for background update: {} jobs with {} events", + request.getCalendarId(), + calendar.getJobIds().size(), + events.size() + ); + BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); for (ScheduledEvent event : events) { @@ -102,13 +113,10 @@ protected void doExecute( new ActionListener() { @Override public void onResponse(BulkResponse response) { - jobManager.updateProcessOnCalendarChanged( - calendar.getJobIds(), - ActionListener.wrap( - r -> listener.onResponse(new PostCalendarEventsAction.Response(events)), - listener::onFailure - ) - ); + jobManager.updateProcessOnCalendarChanged(calendar.getJobIds(), ActionListener.wrap(r -> { + logger.debug("Calendar [{}] update initiated successfully", request.getCalendarId()); + listener.onResponse(new PostCalendarEventsAction.Response(events)); + }, listener::onFailure)); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 139ae471bc388..f715f05db706b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -6,12 +6,11 @@ */ package org.elasticsearch.xpack.ml.job; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; @@ -24,6 +23,8 @@ import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.analysis.AnalysisRegistry; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; @@ -37,6 +38,7 @@ import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; +import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; @@ -62,18 +64,20 @@ import org.elasticsearch.xpack.ml.utils.VoidChainTaskExecutor; import java.io.IOException; -import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.elasticsearch.core.Strings.format; +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; /** * Allows interactions with jobs. The managed interactions include: @@ -451,7 +455,7 @@ private void postJobUpdate(UpdateJobAction.Request request, Job updatedJob, Acti )); } } else { - logger.debug("[{}] No process update required for job update: {}", jobUpdate::getJobId, jobUpdate::toString); + logger.debug(() -> format("[%s] No process update required for job update: %s", jobUpdate.getJobId(), jobUpdate.toString())); auditJobUpdatedIfNotInternal(request); } @@ -615,46 +619,120 @@ private static void appendCommaSeparatedSet(Set items, StringBuilder sb) } public void updateProcessOnCalendarChanged(List calendarJobIds, ActionListener updateListener) { + // Respond immediately to prevent API timeouts + updateListener.onResponse(Boolean.TRUE); + ClusterState clusterState = clusterService.state(); Set openJobIds = openJobIds(clusterState); + + logger.debug("Updating process for calendar change: {} calendar job IDs, {} open jobs", calendarJobIds.size(), openJobIds.size()); + if (openJobIds.isEmpty()) { - updateListener.onResponse(Boolean.TRUE); return; } + // Continue with background processing + processCalendarUpdatesAsync(calendarJobIds, openJobIds); + } + + private void processCalendarUpdatesAsync(List calendarJobIds, Set openJobIds) { boolean appliesToAllJobs = calendarJobIds.stream().anyMatch(Metadata.ALL::equals); if (appliesToAllJobs) { - submitJobEventUpdate(openJobIds, updateListener); + logger.debug("Calendar change applies to all jobs - starting background update for {} jobs", openJobIds.size()); + submitJobCalendarEventUpdateAsync(openJobIds); return; } - // calendarJobIds may be a group or job - jobConfigProvider.expandGroupIds( - calendarJobIds, - updateListener.delegateFailureAndWrap( - (delegate, expandedIds) -> threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { - // Merge the expanded group members with the request Ids. - // Ids that aren't jobs will be filtered by isJobOpen() - expandedIds.addAll(calendarJobIds); - openJobIds.retainAll(expandedIds); - submitJobEventUpdate(openJobIds, delegate); - }) - ) - ); + // Process group expansion asynchronously + jobConfigProvider.expandGroupIds(calendarJobIds, ActionListener.wrap(expandedIds -> { + expandedIds.addAll(calendarJobIds); + openJobIds.retainAll(expandedIds); + logger.debug("Calendar change expanded to {} jobs - starting background update", openJobIds.size()); + submitJobCalendarEventUpdateAsync(openJobIds); + }, e -> logger.error("Failed to expand calendar job groups for background update", e))); } - private void submitJobEventUpdate(Collection jobIds, ActionListener updateListener) { - for (String jobId : jobIds) { - updateJobProcessNotifier.submitJobUpdate( - UpdateParams.scheduledEventsUpdate(jobId), - ActionListener.wrap( - isUpdated -> auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS)), - e -> logger.error("[" + jobId + "] failed submitting process update on calendar change", e) - ) + private void submitJobCalendarEventUpdateAsync(Set jobIds) { + if (jobIds.isEmpty()) { + return; + } + + logger.debug("Starting background calendar event updates for [{}] jobs", jobIds.size()); + AtomicInteger succeeded = new AtomicInteger(); + AtomicInteger failed = new AtomicInteger(); + AtomicInteger skipped = new AtomicInteger(); + long startTime = System.currentTimeMillis(); + + ActionListener backgroundListener = ActionListener.wrap(success -> { + long duration = System.currentTimeMillis() - startTime; + logger.debug( + "Background calendar updates completed in [{}ms]: {} succeeded, {} failed, {} skipped", + duration, + succeeded.get(), + failed.get(), + skipped.get() + ); + }, failure -> { + long duration = System.currentTimeMillis() - startTime; + logger.error( + "Background calendar updates failed after [{}ms]: {} succeeded, {} failed, {} skipped", + duration, + succeeded.get(), + failed.get(), + skipped.get(), + failure ); + }); + + // Execute on utility thread pool to avoid blocking transport threads + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { + submitJobCalendarEventUpdateSync(jobIds, backgroundListener, succeeded, failed, skipped); + }); + } + + private void submitJobCalendarEventUpdateSync( + Set jobIds, + ActionListener updateListener, + AtomicInteger succeeded, + AtomicInteger failed, + AtomicInteger skipped + ) { + // Use RefCountingListener to track all parallel updates and complete the listener when all updates are completed. + try (var refs = new RefCountingListener(updateListener.delegateFailureAndWrap((l, v) -> { l.onResponse(true); }))) { + // Instead of calling `updateJobProcessNotifier.submitJobUpdate()`, directly call `UpdateProcessAction` + // to bypass the queue and avoid the scalability issues. Since calendar and filter updates fetch the latest state from the + // index and can run on any node, they don't need ordering guarantees. + for (String jobId : jobIds) { + UpdateProcessAction.Request request = new UpdateProcessAction.Request(jobId, null, null, null, null, true); + + executeAsyncWithOrigin(client, ML_ORIGIN, UpdateProcessAction.INSTANCE, request, refs.acquire().delegateResponse((l, e) -> { + if (isExpectedFailure(e)) { + skipped.incrementAndGet(); + logger.debug("[{}] Calendar update skipped: {}", jobId, e.getMessage()); + } else { + failed.incrementAndGet(); + logger.warn("[{}] Calendar update failed", jobId, e); + } + l.onResponse(null); // Don't fail the whole operation + }).map(response -> { + succeeded.incrementAndGet(); + auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS)); + return null; + })); + } } + } - updateListener.onResponse(Boolean.TRUE); + private boolean isExpectedFailure(Exception e) { + // Job deleted, closed, etc. - not real errors + Throwable cause = ExceptionsHelper.unwrapCause(e); + if (cause instanceof ResourceNotFoundException) { + return true; + } + // Check for the specific error message format from TransportJobTaskAction + // Message format: "Cannot perform requested action because job [jobId] is not open" + String message = e.getMessage(); + return message != null && message.contains("Cannot perform requested action because job [") && message.contains("] is not open"); } public void revertSnapshot( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java index 46c9ae029e60f..437dd08fff677 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java @@ -75,7 +75,11 @@ public void beforeStop() { } boolean submitJobUpdate(UpdateParams update, ActionListener listener) { - return orderedJobUpdates.offer(new UpdateHolder(update, listener)); + boolean offered = orderedJobUpdates.offer(new UpdateHolder(update, listener)); + if (offered == false) { + logger.warn("Update queue is full ({}), failed to submit update for job [{}]", orderedJobUpdates.size(), update.getJobId()); + } + return offered; } private void start() { @@ -95,7 +99,15 @@ private void processNextUpdate() { List updates = new ArrayList<>(orderedJobUpdates.size()); try { orderedJobUpdates.drainTo(updates); - executeProcessUpdates(new VolatileCursorIterator<>(updates)); + if (updates.isEmpty() == false) { + logger.debug("Draining [{}] queued job updates from queue", updates.size()); + long startTime = System.currentTimeMillis(); + executeProcessUpdates(new VolatileCursorIterator<>(updates)); + long duration = System.currentTimeMillis() - startTime; + // Note: This duration only measures queue draining and request submission initiation, + // not the actual completion of all updates, which happens asynchronously on response threads. + logger.debug("Completed draining and submitting [{}] job update requests in [{}ms]", updates.size(), duration); + } } catch (Exception e) { logger.error("Error while processing next job update", e); } @@ -134,7 +146,7 @@ void executeProcessUpdates(Iterator updatesIterator) { @Override public void onResponse(Response response) { if (response.isUpdated()) { - logger.info("Successfully updated remote job [{}]", update.getJobId()); + logger.debug("Successfully updated remote job [{}]", update.getJobId()); updateHolder.listener.onResponse(true); } else { String msg = "Failed to update remote job [" + update.getJobId() + "]"; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index e4e3adb471d48..d1f009a00306d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -414,6 +414,11 @@ public void onResponse(Job job) { Optional>> stats = getStatistics(jobTask); DataCounts dataCounts = stats.isPresent() ? stats.get().v1() : new DataCounts(job.getId()); ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(job.earliestValidTimestamp(dataCounts)); + logger.debug( + "[{}] Fetching scheduled events for calendar update, time range: [{}]", + jobTask.getJobId(), + job.earliestValidTimestamp(dataCounts) + ); jobResultsProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index cb10eead972ec..8d01fc081df16 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.xpack.core.ml.MlConfigIndex; import org.elasticsearch.xpack.core.ml.MlConfigVersion; import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.CategorizationAnalyzerConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; @@ -75,10 +76,12 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; public class JobManagerTests extends ESTestCase { @@ -326,22 +329,37 @@ public void testUpdateProcessOnCalendarChanged() { // The search will not return any results mockClientBuilder.prepareSearchFields(MlConfigIndex.indexName(), Collections.emptyList()); - JobManager jobManager = createJobManager(mockClientBuilder.build()); + Client mockClient = mockClientBuilder.build(); + + // Mock UpdateProcessAction calls - calendar updates now bypass the queue and call client.execute directly + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocation + .getArguments()[2]; + listener.onResponse(new UpdateProcessAction.Response()); + return null; + }).when(mockClient).execute(eq(UpdateProcessAction.INSTANCE), any(UpdateProcessAction.Request.class), any()); + + JobManager jobManager = createJobManager(mockClient); jobManager.updateProcessOnCalendarChanged( Arrays.asList("job-1", "job-3", "job-4"), ActionTestUtils.assertNoFailureListener(r -> {}) ); - ArgumentCaptor updateParamsCaptor = ArgumentCaptor.forClass(UpdateParams.class); - verify(updateJobProcessNotifier, times(2)).submitJobUpdate(updateParamsCaptor.capture(), any()); + // Verify that UpdateProcessAction is called directly for each open job + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(UpdateProcessAction.Request.class); + verify(mockClient, times(2)).execute(eq(UpdateProcessAction.INSTANCE), requestCaptor.capture(), any()); - List capturedUpdateParams = updateParamsCaptor.getAllValues(); - assertThat(capturedUpdateParams.size(), equalTo(2)); - assertThat(capturedUpdateParams.get(0).getJobId(), equalTo("job-1")); - assertThat(capturedUpdateParams.get(0).isUpdateScheduledEvents(), is(true)); - assertThat(capturedUpdateParams.get(1).getJobId(), equalTo("job-3")); - assertThat(capturedUpdateParams.get(1).isUpdateScheduledEvents(), is(true)); + List capturedRequests = requestCaptor.getAllValues(); + assertThat(capturedRequests.size(), equalTo(2)); + assertThat(capturedRequests.get(0).getJobId(), equalTo("job-1")); + assertThat(capturedRequests.get(0).isUpdateScheduledEvents(), is(true)); + assertThat(capturedRequests.get(1).getJobId(), equalTo("job-3")); + assertThat(capturedRequests.get(1).isUpdateScheduledEvents(), is(true)); + + // Verify updateJobProcessNotifier is not called for calendar updates + verifyNoInteractions(updateJobProcessNotifier); } public void testUpdateProcessOnCalendarChanged_GivenGroups() { @@ -373,19 +391,34 @@ public void testUpdateProcessOnCalendarChanged_GivenGroups() { ); mockClientBuilder.prepareSearchFields(MlConfigIndex.indexName(), fieldHits); - JobManager jobManager = createJobManager(mockClientBuilder.build()); + Client mockClient = mockClientBuilder.build(); + + // Mock UpdateProcessAction calls - calendar updates now bypass the queue and call client.execute directly + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocation + .getArguments()[2]; + listener.onResponse(new UpdateProcessAction.Response()); + return null; + }).when(mockClient).execute(eq(UpdateProcessAction.INSTANCE), any(UpdateProcessAction.Request.class), any()); + + JobManager jobManager = createJobManager(mockClient); jobManager.updateProcessOnCalendarChanged(Collections.singletonList("group-1"), ActionTestUtils.assertNoFailureListener(r -> {})); - ArgumentCaptor updateParamsCaptor = ArgumentCaptor.forClass(UpdateParams.class); - verify(updateJobProcessNotifier, times(2)).submitJobUpdate(updateParamsCaptor.capture(), any()); + // Verify that UpdateProcessAction is called directly for each open job in the group + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(UpdateProcessAction.Request.class); + verify(mockClient, times(2)).execute(eq(UpdateProcessAction.INSTANCE), requestCaptor.capture(), any()); - List capturedUpdateParams = updateParamsCaptor.getAllValues(); - assertThat(capturedUpdateParams.size(), equalTo(2)); - assertThat(capturedUpdateParams.get(0).getJobId(), equalTo("job-1")); - assertThat(capturedUpdateParams.get(0).isUpdateScheduledEvents(), is(true)); - assertThat(capturedUpdateParams.get(1).getJobId(), equalTo("job-2")); - assertThat(capturedUpdateParams.get(1).isUpdateScheduledEvents(), is(true)); + List capturedRequests = requestCaptor.getAllValues(); + assertThat(capturedRequests.size(), equalTo(2)); + assertThat(capturedRequests.get(0).getJobId(), equalTo("job-1")); + assertThat(capturedRequests.get(0).isUpdateScheduledEvents(), is(true)); + assertThat(capturedRequests.get(1).getJobId(), equalTo("job-2")); + assertThat(capturedRequests.get(1).isUpdateScheduledEvents(), is(true)); + + // Verify updateJobProcessNotifier is not called for calendar updates + verifyNoInteractions(updateJobProcessNotifier); } public void testValidateCategorizationAnalyzer_GivenValid() throws IOException {