Skip to content

Commit 79a79c7

Browse files
committed
Simplifying implementation by gating scheduling
1 parent cd751df commit 79a79c7

File tree

18 files changed

+73
-185
lines changed

18 files changed

+73
-185
lines changed

x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.elasticsearch.xpack.core.monitoring.MonitoringField;
3737
import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkAction;
3838
import org.elasticsearch.xpack.core.ssl.SSLService;
39-
import org.elasticsearch.xpack.monitoring.MonitoringService;
4039
import org.elasticsearch.xpack.monitoring.action.TransportMonitoringBulkAction;
4140
import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
4241
import org.elasticsearch.xpack.monitoring.collector.Collector;
@@ -137,19 +136,16 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
137136
final Exporters exporters = new Exporters(settings, exporterFactories, clusterService, getLicenseState(),
138137
threadPool.getThreadContext());
139138

140-
final MonitoringService monitoringService = new MonitoringService(settings, clusterService, threadPool, exporters);
141-
142139
Set<Collector> collectors = new HashSet<>();
143-
collectors.add(new IndexStatsCollector(settings, clusterService, monitoringService, getLicenseState(), client));
144-
collectors.add(new ClusterStatsCollector(settings, clusterService, monitoringService, getLicenseState(), client,
145-
getLicenseService()));
146-
collectors.add(new ShardsCollector(settings, clusterService, monitoringService, getLicenseState()));
147-
collectors.add(new NodeStatsCollector(settings, clusterService, monitoringService, getLicenseState(), client));
148-
collectors.add(new IndexRecoveryCollector(settings, clusterService, monitoringService, getLicenseState(), client));
149-
collectors.add(new JobStatsCollector(settings, clusterService, monitoringService, getLicenseState(), client));
150-
collectors.add(new CcrStatsCollector(settings, clusterService, monitoringService, getLicenseState(), client));
151-
152-
monitoringService.addCollectors(collectors);
140+
collectors.add(new IndexStatsCollector(settings, clusterService, getLicenseState(), client));
141+
collectors.add(new ClusterStatsCollector(settings, clusterService, getLicenseState(), client, getLicenseService()));
142+
collectors.add(new ShardsCollector(settings, clusterService, getLicenseState()));
143+
collectors.add(new NodeStatsCollector(settings, clusterService, getLicenseState(), client));
144+
collectors.add(new IndexRecoveryCollector(settings, clusterService, getLicenseState(), client));
145+
collectors.add(new JobStatsCollector(settings, clusterService, getLicenseState(), client));
146+
collectors.add(new CcrStatsCollector(settings, clusterService, getLicenseState(), client));
147+
148+
final MonitoringService monitoringService = new MonitoringService(settings, clusterService, threadPool, collectors, exporters);
153149

154150
return Arrays.asList(monitoringService, exporters, cleanerService);
155151
}

x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/MonitoringService.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.io.Closeable;
2525
import java.util.ArrayList;
2626
import java.util.Collection;
27-
import java.util.HashSet;
2827
import java.util.Objects;
2928
import java.util.Set;
3029
import java.util.concurrent.Semaphore;
@@ -44,9 +43,14 @@ public class MonitoringService extends AbstractLifecycleComponent {
4443
*/
4544
public static final TimeValue MIN_INTERVAL = TimeValue.timeValueSeconds(1L);
4645

47-
/**
46+
/*
4847
* Dynamically controls enabling or disabling the collection of Monitoring data only from Elasticsearch.
49-
*/
48+
* <p>
49+
* This should only be used while transitioning to Metricbeat-based data collection for Elasticsearch with
50+
* {@linkplain #ENABLED} set to {@code true}. By setting this to {@code false} and that value to {@code true},
51+
* Kibana, Logstash, Beats, and APM Server can all continue to report their stats through this cluster until they
52+
* are transitioned to being monitored by Metricbeat as well.
53+
*/
5054
public static final Setting<Boolean> ELASTICSEARCH_COLLECTION_ENABLED =
5155
Setting.boolSetting("xpack.monitoring.elasticsearch.collection.enabled", true,
5256
Setting.Property.Dynamic, Setting.Property.NodeScope);
@@ -82,11 +86,12 @@ public class MonitoringService extends AbstractLifecycleComponent {
8286
private volatile TimeValue interval;
8387
private volatile ThreadPool.Cancellable scheduler;
8488

85-
MonitoringService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Exporters exporters) {
89+
MonitoringService(Settings settings, ClusterService clusterService, ThreadPool threadPool,
90+
Set<Collector> collectors, Exporters exporters) {
8691
super(settings);
8792
this.clusterService = Objects.requireNonNull(clusterService);
8893
this.threadPool = Objects.requireNonNull(threadPool);
89-
this.collectors = new HashSet<Collector>();
94+
this.collectors = Objects.requireNonNull(collectors);
9095
this.exporters = Objects.requireNonNull(exporters);
9196
this.elasticsearchCollectionEnabled = ELASTICSEARCH_COLLECTION_ENABLED.get(settings);
9297
this.enabled = ENABLED.get(settings);
@@ -113,11 +118,6 @@ void setInterval(final TimeValue interval) {
113118
scheduleExecution();
114119
}
115120

116-
void addCollectors(Set<Collector> collectors) {
117-
this.collectors.addAll(Objects.requireNonNull(collectors));
118-
scheduleExecution();
119-
}
120-
121121
public TimeValue getInterval() {
122122
return interval;
123123
}
@@ -130,6 +130,10 @@ public boolean isElasticsearchCollectionEnabled() {
130130
return this.elasticsearchCollectionEnabled;
131131
}
132132

133+
public boolean shouldScheduleExecution() {
134+
return isElasticsearchCollectionEnabled() && isMonitoringActive();
135+
}
136+
133137
private String threadPoolName() {
134138
return ThreadPool.Names.GENERIC;
135139
}
@@ -181,7 +185,7 @@ void scheduleExecution() {
181185
if (scheduler != null) {
182186
cancelExecution();
183187
}
184-
if (isMonitoringActive()) {
188+
if (shouldScheduleExecution()) {
185189
scheduler = threadPool.scheduleWithFixedDelay(monitor, interval, threadPoolName());
186190
}
187191
}
@@ -214,7 +218,7 @@ class MonitoringExecution extends AbstractRunnable implements Closeable {
214218

215219
@Override
216220
public void doRun() {
217-
if (isMonitoringActive() == false) {
221+
if (shouldScheduleExecution() == false) {
218222
logger.debug("monitoring execution is skipped");
219223
return;
220224
}
@@ -249,7 +253,7 @@ protected void doRun() throws Exception {
249253
new ParameterizedMessage("monitoring collector [{}] failed to collect data", collector.name()), e);
250254
}
251255
}
252-
if (isMonitoringActive()) {
256+
if (shouldScheduleExecution()) {
253257
exporters.export(results, ActionListener.wrap(r -> semaphore.release(), this::onFailure));
254258
} else {
255259
semaphore.release();

x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/Collector.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.elasticsearch.license.XPackLicenseState;
2121
import org.elasticsearch.xpack.core.XPackField;
2222
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
23-
import org.elasticsearch.xpack.monitoring.MonitoringService;
2423

2524
import java.util.Collection;
2625
import java.util.List;
@@ -45,18 +44,15 @@ public abstract class Collector extends AbstractComponent {
4544

4645
private final String name;
4746
private final Setting<TimeValue> collectionTimeoutSetting;
48-
private final MonitoringService monitoringService;
4947

5048
protected final ClusterService clusterService;
5149
protected final XPackLicenseState licenseState;
5250

5351
public Collector(final Settings settings, final String name, final ClusterService clusterService,
54-
final MonitoringService monitoringService, final Setting<TimeValue> timeoutSetting,
55-
final XPackLicenseState licenseState) {
52+
final Setting<TimeValue> timeoutSetting, final XPackLicenseState licenseState) {
5653
super(settings);
5754
this.name = name;
5855
this.clusterService = clusterService;
59-
this.monitoringService = monitoringService;
6056
this.collectionTimeoutSetting = timeoutSetting;
6157
this.licenseState = licenseState;
6258
}
@@ -80,13 +76,6 @@ protected boolean shouldCollect(final boolean isElectedMaster) {
8076
logger.trace("collector [{}] can not collect data due to invalid license", name());
8177
return false;
8278
}
83-
84-
// TODO: Check if Elasticsearch collection is enabled
85-
if (this.monitoringService.isElasticsearchCollectionEnabled() == false) {
86-
logger.trace("collector [{}] will not collect data because elasticsearch collection is disabled", name());
87-
return false;
88-
}
89-
9079
return true;
9180
}
9281

x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollector.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
2121
import org.elasticsearch.xpack.core.ccr.client.CcrClient;
2222
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
23-
import org.elasticsearch.xpack.monitoring.MonitoringService;
2423
import org.elasticsearch.xpack.monitoring.collector.Collector;
2524

2625
import java.util.Collection;
@@ -40,20 +39,18 @@ public class CcrStatsCollector extends Collector {
4039
public CcrStatsCollector(
4140
final Settings settings,
4241
final ClusterService clusterService,
43-
final MonitoringService monitoringService,
4442
final XPackLicenseState licenseState,
4543
final Client client) {
46-
this(settings, clusterService, monitoringService, licenseState, new XPackClient(client).ccr(), client.threadPool().getThreadContext());
44+
this(settings, clusterService, licenseState, new XPackClient(client).ccr(), client.threadPool().getThreadContext());
4745
}
4846

4947
CcrStatsCollector(
5048
final Settings settings,
5149
final ClusterService clusterService,
52-
final MonitoringService monitoringService,
5350
final XPackLicenseState licenseState,
5451
final CcrClient ccrClient,
5552
final ThreadContext threadContext) {
56-
super(settings, TYPE, clusterService, monitoringService, CCR_STATS_TIMEOUT, licenseState);
53+
super(settings, TYPE, clusterService, CCR_STATS_TIMEOUT, licenseState);
5754
this.ccrClient = ccrClient;
5855
this.threadContext = threadContext;
5956
}

x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsCollector.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.elasticsearch.xpack.core.XPackFeatureSet;
2929
import org.elasticsearch.xpack.core.action.XPackUsageRequestBuilder;
3030
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
31-
import org.elasticsearch.xpack.monitoring.MonitoringService;
3231
import org.elasticsearch.xpack.monitoring.collector.Collector;
3332

3433
import java.util.Collection;
@@ -62,22 +61,19 @@ public class ClusterStatsCollector extends Collector {
6261

6362
public ClusterStatsCollector(final Settings settings,
6463
final ClusterService clusterService,
65-
final MonitoringService monitoringService,
6664
final XPackLicenseState licenseState,
6765
final Client client,
6866
final LicenseService licenseService) {
69-
this(settings, clusterService, monitoringService, licenseState, client, licenseService,
70-
new IndexNameExpressionResolver(Settings.EMPTY));
67+
this(settings, clusterService, licenseState, client, licenseService, new IndexNameExpressionResolver(Settings.EMPTY));
7168
}
7269

7370
ClusterStatsCollector(final Settings settings,
7471
final ClusterService clusterService,
75-
final MonitoringService monitoringService,
7672
final XPackLicenseState licenseState,
7773
final Client client,
7874
final LicenseService licenseService,
7975
final IndexNameExpressionResolver indexNameExpressionResolver) {
80-
super(settings, ClusterStatsMonitoringDoc.TYPE, clusterService, monitoringService, CLUSTER_STATS_TIMEOUT, licenseState);
76+
super(settings, ClusterStatsMonitoringDoc.TYPE, clusterService, CLUSTER_STATS_TIMEOUT, licenseState);
8177

8278
this.client = client;
8379
this.licenseService = licenseService;
@@ -87,7 +83,7 @@ public ClusterStatsCollector(final Settings settings,
8783
@Override
8884
protected boolean shouldCollect(final boolean isElectedMaster) {
8985
// This collector can always collect data on the master node
90-
return isElectedMaster && super.shouldCollect(isElectedMaster);
86+
return isElectedMaster;
9187
}
9288

9389
@Override

x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexRecoveryCollector.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.elasticsearch.common.unit.TimeValue;
1616
import org.elasticsearch.license.XPackLicenseState;
1717
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
18-
import org.elasticsearch.xpack.monitoring.MonitoringService;
1918
import org.elasticsearch.xpack.monitoring.collector.Collector;
2019

2120
import java.util.ArrayList;
@@ -49,11 +48,10 @@ public class IndexRecoveryCollector extends Collector {
4948

5049
public IndexRecoveryCollector(final Settings settings,
5150
final ClusterService clusterService,
52-
final MonitoringService monitoringService,
5351
final XPackLicenseState licenseState,
5452
final Client client) {
5553

56-
super(settings, IndexRecoveryMonitoringDoc.TYPE, clusterService, monitoringService, INDEX_RECOVERY_TIMEOUT, licenseState);
54+
super(settings, IndexRecoveryMonitoringDoc.TYPE, clusterService, INDEX_RECOVERY_TIMEOUT, licenseState);
5755
this.client = Objects.requireNonNull(client);
5856
}
5957

x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollector.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.common.unit.TimeValue;
1919
import org.elasticsearch.license.XPackLicenseState;
2020
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
21-
import org.elasticsearch.xpack.monitoring.MonitoringService;
2221
import org.elasticsearch.xpack.monitoring.collector.Collector;
2322

2423
import java.util.ArrayList;
@@ -43,10 +42,9 @@ public class IndexStatsCollector extends Collector {
4342

4443
public IndexStatsCollector(final Settings settings,
4544
final ClusterService clusterService,
46-
final MonitoringService monitoringService,
4745
final XPackLicenseState licenseState,
4846
final Client client) {
49-
super(settings, "index-stats", clusterService, monitoringService, INDEX_STATS_TIMEOUT, licenseState);
47+
super(settings, "index-stats", clusterService, INDEX_STATS_TIMEOUT, licenseState);
5048
this.client = client;
5149
}
5250

x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsCollector.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
2020
import org.elasticsearch.xpack.core.ml.client.MachineLearningClient;
2121
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
22-
import org.elasticsearch.xpack.monitoring.MonitoringService;
2322
import org.elasticsearch.xpack.monitoring.collector.Collector;
2423

2524
import java.util.List;
@@ -46,15 +45,14 @@ public class JobStatsCollector extends Collector {
4645
private final ThreadContext threadContext;
4746
private final MachineLearningClient client;
4847

49-
public JobStatsCollector(final Settings settings, final ClusterService clusterService, final MonitoringService monitoringService,
48+
public JobStatsCollector(final Settings settings, final ClusterService clusterService,
5049
final XPackLicenseState licenseState, final Client client) {
51-
this(settings, clusterService, monitoringService, licenseState, new XPackClient(client).machineLearning(),
52-
client.threadPool().getThreadContext());
50+
this(settings, clusterService, licenseState, new XPackClient(client).machineLearning(), client.threadPool().getThreadContext());
5351
}
5452

55-
JobStatsCollector(final Settings settings, final ClusterService clusterService, final MonitoringService monitoringService,
53+
JobStatsCollector(final Settings settings, final ClusterService clusterService,
5654
final XPackLicenseState licenseState, final MachineLearningClient client, final ThreadContext threadContext) {
57-
super(settings, JobStatsMonitoringDoc.TYPE, clusterService, monitoringService, JOB_STATS_TIMEOUT, licenseState);
55+
super(settings, JobStatsMonitoringDoc.TYPE, clusterService, JOB_STATS_TIMEOUT, licenseState);
5856
this.client = client;
5957
this.threadContext = threadContext;
6058
}

x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollector.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.common.unit.TimeValue;
1919
import org.elasticsearch.license.XPackLicenseState;
2020
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
21-
import org.elasticsearch.xpack.monitoring.MonitoringService;
2221
import org.elasticsearch.xpack.monitoring.collector.Collector;
2322

2423
import java.util.Collection;
@@ -52,11 +51,10 @@ public class NodeStatsCollector extends Collector {
5251

5352
public NodeStatsCollector(final Settings settings,
5453
final ClusterService clusterService,
55-
final MonitoringService monitoringService,
5654
final XPackLicenseState licenseState,
5755
final Client client) {
5856

59-
super(settings, NodeStatsMonitoringDoc.TYPE, clusterService, monitoringService, NODE_STATS_TIMEOUT, licenseState);
57+
super(settings, NodeStatsMonitoringDoc.TYPE, clusterService, NODE_STATS_TIMEOUT, licenseState);
6058
this.client = Objects.requireNonNull(client);
6159
}
6260

x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/shards/ShardsCollector.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.elasticsearch.common.settings.Settings;
1515
import org.elasticsearch.license.XPackLicenseState;
1616
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
17-
import org.elasticsearch.xpack.monitoring.MonitoringService;
1817
import org.elasticsearch.xpack.monitoring.collector.Collector;
1918

2019
import java.util.ArrayList;
@@ -33,10 +32,9 @@ public class ShardsCollector extends Collector {
3332

3433
public ShardsCollector(final Settings settings,
3534
final ClusterService clusterService,
36-
final MonitoringService monitoringService,
3735
final XPackLicenseState licenseState) {
3836

39-
super(settings, ShardMonitoringDoc.TYPE, clusterService, monitoringService, null, licenseState);
37+
super(settings, ShardMonitoringDoc.TYPE, clusterService, null, licenseState);
4038
}
4139

4240
@Override

0 commit comments

Comments
 (0)