Skip to content

Commit 59d6b12

Browse files
committed
Remove unused environment from anomaly detector classes
1 parent 3c81082 commit 59d6b12

File tree

6 files changed

+28
-37
lines changed

6 files changed

+28
-37
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -613,7 +613,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
613613
}
614614
NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory,
615615
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME));
616-
AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(environment, settings, client, threadPool,
616+
AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(settings, client, threadPool,
617617
xContentRegistry, anomalyDetectionAuditor, clusterService, jobManager, jobResultsProvider, jobResultsPersister,
618618
jobDataCountsPersister, anomalyDetectionAnnotationPersister, autodetectProcessFactory, normalizerFactory,
619619
nativeStorageProvider, indexNameExpressionResolver);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ public class JobManager {
8888
private static final Logger logger = LogManager.getLogger(JobManager.class);
8989
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
9090

91-
private final Environment environment;
9291
private final JobResultsProvider jobResultsProvider;
9392
private final JobResultsPersister jobResultsPersister;
9493
private final ClusterService clusterService;
@@ -108,7 +107,6 @@ public JobManager(Environment environment, Settings settings, JobResultsProvider
108107
JobResultsPersister jobResultsPersister, ClusterService clusterService, AnomalyDetectionAuditor auditor,
109108
ThreadPool threadPool, Client client, UpdateJobProcessNotifier updateJobProcessNotifier,
110109
NamedXContentRegistry xContentRegistry) {
111-
this.environment = environment;
112110
this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider);
113111
this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister);
114112
this.clusterService = Objects.requireNonNull(clusterService);
@@ -227,7 +225,7 @@ private Map<String, Job> expandJobsFromClusterState(String expression, boolean a
227225
* The overall structure can be validated at parse time, but the exact names need to be checked separately,
228226
* as plugins that provide the functionality can be installed/uninstalled.
229227
*/
230-
static void validateCategorizationAnalyzer(Job.Builder jobBuilder, AnalysisRegistry analysisRegistry, Environment environment)
228+
static void validateCategorizationAnalyzer(Job.Builder jobBuilder, AnalysisRegistry analysisRegistry)
231229
throws IOException {
232230
CategorizationAnalyzerConfig categorizationAnalyzerConfig = jobBuilder.getAnalysisConfig().getCategorizationAnalyzerConfig();
233231
if (categorizationAnalyzerConfig != null) {
@@ -243,7 +241,7 @@ public void putJob(PutJobAction.Request request, AnalysisRegistry analysisRegist
243241
ActionListener<PutJobAction.Response> actionListener) throws IOException {
244242

245243
request.getJobBuilder().validateAnalysisLimitsAndSetDefaults(maxModelMemoryLimit);
246-
validateCategorizationAnalyzer(request.getJobBuilder(), analysisRegistry, environment);
244+
validateCategorizationAnalyzer(request.getJobBuilder(), analysisRegistry);
247245

248246
Job job = request.getJobBuilder().build(new Date());
249247

@@ -258,7 +256,7 @@ public void putJob(PutJobAction.Request request, AnalysisRegistry analysisRegist
258256
return;
259257
}
260258

261-
ActionListener<Boolean> putJobListener = new ActionListener<Boolean>() {
259+
ActionListener<Boolean> putJobListener = new ActionListener<>() {
262260
@Override
263261
public void onResponse(Boolean mappingsUpdated) {
264262

@@ -331,9 +329,7 @@ public void onFailure(Exception e) {
331329
);
332330

333331
ActionListener<Boolean> checkNoGroupWithTheJobId = ActionListener.wrap(
334-
ok -> {
335-
jobConfigProvider.groupExists(job.getId(), checkNoJobsWithGroupId);
336-
},
332+
ok -> jobConfigProvider.groupExists(job.getId(), checkNoJobsWithGroupId),
337333
actionListener::onFailure
338334
);
339335

@@ -400,7 +396,7 @@ private void postJobUpdate(UpdateJobAction.Request request, Job updatedJob, Acti
400396
));
401397
}
402398
} else {
403-
logger.debug("[{}] No process update required for job update: {}", () -> request.getJobId(), () -> {
399+
logger.debug("[{}] No process update required for job update: {}", request::getJobId, () -> {
404400
try {
405401
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
406402
request.getJobUpdate().toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.elasticsearch.common.util.concurrent.FutureUtils;
1616
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
1717
import org.elasticsearch.common.xcontent.XContentType;
18-
import org.elasticsearch.env.Environment;
1918
import org.elasticsearch.index.analysis.AnalysisRegistry;
2019
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
2120
import org.elasticsearch.xpack.core.ml.job.config.CategorizationAnalyzerConfig;
@@ -47,7 +46,6 @@
4746
import java.time.ZonedDateTime;
4847
import java.util.Collections;
4948
import java.util.Locale;
50-
import java.util.Optional;
5149
import java.util.concurrent.CountDownLatch;
5250
import java.util.concurrent.ExecutionException;
5351
import java.util.concurrent.ExecutorService;
@@ -62,7 +60,6 @@ public class AutodetectCommunicator implements Closeable {
6260
private static final Duration FLUSH_PROCESS_CHECK_FREQUENCY = Duration.ofSeconds(1);
6361

6462
private final Job job;
65-
private final Environment environment;
6663
private final AutodetectProcess autodetectProcess;
6764
private final StateStreamer stateStreamer;
6865
private final DataCountsReporter dataCountsReporter;
@@ -74,12 +71,11 @@ public class AutodetectCommunicator implements Closeable {
7471
private volatile CategorizationAnalyzer categorizationAnalyzer;
7572
private volatile boolean processKilled;
7673

77-
AutodetectCommunicator(Job job, Environment environment, AutodetectProcess process, StateStreamer stateStreamer,
74+
AutodetectCommunicator(Job job, AutodetectProcess process, StateStreamer stateStreamer,
7875
DataCountsReporter dataCountsReporter, AutodetectResultProcessor autodetectResultProcessor,
7976
BiConsumer<Exception, Boolean> onFinishHandler, NamedXContentRegistry xContentRegistry,
8077
ExecutorService autodetectWorkerExecutor) {
8178
this.job = job;
82-
this.environment = environment;
8379
this.autodetectProcess = process;
8480
this.stateStreamer = stateStreamer;
8581
this.dataCountsReporter = dataCountsReporter;
@@ -95,9 +91,9 @@ public void restoreState(ModelSnapshot modelSnapshot) {
9591
autodetectProcess.restoreState(stateStreamer, modelSnapshot);
9692
}
9793

98-
private DataToProcessWriter createProcessWriter(Optional<DataDescription> dataDescription) {
94+
private DataToProcessWriter createProcessWriter(DataDescription dataDescription) {
9995
return DataToProcessWriterFactory.create(true, includeTokensField, autodetectProcess,
100-
dataDescription.orElse(job.getDataDescription()), job.getAnalysisConfig(),
96+
dataDescription, job.getAnalysisConfig(),
10197
dataCountsReporter, xContentRegistry);
10298
}
10399

@@ -106,7 +102,7 @@ private DataToProcessWriter createProcessWriter(Optional<DataDescription> dataDe
106102
* can be used
107103
*/
108104
public void writeHeader() throws IOException {
109-
createProcessWriter(Optional.empty()).writeHeader();
105+
createProcessWriter(job.getDataDescription()).writeHeader();
110106
}
111107

112108
/**
@@ -120,7 +116,7 @@ public void writeToJob(InputStream inputStream, AnalysisRegistry analysisRegistr
120116
}
121117

122118
CountingInputStream countingStream = new CountingInputStream(inputStream, dataCountsReporter);
123-
DataToProcessWriter autodetectWriter = createProcessWriter(params.getDataDescription());
119+
DataToProcessWriter autodetectWriter = createProcessWriter(params.getDataDescription().orElse(job.getDataDescription()));
124120

125121
if (includeTokensField && categorizationAnalyzer == null) {
126122
createCategorizationAnalyzer(analysisRegistry);
@@ -148,7 +144,7 @@ public void writeToJob(InputStream inputStream, AnalysisRegistry analysisRegistr
148144
}
149145

150146
@Override
151-
public void close() throws IOException {
147+
public void close() {
152148
close(false, null);
153149
}
154150

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
2828
import org.elasticsearch.common.xcontent.XContentType;
2929
import org.elasticsearch.core.internal.io.IOUtils;
30-
import org.elasticsearch.env.Environment;
3130
import org.elasticsearch.index.analysis.AnalysisRegistry;
3231
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
3332
import org.elasticsearch.rest.RestStatus;
@@ -94,7 +93,6 @@ public class AutodetectProcessManager implements ClusterStateListener {
9493
private static final Logger logger = LogManager.getLogger(AutodetectProcessManager.class);
9594

9695
private final Client client;
97-
private final Environment environment;
9896
private final ThreadPool threadPool;
9997
private final JobManager jobManager;
10098
private final JobResultsProvider jobResultsProvider;
@@ -117,13 +115,12 @@ public class AutodetectProcessManager implements ClusterStateListener {
117115

118116
private volatile boolean upgradeInProgress;
119117

120-
public AutodetectProcessManager(Environment environment, Settings settings, Client client, ThreadPool threadPool,
118+
public AutodetectProcessManager(Settings settings, Client client, ThreadPool threadPool,
121119
NamedXContentRegistry xContentRegistry, AnomalyDetectionAuditor auditor, ClusterService clusterService,
122120
JobManager jobManager, JobResultsProvider jobResultsProvider, JobResultsPersister jobResultsPersister,
123121
JobDataCountsPersister jobDataCountsPersister, AnnotationPersister annotationPersister,
124122
AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory,
125123
NativeStorageProvider nativeStorageProvider, IndexNameExpressionResolver expressionResolver) {
126-
this.environment = environment;
127124
this.client = client;
128125
this.threadPool = threadPool;
129126
this.xContentRegistry = xContentRegistry;
@@ -336,10 +333,16 @@ public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams
336333
updateProcessMessage.setFilter(filter);
337334

338335
if (updateParams.isUpdateScheduledEvents()) {
339-
jobManager.getJob(jobTask.getJobId(), new ActionListener<Job>() {
336+
jobManager.getJob(jobTask.getJobId(), new ActionListener<>() {
340337
@Override
341338
public void onResponse(Job job) {
342-
DataCounts dataCounts = getStatistics(jobTask).get().v1();
339+
Optional<Tuple<DataCounts, Tuple<ModelSizeStats, TimingStats>>> stats = getStatistics(jobTask);
340+
DataCounts dataCounts;
341+
if (stats.isPresent()) {
342+
dataCounts = stats.get().v1();
343+
} else {
344+
dataCounts = new DataCounts(job.getId());
345+
}
343346
ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder()
344347
.start(job.earliestValidTimestamp(dataCounts));
345348
jobResultsProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener);
@@ -532,7 +535,7 @@ AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodet
532535
}
533536
throw e;
534537
}
535-
return new AutodetectCommunicator(job, environment, process, new StateStreamer(client), dataCountsReporter, processor, handler,
538+
return new AutodetectCommunicator(job, process, new StateStreamer(client), dataCountsReporter, processor, handler,
536539
xContentRegistry, autodetectWorkerExecutor);
537540

538541
}
@@ -683,7 +686,7 @@ public Optional<Duration> jobOpenTime(JobTask jobTask) {
683686

684687
void setJobState(JobTask jobTask, JobState state, String reason) {
685688
JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason);
686-
jobTask.updatePersistentTaskState(jobTaskState, new ActionListener<PersistentTask<?>>() {
689+
jobTask.updatePersistentTaskState(jobTaskState, new ActionListener<>() {
687690
@Override
688691
public void onResponse(PersistentTask<?> persistentTask) {
689692
logger.info("Successfully set job state to [{}] for job [{}]", state, jobTask.getJobId());
@@ -702,7 +705,7 @@ void setJobState(JobTask jobTask, JobState state) {
702705

703706
void setJobState(JobTask jobTask, JobState state, String reason, CheckedConsumer<Exception, IOException> handler) {
704707
JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason);
705-
jobTask.updatePersistentTaskState(jobTaskState, new ActionListener<PersistentTask<?>>() {
708+
jobTask.updatePersistentTaskState(jobTaskState, new ActionListener<>() {
706709
@Override
707710
public void onResponse(PersistentTask<?> persistentTask) {
708711
try {

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,13 @@
6464

6565
public class AutodetectCommunicatorTests extends ESTestCase {
6666

67-
private Environment environment;
6867
private AnalysisRegistry analysisRegistry;
6968
private StateStreamer stateStreamer;
7069

7170
@Before
7271
public void setup() throws Exception {
7372
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build();
74-
environment = TestEnvironment.newEnvironment(settings);
75-
analysisRegistry = CategorizationAnalyzerTests.buildTestAnalysisRegistry(environment);
73+
analysisRegistry = CategorizationAnalyzerTests.buildTestAnalysisRegistry(TestEnvironment.newEnvironment(settings));
7674
stateStreamer = mock(StateStreamer.class);
7775
}
7876

@@ -236,7 +234,7 @@ private AutodetectCommunicator createAutodetectCommunicator(ExecutorService exec
236234
BiConsumer<Exception, Boolean> finishHandler) throws IOException {
237235
DataCountsReporter dataCountsReporter = mock(DataCountsReporter.class);
238236
doNothing().when(dataCountsReporter).finishReporting();
239-
return new AutodetectCommunicator(createJobDetails(), environment, autodetectProcess,
237+
return new AutodetectCommunicator(createJobDetails(), autodetectProcess,
240238
stateStreamer, dataCountsReporter, autodetectResultProcessor, finishHandler,
241239
new NamedXContentRegistry(Collections.emptyList()), executorService);
242240
}

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@
116116
*/
117117
public class AutodetectProcessManagerTests extends ESTestCase {
118118

119-
private Environment environment;
120119
private Client client;
121120
private ThreadPool threadPool;
122121
private AnalysisRegistry analysisRegistry;
@@ -142,14 +141,13 @@ public class AutodetectProcessManagerTests extends ESTestCase {
142141
@Before
143142
public void setup() throws Exception {
144143
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build();
145-
environment = TestEnvironment.newEnvironment(settings);
146144
client = mock(Client.class);
147145

148146
threadPool = mock(ThreadPool.class);
149147
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
150148
when(threadPool.executor(anyString())).thenReturn(EsExecutors.newDirectExecutorService());
151149

152-
analysisRegistry = CategorizationAnalyzerTests.buildTestAnalysisRegistry(environment);
150+
analysisRegistry = CategorizationAnalyzerTests.buildTestAnalysisRegistry(TestEnvironment.newEnvironment(settings));
153151
jobManager = mock(JobManager.class);
154152
jobResultsProvider = mock(JobResultsProvider.class);
155153
jobResultsPersister = mock(JobResultsPersister.class);
@@ -707,7 +705,7 @@ private AutodetectProcessManager createSpyManager(Settings settings) {
707705
}
708706

709707
private AutodetectProcessManager createManager(Settings settings) {
710-
return new AutodetectProcessManager(environment, settings,
708+
return new AutodetectProcessManager(settings,
711709
client, threadPool, new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService, jobManager, jobResultsProvider,
712710
jobResultsPersister, jobDataCountsPersister, annotationPersister, autodetectFactory, normalizerFactory, nativeStorageProvider,
713711
new IndexNameExpressionResolver());

0 commit comments

Comments
 (0)