diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java index 9ab2132b61912..4a3fec07b5abd 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.Transports; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; import org.elasticsearch.xpack.ml.MachineLearning; @@ -66,10 +67,16 @@ private void deleteExpiredData(ActionListener private void deleteExpiredData(Iterator mlDataRemoversIterator, ActionListener listener) { + // Removing expired ML data and artifacts requires multiple operations. + // These are queued up and executed sequentially in the action listener, + // the chained calls must all run the ML utility thread pool NOT the thread + // the previous action returned in which in the case of a transport_client_boss + // thread is a disaster. if (mlDataRemoversIterator.hasNext()) { MlDataRemover remover = mlDataRemoversIterator.next(); remover.remove(ActionListener.wrap( - booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener), + booleanResponse -> threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> + deleteExpiredData(mlDataRemoversIterator, listener)), listener::onFailure)); } else { logger.info("Completed deletion of expired data"); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java index d50a7c3f8c2ad..e98870771dc7a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java @@ -15,6 +15,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortBuilders; +import org.elasticsearch.transport.Transports; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.utils.MlIndicesUtils; diff --git a/x-pack/qa/ml-native-multi-node-tests/build.gradle b/x-pack/qa/ml-native-multi-node-tests/build.gradle new file mode 100644 index 0000000000000..286d4daee8aa5 --- /dev/null +++ b/x-pack/qa/ml-native-multi-node-tests/build.gradle @@ -0,0 +1,85 @@ +import org.elasticsearch.gradle.LoggedExec + +apply plugin: 'elasticsearch.standalone-rest-test' +apply plugin: 'elasticsearch.rest-test' + +dependencies { + testCompile project(path: xpackModule('core'), configuration: 'runtime') + testCompile project(path: xpackModule('core'), configuration: 'testArtifacts') + testCompile project(path: xpackModule('ml'), configuration: 'runtime') + testCompile project(path: xpackModule('ml'), configuration: 'testArtifacts') +} + +integTestRunner { + /* + * We have to disable setting the number of available processors as tests in the same JVM randomize processors and will step on each + * other if we allow them to set the number of available processors as it's set-once in Netty. + */ + systemProperty 'es.set.netty.runtime.available.processors', 'false' +} + +// location of generated keystores and certificates +File keystoreDir = new File(project.buildDir, 'keystore') + +// Generate the node's keystore +File nodeKeystore = new File(keystoreDir, 'test-node.jks') +task createNodeKeyStore(type: LoggedExec) { + doFirst { + if (nodeKeystore.parentFile.exists() == false) { + nodeKeystore.parentFile.mkdirs() + } + if (nodeKeystore.exists()) { + delete nodeKeystore + } + } + executable = new File(project.runtimeJavaHome, 'bin/keytool') + standardInput = new ByteArrayInputStream('FirstName LastName\nUnit\nOrganization\nCity\nState\nNL\nyes\n\n'.getBytes('UTF-8')) + args '-genkey', + '-alias', 'test-node', + '-keystore', nodeKeystore, + '-keyalg', 'RSA', + '-keysize', '2048', + '-validity', '712', + '-dname', 'CN=smoke-test-plugins-ssl', + '-keypass', 'keypass', + '-storepass', 'keypass' +} + +// Add keystores to test classpath: it expects it there +sourceSets.test.resources.srcDir(keystoreDir) +processTestResources.dependsOn(createNodeKeyStore) + +integTestCluster { + dependsOn createNodeKeyStore + setting 'xpack.security.enabled', 'true' + setting 'xpack.ml.enabled', 'true' + setting 'logger.org.elasticsearch.xpack.ml.datafeed', 'TRACE' + setting 'xpack.monitoring.enabled', 'false' + setting 'xpack.security.authc.token.enabled', 'true' + setting 'xpack.security.transport.ssl.enabled', 'true' + setting 'xpack.security.transport.ssl.keystore.path', nodeKeystore.name + setting 'xpack.security.transport.ssl.verification_mode', 'certificate' + setting 'xpack.security.audit.enabled', 'true' + setting 'xpack.license.self_generated.type', 'trial' + + keystoreSetting 'bootstrap.password', 'x-pack-test-password' + keystoreSetting 'xpack.security.transport.ssl.keystore.secure_password', 'keypass' + + numNodes = 3 + + setupCommand 'setupDummyUser', + 'bin/elasticsearch-users', 'useradd', 'x_pack_rest_user', '-p', 'x-pack-test-password', '-r', 'superuser' + + extraConfigFile nodeKeystore.name, nodeKeystore + + waitCondition = { node, ant -> + File tmpFile = new File(node.cwd, 'wait.success') + ant.get(src: "http://${node.httpUri()}/_cluster/health?wait_for_nodes=>=${numNodes}&wait_for_status=yellow", + dest: tmpFile.toString(), + username: 'x_pack_rest_user', + password: 'x-pack-test-password', + ignoreerrors: true, + retries: 10) + return tmpFile.exists() + } +} diff --git a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/qa/ml-native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java similarity index 99% rename from x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java rename to x-pack/qa/ml-native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index 23bd5c7f7ddf1..e5aaf5f4fdb10 100644 --- a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/qa/ml-native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -37,7 +37,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.equalTo; diff --git a/x-pack/qa/ml-native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/x-pack/qa/ml-native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java new file mode 100644 index 0000000000000..f70efc72506d3 --- /dev/null +++ b/x-pack/qa/ml-native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -0,0 +1,510 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterModule; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.DeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.sort.SortBuilders; +import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.SecuritySettingsSourceField; +import org.elasticsearch.transport.Netty4Plugin; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.XPackClientPlugin; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.action.CloseJobAction; +import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction; +import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; +import org.elasticsearch.xpack.core.ml.action.FlushJobAction; +import org.elasticsearch.xpack.core.ml.action.ForecastJobAction; +import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; +import org.elasticsearch.xpack.core.ml.action.GetCategoriesAction; +import org.elasticsearch.xpack.core.ml.action.GetJobsAction; +import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; +import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction; +import org.elasticsearch.xpack.core.ml.action.GetRecordsAction; +import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.action.PersistJobAction; +import org.elasticsearch.xpack.core.ml.action.PostCalendarEventsAction; +import org.elasticsearch.xpack.core.ml.action.PostDataAction; +import org.elasticsearch.xpack.core.ml.action.PutCalendarAction; +import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; +import org.elasticsearch.xpack.core.ml.action.PutFilterAction; +import org.elasticsearch.xpack.core.ml.action.PutJobAction; +import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; +import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; +import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; +import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; +import org.elasticsearch.xpack.core.ml.action.util.PageParams; +import org.elasticsearch.xpack.core.ml.calendars.Calendar; +import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.config.JobState; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.core.ml.job.config.MlFilter; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; +import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord; +import org.elasticsearch.xpack.core.ml.job.results.Bucket; +import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition; +import org.elasticsearch.xpack.core.ml.job.results.Forecast; +import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; +import org.elasticsearch.xpack.core.ml.job.results.Result; +import org.elasticsearch.xpack.core.security.SecurityField; +import org.elasticsearch.xpack.core.security.authc.TokenMetaData; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static org.elasticsearch.test.XContentTestUtils.convertToMap; +import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +/** + * Base class of ML integration tests that use a native autodetect process + */ +abstract class MlNativeAutodetectIntegTestCase extends ESIntegTestCase { + + private List jobs = new ArrayList<>(); + private List datafeeds = new ArrayList<>(); + @Override + protected Collection> nodePlugins() { + return Arrays.asList(LocalStateCompositeXPackPlugin.class, Netty4Plugin.class); + } + + @Override + protected Collection> transportClientPlugins() { + return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class); + } + + @Override + protected Settings externalClusterClientSettings() { + Path keyStore; + try { + keyStore = PathUtils.get(getClass().getResource("/test-node.jks").toURI()); + } catch (URISyntaxException e) { + throw new IllegalStateException("error trying to get keystore path", e); + } + Settings.Builder builder = Settings.builder(); + builder.put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NAME4); + builder.put(SecurityField.USER_SETTING.getKey(), "x_pack_rest_user:" + SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING); + builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), true); + builder.put("xpack.security.transport.ssl.enabled", true); + builder.put("xpack.security.transport.ssl.keystore.path", keyStore.toAbsolutePath().toString()); + builder.put("xpack.security.transport.ssl.keystore.password", "keypass"); + builder.put("xpack.security.transport.ssl.verification_mode", "certificate"); + return builder.build(); + } + + protected void cleanUp() { + cleanUpDatafeeds(); + cleanUpJobs(); + waitForPendingTasks(); + } + + private void cleanUpDatafeeds() { + for (DatafeedConfig datafeed : datafeeds) { + try { + stopDatafeed(datafeed.getId()); + } catch (Exception e) { + // ignore + } + try { + deleteDatafeed(datafeed.getId()); + } catch (Exception e) { + // ignore + } + } + } + + private void cleanUpJobs() { + for (Job.Builder job : jobs) { + try { + closeJob(job.getId()); + } catch (Exception e) { + // ignore + } + try { + deleteJob(job.getId()); + } catch (Exception e) { + // ignore + } + } + } + + private void waitForPendingTasks() { + ListTasksRequest listTasksRequest = new ListTasksRequest(); + listTasksRequest.setWaitForCompletion(true); + listTasksRequest.setDetailed(true); + listTasksRequest.setTimeout(TimeValue.timeValueSeconds(10)); + try { + admin().cluster().listTasks(listTasksRequest).get(); + } catch (Exception e) { + throw new AssertionError("Failed to wait for pending tasks to complete", e); + } + } + + protected void registerJob(Job.Builder job) { + if (jobs.add(job) == false) { + throw new IllegalArgumentException("job [" + job.getId() + "] is already registered"); + } + } + + protected void registerDatafeed(DatafeedConfig datafeed) { + if (datafeeds.add(datafeed) == false) { + throw new IllegalArgumentException("datafeed [" + datafeed.getId() + "] is already registered"); + } + } + + protected List getJobs() { + return jobs; + } + + protected PutJobAction.Response putJob(Job.Builder job) { + PutJobAction.Request request = new PutJobAction.Request(job); + return client().execute(PutJobAction.INSTANCE, request).actionGet(); + } + + protected OpenJobAction.Response openJob(String jobId) { + OpenJobAction.Request request = new OpenJobAction.Request(jobId); + return client().execute(OpenJobAction.INSTANCE, request).actionGet(); + } + + protected CloseJobAction.Response closeJob(String jobId) { + CloseJobAction.Request request = new CloseJobAction.Request(jobId); + return client().execute(CloseJobAction.INSTANCE, request).actionGet(); + } + + protected FlushJobAction.Response flushJob(String jobId, boolean calcInterim) { + FlushJobAction.Request request = new FlushJobAction.Request(jobId); + request.setCalcInterim(calcInterim); + return client().execute(FlushJobAction.INSTANCE, request).actionGet(); + } + + protected PutJobAction.Response updateJob(String jobId, JobUpdate update) { + UpdateJobAction.Request request = new UpdateJobAction.Request(jobId, update); + return client().execute(UpdateJobAction.INSTANCE, request).actionGet(); + } + + protected DeleteJobAction.Response deleteJob(String jobId) { + DeleteJobAction.Request request = new DeleteJobAction.Request(jobId); + return client().execute(DeleteJobAction.INSTANCE, request).actionGet(); + } + + protected PutDatafeedAction.Response putDatafeed(DatafeedConfig datafeed) { + PutDatafeedAction.Request request = new PutDatafeedAction.Request(datafeed); + return client().execute(PutDatafeedAction.INSTANCE, request).actionGet(); + } + + protected StopDatafeedAction.Response stopDatafeed(String datafeedId) { + StopDatafeedAction.Request request = new StopDatafeedAction.Request(datafeedId); + return client().execute(StopDatafeedAction.INSTANCE, request).actionGet(); + } + + protected DeleteDatafeedAction.Response deleteDatafeed(String datafeedId) { + DeleteDatafeedAction.Request request = new DeleteDatafeedAction.Request(datafeedId); + return client().execute(DeleteDatafeedAction.INSTANCE, request).actionGet(); + } + + protected StartDatafeedAction.Response startDatafeed(String datafeedId, long start, Long end) { + StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedId, start); + request.getParams().setEndTime(end); + return client().execute(StartDatafeedAction.INSTANCE, request).actionGet(); + } + + protected void waitUntilJobIsClosed(String jobId) throws Exception { + waitUntilJobIsClosed(jobId, TimeValue.timeValueSeconds(30)); + } + + protected void waitUntilJobIsClosed(String jobId, TimeValue waitTime) throws Exception { + assertBusy(() -> assertThat(getJobStats(jobId).get(0).getState(), equalTo(JobState.CLOSED)), + waitTime.getMillis(), TimeUnit.MILLISECONDS); + } + + protected List getJob(String jobId) { + GetJobsAction.Request request = new GetJobsAction.Request(jobId); + return client().execute(GetJobsAction.INSTANCE, request).actionGet().getResponse().results(); + } + + protected List getJobStats(String jobId) { + GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(jobId); + GetJobsStatsAction.Response response = client().execute(GetJobsStatsAction.INSTANCE, request).actionGet(); + return response.getResponse().results(); + } + + protected List getBuckets(String jobId) { + GetBucketsAction.Request request = new GetBucketsAction.Request(jobId); + return getBuckets(request); + } + + protected List getBuckets(GetBucketsAction.Request request) { + GetBucketsAction.Response response = client().execute(GetBucketsAction.INSTANCE, request).actionGet(); + return response.getBuckets().results(); + } + + protected List getRecords(String jobId) { + GetRecordsAction.Request request = new GetRecordsAction.Request(jobId); + return getRecords(request); + } + + protected List getRecords(GetRecordsAction.Request request) { + GetRecordsAction.Response response = client().execute(GetRecordsAction.INSTANCE, request).actionGet(); + return response.getRecords().results(); + } + + protected List getModelSnapshots(String jobId) { + GetModelSnapshotsAction.Request request = new GetModelSnapshotsAction.Request(jobId, null); + GetModelSnapshotsAction.Response response = client().execute(GetModelSnapshotsAction.INSTANCE, request).actionGet(); + return response.getPage().results(); + } + + protected RevertModelSnapshotAction.Response revertModelSnapshot(String jobId, String snapshotId) { + RevertModelSnapshotAction.Request request = new RevertModelSnapshotAction.Request(jobId, snapshotId); + return client().execute(RevertModelSnapshotAction.INSTANCE, request).actionGet(); + } + + protected List getCategories(String jobId) { + GetCategoriesAction.Request getCategoriesRequest = + new GetCategoriesAction.Request(jobId); + getCategoriesRequest.setPageParams(new PageParams()); + GetCategoriesAction.Response categoriesResponse = client().execute(GetCategoriesAction.INSTANCE, getCategoriesRequest).actionGet(); + return categoriesResponse.getResult().results(); + } + + protected DataCounts postData(String jobId, String data) { + logger.debug("Posting data to job [{}]:\n{}", jobId, data); + PostDataAction.Request request = new PostDataAction.Request(jobId); + request.setContent(new BytesArray(data), XContentType.JSON); + return client().execute(PostDataAction.INSTANCE, request).actionGet().getDataCounts(); + } + + protected String forecast(String jobId, TimeValue duration, TimeValue expiresIn) { + ForecastJobAction.Request request = new ForecastJobAction.Request(jobId); + if (duration != null) { + request.setDuration(duration.getStringRep()); + } + if (expiresIn != null) { + request.setExpiresIn(expiresIn.getStringRep()); + } + return client().execute(ForecastJobAction.INSTANCE, request).actionGet().getForecastId(); + } + + protected void waitForecastToFinish(String jobId, String forecastId) throws Exception { + assertBusy(() -> { + ForecastRequestStats forecastRequestStats = getForecastStats(jobId, forecastId); + assertThat(forecastRequestStats, is(notNullValue())); + assertThat(forecastRequestStats.getStatus(), equalTo(ForecastRequestStats.ForecastRequestStatus.FINISHED)); + }, 30, TimeUnit.SECONDS); + } + + protected ForecastRequestStats getForecastStats(String jobId, String forecastId) { + SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)) + .setQuery(QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE)) + .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)) + .filter(QueryBuilders.termQuery(ForecastRequestStats.FORECAST_ID.getPreferredName(), forecastId))) + .execute().actionGet(); + SearchHits hits = searchResponse.getHits(); + if (hits.getTotalHits() == 0) { + return null; + } + assertThat(hits.getTotalHits(), equalTo(1L)); + try { + XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser( + NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + hits.getHits()[0].getSourceRef().streamInput()); + return ForecastRequestStats.STRICT_PARSER.apply(parser, null); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + + protected List getForecastStats() { + List forecastStats = new ArrayList<>(); + + SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*") + .setSize(1000) + .setQuery(QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE))) + .execute().actionGet(); + SearchHits hits = searchResponse.getHits(); + for (SearchHit hit : hits) { + try { + XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser( + NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, hit.getSourceRef().streamInput()); + forecastStats.add(ForecastRequestStats.STRICT_PARSER.apply(parser, null)); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + return forecastStats; + } + + protected long countForecastDocs(String jobId, String forecastId) { + SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*") + .setQuery(QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), Forecast.RESULT_TYPE_VALUE)) + .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)) + .filter(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastId))) + .execute().actionGet(); + return searchResponse.getHits().getTotalHits(); + } + + protected List getForecasts(String jobId, ForecastRequestStats forecastRequestStats) { + List forecasts = new ArrayList<>(); + + SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*") + .setSize((int) forecastRequestStats.getRecordCount()) + .setQuery(QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), Forecast.RESULT_TYPE_VALUE)) + .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)) + .filter(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastRequestStats.getForecastId()))) + .addSort(SortBuilders.fieldSort(Result.TIMESTAMP.getPreferredName()).order(SortOrder.ASC)) + .execute().actionGet(); + SearchHits hits = searchResponse.getHits(); + for (SearchHit hit : hits) { + try { + XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser( + NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + hit.getSourceRef().streamInput()); + forecasts.add(Forecast.STRICT_PARSER.apply(parser, null)); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + return forecasts; + } + + protected boolean putMlFilter(MlFilter filter) { + PutFilterAction.Response response = client().execute(PutFilterAction.INSTANCE, new PutFilterAction.Request(filter)).actionGet(); + return response.isAcknowledged(); + } + + protected PutCalendarAction.Response putCalendar(String calendarId, List jobIds, String description) { + PutCalendarAction.Request request = new PutCalendarAction.Request(new Calendar(calendarId, jobIds, description)); + return client().execute(PutCalendarAction.INSTANCE, request).actionGet(); + } + + protected PostCalendarEventsAction.Response postScheduledEvents(String calendarId, List events) { + PostCalendarEventsAction.Request request = new PostCalendarEventsAction.Request(calendarId, events); + return client().execute(PostCalendarEventsAction.INSTANCE, request).actionGet(); + } + + protected PersistJobAction.Response persistJob(String jobId) { + PersistJobAction.Request request = new PersistJobAction.Request(jobId); + return client().execute(PersistJobAction.INSTANCE, request).actionGet(); + } + + @Override + protected void ensureClusterStateConsistency() throws IOException { + if (cluster() != null && cluster().size() > 0) { + List entries = new ArrayList<>(ClusterModule.getNamedWriteables()); + entries.addAll(new SearchModule(Settings.EMPTY, true, Collections.emptyList()).getNamedWriteables()); + entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new)); + entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, StartDatafeedAction.TASK_NAME, + StartDatafeedAction.DatafeedParams::new)); + entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME, + OpenJobAction.JobParams::new)); + entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, JobTaskStatus.NAME, JobTaskStatus::new)); + entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, DatafeedState.NAME, DatafeedState::fromStream)); + entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, TokenMetaData.TYPE, TokenMetaData::new)); + final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries); + ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState(); + byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState); + // remove local node reference + masterClusterState = ClusterState.Builder.fromBytes(masterClusterStateBytes, null, namedWriteableRegistry); + Map masterStateMap = convertToMap(masterClusterState); + int masterClusterStateSize = ClusterState.Builder.toBytes(masterClusterState).length; + String masterId = masterClusterState.nodes().getMasterNodeId(); + for (Client client : cluster().getClients()) { + ClusterState localClusterState = client.admin().cluster().prepareState().all().setLocal(true).get().getState(); + byte[] localClusterStateBytes = ClusterState.Builder.toBytes(localClusterState); + // remove local node reference + localClusterState = ClusterState.Builder.fromBytes(localClusterStateBytes, null, namedWriteableRegistry); + final Map localStateMap = convertToMap(localClusterState); + final int localClusterStateSize = ClusterState.Builder.toBytes(localClusterState).length; + // Check that the non-master node has the same version of the cluster state as the master and + // that the master node matches the master (otherwise there is no requirement for the cluster state to match) + if (masterClusterState.version() == localClusterState.version() && + masterId.equals(localClusterState.nodes().getMasterNodeId())) { + try { + assertEquals("clusterstate UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID()); + // We cannot compare serialization bytes since serialization order of maps is not guaranteed + // but we can compare serialization sizes - they should be the same + assertEquals("clusterstate size does not match", masterClusterStateSize, localClusterStateSize); + // Compare JSON serialization + assertNull("clusterstate JSON serialization does not match", + differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap)); + } catch (AssertionError error) { + logger.error("Cluster state from master:\n{}\nLocal cluster state:\n{}", + masterClusterState.toString(), localClusterState.toString()); + throw error; + } + } + } + } + } + + protected List generateData(long timestamp, TimeValue bucketSpan, int bucketCount, + Function timeToCountFunction) throws IOException { + List data = new ArrayList<>(); + long now = timestamp; + for (int bucketIndex = 0; bucketIndex < bucketCount; bucketIndex++) { + for (int count = 0; count < timeToCountFunction.apply(bucketIndex); count++) { + Map record = new HashMap<>(); + record.put("time", now); + data.add(createJsonRecord(record)); + } + now += bucketSpan.getMillis(); + } + return data; + } + + protected static String createJsonRecord(Map keyValueMap) throws IOException { + return Strings.toString(JsonXContent.contentBuilder().map(keyValueMap)) + "\n"; + } +}