Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,11 @@ public List<Setting<?>> getSettings() {
MAX_MACHINE_MEMORY_PERCENT,
AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING,
AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING,
AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC,
DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING,
DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING,
DataCountsReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING,
DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING,
AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE,
AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE,
AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP));
Expand Down Expand Up @@ -379,7 +382,12 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
// This will only only happen when path.home is not set, which is disallowed in production
throw new ElasticsearchException("Failed to create native process controller for Machine Learning");
}
autodetectProcessFactory = new NativeAutodetectProcessFactory(environment, settings, nativeController, client);
autodetectProcessFactory = new NativeAutodetectProcessFactory(
environment,
settings,
nativeController,
client,
clusterService);
normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, settings, nativeController);
} catch (IOException e) {
// This also should not happen in production, as the MachineLearningFeatureSet should have
Expand All @@ -397,7 +405,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME));
AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(env, settings, client, threadPool,
jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, xContentRegistry, auditor);
normalizerFactory, xContentRegistry, auditor, clusterService);
this.autodetectProcessManager.set(autodetectProcessManager);
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, jobResultsProvider, auditor, System::currentTimeMillis);
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.job.process;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
Expand Down Expand Up @@ -42,15 +43,28 @@ public class DataCountsReporter extends AbstractComponent {
* The max percentage of date parse errors allowed before
* an exception is thrown.
*/
@Deprecated
public static final Setting<Integer> ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING = Setting.intSetting("max.percent.date.errors", 25,
Property.NodeScope);

Property.NodeScope, Property.Deprecated);
public static final Setting<Integer> MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING = Setting.intSetting(
"xpack.ml.max_percent_date_errors",
ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING,
0,
Property.Dynamic,
Property.NodeScope);
/**
* The max percentage of out of order records allowed before
* an exception is thrown.
*/
@Deprecated
public static final Setting<Integer> ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING = Setting
.intSetting("max.percent.outoforder.errors", 25, Property.NodeScope);
.intSetting("max.percent.outoforder.errors", 25, Property.NodeScope, Property.Deprecated);
public static final Setting<Integer> MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING = Setting.intSetting(
"xpack.ml.max_percent_out_of_order_errors",
ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING,
0,
Property.Dynamic,
Property.NodeScope);

private static final TimeValue PERSIST_INTERVAL = TimeValue.timeValueMillis(10_000L);

Expand All @@ -66,14 +80,15 @@ public class DataCountsReporter extends AbstractComponent {
private long logEvery = 1;
private long logCount = 0;

private final int acceptablePercentDateParseErrors;
private final int acceptablePercentOutOfOrderErrors;
private volatile int acceptablePercentDateParseErrors;
private volatile int acceptablePercentOutOfOrderErrors;

private Function<Long, Boolean> reportingBoundaryFunction;

private DataStreamDiagnostics diagnostics;

public DataCountsReporter(Settings settings, Job job, DataCounts counts, JobDataCountsPersister dataCountsPersister) {
public DataCountsReporter(Settings settings, Job job, DataCounts counts, JobDataCountsPersister dataCountsPersister,
ClusterService clusterService) {

super(settings);

Expand All @@ -84,9 +99,12 @@ public DataCountsReporter(Settings settings, Job job, DataCounts counts, JobData
incrementalRecordStats = new DataCounts(job.getId());
diagnostics = new DataStreamDiagnostics(job, counts);

acceptablePercentDateParseErrors = ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.get(settings);
acceptablePercentOutOfOrderErrors = ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.get(settings);

acceptablePercentDateParseErrors = MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING.get(settings);
acceptablePercentOutOfOrderErrors = MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING, this::setAcceptablePercentDateParseErrors);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING, this::setAcceptablePercentOutOfOrderErrors);
reportingBoundaryFunction = this::reportEvery10000Records;
}

Expand Down Expand Up @@ -352,4 +370,17 @@ private void retrieveDiagnosticsIntermediateResults() {

diagnostics.resetCounts();
}

private void setAcceptablePercentDateParseErrors(int acceptablePercentDateParseErrors) {
logger.info("Changing [{}] from [{}] to [{}]", MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING.getKey(),
this.acceptablePercentDateParseErrors, acceptablePercentDateParseErrors);
this.acceptablePercentDateParseErrors = acceptablePercentDateParseErrors;
}

private void setAcceptablePercentOutOfOrderErrors(int acceptablePercentOutOfOrderErrors) {
logger.info("Changing [{}] from [{}] to [{}]", MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING.getKey(),
this.acceptablePercentOutOfOrderErrors, acceptablePercentOutOfOrderErrors);
this.acceptablePercentOutOfOrderErrors = acceptablePercentOutOfOrderErrors;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,16 @@ public class AutodetectBuilder {
/**
* The maximum number of anomaly records that will be written each bucket
*/
@Deprecated
public static final Setting<Integer> MAX_ANOMALY_RECORDS_SETTING = Setting.intSetting("max.anomaly.records", DEFAULT_MAX_NUM_RECORDS,
Setting.Property.NodeScope);
Setting.Property.NodeScope, Setting.Property.Deprecated);
// Though this setting is dynamic, it is only set when a new job is opened. So, already runnin jobs will not get the updated value.
public static final Setting<Integer> MAX_ANOMALY_RECORDS_SETTING_DYNAMIC = Setting.intSetting(
"xpack.ml.max_anomaly_records",
MAX_ANOMALY_RECORDS_SETTING,
1,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/**
* Config setting storing the flag that disables model persistence
Expand Down Expand Up @@ -244,9 +252,8 @@ List<String> buildAutodetectCommand() {
return command;
}


static String maxAnomalyRecordsArg(Settings settings) {
return "--maxAnomalyRecords=" + MAX_ANOMALY_RECORDS_SETTING.get(settings);
return "--maxAnomalyRecords=" + MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.get(settings);
}

private static String getTimeFieldOrDefault(Job job) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
Expand Down Expand Up @@ -130,12 +131,13 @@ public class AutodetectProcessManager extends AbstractComponent {
private final NamedXContentRegistry xContentRegistry;

private final Auditor auditor;
private final ClusterService clusterService;

public AutodetectProcessManager(Environment environment, Settings settings, Client client, ThreadPool threadPool,
JobManager jobManager, JobResultsProvider jobResultsProvider, JobResultsPersister jobResultsPersister,
JobDataCountsPersister jobDataCountsPersister,
AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory,
NamedXContentRegistry xContentRegistry, Auditor auditor) {
NamedXContentRegistry xContentRegistry, Auditor auditor, ClusterService clusterService) {
super(settings);
this.environment = environment;
this.client = client;
Expand All @@ -150,6 +152,7 @@ public AutodetectProcessManager(Environment environment, Settings settings, Clie
this.jobDataCountsPersister = jobDataCountsPersister;
this.auditor = auditor;
this.nativeStorageProvider = new NativeStorageProvider(environment, MIN_DISK_SPACE_OFF_HEAP.get(settings));
this.clusterService = clusterService;
}

public void onNodeStartup() {
Expand Down Expand Up @@ -493,8 +496,11 @@ AutodetectCommunicator create(JobTask jobTask, AutodetectParams autodetectParams
Job job = jobManager.getJobOrThrowIfUnknown(jobId);
// A TP with no queue, so that we fail immediately if there are no threads available
ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME);
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, autodetectParams.dataCounts(),
jobDataCountsPersister);
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings,
job,
autodetectParams.dataCounts(),
jobDataCountsPersister,
clusterService);
ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobResultsProvider,
new JobRenormalizedResultsPersister(job.getId(), settings, client), normalizerFactory);
ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
Expand Down Expand Up @@ -40,12 +41,15 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
private final Environment env;
private final Settings settings;
private final NativeController nativeController;
private final ClusterService clusterService;

public NativeAutodetectProcessFactory(Environment env, Settings settings, NativeController nativeController, Client client) {
public NativeAutodetectProcessFactory(Environment env, Settings settings, NativeController nativeController, Client client,
ClusterService clusterService) {
this.env = Objects.requireNonNull(env);
this.settings = Objects.requireNonNull(settings);
this.nativeController = Objects.requireNonNull(nativeController);
this.client = client;
this.clusterService = clusterService;
}

@Override
Expand Down Expand Up @@ -85,8 +89,15 @@ public AutodetectProcess createAutodetectProcess(Job job,
private void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes,
List<Path> filesToDelete) {
try {

Settings updatedSettings = Settings.builder()
.put(settings)
.put(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.getKey(),
clusterService.getClusterSettings().get(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC))
.build();

AutodetectBuilder autodetectBuilder = new AutodetectBuilder(job, filesToDelete, LOGGER, env,
settings, nativeController, processPipes)
updatedSettings, nativeController, processPipes)
.referencedFilters(autodetectParams.filters())
.scheduledEvents(autodetectParams.scheduledEvents());

Expand All @@ -95,7 +106,6 @@ private void createNativeProcess(Job job, AutodetectParams autodetectParams, Pro
if (autodetectParams.quantiles() != null) {
autodetectBuilder.quantiles(autodetectParams.quantiles());
}

autodetectBuilder.build();
processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
} catch (IOException e) {
Expand All @@ -104,5 +114,6 @@ private void createNativeProcess(Job job, AutodetectParams autodetectParams, Pro
throw ExceptionsHelper.serverError(msg, e);
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,44 @@
*/
package org.elasticsearch.xpack.ml.job.process;

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;

import static org.elasticsearch.mock.orig.Mockito.when;
import static org.mockito.Mockito.mock;

public class CountingInputStreamTests extends ESTestCase {

private ClusterService clusterService;

@Before
public void setUpMocks() {
Settings settings = Settings.builder().put(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING.getKey(), 10)
.put(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING.getKey(), 10)
.build();
Set<Setting<?>> setOfSettings = new HashSet<>();
setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING);
setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING);
ClusterSettings clusterSettings = new ClusterSettings(settings, setOfSettings);

clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
}

public void testRead_OneByteAtATime() throws IOException {

DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService);

final String TEXT = "123";
InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8));
Expand All @@ -30,7 +56,7 @@ public void testRead_OneByteAtATime() throws IOException {
public void testRead_WithBuffer() throws IOException {
final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail.";

DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService);

InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8));

Expand All @@ -44,7 +70,7 @@ public void testRead_WithBuffer() throws IOException {
public void testRead_WithTinyBuffer() throws IOException {
final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail.";

DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService);

InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8));

Expand All @@ -57,7 +83,7 @@ public void testRead_WithTinyBuffer() throws IOException {

public void testRead_WithResets() throws IOException {

DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService);

final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail.";
InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8));
Expand Down
Loading