diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java index 9cba0b20c51b9..1a928a8bdba11 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java @@ -10,21 +10,19 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; -import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import java.io.BufferedReader; import java.io.InputStream; @@ -37,51 +35,56 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction { private final Client client; - private final ClusterService clusterService; + private final JobConfigProvider jobConfigProvider; + private final DatafeedConfigProvider datafeedConfigProvider; @Inject public TransportPreviewDatafeedAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - Client client, ClusterService clusterService) { + Client client, JobConfigProvider jobConfigProvider, + DatafeedConfigProvider datafeedConfigProvider) { super(settings, PreviewDatafeedAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, PreviewDatafeedAction.Request::new); this.client = client; - this.clusterService = clusterService; + this.jobConfigProvider = jobConfigProvider; + this.datafeedConfigProvider = datafeedConfigProvider; } @Override protected void doExecute(PreviewDatafeedAction.Request request, ActionListener listener) { - MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state()); - DatafeedConfig datafeed = mlMetadata.getDatafeed(request.getDatafeedId()); - if (datafeed == null) { - throw ExceptionsHelper.missingDatafeedException(request.getDatafeedId()); - } - Job job = mlMetadata.getJobs().get(datafeed.getJobId()); - if (job == null) { - throw ExceptionsHelper.missingJobException(datafeed.getJobId()); - } - DatafeedConfig.Builder previewDatafeed = buildPreviewDatafeed(datafeed); - Map headers = threadPool.getThreadContext().getHeaders().entrySet().stream() - .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - previewDatafeed.setHeaders(headers); - // NB: this is using the client from the transport layer, NOT the internal client. - // This is important because it means the datafeed search will fail if the user - // requesting the preview doesn't have permission to search the relevant indices. - DataExtractorFactory.create(client, previewDatafeed.build(), job, new ActionListener() { - @Override - public void onResponse(DataExtractorFactory dataExtractorFactory) { - DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, Long.MAX_VALUE); - threadPool.generic().execute(() -> previewDatafeed(dataExtractor, listener)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + datafeedConfigProvider.getDatafeedConfig(request.getDatafeedId(), ActionListener.wrap( + datafeedConfigBuilder -> { + DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); + jobConfigProvider.getJob(datafeedConfig.getJobId(), ActionListener.wrap( + jobBuilder -> { + DatafeedConfig.Builder previewDatafeed = buildPreviewDatafeed(datafeedConfig); + Map headers = threadPool.getThreadContext().getHeaders().entrySet().stream() + .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + previewDatafeed.setHeaders(headers); + // NB: this is using the client from the transport layer, NOT the internal client. + // This is important because it means the datafeed search will fail if the user + // requesting the preview doesn't have permission to search the relevant indices. + DataExtractorFactory.create(client, previewDatafeed.build(), jobBuilder.build(), + new ActionListener() { + @Override + public void onResponse(DataExtractorFactory dataExtractorFactory) { + DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, Long.MAX_VALUE); + threadPool.generic().execute(() -> previewDatafeed(dataExtractor, listener)); + } + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + }, + listener::onFailure + )); + }, + listener::onFailure + )); } /** Visible for testing */ diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index aa84d519df64e..4c5d6940bf45a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -23,19 +23,17 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.discovery.MasterNotDiscoveredException; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; -import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import java.io.IOException; import java.util.ArrayList; @@ -50,35 +48,36 @@ public class TransportStopDatafeedAction extends TransportTasksAction { private final PersistentTasksService persistentTasksService; + private final DatafeedConfigProvider datafeedConfigProvider; @Inject public TransportStopDatafeedAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - ClusterService clusterService, PersistentTasksService persistentTasksService) { + ClusterService clusterService, PersistentTasksService persistentTasksService, + DatafeedConfigProvider datafeedConfigProvider) { super(settings, StopDatafeedAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, StopDatafeedAction.Request::new, StopDatafeedAction.Response::new, MachineLearning.UTILITY_THREAD_POOL_NAME); this.persistentTasksService = persistentTasksService; + this.datafeedConfigProvider = datafeedConfigProvider; + } /** - * Resolve the requested datafeeds and add their IDs to one of the list - * arguments depending on datafeed state. + * Sort the datafeed IDs the their task state and add to one + * of the list arguments depending on the state. * - * @param request The stop datafeed request - * @param mlMetadata ML Metadata + * @param expandedDatafeedIds The expanded set of IDs * @param tasks Persistent task meta data * @param startedDatafeedIds Started datafeed ids are added to this list * @param stoppingDatafeedIds Stopping datafeed ids are added to this list */ - static void resolveDataFeedIds(StopDatafeedAction.Request request, MlMetadata mlMetadata, - PersistentTasksCustomMetaData tasks, - List startedDatafeedIds, - List stoppingDatafeedIds) { + static void sortDatafeedIdsByTaskState(Set expandedDatafeedIds, + PersistentTasksCustomMetaData tasks, + List startedDatafeedIds, + List stoppingDatafeedIds) { - Set expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds()); for (String expandedDatafeedId : expandedDatafeedIds) { - validateDatafeedTask(expandedDatafeedId, mlMetadata); addDatafeedTaskIdAccordingToState(expandedDatafeedId, MlTasks.getDatafeedState(expandedDatafeedId, tasks), startedDatafeedIds, stoppingDatafeedIds); } @@ -102,20 +101,6 @@ private static void addDatafeedTaskIdAccordingToState(String datafeedId, } } - /** - * Validate the stop request. - * Throws an {@code ResourceNotFoundException} if there is no datafeed - * with id {@code datafeedId} - * @param datafeedId The datafeed Id - * @param mlMetadata ML meta data - */ - static void validateDatafeedTask(String datafeedId, MlMetadata mlMetadata) { - DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); - if (datafeed == null) { - throw new ResourceNotFoundException(Messages.getMessage(Messages.DATAFEED_NOT_FOUND, datafeedId)); - } - } - @Override protected void doExecute(Task task, StopDatafeedAction.Request request, ActionListener listener) { final ClusterState state = clusterService.state(); @@ -130,23 +115,27 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi new ActionListenerResponseHandler<>(listener, StopDatafeedAction.Response::new)); } } else { - MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); - PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds(), ActionListener.wrap( + expandedIds -> { + PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - List startedDatafeeds = new ArrayList<>(); - List stoppingDatafeeds = new ArrayList<>(); - resolveDataFeedIds(request, mlMetadata, tasks, startedDatafeeds, stoppingDatafeeds); - if (startedDatafeeds.isEmpty() && stoppingDatafeeds.isEmpty()) { - listener.onResponse(new StopDatafeedAction.Response(true)); - return; - } - request.setResolvedStartedDatafeedIds(startedDatafeeds.toArray(new String[startedDatafeeds.size()])); + List startedDatafeeds = new ArrayList<>(); + List stoppingDatafeeds = new ArrayList<>(); + sortDatafeedIdsByTaskState(expandedIds, tasks, startedDatafeeds, stoppingDatafeeds); + if (startedDatafeeds.isEmpty() && stoppingDatafeeds.isEmpty()) { + listener.onResponse(new StopDatafeedAction.Response(true)); + return; + } + request.setResolvedStartedDatafeedIds(startedDatafeeds.toArray(new String[startedDatafeeds.size()])); - if (request.isForce()) { - forceStopDatafeed(request, listener, tasks, startedDatafeeds); - } else { - normalStopDatafeed(task, request, listener, tasks, startedDatafeeds, stoppingDatafeeds); - } + if (request.isForce()) { + forceStopDatafeed(request, listener, tasks, startedDatafeeds); + } else { + normalStopDatafeed(task, request, listener, tasks, startedDatafeeds, stoppingDatafeeds); + } + }, + listener::onFailure + )); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java index d8b1d28153688..da390b6106245 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java @@ -5,112 +5,59 @@ */ package org.elasticsearch.xpack.ml.action; -import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; -import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; -import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; -import java.util.Date; +import java.util.HashSet; import java.util.List; -import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedConfig; -import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedJob; -import static org.hamcrest.Matchers.equalTo; - public class TransportStopDatafeedActionTests extends ESTestCase { - public void testValidate() { - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - tasksBuilder.addTask(MlTasks.datafeedTaskId("foo"), MlTasks.DATAFEED_TASK_NAME, - new StartDatafeedAction.DatafeedParams("foo", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); - tasksBuilder.updateTaskState(MlTasks.datafeedTaskId("foo"), DatafeedState.STARTED); - tasksBuilder.build(); - - Job job = createDatafeedJob().build(new Date()); - MlMetadata mlMetadata1 = new MlMetadata.Builder().putJob(job, false).build(); - Exception e = expectThrows(ResourceNotFoundException.class, - () -> TransportStopDatafeedAction.validateDatafeedTask("foo", mlMetadata1)); - assertThat(e.getMessage(), equalTo("No datafeed with id [foo] exists")); - - DatafeedConfig datafeedConfig = createDatafeedConfig("foo", "job_id").build(); - MlMetadata mlMetadata2 = new MlMetadata.Builder().putJob(job, false) - .putDatafeed(datafeedConfig, Collections.emptyMap()) - .build(); - TransportStopDatafeedAction.validateDatafeedTask("foo", mlMetadata2); - } - - public void testResolveDataFeedIds_GivenDatafeedId() { - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); + public void testSortDatafeedIdsByTaskState_GivenDatafeedId() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addTask("datafeed_1", 0L, "node-1", DatafeedState.STARTED, tasksBuilder); - Job job = BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()); - DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed_1", "job_id_1").build(); - mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, Collections.emptyMap()); - addTask("datafeed_2", 0L, "node-1", DatafeedState.STOPPED, tasksBuilder); - job = BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date()); - datafeedConfig = createDatafeedConfig("datafeed_2", "job_id_2").build(); - mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, Collections.emptyMap()); - PersistentTasksCustomMetaData tasks = tasksBuilder.build(); - MlMetadata mlMetadata = mlMetadataBuilder.build(); List startedDatafeeds = new ArrayList<>(); List stoppingDatafeeds = new ArrayList<>(); - TransportStopDatafeedAction.resolveDataFeedIds(new StopDatafeedAction.Request("datafeed_1"), mlMetadata, tasks, startedDatafeeds, - stoppingDatafeeds); + TransportStopDatafeedAction.sortDatafeedIdsByTaskState( + Collections.singleton("datafeed_1"), tasks, startedDatafeeds, stoppingDatafeeds); assertEquals(Collections.singletonList("datafeed_1"), startedDatafeeds); assertEquals(Collections.emptyList(), stoppingDatafeeds); startedDatafeeds.clear(); stoppingDatafeeds.clear(); - TransportStopDatafeedAction.resolveDataFeedIds(new StopDatafeedAction.Request("datafeed_2"), mlMetadata, tasks, startedDatafeeds, - stoppingDatafeeds); + TransportStopDatafeedAction.sortDatafeedIdsByTaskState( + Collections.singleton("datafeed_2"), tasks, startedDatafeeds, stoppingDatafeeds); assertEquals(Collections.emptyList(), startedDatafeeds); assertEquals(Collections.emptyList(), stoppingDatafeeds); } - public void testResolveDataFeedIds_GivenAll() { - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); + public void testSortDatafeedIdsByTaskState_GivenAll() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addTask("datafeed_1", 0L, "node-1", DatafeedState.STARTED, tasksBuilder); - Job job = BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()); - DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed_1", "job_id_1").build(); - mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, Collections.emptyMap()); - addTask("datafeed_2", 0L, "node-1", DatafeedState.STOPPED, tasksBuilder); - job = BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date()); - datafeedConfig = createDatafeedConfig("datafeed_2", "job_id_2").build(); - mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, Collections.emptyMap()); - addTask("datafeed_3", 0L, "node-1", DatafeedState.STOPPING, tasksBuilder); - job = BaseMlIntegTestCase.createScheduledJob("job_id_3").build(new Date()); - datafeedConfig = createDatafeedConfig("datafeed_3", "job_id_3").build(); - mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, Collections.emptyMap()); - PersistentTasksCustomMetaData tasks = tasksBuilder.build(); - MlMetadata mlMetadata = mlMetadataBuilder.build(); List startedDatafeeds = new ArrayList<>(); List stoppingDatafeeds = new ArrayList<>(); - TransportStopDatafeedAction.resolveDataFeedIds(new StopDatafeedAction.Request("_all"), mlMetadata, tasks, startedDatafeeds, - stoppingDatafeeds); + TransportStopDatafeedAction.sortDatafeedIdsByTaskState( + new HashSet<>(Arrays.asList("datafeed_1", "datafeed_2", "datafeed_3")), tasks, startedDatafeeds, stoppingDatafeeds); assertEquals(Collections.singletonList("datafeed_1"), startedDatafeeds); assertEquals(Collections.singletonList("datafeed_3"), stoppingDatafeeds); startedDatafeeds.clear(); stoppingDatafeeds.clear(); - TransportStopDatafeedAction.resolveDataFeedIds(new StopDatafeedAction.Request("datafeed_2"), mlMetadata, tasks, startedDatafeeds, + TransportStopDatafeedAction.sortDatafeedIdsByTaskState(Collections.singleton("datafeed_2"), tasks, startedDatafeeds, stoppingDatafeeds); assertEquals(Collections.emptyList(), startedDatafeeds); assertEquals(Collections.emptyList(), stoppingDatafeeds);