From 53f7f2a6db30ab071f962851eb74e5ab17dcba8d Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Fri, 21 Sep 2018 14:14:21 -0500 Subject: [PATCH 1/3] Make certain ML node settings dynamic (#33565) --- .../xpack/ml/MachineLearning.java | 12 ++++- .../ml/job/process/DataCountsReporter.java | 49 +++++++++++++++---- .../process/autodetect/AutodetectBuilder.java | 22 ++++++--- .../autodetect/AutodetectProcessManager.java | 12 +++-- .../NativeAutodetectProcessFactory.java | 17 +++++-- .../job/process/CountingInputStreamTests.java | 34 +++++++++++-- .../job/process/DataCountsReporterTests.java | 45 +++++++++++------ .../job/process/DummyDataCountsReporter.java | 5 +- .../autodetect/AutodetectBuilderTests.java | 14 ++++-- .../AutodetectProcessManagerTests.java | 22 +++++++-- 10 files changed, 178 insertions(+), 54 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index cd13b2c8bb657..538521909a7b8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -292,8 +292,11 @@ public List> getSettings() { MAX_MACHINE_MEMORY_PERCENT, AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING, AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING, + AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC, DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING, + DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING, DataCountsReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING, + DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING, AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE, AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE, AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP)); @@ -379,7 +382,12 @@ public Collection createComponents(Client client, ClusterService cluster // This will only only happen when path.home is not set, which is disallowed in production throw new ElasticsearchException("Failed to create native process controller for Machine Learning"); } - autodetectProcessFactory = new NativeAutodetectProcessFactory(environment, settings, nativeController, client); + autodetectProcessFactory = new NativeAutodetectProcessFactory( + environment, + settings, + nativeController, + client, + clusterService); normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, settings, nativeController); } catch (IOException e) { // This also should not happen in production, as the MachineLearningFeatureSet should have @@ -397,7 +405,7 @@ public Collection createComponents(Client client, ClusterService cluster threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)); AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(env, settings, client, threadPool, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, - normalizerFactory, xContentRegistry, auditor); + normalizerFactory, xContentRegistry, auditor, clusterService); this.autodetectProcessManager.set(autodetectProcessManager); DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, jobResultsProvider, auditor, System::currentTimeMillis); DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java index d906ccf2f7a9b..a00c14078eb95 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.job.process; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -42,15 +43,28 @@ public class DataCountsReporter extends AbstractComponent { * The max percentage of date parse errors allowed before * an exception is thrown. */ + @Deprecated public static final Setting ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING = Setting.intSetting("max.percent.date.errors", 25, - Property.NodeScope); - + Property.NodeScope, Property.Deprecated); + public static final Setting MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING = Setting.intSetting( + "xpack.ml.max_percent_date_errors", + ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING, + 0, + Property.Dynamic, + Property.NodeScope); /** * The max percentage of out of order records allowed before * an exception is thrown. */ + @Deprecated public static final Setting ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING = Setting - .intSetting("max.percent.outoforder.errors", 25, Property.NodeScope); + .intSetting("max.percent.outoforder.errors", 25, Property.NodeScope, Property.Deprecated); + public static final Setting MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING = Setting.intSetting( + "xpack.ml.max_percent_out_of_order_errors", + ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING, + 0, + Property.Dynamic, + Property.NodeScope); private static final TimeValue PERSIST_INTERVAL = TimeValue.timeValueMillis(10_000L); @@ -66,14 +80,15 @@ public class DataCountsReporter extends AbstractComponent { private long logEvery = 1; private long logCount = 0; - private final int acceptablePercentDateParseErrors; - private final int acceptablePercentOutOfOrderErrors; + private volatile int acceptablePercentDateParseErrors; + private volatile int acceptablePercentOutOfOrderErrors; private Function reportingBoundaryFunction; private DataStreamDiagnostics diagnostics; - public DataCountsReporter(Settings settings, Job job, DataCounts counts, JobDataCountsPersister dataCountsPersister) { + public DataCountsReporter(Settings settings, Job job, DataCounts counts, JobDataCountsPersister dataCountsPersister, + ClusterService clusterService) { super(settings); @@ -84,9 +99,12 @@ public DataCountsReporter(Settings settings, Job job, DataCounts counts, JobData incrementalRecordStats = new DataCounts(job.getId()); diagnostics = new DataStreamDiagnostics(job, counts); - acceptablePercentDateParseErrors = ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.get(settings); - acceptablePercentOutOfOrderErrors = ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.get(settings); - + acceptablePercentDateParseErrors = MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING.get(settings); + acceptablePercentOutOfOrderErrors = MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING.get(settings); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING, this::setAcceptablePercentDateParseErrors); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING, this::setAcceptablePercentOutOfOrderErrors); reportingBoundaryFunction = this::reportEvery10000Records; } @@ -352,4 +370,17 @@ private void retrieveDiagnosticsIntermediateResults() { diagnostics.resetCounts(); } + + private void setAcceptablePercentDateParseErrors(int acceptablePercentDateParseErrors) { + logger.info("Changing [{}] from [{}] to [{}]", MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING.getKey(), + this.acceptablePercentDateParseErrors, acceptablePercentDateParseErrors); + this.acceptablePercentDateParseErrors = acceptablePercentDateParseErrors; + } + + private void setAcceptablePercentOutOfOrderErrors(int acceptablePercentOutOfOrderErrors) { + logger.info("Changing [{}] from [{}] to [{}]", MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING.getKey(), + this.acceptablePercentOutOfOrderErrors, acceptablePercentOutOfOrderErrors); + this.acceptablePercentOutOfOrderErrors = acceptablePercentOutOfOrderErrors; + } + } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java index 0094eba97cecb..39230858d37f2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java @@ -83,8 +83,15 @@ public class AutodetectBuilder { /** * The maximum number of anomaly records that will be written each bucket */ + @Deprecated public static final Setting MAX_ANOMALY_RECORDS_SETTING = Setting.intSetting("max.anomaly.records", DEFAULT_MAX_NUM_RECORDS, - Setting.Property.NodeScope); + Setting.Property.NodeScope, Setting.Property.Deprecated); + public static final Setting MAX_ANOMALY_RECORDS_SETTING_DYNAMIC = Setting.intSetting( + "xpack.ml.max_anomaly_records", + MAX_ANOMALY_RECORDS_SETTING, + 1, + Setting.Property.NodeScope, + Setting.Property.Dynamic); /** * Config setting storing the flag that disables model persistence @@ -168,9 +175,9 @@ public AutodetectBuilder scheduledEvents(List scheduledEvents) { /** * Requests that the controller daemon start an autodetect process. */ - public void build() throws IOException { + public void build(int maxAnomalyRecords) throws IOException { - List command = buildAutodetectCommand(); + List command = buildAutodetectCommand(maxAnomalyRecords); buildLimits(command); buildModelPlotConfig(command); @@ -184,7 +191,7 @@ public void build() throws IOException { /** * Visible for testing */ - List buildAutodetectCommand() { + List buildAutodetectCommand(int maxAnomalyRecords) { List command = new ArrayList<>(); command.add(AUTODETECT_PATH); @@ -212,7 +219,7 @@ List buildAutodetectCommand() { command.add(LENGTH_ENCODED_INPUT_ARG); // Limit the number of output records - command.add(maxAnomalyRecordsArg(settings)); + command.add(maxAnomalyRecordsArg(maxAnomalyRecords)); // always set the time field String timeFieldArg = TIME_FIELD_ARG + getTimeFieldOrDefault(job); @@ -244,9 +251,8 @@ List buildAutodetectCommand() { return command; } - - static String maxAnomalyRecordsArg(Settings settings) { - return "--maxAnomalyRecords=" + MAX_ANOMALY_RECORDS_SETTING.get(settings); + static String maxAnomalyRecordsArg(int maxAnomalyRecords) { + return "--maxAnomalyRecords=" + maxAnomalyRecords; } private static String getTimeFieldOrDefault(Job job) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index fa05c2e63ee11..7e6d923bb5192 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; @@ -130,12 +131,13 @@ public class AutodetectProcessManager extends AbstractComponent { private final NamedXContentRegistry xContentRegistry; private final Auditor auditor; + private final ClusterService clusterService; public AutodetectProcessManager(Environment environment, Settings settings, Client client, ThreadPool threadPool, JobManager jobManager, JobResultsProvider jobResultsProvider, JobResultsPersister jobResultsPersister, JobDataCountsPersister jobDataCountsPersister, AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory, - NamedXContentRegistry xContentRegistry, Auditor auditor) { + NamedXContentRegistry xContentRegistry, Auditor auditor, ClusterService clusterService) { super(settings); this.environment = environment; this.client = client; @@ -150,6 +152,7 @@ public AutodetectProcessManager(Environment environment, Settings settings, Clie this.jobDataCountsPersister = jobDataCountsPersister; this.auditor = auditor; this.nativeStorageProvider = new NativeStorageProvider(environment, MIN_DISK_SPACE_OFF_HEAP.get(settings)); + this.clusterService = clusterService; } public void onNodeStartup() { @@ -493,8 +496,11 @@ AutodetectCommunicator create(JobTask jobTask, AutodetectParams autodetectParams Job job = jobManager.getJobOrThrowIfUnknown(jobId); // A TP with no queue, so that we fail immediately if there are no threads available ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME); - DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, autodetectParams.dataCounts(), - jobDataCountsPersister); + DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, + job, + autodetectParams.dataCounts(), + jobDataCountsPersister, + clusterService); ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobResultsProvider, new JobRenormalizedResultsPersister(job.getId(), settings, client), normalizerFactory); ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index 01ad0bec85aec..a380a8bae3b1e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -7,6 +7,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; @@ -40,12 +41,17 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory private final Environment env; private final Settings settings; private final NativeController nativeController; + private volatile int maxAnomalyRecords; - public NativeAutodetectProcessFactory(Environment env, Settings settings, NativeController nativeController, Client client) { + public NativeAutodetectProcessFactory(Environment env, Settings settings, NativeController nativeController, Client client, + ClusterService clusterService) { this.env = Objects.requireNonNull(env); this.settings = Objects.requireNonNull(settings); this.nativeController = Objects.requireNonNull(nativeController); this.client = client; + this.maxAnomalyRecords = AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.get(settings); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC, this::setMaxAnomalyRecords); } @Override @@ -95,8 +101,7 @@ private void createNativeProcess(Job job, AutodetectParams autodetectParams, Pro if (autodetectParams.quantiles() != null) { autodetectBuilder.quantiles(autodetectParams.quantiles()); } - - autodetectBuilder.build(); + autodetectBuilder.build(maxAnomalyRecords); processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT); } catch (IOException e) { String msg = "Failed to launch autodetect for job " + job.getId(); @@ -104,5 +109,11 @@ private void createNativeProcess(Job job, AutodetectParams autodetectParams, Pro throw ExceptionsHelper.serverError(msg, e); } } + + private void setMaxAnomalyRecords(int maxAnomalyRecords) { + LOGGER.info("Changing [{}] from [{}] to [{}]", AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.getKey(), + this.maxAnomalyRecords, maxAnomalyRecords); + this.maxAnomalyRecords = maxAnomalyRecords; + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/CountingInputStreamTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/CountingInputStreamTests.java index 45a5e57af5ff7..b867a6bbe1e53 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/CountingInputStreamTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/CountingInputStreamTests.java @@ -5,18 +5,44 @@ */ package org.elasticsearch.xpack.ml.job.process; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; +import org.junit.Before; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.HashSet; +import java.util.Set; + +import static org.elasticsearch.mock.orig.Mockito.when; +import static org.mockito.Mockito.mock; public class CountingInputStreamTests extends ESTestCase { + private ClusterService clusterService; + + @Before + public void setUpMocks() { + Settings settings = Settings.builder().put(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING.getKey(), 10) + .put(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING.getKey(), 10) + .build(); + Set> setOfSettings = new HashSet<>(); + setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING); + setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING); + ClusterSettings clusterSettings = new ClusterSettings(settings, setOfSettings); + + clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + } + public void testRead_OneByteAtATime() throws IOException { - DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService); final String TEXT = "123"; InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8)); @@ -30,7 +56,7 @@ public void testRead_OneByteAtATime() throws IOException { public void testRead_WithBuffer() throws IOException { final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail."; - DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService); InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8)); @@ -44,7 +70,7 @@ public void testRead_WithBuffer() throws IOException { public void testRead_WithTinyBuffer() throws IOException { final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail."; - DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService); InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8)); @@ -57,7 +83,7 @@ public void testRead_WithTinyBuffer() throws IOException { public void testRead_WithResets() throws IOException { - DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService); final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail."; InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8)); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java index d3afb7324186f..5f56db164a35a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java @@ -6,6 +6,9 @@ package org.elasticsearch.xpack.ml.job.process; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.Environment; @@ -22,11 +25,15 @@ import java.io.IOException; import java.util.Arrays; import java.util.Date; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.mock.orig.Mockito.when; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -38,12 +45,13 @@ public class DataCountsReporterTests extends ESTestCase { private JobDataCountsPersister jobDataCountsPersister; private Settings settings; private TimeValue bucketSpan = TimeValue.timeValueSeconds(300); + private ClusterService clusterService; @Before public void setUpMocks() { settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) - .put(DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.getKey(), MAX_PERCENT_DATE_PARSE_ERRORS) - .put(DataCountsReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.getKey(), MAX_PERCENT_OUT_OF_ORDER_ERRORS) + .put(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING.getKey(), MAX_PERCENT_DATE_PARSE_ERRORS) + .put(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING.getKey(), MAX_PERCENT_OUT_OF_ORDER_ERRORS) .build(); AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build())); @@ -51,6 +59,15 @@ public void setUpMocks() { acBuilder.setLatency(TimeValue.ZERO); acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build())); + + Set> setOfSettings = new HashSet<>(); + setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING); + setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING); + ClusterSettings clusterSettings = new ClusterSettings(settings, setOfSettings); + + clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + Job.Builder builder = new Job.Builder("sr"); builder.setAnalysisConfig(acBuilder); builder.setDataDescription(new DataDescription.Builder()); @@ -61,14 +78,14 @@ public void setUpMocks() { public void testSettingAcceptablePercentages() throws IOException { DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), - jobDataCountsPersister); + jobDataCountsPersister, clusterService); assertEquals(dataCountsReporter.getAcceptablePercentDateParseErrors(), MAX_PERCENT_DATE_PARSE_ERRORS); assertEquals(dataCountsReporter.getAcceptablePercentOutOfOrderErrors(), MAX_PERCENT_OUT_OF_ORDER_ERRORS); } public void testSimpleConstructor() throws Exception { DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), - jobDataCountsPersister); + jobDataCountsPersister, clusterService); DataCounts stats = dataCountsReporter.incrementalStats(); assertNotNull(stats); assertAllCountFieldsEqualZero(stats); @@ -79,7 +96,7 @@ public void testComplexConstructor() throws Exception { new Date(), new Date(), new Date(), new Date(), new Date()); DataCountsReporter dataCountsReporter = - new DataCountsReporter(settings, job, counts, jobDataCountsPersister); + new DataCountsReporter(settings, job, counts, jobDataCountsPersister, clusterService); DataCounts stats = dataCountsReporter.incrementalStats(); assertNotNull(stats); assertAllCountFieldsEqualZero(stats); @@ -97,7 +114,7 @@ public void testComplexConstructor() throws Exception { public void testResetIncrementalCounts() throws Exception { DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), - jobDataCountsPersister); + jobDataCountsPersister, clusterService); DataCounts stats = dataCountsReporter.incrementalStats(); assertNotNull(stats); assertAllCountFieldsEqualZero(stats); @@ -150,7 +167,7 @@ public void testResetIncrementalCounts() throws Exception { public void testReportLatestTimeIncrementalStats() throws IOException { DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), - jobDataCountsPersister); + jobDataCountsPersister, clusterService); dataCountsReporter.startNewIncrementalCount(); dataCountsReporter.reportLatestTimeIncrementalStats(5001L); assertEquals(5001L, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); @@ -158,7 +175,7 @@ public void testReportLatestTimeIncrementalStats() throws IOException { public void testReportRecordsWritten() { DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), - jobDataCountsPersister); + jobDataCountsPersister, clusterService); dataCountsReporter.setAnalysedFieldsPerRecord(3); dataCountsReporter.reportRecordWritten(5, 2000); @@ -182,7 +199,7 @@ public void testReportRecordsWritten() { } public void testReportRecordsWritten_Given9999Records() { - DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService); dataCountsReporter.setAnalysedFieldsPerRecord(3); for (int i = 1; i <= 9999; i++) { @@ -199,7 +216,7 @@ public void testReportRecordsWritten_Given9999Records() { } public void testReportRecordsWritten_Given30000Records() { - DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService); dataCountsReporter.setAnalysedFieldsPerRecord(3); for (int i = 1; i <= 30001; i++) { @@ -216,7 +233,7 @@ public void testReportRecordsWritten_Given30000Records() { } public void testReportRecordsWritten_Given100_000Records() { - DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService); dataCountsReporter.setAnalysedFieldsPerRecord(3); for (int i = 1; i <= 100000; i++) { @@ -233,7 +250,7 @@ public void testReportRecordsWritten_Given100_000Records() { } public void testReportRecordsWritten_Given1_000_000Records() { - DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService); dataCountsReporter.setAnalysedFieldsPerRecord(3); for (int i = 1; i <= 1_000_000; i++) { @@ -250,7 +267,7 @@ public void testReportRecordsWritten_Given1_000_000Records() { } public void testReportRecordsWritten_Given2_000_000Records() { - DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService); dataCountsReporter.setAnalysedFieldsPerRecord(3); for (int i = 1; i <= 2_000_000; i++) { @@ -269,7 +286,7 @@ public void testReportRecordsWritten_Given2_000_000Records() { public void testFinishReporting() { DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, new DataCounts(job.getId()), - jobDataCountsPersister); + jobDataCountsPersister, clusterService); dataCountsReporter.setAnalysedFieldsPerRecord(3); Date now = new Date(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DummyDataCountsReporter.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DummyDataCountsReporter.java index bcf41a994b9e1..6b4c68e1f30ac 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DummyDataCountsReporter.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DummyDataCountsReporter.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.job.process; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; @@ -26,9 +27,9 @@ class DummyDataCountsReporter extends DataCountsReporter { int logStatusCallCount = 0; - DummyDataCountsReporter() { + DummyDataCountsReporter(ClusterService clusterService) { super(Settings.EMPTY, createJob(), new DataCounts("DummyJobId"), - mock(JobDataCountsPersister.class)); + mock(JobDataCountsPersister.class), clusterService); } /** diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilderTests.java index 325ad52864bfa..5e214bf969ddf 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilderTests.java @@ -64,7 +64,8 @@ public void testBuildAutodetectCommand() { dd.setTimeField("tf"); job.setDataDescription(dd); - List command = autodetectBuilder(job.build()).buildAutodetectCommand(); + List command = autodetectBuilder( + job.build()).buildAutodetectCommand(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.get(settings)); assertEquals(12, command.size()); assertTrue(command.contains(AutodetectBuilder.AUTODETECT_PATH)); assertTrue(command.contains(AutodetectBuilder.BUCKET_SPAN_ARG + "120")); @@ -74,7 +75,8 @@ public void testBuildAutodetectCommand() { assertTrue(command.contains(AutodetectBuilder.MULTIVARIATE_BY_FIELDS_ARG)); assertTrue(command.contains(AutodetectBuilder.LENGTH_ENCODED_INPUT_ARG)); - assertTrue(command.contains(AutodetectBuilder.maxAnomalyRecordsArg(settings))); + assertTrue( + command.contains(AutodetectBuilder.maxAnomalyRecordsArg(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.get(settings)))); assertTrue(command.contains(AutodetectBuilder.TIME_FIELD_ARG + "tf")); assertTrue(command.contains(AutodetectBuilder.JOB_ID_ARG + "unit-test-job")); @@ -88,7 +90,7 @@ public void testBuildAutodetectCommand() { public void testBuildAutodetectCommand_defaultTimeField() { Job.Builder job = buildJobBuilder("unit-test-job"); - List command = autodetectBuilder(job.build()).buildAutodetectCommand(); + List command = autodetectBuilder(job.build()).buildAutodetectCommand(1); assertTrue(command.contains(AutodetectBuilder.TIME_FIELD_ARG + "time")); } @@ -100,13 +102,15 @@ public void testBuildAutodetectCommand_givenPersistModelState() { int expectedPersistInterval = 10800 + AutodetectBuilder.calculateStaggeringInterval(job.getId()); - List command = autodetectBuilder(job.build()).buildAutodetectCommand(); + List command = autodetectBuilder( + job.build()).buildAutodetectCommand(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.get(settings)); assertFalse(command.contains(AutodetectBuilder.PERSIST_INTERVAL_ARG + expectedPersistInterval)); settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); env = TestEnvironment.newEnvironment(settings); - command = autodetectBuilder(job.build()).buildAutodetectCommand(); + command = autodetectBuilder( + job.build()).buildAutodetectCommand(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.get(settings)); assertTrue(command.contains(AutodetectBuilder.PERSIST_INTERVAL_ARG + expectedPersistInterval)); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 43cc909e392ea..dd67de41996b3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -8,7 +8,10 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; @@ -41,6 +44,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; +import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; @@ -106,6 +110,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { private JobDataCountsPersister jobDataCountsPersister; private NormalizerFactory normalizerFactory; private Auditor auditor; + private ClusterService clusterService; private DataCounts dataCounts = new DataCounts("foo"); private ModelSizeStats modelSizeStats = new ModelSizeStats.Builder("foo").build(); @@ -126,6 +131,15 @@ public void setup() throws Exception { normalizerFactory = mock(NormalizerFactory.class); auditor = mock(Auditor.class); + + Set> setOfSettings = new HashSet<>(); + setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING); + setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING); + ClusterSettings clusterSettings = new ClusterSettings(settings, setOfSettings); + + clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo")); doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") @@ -228,7 +242,7 @@ public void testOpenJob_exceedMaxNumJobs() { settings.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), 3); AutodetectProcessManager manager = spy(new AutodetectProcessManager(environment, settings.build(), client, threadPool, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, - normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor)); + normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService)); doReturn(executorService).when(manager).createAutodetectExecutorService(any()); doAnswer(invocationOnMock -> { @@ -583,7 +597,7 @@ public void testCreate_notEnoughThreads() throws IOException { (j, autodetectParams, e, onProcessCrash) -> autodetectProcess; AutodetectProcessManager manager = new AutodetectProcessManager(environment, Settings.EMPTY, client, threadPool, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, - normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor); + normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService); JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("my_id"); @@ -656,7 +670,7 @@ private AutodetectProcessManager createNonSpyManager(String jobId) { (j, autodetectParams, e, onProcessCrash) -> autodetectProcess; return new AutodetectProcessManager(environment, Settings.EMPTY, client, threadPool, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, - normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor); + normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService); } private AutodetectParams buildAutodetectParams() { @@ -682,7 +696,7 @@ private AutodetectProcessManager createManager(AutodetectCommunicator communicat AutodetectProcessManager manager = new AutodetectProcessManager(environment, Settings.EMPTY, client, threadPool, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, - new NamedXContentRegistry(Collections.emptyList()), auditor); + new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService); manager = spy(manager); doReturn(communicator).when(manager).create(any(), eq(buildAutodetectParams()), any()); return manager; From ca347eb5e357aedbd66449388a75c996a5497502 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Mon, 24 Sep 2018 08:28:50 -0500 Subject: [PATCH 2/3] Changing to pull in updating settings and pass to constructor --- .../process/autodetect/AutodetectBuilder.java | 12 +++++----- .../NativeAutodetectProcessFactory.java | 22 +++++++++---------- .../autodetect/AutodetectBuilderTests.java | 14 +++++------- 3 files changed, 22 insertions(+), 26 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java index 39230858d37f2..78da6fe434540 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java @@ -175,9 +175,9 @@ public AutodetectBuilder scheduledEvents(List scheduledEvents) { /** * Requests that the controller daemon start an autodetect process. */ - public void build(int maxAnomalyRecords) throws IOException { + public void build() throws IOException { - List command = buildAutodetectCommand(maxAnomalyRecords); + List command = buildAutodetectCommand(); buildLimits(command); buildModelPlotConfig(command); @@ -191,7 +191,7 @@ public void build(int maxAnomalyRecords) throws IOException { /** * Visible for testing */ - List buildAutodetectCommand(int maxAnomalyRecords) { + List buildAutodetectCommand() { List command = new ArrayList<>(); command.add(AUTODETECT_PATH); @@ -219,7 +219,7 @@ List buildAutodetectCommand(int maxAnomalyRecords) { command.add(LENGTH_ENCODED_INPUT_ARG); // Limit the number of output records - command.add(maxAnomalyRecordsArg(maxAnomalyRecords)); + command.add(maxAnomalyRecordsArg(settings)); // always set the time field String timeFieldArg = TIME_FIELD_ARG + getTimeFieldOrDefault(job); @@ -251,8 +251,8 @@ List buildAutodetectCommand(int maxAnomalyRecords) { return command; } - static String maxAnomalyRecordsArg(int maxAnomalyRecords) { - return "--maxAnomalyRecords=" + maxAnomalyRecords; + static String maxAnomalyRecordsArg(Settings settings) { + return "--maxAnomalyRecords=" + MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.get(settings); } private static String getTimeFieldOrDefault(Job job) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index a380a8bae3b1e..06055476f7642 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -41,7 +41,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory private final Environment env; private final Settings settings; private final NativeController nativeController; - private volatile int maxAnomalyRecords; + private final ClusterService clusterService; public NativeAutodetectProcessFactory(Environment env, Settings settings, NativeController nativeController, Client client, ClusterService clusterService) { @@ -49,9 +49,7 @@ public NativeAutodetectProcessFactory(Environment env, Settings settings, Native this.settings = Objects.requireNonNull(settings); this.nativeController = Objects.requireNonNull(nativeController); this.client = client; - this.maxAnomalyRecords = AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.get(settings); - clusterService.getClusterSettings() - .addSettingsUpdateConsumer(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC, this::setMaxAnomalyRecords); + this.clusterService = clusterService; } @Override @@ -91,8 +89,15 @@ public AutodetectProcess createAutodetectProcess(Job job, private void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes, List filesToDelete) { try { + + Settings updatedSettings = Settings.builder() + .put(settings) + .put(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.getKey(), + clusterService.getClusterSettings().get(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC)) + .build(); + AutodetectBuilder autodetectBuilder = new AutodetectBuilder(job, filesToDelete, LOGGER, env, - settings, nativeController, processPipes) + updatedSettings, nativeController, processPipes) .referencedFilters(autodetectParams.filters()) .scheduledEvents(autodetectParams.scheduledEvents()); @@ -101,7 +106,7 @@ private void createNativeProcess(Job job, AutodetectParams autodetectParams, Pro if (autodetectParams.quantiles() != null) { autodetectBuilder.quantiles(autodetectParams.quantiles()); } - autodetectBuilder.build(maxAnomalyRecords); + autodetectBuilder.build(); processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT); } catch (IOException e) { String msg = "Failed to launch autodetect for job " + job.getId(); @@ -110,10 +115,5 @@ private void createNativeProcess(Job job, AutodetectParams autodetectParams, Pro } } - private void setMaxAnomalyRecords(int maxAnomalyRecords) { - LOGGER.info("Changing [{}] from [{}] to [{}]", AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.getKey(), - this.maxAnomalyRecords, maxAnomalyRecords); - this.maxAnomalyRecords = maxAnomalyRecords; - } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilderTests.java index 5e214bf969ddf..325ad52864bfa 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilderTests.java @@ -64,8 +64,7 @@ public void testBuildAutodetectCommand() { dd.setTimeField("tf"); job.setDataDescription(dd); - List command = autodetectBuilder( - job.build()).buildAutodetectCommand(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.get(settings)); + List command = autodetectBuilder(job.build()).buildAutodetectCommand(); assertEquals(12, command.size()); assertTrue(command.contains(AutodetectBuilder.AUTODETECT_PATH)); assertTrue(command.contains(AutodetectBuilder.BUCKET_SPAN_ARG + "120")); @@ -75,8 +74,7 @@ public void testBuildAutodetectCommand() { assertTrue(command.contains(AutodetectBuilder.MULTIVARIATE_BY_FIELDS_ARG)); assertTrue(command.contains(AutodetectBuilder.LENGTH_ENCODED_INPUT_ARG)); - assertTrue( - command.contains(AutodetectBuilder.maxAnomalyRecordsArg(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.get(settings)))); + assertTrue(command.contains(AutodetectBuilder.maxAnomalyRecordsArg(settings))); assertTrue(command.contains(AutodetectBuilder.TIME_FIELD_ARG + "tf")); assertTrue(command.contains(AutodetectBuilder.JOB_ID_ARG + "unit-test-job")); @@ -90,7 +88,7 @@ public void testBuildAutodetectCommand() { public void testBuildAutodetectCommand_defaultTimeField() { Job.Builder job = buildJobBuilder("unit-test-job"); - List command = autodetectBuilder(job.build()).buildAutodetectCommand(1); + List command = autodetectBuilder(job.build()).buildAutodetectCommand(); assertTrue(command.contains(AutodetectBuilder.TIME_FIELD_ARG + "time")); } @@ -102,15 +100,13 @@ public void testBuildAutodetectCommand_givenPersistModelState() { int expectedPersistInterval = 10800 + AutodetectBuilder.calculateStaggeringInterval(job.getId()); - List command = autodetectBuilder( - job.build()).buildAutodetectCommand(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.get(settings)); + List command = autodetectBuilder(job.build()).buildAutodetectCommand(); assertFalse(command.contains(AutodetectBuilder.PERSIST_INTERVAL_ARG + expectedPersistInterval)); settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); env = TestEnvironment.newEnvironment(settings); - command = autodetectBuilder( - job.build()).buildAutodetectCommand(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.get(settings)); + command = autodetectBuilder(job.build()).buildAutodetectCommand(); assertTrue(command.contains(AutodetectBuilder.PERSIST_INTERVAL_ARG + expectedPersistInterval)); } From eb57befcb1b7670a4f72b5ec216be1efe79d1533 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Mon, 24 Sep 2018 09:32:54 -0500 Subject: [PATCH 3/3] adding note about only newly opened jobs getting updated value --- .../xpack/ml/job/process/autodetect/AutodetectBuilder.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java index 78da6fe434540..4942200606dba 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java @@ -86,6 +86,7 @@ public class AutodetectBuilder { @Deprecated public static final Setting MAX_ANOMALY_RECORDS_SETTING = Setting.intSetting("max.anomaly.records", DEFAULT_MAX_NUM_RECORDS, Setting.Property.NodeScope, Setting.Property.Deprecated); + // Though this setting is dynamic, it is only set when a new job is opened. So, already runnin jobs will not get the updated value. public static final Setting MAX_ANOMALY_RECORDS_SETTING_DYNAMIC = Setting.intSetting( "xpack.ml.max_anomaly_records", MAX_ANOMALY_RECORDS_SETTING,