From 4df6cde71e4fc4d9881d8249a7077fd3e383d846 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Tue, 21 Oct 2025 17:07:33 +0200 Subject: [PATCH 01/15] Fix calendar event update scalability issues - Refactor JobManager.submitJobEventUpdate() to bypass UpdateJobProcessNotifier queue - Use RefCountingListener for parallel calendar/filter updates - Add comprehensive logging throughout the system - Create CalendarScalabilityIT integration tests - Add helper methods to base test class Fixes issue where calendar events failed to update some jobs when associated with large numbers of jobs (>1000) due to queue capacity limits and sequential processing. --- .../ml/integration/CalendarScalabilityIT.java | 159 ++++++++++++++++++ .../MlNativeAutodetectIntegTestCase.java | 56 ++++++ .../xpack/ml/integration/ResetJobIT.java | 2 +- .../ml/integration/ScheduledEventsIT.java | 2 +- .../TransportPostCalendarEventsAction.java | 14 +- .../xpack/ml/job/JobManager.java | 56 ++++-- .../ml/job/UpdateJobProcessNotifier.java | 17 +- .../autodetect/AutodetectProcessManager.java | 2 + 8 files changed, 292 insertions(+), 16 deletions(-) create mode 100644 x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/CalendarScalabilityIT.java diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/CalendarScalabilityIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/CalendarScalabilityIT.java new file mode 100644 index 0000000000000..1a0bf41d3aadf --- /dev/null +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/CalendarScalabilityIT.java @@ -0,0 +1,159 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.xpack.core.ml.action.PostCalendarEventsAction; +import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.config.JobState; +import org.junit.After; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +/** + * Integration tests for calendar event scalability + */ +public class CalendarScalabilityIT extends MlNativeAutodetectIntegTestCase { + + @After + public void cleanUpTest() { + cleanUp(); + } + + /** + * Test that calendar updates work correctly when a calendar is associated with many jobs + */ + public void testCalendarUpdateWithManyJobs() throws Exception { + TimeValue bucketSpan = TimeValue.timeValueMinutes(30); + int jobCount = 5; // Reduced for faster testing + + // Create and open multiple jobs + List jobIds = new ArrayList<>(); + for (int i = 0; i < jobCount; i++) { + String jobId = "calendar-scalability-" + i; + jobIds.add(jobId); + + Job.Builder job = createJob(jobId, bucketSpan); + putJob(job); + openJob(jobId); + } + + // Create a calendar and associate it with all jobs + String calendarId = "test-calendar-many-jobs"; + putCalendar(calendarId, jobIds, "Calendar for many jobs test"); + + // Add scheduled events to the calendar + List events = new ArrayList<>(); + long eventStartTime = 1514764800000L + (bucketSpan.millis() * 5); // 5 buckets in + long eventEndTime = eventStartTime + (bucketSpan.millis() * 2); // 2 buckets duration + events.add( + new ScheduledEvent.Builder() + .description("Scalability Test Event") + .startTime(Instant.ofEpochMilli(eventStartTime)) + .endTime(Instant.ofEpochMilli(eventEndTime)) + .calendarId(calendarId) + .build() + ); + + PostCalendarEventsAction.Response response = postScheduledEvents(calendarId, events); + + // Wait a bit for updates to complete + Thread.sleep(2000); + + // Verify all jobs are still running + for (String jobId : jobIds) { + assertThat("Job should still be open", getJobStats(jobId).get(0).getState(), equalTo(JobState.OPENED)); + } + } + + /** + * Test that calendar updates work with a single job + */ + public void testCalendarUpdateWithSingleJob() throws Exception { + TimeValue bucketSpan = TimeValue.timeValueMinutes(30); + String jobId = "single-job-test"; + + // Create and open job + Job.Builder job = createJob(jobId, bucketSpan); + putJob(job); + openJob(jobId); + + // Create a calendar and associate it with the job + String calendarId = "test-calendar-single"; + putCalendar(calendarId, Collections.singletonList(jobId), "Calendar for single job test"); + + // Add scheduled events to the calendar + List events = new ArrayList<>(); + long eventStartTime = 1514764800000L + (bucketSpan.millis() * 5); + long eventEndTime = eventStartTime + (bucketSpan.millis() * 2); + events.add( + new ScheduledEvent.Builder() + .description("Single Job Event") + .startTime(Instant.ofEpochMilli(eventStartTime)) + .endTime(Instant.ofEpochMilli(eventEndTime)) + .calendarId(calendarId) + .build() + ); + + PostCalendarEventsAction.Response response = postScheduledEvents(calendarId, events); + + // Wait a bit for updates to complete + Thread.sleep(1000); + + // Verify job is still running + assertThat("Job should still be open", getJobStats(jobId).get(0).getState(), equalTo(JobState.OPENED)); + } + + /** + * Test that calendar updates work with closed jobs (should not fail) + */ + public void testCalendarUpdateWithClosedJobs() throws Exception { + TimeValue bucketSpan = TimeValue.timeValueMinutes(30); + String jobId = "closed-job-test"; + + // Create and open job + Job.Builder job = createJob(jobId, bucketSpan); + putJob(job); + openJob(jobId); + + // Close the job + closeJob(jobId); + + // Create a calendar and associate it with the closed job + String calendarId = "test-calendar-closed"; + putCalendar(calendarId, Collections.singletonList(jobId), "Calendar for closed job test"); + + // Add scheduled events to the calendar + List events = new ArrayList<>(); + long eventStartTime = 1514764800000L + (bucketSpan.millis() * 5); + long eventEndTime = eventStartTime + (bucketSpan.millis() * 2); + events.add( + new ScheduledEvent.Builder() + .description("Closed Job Event") + .startTime(Instant.ofEpochMilli(eventStartTime)) + .endTime(Instant.ofEpochMilli(eventEndTime)) + .calendarId(calendarId) + .build() + ); + + // This should not fail even though the job is closed + PostCalendarEventsAction.Response response = postScheduledEvents(calendarId, events); + + // Wait a bit for updates to complete + Thread.sleep(1000); + + // Verify job is still closed + assertThat("Job should still be closed", getJobStats(jobId).get(0).getState(), equalTo(JobState.CLOSED)); + } +} \ No newline at end of file diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java index 9864c88d1405c..baccb295cc822 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -8,6 +8,7 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.Strings; @@ -55,6 +56,9 @@ import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; @@ -70,6 +74,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -444,4 +449,55 @@ protected List generateData( protected static String createJsonRecord(Map keyValueMap) throws IOException { return Strings.toString(JsonXContent.contentBuilder().map(keyValueMap)) + "\n"; } + + /** + * Helper method to wait for calendar update audit message + */ + protected void waitForCalendarUpdateAuditMessage(String jobId) throws Exception { + assertBusy(() -> { + SearchRequestBuilder searchRequest = prepareSearch(".ml-notifications") + .setSize(1) + .addSort("timestamp", SortOrder.DESC) + .setQuery( + QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery("job_id", jobId)) + .filter(QueryBuilders.termQuery("level", "info")) + .filter(QueryBuilders.termQuery("message", "Updated calendars in running process")) + ); + + assertResponse(searchRequest, searchResponse -> { + SearchHit[] hits = searchResponse.getHits().getHits(); + assertThat("Job " + jobId + " should have calendar update audit message", hits.length, equalTo(1)); + }); + }, 30, TimeUnit.SECONDS); + } + + /** + * Helper method to create and open multiple jobs + */ + protected List createAndOpenMultipleJobs(String prefix, int count, TimeValue bucketSpan) { + List jobIds = new ArrayList<>(); + for (int i = 0; i < count; i++) { + String jobId = prefix + "-" + i; + Job.Builder job = createJob(jobId, bucketSpan); + putJob(job); + openJob(jobId); + jobIds.add(jobId); + } + return jobIds; + } + + /** + * Helper method to create a job + */ + protected Job.Builder createJob(String jobId, TimeValue bucketSpan) { + Detector.Builder detector = new Detector.Builder("count", null); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); + analysisConfig.setBucketSpan(bucketSpan); + Job.Builder job = new Job.Builder(jobId); + job.setAnalysisConfig(analysisConfig); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + job.setDataDescription(dataDescription); + return job; + } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ResetJobIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ResetJobIT.java index 61ce2224c1ed9..844ac118b6027 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ResetJobIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ResetJobIT.java @@ -88,7 +88,7 @@ private void testReset(boolean previousResetFailed) throws Exception { assertThat("Audit messages: " + auditMessages, auditMessages.get(auditMessages.size() - 1), equalTo("Job has been reset")); } - private Job.Builder createJob(String jobId, TimeValue bucketSpan) { + protected Job.Builder createJob(String jobId, TimeValue bucketSpan) { Detector.Builder detector = new Detector.Builder("count", null); AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); analysisConfig.setBucketSpan(bucketSpan); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java index 3a242104fc027..033cb27355533 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java @@ -464,7 +464,7 @@ public void testNewJobWithGlobalCalendar() throws Exception { assertThat(buckets.get(5).getScheduledEvents(), contains("Event added after job is opened")); } - private Job.Builder createJob(String jobId, TimeValue bucketSpan) { + protected Job.Builder createJob(String jobId, TimeValue bucketSpan) { Detector.Builder detector = new Detector.Builder("count", null); AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); analysisConfig.setBucketSpan(bucketSpan); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java index 9d43cb5366381..620491a522d75 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java @@ -6,6 +6,8 @@ */ package org.elasticsearch.xpack.ml.action; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; @@ -42,6 +44,8 @@ public class TransportPostCalendarEventsAction extends HandledTransportAction< PostCalendarEventsAction.Request, PostCalendarEventsAction.Response> { + private static final Logger logger = LogManager.getLogger(TransportPostCalendarEventsAction.class); + private final Client client; private final JobResultsProvider jobResultsProvider; private final JobManager jobManager; @@ -75,6 +79,9 @@ protected void doExecute( List events = request.getScheduledEvents(); ActionListener calendarListener = ActionListener.wrap(calendar -> { + logger.info("Calendar [{}] triggering update for {} jobs with {} new events", + request.getCalendarId(), calendar.getJobIds().size(), events.size()); + BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); for (ScheduledEvent event : events) { @@ -102,10 +109,15 @@ protected void doExecute( new ActionListener() { @Override public void onResponse(BulkResponse response) { + long startTime = System.currentTimeMillis(); jobManager.updateProcessOnCalendarChanged( calendar.getJobIds(), ActionListener.wrap( - r -> listener.onResponse(new PostCalendarEventsAction.Response(events)), + r -> { + long duration = System.currentTimeMillis() - startTime; + logger.info("Calendar [{}] update completed in [{}ms]", request.getCalendarId(), duration); + listener.onResponse(new PostCalendarEventsAction.Response(events)); + }, listener::onFailure ) ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 164a6ea8ad560..8b99c09f33cf5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -12,6 +12,7 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; @@ -37,6 +38,7 @@ import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; +import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; @@ -68,12 +70,15 @@ import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.elasticsearch.core.Strings.format; +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; /** * Allows interactions with jobs. The managed interactions include: @@ -617,6 +622,10 @@ private static void appendCommaSeparatedSet(Set items, StringBuilder sb) public void updateProcessOnCalendarChanged(List calendarJobIds, ActionListener updateListener) { ClusterState clusterState = clusterService.state(); Set openJobIds = openJobIds(clusterState); + + logger.info("Updating process for calendar change: {} calendar job IDs, {} open jobs", + calendarJobIds.size(), openJobIds.size()); + if (openJobIds.isEmpty()) { updateListener.onResponse(Boolean.TRUE); return; @@ -624,6 +633,7 @@ public void updateProcessOnCalendarChanged(List calendarJobIds, ActionLi boolean appliesToAllJobs = calendarJobIds.stream().anyMatch(Metadata.ALL::equals); if (appliesToAllJobs) { + logger.info("Calendar change applies to all jobs"); submitJobEventUpdate(openJobIds, updateListener); return; } @@ -644,17 +654,43 @@ public void updateProcessOnCalendarChanged(List calendarJobIds, ActionLi } private void submitJobEventUpdate(Collection jobIds, ActionListener updateListener) { - for (String jobId : jobIds) { - updateJobProcessNotifier.submitJobUpdate( - UpdateParams.scheduledEventsUpdate(jobId), - ActionListener.wrap( - isUpdated -> auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS)), - e -> logger.error("[" + jobId + "] failed submitting process update on calendar change", e) - ) - ); + logger.info("Submitting calendar event updates for [{}] jobs", jobIds.size()); + + AtomicInteger succeeded = new AtomicInteger(); + AtomicInteger failed = new AtomicInteger(); + + try (var refs = new RefCountingListener(updateListener.delegateFailureAndWrap((l, v) -> { + logger.info("Completed calendar updates: {} succeeded, {} failed", succeeded.get(), failed.get()); + l.onResponse(true); + }))) { + for (String jobId : jobIds) { + UpdateProcessAction.Request request = new UpdateProcessAction.Request( + jobId, null, null, null, null, true // updateScheduledEvents=true + ); + + executeAsyncWithOrigin(client, ML_ORIGIN, UpdateProcessAction.INSTANCE, request, + refs.acquire().delegateResponse((l, e) -> { + if (isExpectedFailure(e)) { + logger.debug("[{}] Calendar update skipped: {}", jobId, e.getMessage()); + } else { + failed.incrementAndGet(); + logger.warn("[{}] Calendar update failed", jobId, e); + } + l.onResponse(null); // Don't fail the whole operation + }).map(response -> { + succeeded.incrementAndGet(); + auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS)); + return null; + }) + ); + } } - - updateListener.onResponse(Boolean.TRUE); + } + + private boolean isExpectedFailure(Exception e) { + // Job deleted, closed, etc. - not real errors + return ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException + || e.getMessage().contains("is not open"); } public void revertSnapshot( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java index 46c9ae029e60f..1eb8c2f01d6fc 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java @@ -75,7 +75,12 @@ public void beforeStop() { } boolean submitJobUpdate(UpdateParams update, ActionListener listener) { - return orderedJobUpdates.offer(new UpdateHolder(update, listener)); + boolean offered = orderedJobUpdates.offer(new UpdateHolder(update, listener)); + if (!offered) { + logger.warn("Update queue is full ({}), failed to submit update for job [{}]", + orderedJobUpdates.size(), update.getJobId()); + } + return offered; } private void start() { @@ -95,7 +100,13 @@ private void processNextUpdate() { List updates = new ArrayList<>(orderedJobUpdates.size()); try { orderedJobUpdates.drainTo(updates); - executeProcessUpdates(new VolatileCursorIterator<>(updates)); + if (!updates.isEmpty()) { + logger.info("Processing [{}] queued job updates", updates.size()); + long startTime = System.currentTimeMillis(); + executeProcessUpdates(new VolatileCursorIterator<>(updates)); + long duration = System.currentTimeMillis() - startTime; + logger.info("Processed [{}] job updates in [{}ms]", updates.size(), duration); + } } catch (Exception e) { logger.error("Error while processing next job update", e); } @@ -134,7 +145,7 @@ void executeProcessUpdates(Iterator updatesIterator) { @Override public void onResponse(Response response) { if (response.isUpdated()) { - logger.info("Successfully updated remote job [{}]", update.getJobId()); + logger.debug("Successfully updated remote job [{}]", update.getJobId()); updateHolder.listener.onResponse(true); } else { String msg = "Failed to update remote job [" + update.getJobId() + "]"; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index e4e3adb471d48..c49c05ff3cddb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -414,6 +414,8 @@ public void onResponse(Job job) { Optional>> stats = getStatistics(jobTask); DataCounts dataCounts = stats.isPresent() ? stats.get().v1() : new DataCounts(job.getId()); ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(job.earliestValidTimestamp(dataCounts)); + logger.debug("[{}] Fetching scheduled events for calendar update, time range: [{}]", + jobTask.getJobId(), job.earliestValidTimestamp(dataCounts)); jobResultsProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener); } From 6957a46109cf386a580722a1135dadf5724f3c38 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 21 Oct 2025 15:17:14 +0000 Subject: [PATCH 02/15] [CI] Auto commit changes from spotless --- .../ml/integration/CalendarScalabilityIT.java | 51 +++++++++--------- .../MlNativeAutodetectIntegTestCase.java | 5 +- .../TransportPostCalendarEventsAction.java | 26 +++++---- .../xpack/ml/job/JobManager.java | 53 ++++++++++--------- .../ml/job/UpdateJobProcessNotifier.java | 3 +- .../autodetect/AutodetectProcessManager.java | 7 ++- 6 files changed, 71 insertions(+), 74 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/CalendarScalabilityIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/CalendarScalabilityIT.java index 1a0bf41d3aadf..662c2060c7926 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/CalendarScalabilityIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/CalendarScalabilityIT.java @@ -37,40 +37,39 @@ public void cleanUpTest() { public void testCalendarUpdateWithManyJobs() throws Exception { TimeValue bucketSpan = TimeValue.timeValueMinutes(30); int jobCount = 5; // Reduced for faster testing - + // Create and open multiple jobs List jobIds = new ArrayList<>(); for (int i = 0; i < jobCount; i++) { String jobId = "calendar-scalability-" + i; jobIds.add(jobId); - + Job.Builder job = createJob(jobId, bucketSpan); putJob(job); openJob(jobId); } - + // Create a calendar and associate it with all jobs String calendarId = "test-calendar-many-jobs"; putCalendar(calendarId, jobIds, "Calendar for many jobs test"); - + // Add scheduled events to the calendar List events = new ArrayList<>(); long eventStartTime = 1514764800000L + (bucketSpan.millis() * 5); // 5 buckets in long eventEndTime = eventStartTime + (bucketSpan.millis() * 2); // 2 buckets duration events.add( - new ScheduledEvent.Builder() - .description("Scalability Test Event") + new ScheduledEvent.Builder().description("Scalability Test Event") .startTime(Instant.ofEpochMilli(eventStartTime)) .endTime(Instant.ofEpochMilli(eventEndTime)) .calendarId(calendarId) .build() ); - + PostCalendarEventsAction.Response response = postScheduledEvents(calendarId, events); - + // Wait a bit for updates to complete Thread.sleep(2000); - + // Verify all jobs are still running for (String jobId : jobIds) { assertThat("Job should still be open", getJobStats(jobId).get(0).getState(), equalTo(JobState.OPENED)); @@ -83,34 +82,33 @@ public void testCalendarUpdateWithManyJobs() throws Exception { public void testCalendarUpdateWithSingleJob() throws Exception { TimeValue bucketSpan = TimeValue.timeValueMinutes(30); String jobId = "single-job-test"; - + // Create and open job Job.Builder job = createJob(jobId, bucketSpan); putJob(job); openJob(jobId); - + // Create a calendar and associate it with the job String calendarId = "test-calendar-single"; putCalendar(calendarId, Collections.singletonList(jobId), "Calendar for single job test"); - + // Add scheduled events to the calendar List events = new ArrayList<>(); long eventStartTime = 1514764800000L + (bucketSpan.millis() * 5); long eventEndTime = eventStartTime + (bucketSpan.millis() * 2); events.add( - new ScheduledEvent.Builder() - .description("Single Job Event") + new ScheduledEvent.Builder().description("Single Job Event") .startTime(Instant.ofEpochMilli(eventStartTime)) .endTime(Instant.ofEpochMilli(eventEndTime)) .calendarId(calendarId) .build() ); - + PostCalendarEventsAction.Response response = postScheduledEvents(calendarId, events); - + // Wait a bit for updates to complete Thread.sleep(1000); - + // Verify job is still running assertThat("Job should still be open", getJobStats(jobId).get(0).getState(), equalTo(JobState.OPENED)); } @@ -121,39 +119,38 @@ public void testCalendarUpdateWithSingleJob() throws Exception { public void testCalendarUpdateWithClosedJobs() throws Exception { TimeValue bucketSpan = TimeValue.timeValueMinutes(30); String jobId = "closed-job-test"; - + // Create and open job Job.Builder job = createJob(jobId, bucketSpan); putJob(job); openJob(jobId); - + // Close the job closeJob(jobId); - + // Create a calendar and associate it with the closed job String calendarId = "test-calendar-closed"; putCalendar(calendarId, Collections.singletonList(jobId), "Calendar for closed job test"); - + // Add scheduled events to the calendar List events = new ArrayList<>(); long eventStartTime = 1514764800000L + (bucketSpan.millis() * 5); long eventEndTime = eventStartTime + (bucketSpan.millis() * 2); events.add( - new ScheduledEvent.Builder() - .description("Closed Job Event") + new ScheduledEvent.Builder().description("Closed Job Event") .startTime(Instant.ofEpochMilli(eventStartTime)) .endTime(Instant.ofEpochMilli(eventEndTime)) .calendarId(calendarId) .build() ); - + // This should not fail even though the job is closed PostCalendarEventsAction.Response response = postScheduledEvents(calendarId, events); - + // Wait a bit for updates to complete Thread.sleep(1000); - + // Verify job is still closed assertThat("Job should still be closed", getJobStats(jobId).get(0).getState(), equalTo(JobState.CLOSED)); } -} \ No newline at end of file +} diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java index baccb295cc822..9c82f69716d62 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -455,8 +455,7 @@ protected static String createJsonRecord(Map keyValueMap) throws */ protected void waitForCalendarUpdateAuditMessage(String jobId) throws Exception { assertBusy(() -> { - SearchRequestBuilder searchRequest = prepareSearch(".ml-notifications") - .setSize(1) + SearchRequestBuilder searchRequest = prepareSearch(".ml-notifications").setSize(1) .addSort("timestamp", SortOrder.DESC) .setQuery( QueryBuilders.boolQuery() @@ -464,7 +463,7 @@ protected void waitForCalendarUpdateAuditMessage(String jobId) throws Exception .filter(QueryBuilders.termQuery("level", "info")) .filter(QueryBuilders.termQuery("message", "Updated calendars in running process")) ); - + assertResponse(searchRequest, searchResponse -> { SearchHit[] hits = searchResponse.getHits().getHits(); assertThat("Job " + jobId + " should have calendar update audit message", hits.length, equalTo(1)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java index 620491a522d75..d82e38c27ee1c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java @@ -79,9 +79,13 @@ protected void doExecute( List events = request.getScheduledEvents(); ActionListener calendarListener = ActionListener.wrap(calendar -> { - logger.info("Calendar [{}] triggering update for {} jobs with {} new events", - request.getCalendarId(), calendar.getJobIds().size(), events.size()); - + logger.info( + "Calendar [{}] triggering update for {} jobs with {} new events", + request.getCalendarId(), + calendar.getJobIds().size(), + events.size() + ); + BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); for (ScheduledEvent event : events) { @@ -110,17 +114,11 @@ protected void doExecute( @Override public void onResponse(BulkResponse response) { long startTime = System.currentTimeMillis(); - jobManager.updateProcessOnCalendarChanged( - calendar.getJobIds(), - ActionListener.wrap( - r -> { - long duration = System.currentTimeMillis() - startTime; - logger.info("Calendar [{}] update completed in [{}ms]", request.getCalendarId(), duration); - listener.onResponse(new PostCalendarEventsAction.Response(events)); - }, - listener::onFailure - ) - ); + jobManager.updateProcessOnCalendarChanged(calendar.getJobIds(), ActionListener.wrap(r -> { + long duration = System.currentTimeMillis() - startTime; + logger.info("Calendar [{}] update completed in [{}ms]", request.getCalendarId(), duration); + listener.onResponse(new PostCalendarEventsAction.Response(events)); + }, listener::onFailure)); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 8b99c09f33cf5..96fdacd36de67 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -622,10 +622,9 @@ private static void appendCommaSeparatedSet(Set items, StringBuilder sb) public void updateProcessOnCalendarChanged(List calendarJobIds, ActionListener updateListener) { ClusterState clusterState = clusterService.state(); Set openJobIds = openJobIds(clusterState); - - logger.info("Updating process for calendar change: {} calendar job IDs, {} open jobs", - calendarJobIds.size(), openJobIds.size()); - + + logger.info("Updating process for calendar change: {} calendar job IDs, {} open jobs", calendarJobIds.size(), openJobIds.size()); + if (openJobIds.isEmpty()) { updateListener.onResponse(Boolean.TRUE); return; @@ -655,42 +654,44 @@ public void updateProcessOnCalendarChanged(List calendarJobIds, ActionLi private void submitJobEventUpdate(Collection jobIds, ActionListener updateListener) { logger.info("Submitting calendar event updates for [{}] jobs", jobIds.size()); - + AtomicInteger succeeded = new AtomicInteger(); AtomicInteger failed = new AtomicInteger(); - + try (var refs = new RefCountingListener(updateListener.delegateFailureAndWrap((l, v) -> { logger.info("Completed calendar updates: {} succeeded, {} failed", succeeded.get(), failed.get()); l.onResponse(true); }))) { for (String jobId : jobIds) { UpdateProcessAction.Request request = new UpdateProcessAction.Request( - jobId, null, null, null, null, true // updateScheduledEvents=true - ); - - executeAsyncWithOrigin(client, ML_ORIGIN, UpdateProcessAction.INSTANCE, request, - refs.acquire().delegateResponse((l, e) -> { - if (isExpectedFailure(e)) { - logger.debug("[{}] Calendar update skipped: {}", jobId, e.getMessage()); - } else { - failed.incrementAndGet(); - logger.warn("[{}] Calendar update failed", jobId, e); - } - l.onResponse(null); // Don't fail the whole operation - }).map(response -> { - succeeded.incrementAndGet(); - auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS)); - return null; - }) + jobId, + null, + null, + null, + null, + true // updateScheduledEvents=true ); + + executeAsyncWithOrigin(client, ML_ORIGIN, UpdateProcessAction.INSTANCE, request, refs.acquire().delegateResponse((l, e) -> { + if (isExpectedFailure(e)) { + logger.debug("[{}] Calendar update skipped: {}", jobId, e.getMessage()); + } else { + failed.incrementAndGet(); + logger.warn("[{}] Calendar update failed", jobId, e); + } + l.onResponse(null); // Don't fail the whole operation + }).map(response -> { + succeeded.incrementAndGet(); + auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS)); + return null; + })); } } } - + private boolean isExpectedFailure(Exception e) { // Job deleted, closed, etc. - not real errors - return ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException - || e.getMessage().contains("is not open"); + return ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException || e.getMessage().contains("is not open"); } public void revertSnapshot( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java index 1eb8c2f01d6fc..4a5754ace3a1a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java @@ -77,8 +77,7 @@ public void beforeStop() { boolean submitJobUpdate(UpdateParams update, ActionListener listener) { boolean offered = orderedJobUpdates.offer(new UpdateHolder(update, listener)); if (!offered) { - logger.warn("Update queue is full ({}), failed to submit update for job [{}]", - orderedJobUpdates.size(), update.getJobId()); + logger.warn("Update queue is full ({}), failed to submit update for job [{}]", orderedJobUpdates.size(), update.getJobId()); } return offered; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index c49c05ff3cddb..d1f009a00306d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -414,8 +414,11 @@ public void onResponse(Job job) { Optional>> stats = getStatistics(jobTask); DataCounts dataCounts = stats.isPresent() ? stats.get().v1() : new DataCounts(job.getId()); ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(job.earliestValidTimestamp(dataCounts)); - logger.debug("[{}] Fetching scheduled events for calendar update, time range: [{}]", - jobTask.getJobId(), job.earliestValidTimestamp(dataCounts)); + logger.debug( + "[{}] Fetching scheduled events for calendar update, time range: [{}]", + jobTask.getJobId(), + job.earliestValidTimestamp(dataCounts) + ); jobResultsProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener); } From 0311f6c24e1c7cf4fbae08f26e24aa54ae249b61 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Tue, 21 Oct 2025 17:17:40 +0200 Subject: [PATCH 03/15] Update docs/changelog/136886.yaml --- docs/changelog/136886.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/136886.yaml diff --git a/docs/changelog/136886.yaml b/docs/changelog/136886.yaml new file mode 100644 index 0000000000000..258a760e1b4fd --- /dev/null +++ b/docs/changelog/136886.yaml @@ -0,0 +1,5 @@ +pr: 136886 +summary: Fix ML calendar event update scalability issues +area: Machine Learning +type: bug +issues: [] From 9eda7931afea155836ee3deec369ffdc0a627938 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Tue, 21 Oct 2025 21:01:04 +0200 Subject: [PATCH 04/15] checkstyle --- .../xpack/ml/job/UpdateJobProcessNotifier.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java index 4a5754ace3a1a..0327cc9d73574 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java @@ -76,8 +76,9 @@ public void beforeStop() { boolean submitJobUpdate(UpdateParams update, ActionListener listener) { boolean offered = orderedJobUpdates.offer(new UpdateHolder(update, listener)); - if (!offered) { - logger.warn("Update queue is full ({}), failed to submit update for job [{}]", orderedJobUpdates.size(), update.getJobId()); + if (offered == false) { + logger.warn("Update queue is full ({}), failed to submit update for job [{}]", + orderedJobUpdates.size(), update.getJobId()); } return offered; } @@ -99,7 +100,7 @@ private void processNextUpdate() { List updates = new ArrayList<>(orderedJobUpdates.size()); try { orderedJobUpdates.drainTo(updates); - if (!updates.isEmpty()) { + if (updates.isEmpty() == false) { logger.info("Processing [{}] queued job updates", updates.size()); long startTime = System.currentTimeMillis(); executeProcessUpdates(new VolatileCursorIterator<>(updates)); From 8514752b342ea16e6ea56647f80af7c1173a5339 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 21 Oct 2025 19:07:58 +0000 Subject: [PATCH 05/15] [CI] Auto commit changes from spotless --- .../elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java index 0327cc9d73574..4ce4c492a41af 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java @@ -77,8 +77,7 @@ public void beforeStop() { boolean submitJobUpdate(UpdateParams update, ActionListener listener) { boolean offered = orderedJobUpdates.offer(new UpdateHolder(update, listener)); if (offered == false) { - logger.warn("Update queue is full ({}), failed to submit update for job [{}]", - orderedJobUpdates.size(), update.getJobId()); + logger.warn("Update queue is full ({}), failed to submit update for job [{}]", orderedJobUpdates.size(), update.getJobId()); } return offered; } From ddd9e538aeac3c6081c4c523d0d6fb056c4a3f12 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Wed, 22 Oct 2025 13:26:14 +0200 Subject: [PATCH 06/15] Progress-Based Response for calendar updates by immediately responding to API calls and processing job updates asynchronously in the background. --- .../TransportPostCalendarEventsAction.java | 15 ++-- .../xpack/ml/job/JobManager.java | 73 +++++++++++++------ 2 files changed, 60 insertions(+), 28 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java index d82e38c27ee1c..10bb4d62698a3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java @@ -80,7 +80,7 @@ protected void doExecute( ActionListener calendarListener = ActionListener.wrap(calendar -> { logger.info( - "Calendar [{}] triggering update for {} jobs with {} new events", + "Calendar [{}] accepted for background update: {} jobs with {} events", request.getCalendarId(), calendar.getJobIds().size(), events.size() @@ -113,12 +113,13 @@ protected void doExecute( new ActionListener() { @Override public void onResponse(BulkResponse response) { - long startTime = System.currentTimeMillis(); - jobManager.updateProcessOnCalendarChanged(calendar.getJobIds(), ActionListener.wrap(r -> { - long duration = System.currentTimeMillis() - startTime; - logger.info("Calendar [{}] update completed in [{}ms]", request.getCalendarId(), duration); - listener.onResponse(new PostCalendarEventsAction.Response(events)); - }, listener::onFailure)); + jobManager.updateProcessOnCalendarChanged(calendar.getJobIds(), ActionListener.wrap( + r -> { + logger.info("Calendar [{}] update initiated successfully", request.getCalendarId()); + listener.onResponse(new PostCalendarEventsAction.Response(events)); + }, + listener::onFailure + )); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 96fdacd36de67..31c95e8c32dcb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -622,54 +622,85 @@ private static void appendCommaSeparatedSet(Set items, StringBuilder sb) public void updateProcessOnCalendarChanged(List calendarJobIds, ActionListener updateListener) { ClusterState clusterState = clusterService.state(); Set openJobIds = openJobIds(clusterState); - + logger.info("Updating process for calendar change: {} calendar job IDs, {} open jobs", calendarJobIds.size(), openJobIds.size()); - + if (openJobIds.isEmpty()) { updateListener.onResponse(Boolean.TRUE); return; } + + // Respond immediately to prevent API timeouts + updateListener.onResponse(Boolean.TRUE); + + // Continue with background processing + processCalendarUpdatesAsync(calendarJobIds, openJobIds); + } + private void processCalendarUpdatesAsync(List calendarJobIds, Set openJobIds) { boolean appliesToAllJobs = calendarJobIds.stream().anyMatch(Metadata.ALL::equals); if (appliesToAllJobs) { - logger.info("Calendar change applies to all jobs"); - submitJobEventUpdate(openJobIds, updateListener); + logger.info("Calendar change applies to all jobs - starting background update for {} jobs", openJobIds.size()); + submitJobEventUpdateAsync(openJobIds); return; } - // calendarJobIds may be a group or job + // Process group expansion asynchronously jobConfigProvider.expandGroupIds( calendarJobIds, - updateListener.delegateFailureAndWrap( - (delegate, expandedIds) -> threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { - // Merge the expanded group members with the request Ids. - // Ids that aren't jobs will be filtered by isJobOpen() + ActionListener.wrap( + expandedIds -> { expandedIds.addAll(calendarJobIds); openJobIds.retainAll(expandedIds); - submitJobEventUpdate(openJobIds, delegate); - }) + logger.info("Calendar change expanded to {} jobs - starting background update", openJobIds.size()); + submitJobEventUpdateAsync(openJobIds); + }, + e -> logger.error("Failed to expand calendar job groups for background update", e) ) ); } - private void submitJobEventUpdate(Collection jobIds, ActionListener updateListener) { - logger.info("Submitting calendar event updates for [{}] jobs", jobIds.size()); - + private void submitJobEventUpdateAsync(Collection jobIds) { + if (jobIds.isEmpty()) { + return; + } + + logger.info("Starting background calendar event updates for [{}] jobs", jobIds.size()); + AtomicInteger succeeded = new AtomicInteger(); AtomicInteger failed = new AtomicInteger(); + long startTime = System.currentTimeMillis(); + + ActionListener backgroundListener = ActionListener.wrap( + success -> { + long duration = System.currentTimeMillis() - startTime; + logger.info("Background calendar updates completed in [{}ms]: {} succeeded, {} failed", + duration, succeeded.get(), failed.get()); + }, + failure -> { + long duration = System.currentTimeMillis() - startTime; + logger.error("Background calendar updates failed after [{}ms]: {} succeeded, {} failed", + duration, succeeded.get(), failed.get(), failure); + } + ); + + // Execute on utility thread pool to avoid blocking transport threads + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { + submitJobEventUpdateSync(jobIds, backgroundListener, succeeded, failed); + }); + } + private void submitJobEventUpdateSync(Collection jobIds, ActionListener updateListener, + AtomicInteger succeeded, AtomicInteger failed) { try (var refs = new RefCountingListener(updateListener.delegateFailureAndWrap((l, v) -> { - logger.info("Completed calendar updates: {} succeeded, {} failed", succeeded.get(), failed.get()); l.onResponse(true); }))) { + // Instead of calling `updateJobProcessNotifier.submitJobUpdate()`, directly call `UpdateProcessAction` + // to bypass the queue and avoid the scalability issues. Since calendar and filter updates fetch the latest state from the + // index and can run on any node, they don't need ordering guarantees. for (String jobId : jobIds) { UpdateProcessAction.Request request = new UpdateProcessAction.Request( - jobId, - null, - null, - null, - null, - true // updateScheduledEvents=true + jobId, null, null, null, null, true ); executeAsyncWithOrigin(client, ML_ORIGIN, UpdateProcessAction.INSTANCE, request, refs.acquire().delegateResponse((l, e) -> { From 771b430ba0f01bf79a5c5034c3fe2582014cba3d Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Wed, 22 Oct 2025 13:28:27 +0200 Subject: [PATCH 07/15] spotless --- .../TransportPostCalendarEventsAction.java | 11 +-- .../xpack/ml/job/JobManager.java | 82 +++++++++---------- 2 files changed, 45 insertions(+), 48 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java index 10bb4d62698a3..e1e4c42089e4c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java @@ -113,13 +113,10 @@ protected void doExecute( new ActionListener() { @Override public void onResponse(BulkResponse response) { - jobManager.updateProcessOnCalendarChanged(calendar.getJobIds(), ActionListener.wrap( - r -> { - logger.info("Calendar [{}] update initiated successfully", request.getCalendarId()); - listener.onResponse(new PostCalendarEventsAction.Response(events)); - }, - listener::onFailure - )); + jobManager.updateProcessOnCalendarChanged(calendar.getJobIds(), ActionListener.wrap(r -> { + logger.info("Calendar [{}] update initiated successfully", request.getCalendarId()); + listener.onResponse(new PostCalendarEventsAction.Response(events)); + }, listener::onFailure)); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 31c95e8c32dcb..86608c39fa125 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -622,17 +622,17 @@ private static void appendCommaSeparatedSet(Set items, StringBuilder sb) public void updateProcessOnCalendarChanged(List calendarJobIds, ActionListener updateListener) { ClusterState clusterState = clusterService.state(); Set openJobIds = openJobIds(clusterState); - + logger.info("Updating process for calendar change: {} calendar job IDs, {} open jobs", calendarJobIds.size(), openJobIds.size()); - + if (openJobIds.isEmpty()) { updateListener.onResponse(Boolean.TRUE); return; } - + // Respond immediately to prevent API timeouts updateListener.onResponse(Boolean.TRUE); - + // Continue with background processing processCalendarUpdatesAsync(calendarJobIds, openJobIds); } @@ -646,62 +646,62 @@ private void processCalendarUpdatesAsync(List calendarJobIds, Set { - expandedIds.addAll(calendarJobIds); - openJobIds.retainAll(expandedIds); - logger.info("Calendar change expanded to {} jobs - starting background update", openJobIds.size()); - submitJobEventUpdateAsync(openJobIds); - }, - e -> logger.error("Failed to expand calendar job groups for background update", e) - ) - ); + jobConfigProvider.expandGroupIds(calendarJobIds, ActionListener.wrap(expandedIds -> { + expandedIds.addAll(calendarJobIds); + openJobIds.retainAll(expandedIds); + logger.info("Calendar change expanded to {} jobs - starting background update", openJobIds.size()); + submitJobEventUpdateAsync(openJobIds); + }, e -> logger.error("Failed to expand calendar job groups for background update", e))); } private void submitJobEventUpdateAsync(Collection jobIds) { if (jobIds.isEmpty()) { return; } - + logger.info("Starting background calendar event updates for [{}] jobs", jobIds.size()); - + AtomicInteger succeeded = new AtomicInteger(); AtomicInteger failed = new AtomicInteger(); long startTime = System.currentTimeMillis(); - - ActionListener backgroundListener = ActionListener.wrap( - success -> { - long duration = System.currentTimeMillis() - startTime; - logger.info("Background calendar updates completed in [{}ms]: {} succeeded, {} failed", - duration, succeeded.get(), failed.get()); - }, - failure -> { - long duration = System.currentTimeMillis() - startTime; - logger.error("Background calendar updates failed after [{}ms]: {} succeeded, {} failed", - duration, succeeded.get(), failed.get(), failure); - } - ); - + + ActionListener backgroundListener = ActionListener.wrap(success -> { + long duration = System.currentTimeMillis() - startTime; + logger.info( + "Background calendar updates completed in [{}ms]: {} succeeded, {} failed", + duration, + succeeded.get(), + failed.get() + ); + }, failure -> { + long duration = System.currentTimeMillis() - startTime; + logger.error( + "Background calendar updates failed after [{}ms]: {} succeeded, {} failed", + duration, + succeeded.get(), + failed.get(), + failure + ); + }); + // Execute on utility thread pool to avoid blocking transport threads threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { submitJobEventUpdateSync(jobIds, backgroundListener, succeeded, failed); }); } - private void submitJobEventUpdateSync(Collection jobIds, ActionListener updateListener, - AtomicInteger succeeded, AtomicInteger failed) { - try (var refs = new RefCountingListener(updateListener.delegateFailureAndWrap((l, v) -> { - l.onResponse(true); - }))) { + private void submitJobEventUpdateSync( + Collection jobIds, + ActionListener updateListener, + AtomicInteger succeeded, + AtomicInteger failed + ) { + try (var refs = new RefCountingListener(updateListener.delegateFailureAndWrap((l, v) -> { l.onResponse(true); }))) { // Instead of calling `updateJobProcessNotifier.submitJobUpdate()`, directly call `UpdateProcessAction` - // to bypass the queue and avoid the scalability issues. Since calendar and filter updates fetch the latest state from the + // to bypass the queue and avoid the scalability issues. Since calendar and filter updates fetch the latest state from the // index and can run on any node, they don't need ordering guarantees. for (String jobId : jobIds) { - UpdateProcessAction.Request request = new UpdateProcessAction.Request( - jobId, null, null, null, null, true - ); + UpdateProcessAction.Request request = new UpdateProcessAction.Request(jobId, null, null, null, null, true); executeAsyncWithOrigin(client, ML_ORIGIN, UpdateProcessAction.INSTANCE, request, refs.acquire().delegateResponse((l, e) -> { if (isExpectedFailure(e)) { From 95a07dd1cfb4ba8f4ebeab0ba4c293126ba7cfc3 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Wed, 22 Oct 2025 15:23:15 +0200 Subject: [PATCH 08/15] Remove CalendarScalabilityIT integration tests and refactor job update handling in JobManager to include skipped updates. Update logging to reflect skipped updates during background calendar processing. --- .../ml/integration/CalendarScalabilityIT.java | 156 ------------------ .../MlNativeAutodetectIntegTestCase.java | 15 -- .../xpack/ml/job/JobManager.java | 18 +- 3 files changed, 11 insertions(+), 178 deletions(-) delete mode 100644 x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/CalendarScalabilityIT.java diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/CalendarScalabilityIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/CalendarScalabilityIT.java deleted file mode 100644 index 662c2060c7926..0000000000000 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/CalendarScalabilityIT.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ -package org.elasticsearch.xpack.ml.integration; - -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.xpack.core.ml.action.PostCalendarEventsAction; -import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; -import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.junit.After; - -import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; - -/** - * Integration tests for calendar event scalability - */ -public class CalendarScalabilityIT extends MlNativeAutodetectIntegTestCase { - - @After - public void cleanUpTest() { - cleanUp(); - } - - /** - * Test that calendar updates work correctly when a calendar is associated with many jobs - */ - public void testCalendarUpdateWithManyJobs() throws Exception { - TimeValue bucketSpan = TimeValue.timeValueMinutes(30); - int jobCount = 5; // Reduced for faster testing - - // Create and open multiple jobs - List jobIds = new ArrayList<>(); - for (int i = 0; i < jobCount; i++) { - String jobId = "calendar-scalability-" + i; - jobIds.add(jobId); - - Job.Builder job = createJob(jobId, bucketSpan); - putJob(job); - openJob(jobId); - } - - // Create a calendar and associate it with all jobs - String calendarId = "test-calendar-many-jobs"; - putCalendar(calendarId, jobIds, "Calendar for many jobs test"); - - // Add scheduled events to the calendar - List events = new ArrayList<>(); - long eventStartTime = 1514764800000L + (bucketSpan.millis() * 5); // 5 buckets in - long eventEndTime = eventStartTime + (bucketSpan.millis() * 2); // 2 buckets duration - events.add( - new ScheduledEvent.Builder().description("Scalability Test Event") - .startTime(Instant.ofEpochMilli(eventStartTime)) - .endTime(Instant.ofEpochMilli(eventEndTime)) - .calendarId(calendarId) - .build() - ); - - PostCalendarEventsAction.Response response = postScheduledEvents(calendarId, events); - - // Wait a bit for updates to complete - Thread.sleep(2000); - - // Verify all jobs are still running - for (String jobId : jobIds) { - assertThat("Job should still be open", getJobStats(jobId).get(0).getState(), equalTo(JobState.OPENED)); - } - } - - /** - * Test that calendar updates work with a single job - */ - public void testCalendarUpdateWithSingleJob() throws Exception { - TimeValue bucketSpan = TimeValue.timeValueMinutes(30); - String jobId = "single-job-test"; - - // Create and open job - Job.Builder job = createJob(jobId, bucketSpan); - putJob(job); - openJob(jobId); - - // Create a calendar and associate it with the job - String calendarId = "test-calendar-single"; - putCalendar(calendarId, Collections.singletonList(jobId), "Calendar for single job test"); - - // Add scheduled events to the calendar - List events = new ArrayList<>(); - long eventStartTime = 1514764800000L + (bucketSpan.millis() * 5); - long eventEndTime = eventStartTime + (bucketSpan.millis() * 2); - events.add( - new ScheduledEvent.Builder().description("Single Job Event") - .startTime(Instant.ofEpochMilli(eventStartTime)) - .endTime(Instant.ofEpochMilli(eventEndTime)) - .calendarId(calendarId) - .build() - ); - - PostCalendarEventsAction.Response response = postScheduledEvents(calendarId, events); - - // Wait a bit for updates to complete - Thread.sleep(1000); - - // Verify job is still running - assertThat("Job should still be open", getJobStats(jobId).get(0).getState(), equalTo(JobState.OPENED)); - } - - /** - * Test that calendar updates work with closed jobs (should not fail) - */ - public void testCalendarUpdateWithClosedJobs() throws Exception { - TimeValue bucketSpan = TimeValue.timeValueMinutes(30); - String jobId = "closed-job-test"; - - // Create and open job - Job.Builder job = createJob(jobId, bucketSpan); - putJob(job); - openJob(jobId); - - // Close the job - closeJob(jobId); - - // Create a calendar and associate it with the closed job - String calendarId = "test-calendar-closed"; - putCalendar(calendarId, Collections.singletonList(jobId), "Calendar for closed job test"); - - // Add scheduled events to the calendar - List events = new ArrayList<>(); - long eventStartTime = 1514764800000L + (bucketSpan.millis() * 5); - long eventEndTime = eventStartTime + (bucketSpan.millis() * 2); - events.add( - new ScheduledEvent.Builder().description("Closed Job Event") - .startTime(Instant.ofEpochMilli(eventStartTime)) - .endTime(Instant.ofEpochMilli(eventEndTime)) - .calendarId(calendarId) - .build() - ); - - // This should not fail even though the job is closed - PostCalendarEventsAction.Response response = postScheduledEvents(calendarId, events); - - // Wait a bit for updates to complete - Thread.sleep(1000); - - // Verify job is still closed - assertThat("Job should still be closed", getJobStats(jobId).get(0).getState(), equalTo(JobState.CLOSED)); - } -} diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java index 9c82f69716d62..2dc8155ee7414 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -471,21 +471,6 @@ protected void waitForCalendarUpdateAuditMessage(String jobId) throws Exception }, 30, TimeUnit.SECONDS); } - /** - * Helper method to create and open multiple jobs - */ - protected List createAndOpenMultipleJobs(String prefix, int count, TimeValue bucketSpan) { - List jobIds = new ArrayList<>(); - for (int i = 0; i < count; i++) { - String jobId = prefix + "-" + i; - Job.Builder job = createJob(jobId, bucketSpan); - putJob(job); - openJob(jobId); - jobIds.add(jobId); - } - return jobIds; - } - /** * Helper method to create a job */ diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 86608c39fa125..a729f0b21a4e0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -632,7 +632,6 @@ public void updateProcessOnCalendarChanged(List calendarJobIds, ActionLi // Respond immediately to prevent API timeouts updateListener.onResponse(Boolean.TRUE); - // Continue with background processing processCalendarUpdatesAsync(calendarJobIds, openJobIds); } @@ -660,33 +659,35 @@ private void submitJobEventUpdateAsync(Collection jobIds) { } logger.info("Starting background calendar event updates for [{}] jobs", jobIds.size()); - AtomicInteger succeeded = new AtomicInteger(); AtomicInteger failed = new AtomicInteger(); + AtomicInteger skipped = new AtomicInteger(); long startTime = System.currentTimeMillis(); ActionListener backgroundListener = ActionListener.wrap(success -> { long duration = System.currentTimeMillis() - startTime; logger.info( - "Background calendar updates completed in [{}ms]: {} succeeded, {} failed", + "Background calendar updates completed in [{}ms]: {} succeeded, {} failed, {} skipped", duration, succeeded.get(), - failed.get() + failed.get(), + skipped.get() ); }, failure -> { long duration = System.currentTimeMillis() - startTime; logger.error( - "Background calendar updates failed after [{}ms]: {} succeeded, {} failed", + "Background calendar updates failed after [{}ms]: {} succeeded, {} failed, {} skipped", duration, succeeded.get(), failed.get(), + skipped.get(), failure ); }); // Execute on utility thread pool to avoid blocking transport threads threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { - submitJobEventUpdateSync(jobIds, backgroundListener, succeeded, failed); + submitJobEventUpdateSync(jobIds, backgroundListener, succeeded, failed, skipped); }); } @@ -694,8 +695,10 @@ private void submitJobEventUpdateSync( Collection jobIds, ActionListener updateListener, AtomicInteger succeeded, - AtomicInteger failed + AtomicInteger failed, + AtomicInteger skipped ) { + // Use RefCountingListener to track all parallel updates and complete the listener when all updates are completed. try (var refs = new RefCountingListener(updateListener.delegateFailureAndWrap((l, v) -> { l.onResponse(true); }))) { // Instead of calling `updateJobProcessNotifier.submitJobUpdate()`, directly call `UpdateProcessAction` // to bypass the queue and avoid the scalability issues. Since calendar and filter updates fetch the latest state from the @@ -705,6 +708,7 @@ private void submitJobEventUpdateSync( executeAsyncWithOrigin(client, ML_ORIGIN, UpdateProcessAction.INSTANCE, request, refs.acquire().delegateResponse((l, e) -> { if (isExpectedFailure(e)) { + skipped.incrementAndGet(); logger.debug("[{}] Calendar update skipped: {}", jobId, e.getMessage()); } else { failed.incrementAndGet(); From 613bccee8370a7927cb877bd4db48fb41511f359 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Wed, 22 Oct 2025 15:46:32 +0200 Subject: [PATCH 09/15] Refactor integration tests for ML job handling by removing unused methods and updating job creation visibility. Enhance ScheduledEventsIT to verify asynchronous calendar updates and add a plugin for tracking UpdateProcessAction calls. --- .../MlNativeAutodetectIntegTestCase.java | 40 ---- .../xpack/ml/integration/ResetJobIT.java | 2 +- .../ml/integration/ScheduledEventsIT.java | 183 +++++++++++++++++- 3 files changed, 183 insertions(+), 42 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java index 2dc8155ee7414..9864c88d1405c 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -8,7 +8,6 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.Strings; @@ -56,9 +55,6 @@ import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate; -import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; -import org.elasticsearch.xpack.core.ml.job.config.DataDescription; -import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; @@ -74,7 +70,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -449,39 +444,4 @@ protected List generateData( protected static String createJsonRecord(Map keyValueMap) throws IOException { return Strings.toString(JsonXContent.contentBuilder().map(keyValueMap)) + "\n"; } - - /** - * Helper method to wait for calendar update audit message - */ - protected void waitForCalendarUpdateAuditMessage(String jobId) throws Exception { - assertBusy(() -> { - SearchRequestBuilder searchRequest = prepareSearch(".ml-notifications").setSize(1) - .addSort("timestamp", SortOrder.DESC) - .setQuery( - QueryBuilders.boolQuery() - .filter(QueryBuilders.termQuery("job_id", jobId)) - .filter(QueryBuilders.termQuery("level", "info")) - .filter(QueryBuilders.termQuery("message", "Updated calendars in running process")) - ); - - assertResponse(searchRequest, searchResponse -> { - SearchHit[] hits = searchResponse.getHits().getHits(); - assertThat("Job " + jobId + " should have calendar update audit message", hits.length, equalTo(1)); - }); - }, 30, TimeUnit.SECONDS); - } - - /** - * Helper method to create a job - */ - protected Job.Builder createJob(String jobId, TimeValue bucketSpan) { - Detector.Builder detector = new Detector.Builder("count", null); - AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); - analysisConfig.setBucketSpan(bucketSpan); - Job.Builder job = new Job.Builder(jobId); - job.setAnalysisConfig(analysisConfig); - DataDescription.Builder dataDescription = new DataDescription.Builder(); - job.setDataDescription(dataDescription); - return job; - } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ResetJobIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ResetJobIT.java index 844ac118b6027..61ce2224c1ed9 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ResetJobIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ResetJobIT.java @@ -88,7 +88,7 @@ private void testReset(boolean previousResetFailed) throws Exception { assertThat("Audit messages: " + auditMessages, auditMessages.get(auditMessages.size() - 1), equalTo("Job has been reset")); } - protected Job.Builder createJob(String jobId, TimeValue bucketSpan) { + private Job.Builder createJob(String jobId, TimeValue bucketSpan) { Detector.Builder detector = new Detector.Builder("count", null); AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); analysisConfig.setBucketSpan(bucketSpan); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java index 033cb27355533..046d560815092 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java @@ -6,14 +6,24 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilter; +import org.elasticsearch.action.support.ActionFilterChain; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.tasks.Task; import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; +import org.elasticsearch.xpack.core.ml.action.GetCalendarEventsAction; import org.elasticsearch.xpack.core.ml.action.GetRecordsAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; +import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction; import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; @@ -28,8 +38,11 @@ import java.io.IOException; import java.time.Instant; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; @@ -37,6 +50,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; public class ScheduledEventsIT extends MlNativeAutodetectIntegTestCase { @@ -45,6 +59,13 @@ public void cleanUpTest() { cleanUp(); } + @Override + protected Collection> nodePlugins() { + List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(ScheduledEventsIT.UpdateProcessActionTrackerPlugin.class); + return plugins; + } + public void testScheduledEvents() throws IOException { TimeValue bucketSpan = TimeValue.timeValueMinutes(30); @@ -464,7 +485,109 @@ public void testNewJobWithGlobalCalendar() throws Exception { assertThat(buckets.get(5).getScheduledEvents(), contains("Event added after job is opened")); } - protected Job.Builder createJob(String jobId, TimeValue bucketSpan) { + /** + * Test that verifies UpdateProcessAction is called with correct parameters when calendar events + * are posted asynchronously, using ActionFilter to directly intercept the calls + */ + public void testCalendarUpdateCallsUpdateProcessAction() throws Exception { + // Reset tracker + ScheduledEventsIT.UpdateProcessActionTrackerPlugin.reset(); + + TimeValue bucketSpan = TimeValue.timeValueMinutes(30); + String jobId = "calendar-update-test"; + String calendarId = "test-calendar"; + + // Create and open single job + createJob(jobId, bucketSpan); + openJob(jobId); + + // Create calendar with the job + putCalendar(calendarId, List.of(jobId), "Update process action test"); + + // Create scheduled event + List events = List.of( + new ScheduledEvent.Builder().description("Direct Test Event") + .startTime(Instant.ofEpochMilli(System.currentTimeMillis() + 60000)) + .endTime(Instant.ofEpochMilli(System.currentTimeMillis() + 120000)) + .calendarId(calendarId) + .build() + ); + + // Post events and verify API completes quickly (async behavior) + long startTime = System.currentTimeMillis(); + postScheduledEvents(calendarId, events); + long duration = System.currentTimeMillis() - startTime; + + assertThat("API should complete quickly with async implementation", duration, lessThan(5000L)); + + // Wait for and verify ActionFilter captured the UpdateProcessAction call + // We intercept the call to UpdateProcessAction using an ActionFilter to verify the call was made + assertBusy(() -> { + assertThat( + "Should have intercepted UpdateProcessAction call", + ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updateProcessCallCount.get(), + equalTo(1) + ); + assertThat( + "Should have called UpdateProcessAction for the correct job", + ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updatedJobIds, + contains(jobId) + ); + }, 5, TimeUnit.SECONDS); + + // Verify calendar events were stored correctly + verifyCalendarEventsStored(calendarId, 1); + + // Cleanup + closeJob(jobId); + + logger.info("Successfully verified UpdateProcessAction call with updateScheduledEvents=true for job [{}]", jobId); + } + + /** + * Test calendar updates with closed jobs (should not fail) + */ + public void testCalendarUpdateWithClosedJobs() throws IOException { + TimeValue bucketSpan = TimeValue.timeValueMinutes(30); + String jobId = "closed-job-test"; + + // Create and run job, then close it + Job.Builder job = createJob(jobId, bucketSpan); + long startTime = 1514764800000L; + runJob(job, startTime, bucketSpan, 10); + + // Create calendar with the closed job + String calendarId = "closed-job-calendar"; + putCalendar(calendarId, Collections.singletonList(jobId), "Calendar with closed job"); + + // Create scheduled event + List events = new ArrayList<>(); + long eventStartTime = startTime + (bucketSpan.millis() * 5); + long eventEndTime = eventStartTime + (bucketSpan.millis() * 2); + events.add( + new ScheduledEvent.Builder().description("Closed Job Event") + .startTime(Instant.ofEpochMilli(eventStartTime)) + .endTime(Instant.ofEpochMilli(eventEndTime)) + .calendarId(calendarId) + .build() + ); + + // This should not fail even though the job is closed + // The async implementation should gracefully skip closed jobs + postScheduledEvents(calendarId, events); + + // Verify job is still closed and buckets don't have the new event + // (since the job was closed when the event was added) + GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId); + List buckets = getBuckets(getBucketsRequest); + + // All buckets should be empty of scheduled events since job was closed when event was added + for (Bucket bucket : buckets) { + assertThat("Closed job buckets should not contain new scheduled events", bucket.getScheduledEvents(), empty()); + } + } + + private Job.Builder createJob(String jobId, TimeValue bucketSpan) { Detector.Builder detector = new Detector.Builder("count", null); AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); analysisConfig.setBucketSpan(bucketSpan); @@ -486,4 +609,62 @@ private void runJob(Job.Builder job, long startTime, TimeValue bucketSpan, int b ); closeJob(job.getId()); } + + /** + * Helper method to verify that calendar events are stored and retrievable + */ + private void verifyCalendarEventsStored(String calendarId, int expectedEventCount) { + GetCalendarEventsAction.Request request = new GetCalendarEventsAction.Request(calendarId); + GetCalendarEventsAction.Response response = client().execute(GetCalendarEventsAction.INSTANCE, request).actionGet(); + + assertThat( + "Calendar should have the expected number of events", + response.getResources().results().size(), + equalTo(expectedEventCount) + ); + } + + /** + * Test plugin that tracks UpdateProcessAction calls with updateScheduledEvents=true + * using an ActionFilter to verify native process interaction in integration tests + */ + public static class UpdateProcessActionTrackerPlugin extends Plugin implements ActionPlugin { + + public static final AtomicInteger updateProcessCallCount = new AtomicInteger(0); + public static final List updatedJobIds = Collections.synchronizedList(new ArrayList<>()); + + public static void reset() { + updateProcessCallCount.set(0); + updatedJobIds.clear(); + } + + @Override + public List getActionFilters() { + return List.of(new ActionFilter() { + @Override + public int order() { + return 0; + } + + @Override + public void apply( + Task task, + String action, + Request request, + ActionListener listener, + ActionFilterChain chain + ) { + if (UpdateProcessAction.NAME.equals(action) && request instanceof UpdateProcessAction.Request) { + UpdateProcessAction.Request updateRequest = (UpdateProcessAction.Request) request; + if (updateRequest.isUpdateScheduledEvents()) { + updateProcessCallCount.incrementAndGet(); + updatedJobIds.add(updateRequest.getJobId()); + } + } + chain.proceed(task, action, request, listener); + } + }); + } + } + } From 8bbe8af4ec445c9be9247b1356c7c81fa5f7c42a Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Wed, 22 Oct 2025 15:51:57 +0200 Subject: [PATCH 10/15] Refactor logging imports in TransportPostCalendarEventsAction to use the updated logging package. This change improves consistency and aligns with recent codebase updates. --- .../xpack/ml/action/TransportPostCalendarEventsAction.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java index e1e4c42089e4c..7bee769318bd7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java @@ -6,8 +6,6 @@ */ package org.elasticsearch.xpack.ml.action; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; @@ -19,6 +17,8 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.ToXContent; From 8361516240e8224c14a70f3e7d4dc96063be1aa1 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Wed, 22 Oct 2025 16:46:59 +0200 Subject: [PATCH 11/15] fix logger check --- .../java/org/elasticsearch/xpack/ml/job/JobManager.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index a729f0b21a4e0..9f5a38b21a745 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -6,8 +6,6 @@ */ package org.elasticsearch.xpack.ml.job; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; @@ -25,6 +23,8 @@ import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.analysis.AnalysisRegistry; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; @@ -456,7 +456,7 @@ private void postJobUpdate(UpdateJobAction.Request request, Job updatedJob, Acti )); } } else { - logger.debug("[{}] No process update required for job update: {}", jobUpdate::getJobId, jobUpdate::toString); + logger.debug("[{}] No process update required for job update: {}", jobUpdate.getJobId(), jobUpdate.toString()); auditJobUpdatedIfNotInternal(request); } From 6ea309f49da5b438f9309eb7855aba21de801e28 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Thu, 23 Oct 2025 10:56:56 +0200 Subject: [PATCH 12/15] Update unit tests. --- .../xpack/ml/job/JobManagerTests.java | 69 ++++++++++++++----- 1 file changed, 51 insertions(+), 18 deletions(-) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index cb10eead972ec..8d01fc081df16 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.xpack.core.ml.MlConfigIndex; import org.elasticsearch.xpack.core.ml.MlConfigVersion; import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.CategorizationAnalyzerConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; @@ -75,10 +76,12 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; public class JobManagerTests extends ESTestCase { @@ -326,22 +329,37 @@ public void testUpdateProcessOnCalendarChanged() { // The search will not return any results mockClientBuilder.prepareSearchFields(MlConfigIndex.indexName(), Collections.emptyList()); - JobManager jobManager = createJobManager(mockClientBuilder.build()); + Client mockClient = mockClientBuilder.build(); + + // Mock UpdateProcessAction calls - calendar updates now bypass the queue and call client.execute directly + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocation + .getArguments()[2]; + listener.onResponse(new UpdateProcessAction.Response()); + return null; + }).when(mockClient).execute(eq(UpdateProcessAction.INSTANCE), any(UpdateProcessAction.Request.class), any()); + + JobManager jobManager = createJobManager(mockClient); jobManager.updateProcessOnCalendarChanged( Arrays.asList("job-1", "job-3", "job-4"), ActionTestUtils.assertNoFailureListener(r -> {}) ); - ArgumentCaptor updateParamsCaptor = ArgumentCaptor.forClass(UpdateParams.class); - verify(updateJobProcessNotifier, times(2)).submitJobUpdate(updateParamsCaptor.capture(), any()); + // Verify that UpdateProcessAction is called directly for each open job + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(UpdateProcessAction.Request.class); + verify(mockClient, times(2)).execute(eq(UpdateProcessAction.INSTANCE), requestCaptor.capture(), any()); - List capturedUpdateParams = updateParamsCaptor.getAllValues(); - assertThat(capturedUpdateParams.size(), equalTo(2)); - assertThat(capturedUpdateParams.get(0).getJobId(), equalTo("job-1")); - assertThat(capturedUpdateParams.get(0).isUpdateScheduledEvents(), is(true)); - assertThat(capturedUpdateParams.get(1).getJobId(), equalTo("job-3")); - assertThat(capturedUpdateParams.get(1).isUpdateScheduledEvents(), is(true)); + List capturedRequests = requestCaptor.getAllValues(); + assertThat(capturedRequests.size(), equalTo(2)); + assertThat(capturedRequests.get(0).getJobId(), equalTo("job-1")); + assertThat(capturedRequests.get(0).isUpdateScheduledEvents(), is(true)); + assertThat(capturedRequests.get(1).getJobId(), equalTo("job-3")); + assertThat(capturedRequests.get(1).isUpdateScheduledEvents(), is(true)); + + // Verify updateJobProcessNotifier is not called for calendar updates + verifyNoInteractions(updateJobProcessNotifier); } public void testUpdateProcessOnCalendarChanged_GivenGroups() { @@ -373,19 +391,34 @@ public void testUpdateProcessOnCalendarChanged_GivenGroups() { ); mockClientBuilder.prepareSearchFields(MlConfigIndex.indexName(), fieldHits); - JobManager jobManager = createJobManager(mockClientBuilder.build()); + Client mockClient = mockClientBuilder.build(); + + // Mock UpdateProcessAction calls - calendar updates now bypass the queue and call client.execute directly + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocation + .getArguments()[2]; + listener.onResponse(new UpdateProcessAction.Response()); + return null; + }).when(mockClient).execute(eq(UpdateProcessAction.INSTANCE), any(UpdateProcessAction.Request.class), any()); + + JobManager jobManager = createJobManager(mockClient); jobManager.updateProcessOnCalendarChanged(Collections.singletonList("group-1"), ActionTestUtils.assertNoFailureListener(r -> {})); - ArgumentCaptor updateParamsCaptor = ArgumentCaptor.forClass(UpdateParams.class); - verify(updateJobProcessNotifier, times(2)).submitJobUpdate(updateParamsCaptor.capture(), any()); + // Verify that UpdateProcessAction is called directly for each open job in the group + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(UpdateProcessAction.Request.class); + verify(mockClient, times(2)).execute(eq(UpdateProcessAction.INSTANCE), requestCaptor.capture(), any()); - List capturedUpdateParams = updateParamsCaptor.getAllValues(); - assertThat(capturedUpdateParams.size(), equalTo(2)); - assertThat(capturedUpdateParams.get(0).getJobId(), equalTo("job-1")); - assertThat(capturedUpdateParams.get(0).isUpdateScheduledEvents(), is(true)); - assertThat(capturedUpdateParams.get(1).getJobId(), equalTo("job-2")); - assertThat(capturedUpdateParams.get(1).isUpdateScheduledEvents(), is(true)); + List capturedRequests = requestCaptor.getAllValues(); + assertThat(capturedRequests.size(), equalTo(2)); + assertThat(capturedRequests.get(0).getJobId(), equalTo("job-1")); + assertThat(capturedRequests.get(0).isUpdateScheduledEvents(), is(true)); + assertThat(capturedRequests.get(1).getJobId(), equalTo("job-2")); + assertThat(capturedRequests.get(1).isUpdateScheduledEvents(), is(true)); + + // Verify updateJobProcessNotifier is not called for calendar updates + verifyNoInteractions(updateJobProcessNotifier); } public void testValidateCategorizationAnalyzer_GivenValid() throws IOException { From d56d390f7bf973c4bbad79b7c2ec25416990e6fc Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Fri, 31 Oct 2025 12:20:15 +0100 Subject: [PATCH 13/15] reiviewer comments addressed. --- .../ml/integration/ScheduledEventsIT.java | 72 ++++++++++++++++--- .../TransportPostCalendarEventsAction.java | 4 +- .../xpack/ml/job/JobManager.java | 40 ++++++----- .../ml/job/UpdateJobProcessNotifier.java | 6 +- 4 files changed, 93 insertions(+), 29 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java index 046d560815092..569b3e17c5be7 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java @@ -41,7 +41,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -50,7 +49,6 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.lessThan; public class ScheduledEventsIT extends MlNativeAutodetectIntegTestCase { @@ -513,12 +511,8 @@ public void testCalendarUpdateCallsUpdateProcessAction() throws Exception { .build() ); - // Post events and verify API completes quickly (async behavior) - long startTime = System.currentTimeMillis(); + // Post events - API should return immediately with async implementation postScheduledEvents(calendarId, events); - long duration = System.currentTimeMillis() - startTime; - - assertThat("API should complete quickly with async implementation", duration, lessThan(5000L)); // Wait for and verify ActionFilter captured the UpdateProcessAction call // We intercept the call to UpdateProcessAction using an ActionFilter to verify the call was made @@ -533,7 +527,7 @@ public void testCalendarUpdateCallsUpdateProcessAction() throws Exception { ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updatedJobIds, contains(jobId) ); - }, 5, TimeUnit.SECONDS); + }); // Verify calendar events were stored correctly verifyCalendarEventsStored(calendarId, 1); @@ -587,6 +581,68 @@ public void testCalendarUpdateWithClosedJobs() throws IOException { } } + /** + * Test calendar updates with mixed open and closed jobs - verify open jobs are updated and closed jobs are skipped + */ + public void testCalendarUpdateWithMixedOpenAndClosedJobs() throws Exception { + TimeValue bucketSpan = TimeValue.timeValueMinutes(30); + + // Create two jobs + String openJobId = "mixed-test-open-job"; + String closedJobId = "mixed-test-closed-job"; + + // Create and open first job + createJob(openJobId, bucketSpan); + openJob(openJobId); + + // Create and run second job, then close it + Job.Builder closedJob = createJob(closedJobId, bucketSpan); + long startTime = 1514764800000L; + runJob(closedJob, startTime, bucketSpan, 10); + + // Create calendar with both jobs + String calendarId = "mixed-jobs-calendar"; + putCalendar(calendarId, List.of(openJobId, closedJobId), "Calendar with mixed open and closed jobs"); + + // Reset tracker + ScheduledEventsIT.UpdateProcessActionTrackerPlugin.reset(); + + // Create scheduled event + List events = List.of( + new ScheduledEvent.Builder().description("Mixed Jobs Event") + .startTime(Instant.ofEpochMilli(System.currentTimeMillis() + 60000)) + .endTime(Instant.ofEpochMilli(System.currentTimeMillis() + 120000)) + .calendarId(calendarId) + .build() + ); + + // Post events - should update open job and skip closed job + postScheduledEvents(calendarId, events); + + // Wait for ActionFilter to capture the UpdateProcessAction call + // Should only be called for the open job, not the closed one + assertBusy(() -> { + assertThat( + "Should have intercepted UpdateProcessAction call for open job only", + ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updateProcessCallCount.get(), + equalTo(1) + ); + assertThat( + "Should have called UpdateProcessAction for the open job only", + ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updatedJobIds, + contains(openJobId) + ); + assertThat( + "Should not have called UpdateProcessAction for the closed job", + ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updatedJobIds.contains(closedJobId), + is(false) + ); + }); + + // Cleanup + closeJob(openJobId); + } + private Job.Builder createJob(String jobId, TimeValue bucketSpan) { Detector.Builder detector = new Detector.Builder("count", null); AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java index 7bee769318bd7..a832142f6b39d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java @@ -79,7 +79,7 @@ protected void doExecute( List events = request.getScheduledEvents(); ActionListener calendarListener = ActionListener.wrap(calendar -> { - logger.info( + logger.debug( "Calendar [{}] accepted for background update: {} jobs with {} events", request.getCalendarId(), calendar.getJobIds().size(), @@ -114,7 +114,7 @@ protected void doExecute( @Override public void onResponse(BulkResponse response) { jobManager.updateProcessOnCalendarChanged(calendar.getJobIds(), ActionListener.wrap(r -> { - logger.info("Calendar [{}] update initiated successfully", request.getCalendarId()); + logger.debug("Calendar [{}] update initiated successfully", request.getCalendarId()); listener.onResponse(new PostCalendarEventsAction.Response(events)); }, listener::onFailure)); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 9f5a38b21a745..0318a04e90a30 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -64,7 +64,6 @@ import org.elasticsearch.xpack.ml.utils.VoidChainTaskExecutor; import java.io.IOException; -import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.List; @@ -456,7 +455,7 @@ private void postJobUpdate(UpdateJobAction.Request request, Job updatedJob, Acti )); } } else { - logger.debug("[{}] No process update required for job update: {}", jobUpdate.getJobId(), jobUpdate.toString()); + logger.debug(() -> format("[%s] No process update required for job update: %s", jobUpdate.getJobId(), jobUpdate.toString())); auditJobUpdatedIfNotInternal(request); } @@ -620,18 +619,18 @@ private static void appendCommaSeparatedSet(Set items, StringBuilder sb) } public void updateProcessOnCalendarChanged(List calendarJobIds, ActionListener updateListener) { + // Respond immediately to prevent API timeouts + updateListener.onResponse(Boolean.TRUE); + ClusterState clusterState = clusterService.state(); Set openJobIds = openJobIds(clusterState); - logger.info("Updating process for calendar change: {} calendar job IDs, {} open jobs", calendarJobIds.size(), openJobIds.size()); + logger.debug("Updating process for calendar change: {} calendar job IDs, {} open jobs", calendarJobIds.size(), openJobIds.size()); if (openJobIds.isEmpty()) { - updateListener.onResponse(Boolean.TRUE); return; } - // Respond immediately to prevent API timeouts - updateListener.onResponse(Boolean.TRUE); // Continue with background processing processCalendarUpdatesAsync(calendarJobIds, openJobIds); } @@ -639,8 +638,8 @@ public void updateProcessOnCalendarChanged(List calendarJobIds, ActionLi private void processCalendarUpdatesAsync(List calendarJobIds, Set openJobIds) { boolean appliesToAllJobs = calendarJobIds.stream().anyMatch(Metadata.ALL::equals); if (appliesToAllJobs) { - logger.info("Calendar change applies to all jobs - starting background update for {} jobs", openJobIds.size()); - submitJobEventUpdateAsync(openJobIds); + logger.debug("Calendar change applies to all jobs - starting background update for {} jobs", openJobIds.size()); + submitJobCalendarEventUpdateAsync(openJobIds); return; } // calendarJobIds may be a group or job @@ -648,17 +647,17 @@ private void processCalendarUpdatesAsync(List calendarJobIds, Set { expandedIds.addAll(calendarJobIds); openJobIds.retainAll(expandedIds); - logger.info("Calendar change expanded to {} jobs - starting background update", openJobIds.size()); - submitJobEventUpdateAsync(openJobIds); + logger.debug("Calendar change expanded to {} jobs - starting background update", openJobIds.size()); + submitJobCalendarEventUpdateAsync(openJobIds); }, e -> logger.error("Failed to expand calendar job groups for background update", e))); } - private void submitJobEventUpdateAsync(Collection jobIds) { + private void submitJobCalendarEventUpdateAsync(Set jobIds) { if (jobIds.isEmpty()) { return; } - logger.info("Starting background calendar event updates for [{}] jobs", jobIds.size()); + logger.debug("Starting background calendar event updates for [{}] jobs", jobIds.size()); AtomicInteger succeeded = new AtomicInteger(); AtomicInteger failed = new AtomicInteger(); AtomicInteger skipped = new AtomicInteger(); @@ -666,7 +665,7 @@ private void submitJobEventUpdateAsync(Collection jobIds) { ActionListener backgroundListener = ActionListener.wrap(success -> { long duration = System.currentTimeMillis() - startTime; - logger.info( + logger.debug( "Background calendar updates completed in [{}ms]: {} succeeded, {} failed, {} skipped", duration, succeeded.get(), @@ -687,12 +686,12 @@ private void submitJobEventUpdateAsync(Collection jobIds) { // Execute on utility thread pool to avoid blocking transport threads threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { - submitJobEventUpdateSync(jobIds, backgroundListener, succeeded, failed, skipped); + submitJobCalendarEventUpdateSync(jobIds, backgroundListener, succeeded, failed, skipped); }); } - private void submitJobEventUpdateSync( - Collection jobIds, + private void submitJobCalendarEventUpdateSync( + Set jobIds, ActionListener updateListener, AtomicInteger succeeded, AtomicInteger failed, @@ -726,7 +725,14 @@ private void submitJobEventUpdateSync( private boolean isExpectedFailure(Exception e) { // Job deleted, closed, etc. - not real errors - return ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException || e.getMessage().contains("is not open"); + Throwable cause = ExceptionsHelper.unwrapCause(e); + if (cause instanceof ResourceNotFoundException) { + return true; + } + // Check for the specific error message format from TransportJobTaskAction + // Message format: "Cannot perform requested action because job [jobId] is not open" + String message = e.getMessage(); + return message != null && message.contains("Cannot perform requested action because job [") && message.contains("] is not open"); } public void revertSnapshot( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java index 4ce4c492a41af..437dd08fff677 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java @@ -100,11 +100,13 @@ private void processNextUpdate() { try { orderedJobUpdates.drainTo(updates); if (updates.isEmpty() == false) { - logger.info("Processing [{}] queued job updates", updates.size()); + logger.debug("Draining [{}] queued job updates from queue", updates.size()); long startTime = System.currentTimeMillis(); executeProcessUpdates(new VolatileCursorIterator<>(updates)); long duration = System.currentTimeMillis() - startTime; - logger.info("Processed [{}] job updates in [{}ms]", updates.size(), duration); + // Note: This duration only measures queue draining and request submission initiation, + // not the actual completion of all updates, which happens asynchronously on response threads. + logger.debug("Completed draining and submitting [{}] job update requests in [{}ms]", updates.size(), duration); } } catch (Exception e) { logger.error("Error while processing next job update", e); From 1becbbc82be462ccf2787adcec73de5123b0b888 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Fri, 31 Oct 2025 12:22:10 +0100 Subject: [PATCH 14/15] revert debug logging with supplier function --- .../main/java/org/elasticsearch/xpack/ml/job/JobManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 0318a04e90a30..c9671ca8d8369 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -455,7 +455,7 @@ private void postJobUpdate(UpdateJobAction.Request request, Job updatedJob, Acti )); } } else { - logger.debug(() -> format("[%s] No process update required for job update: %s", jobUpdate.getJobId(), jobUpdate.toString())); + logger.debug("[{}] No process update required for job update: {}", jobUpdate::getJobId, jobUpdate::toString); auditJobUpdatedIfNotInternal(request); } From ca7916631b8d9c1c40dc4ce63dfe09cf6b87651b Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Fri, 31 Oct 2025 13:15:53 +0100 Subject: [PATCH 15/15] Refactor debug logging in JobManager to use lambda expression for improved readability --- .../main/java/org/elasticsearch/xpack/ml/job/JobManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index c9671ca8d8369..0318a04e90a30 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -455,7 +455,7 @@ private void postJobUpdate(UpdateJobAction.Request request, Job updatedJob, Acti )); } } else { - logger.debug("[{}] No process update required for job update: {}", jobUpdate::getJobId, jobUpdate::toString); + logger.debug(() -> format("[%s] No process update required for job update: %s", jobUpdate.getJobId(), jobUpdate.toString())); auditJobUpdatedIfNotInternal(request); }