Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/136886.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 136886
summary: Fix ML calendar event update scalability issues
area: Machine Learning
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,24 @@
*/
package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilterChain;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.core.ml.action.GetCalendarEventsAction;
import org.elasticsearch.xpack.core.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
Expand All @@ -28,8 +38,10 @@
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
Expand All @@ -45,6 +57,13 @@ public void cleanUpTest() {
cleanUp();
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(ScheduledEventsIT.UpdateProcessActionTrackerPlugin.class);
return plugins;
}

public void testScheduledEvents() throws IOException {

TimeValue bucketSpan = TimeValue.timeValueMinutes(30);
Expand Down Expand Up @@ -464,6 +483,166 @@ public void testNewJobWithGlobalCalendar() throws Exception {
assertThat(buckets.get(5).getScheduledEvents(), contains("Event added after job is opened"));
}

/**
* Test that verifies UpdateProcessAction is called with correct parameters when calendar events
* are posted asynchronously, using ActionFilter to directly intercept the calls
*/
public void testCalendarUpdateCallsUpdateProcessAction() throws Exception {
// Reset tracker
ScheduledEventsIT.UpdateProcessActionTrackerPlugin.reset();

TimeValue bucketSpan = TimeValue.timeValueMinutes(30);
String jobId = "calendar-update-test";
String calendarId = "test-calendar";

// Create and open single job
createJob(jobId, bucketSpan);
openJob(jobId);

// Create calendar with the job
putCalendar(calendarId, List.of(jobId), "Update process action test");

// Create scheduled event
List<ScheduledEvent> events = List.of(
new ScheduledEvent.Builder().description("Direct Test Event")
.startTime(Instant.ofEpochMilli(System.currentTimeMillis() + 60000))
.endTime(Instant.ofEpochMilli(System.currentTimeMillis() + 120000))
.calendarId(calendarId)
.build()
);

// Post events - API should return immediately with async implementation
postScheduledEvents(calendarId, events);

// Wait for and verify ActionFilter captured the UpdateProcessAction call
// We intercept the call to UpdateProcessAction using an ActionFilter to verify the call was made
assertBusy(() -> {
assertThat(
"Should have intercepted UpdateProcessAction call",
ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updateProcessCallCount.get(),
equalTo(1)
);
assertThat(
"Should have called UpdateProcessAction for the correct job",
ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updatedJobIds,
contains(jobId)
);
});

// Verify calendar events were stored correctly
verifyCalendarEventsStored(calendarId, 1);

// Cleanup
closeJob(jobId);

logger.info("Successfully verified UpdateProcessAction call with updateScheduledEvents=true for job [{}]", jobId);
}

/**
* Test calendar updates with closed jobs (should not fail)
*/
public void testCalendarUpdateWithClosedJobs() throws IOException {
TimeValue bucketSpan = TimeValue.timeValueMinutes(30);
String jobId = "closed-job-test";

// Create and run job, then close it
Job.Builder job = createJob(jobId, bucketSpan);
long startTime = 1514764800000L;
runJob(job, startTime, bucketSpan, 10);

// Create calendar with the closed job
String calendarId = "closed-job-calendar";
putCalendar(calendarId, Collections.singletonList(jobId), "Calendar with closed job");

// Create scheduled event
List<ScheduledEvent> events = new ArrayList<>();
long eventStartTime = startTime + (bucketSpan.millis() * 5);
long eventEndTime = eventStartTime + (bucketSpan.millis() * 2);
events.add(
new ScheduledEvent.Builder().description("Closed Job Event")
.startTime(Instant.ofEpochMilli(eventStartTime))
.endTime(Instant.ofEpochMilli(eventEndTime))
.calendarId(calendarId)
.build()
);

// This should not fail even though the job is closed
// The async implementation should gracefully skip closed jobs
postScheduledEvents(calendarId, events);

// Verify job is still closed and buckets don't have the new event
// (since the job was closed when the event was added)
GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId);
List<Bucket> buckets = getBuckets(getBucketsRequest);

// All buckets should be empty of scheduled events since job was closed when event was added
for (Bucket bucket : buckets) {
assertThat("Closed job buckets should not contain new scheduled events", bucket.getScheduledEvents(), empty());
}
}

/**
* Test calendar updates with mixed open and closed jobs - verify open jobs are updated and closed jobs are skipped
*/
public void testCalendarUpdateWithMixedOpenAndClosedJobs() throws Exception {
TimeValue bucketSpan = TimeValue.timeValueMinutes(30);

// Create two jobs
String openJobId = "mixed-test-open-job";
String closedJobId = "mixed-test-closed-job";

// Create and open first job
createJob(openJobId, bucketSpan);
openJob(openJobId);

// Create and run second job, then close it
Job.Builder closedJob = createJob(closedJobId, bucketSpan);
long startTime = 1514764800000L;
runJob(closedJob, startTime, bucketSpan, 10);

// Create calendar with both jobs
String calendarId = "mixed-jobs-calendar";
putCalendar(calendarId, List.of(openJobId, closedJobId), "Calendar with mixed open and closed jobs");

// Reset tracker
ScheduledEventsIT.UpdateProcessActionTrackerPlugin.reset();

// Create scheduled event
List<ScheduledEvent> events = List.of(
new ScheduledEvent.Builder().description("Mixed Jobs Event")
.startTime(Instant.ofEpochMilli(System.currentTimeMillis() + 60000))
.endTime(Instant.ofEpochMilli(System.currentTimeMillis() + 120000))
.calendarId(calendarId)
.build()
);

// Post events - should update open job and skip closed job
postScheduledEvents(calendarId, events);

// Wait for ActionFilter to capture the UpdateProcessAction call
// Should only be called for the open job, not the closed one
assertBusy(() -> {
assertThat(
"Should have intercepted UpdateProcessAction call for open job only",
ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updateProcessCallCount.get(),
equalTo(1)
);
assertThat(
"Should have called UpdateProcessAction for the open job only",
ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updatedJobIds,
contains(openJobId)
);
assertThat(
"Should not have called UpdateProcessAction for the closed job",
ScheduledEventsIT.UpdateProcessActionTrackerPlugin.updatedJobIds.contains(closedJobId),
is(false)
);
});

// Cleanup
closeJob(openJobId);
}

private Job.Builder createJob(String jobId, TimeValue bucketSpan) {
Detector.Builder detector = new Detector.Builder("count", null);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
Expand All @@ -486,4 +665,62 @@ private void runJob(Job.Builder job, long startTime, TimeValue bucketSpan, int b
);
closeJob(job.getId());
}

/**
* Helper method to verify that calendar events are stored and retrievable
*/
private void verifyCalendarEventsStored(String calendarId, int expectedEventCount) {
GetCalendarEventsAction.Request request = new GetCalendarEventsAction.Request(calendarId);
GetCalendarEventsAction.Response response = client().execute(GetCalendarEventsAction.INSTANCE, request).actionGet();

assertThat(
"Calendar should have the expected number of events",
response.getResources().results().size(),
equalTo(expectedEventCount)
);
}

/**
* Test plugin that tracks UpdateProcessAction calls with updateScheduledEvents=true
* using an ActionFilter to verify native process interaction in integration tests
*/
public static class UpdateProcessActionTrackerPlugin extends Plugin implements ActionPlugin {

public static final AtomicInteger updateProcessCallCount = new AtomicInteger(0);
public static final List<String> updatedJobIds = Collections.synchronizedList(new ArrayList<>());

public static void reset() {
updateProcessCallCount.set(0);
updatedJobIds.clear();
}

@Override
public List<ActionFilter> getActionFilters() {
return List.of(new ActionFilter() {
@Override
public int order() {
return 0;
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse> void apply(
Task task,
String action,
Request request,
ActionListener<Response> listener,
ActionFilterChain<Request, Response> chain
) {
if (UpdateProcessAction.NAME.equals(action) && request instanceof UpdateProcessAction.Request) {
UpdateProcessAction.Request updateRequest = (UpdateProcessAction.Request) request;
if (updateRequest.isUpdateScheduledEvents()) {
updateProcessCallCount.incrementAndGet();
updatedJobIds.add(updateRequest.getJobId());
}
}
chain.proceed(task, action, request, listener);
}
});
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.ToXContent;
Expand All @@ -42,6 +44,8 @@ public class TransportPostCalendarEventsAction extends HandledTransportAction<
PostCalendarEventsAction.Request,
PostCalendarEventsAction.Response> {

private static final Logger logger = LogManager.getLogger(TransportPostCalendarEventsAction.class);

private final Client client;
private final JobResultsProvider jobResultsProvider;
private final JobManager jobManager;
Expand Down Expand Up @@ -75,6 +79,13 @@ protected void doExecute(
List<ScheduledEvent> events = request.getScheduledEvents();

ActionListener<Calendar> calendarListener = ActionListener.wrap(calendar -> {
logger.debug(
"Calendar [{}] accepted for background update: {} jobs with {} events",
request.getCalendarId(),
calendar.getJobIds().size(),
events.size()
);

BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();

for (ScheduledEvent event : events) {
Expand Down Expand Up @@ -102,13 +113,10 @@ protected void doExecute(
new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
jobManager.updateProcessOnCalendarChanged(
calendar.getJobIds(),
ActionListener.wrap(
r -> listener.onResponse(new PostCalendarEventsAction.Response(events)),
listener::onFailure
)
);
jobManager.updateProcessOnCalendarChanged(calendar.getJobIds(), ActionListener.wrap(r -> {
logger.debug("Calendar [{}] update initiated successfully", request.getCalendarId());
listener.onResponse(new PostCalendarEventsAction.Response(events));
}, listener::onFailure));
}

@Override
Expand Down
Loading