Skip to content

Commit ae7da16

Browse files
[ML] DF Analytics should always display operational stats (#54210)
This commit populates the _stats API response with sensible "empty" `data_counts` and `memory_usage` objects when the job itself has not started reporting them.
1 parent d14dca9 commit ae7da16

File tree

13 files changed

+132
-30
lines changed

13 files changed

+132
-30
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/stats/common/MemoryUsage.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.elasticsearch.client.ml.dataframe.stats.common;
2020

2121
import org.elasticsearch.client.common.TimeUtil;
22+
import org.elasticsearch.common.Nullable;
2223
import org.elasticsearch.common.ParseField;
2324
import org.elasticsearch.common.inject.internal.ToStringBuilder;
2425
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
@@ -39,21 +40,23 @@ public class MemoryUsage implements ToXContentObject {
3940
true, a -> new MemoryUsage((Instant) a[0], (long) a[1]));
4041

4142
static {
42-
PARSER.declareField(ConstructingObjectParser.constructorArg(),
43+
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
4344
p -> TimeUtil.parseTimeFieldToInstant(p, TIMESTAMP.getPreferredName()),
4445
TIMESTAMP,
4546
ObjectParser.ValueType.VALUE);
4647
PARSER.declareLong(ConstructingObjectParser.constructorArg(), PEAK_USAGE_BYTES);
4748
}
4849

50+
@Nullable
4951
private final Instant timestamp;
5052
private final long peakUsageBytes;
5153

52-
public MemoryUsage(Instant timestamp, long peakUsageBytes) {
53-
this.timestamp = Instant.ofEpochMilli(Objects.requireNonNull(timestamp).toEpochMilli());
54+
public MemoryUsage(@Nullable Instant timestamp, long peakUsageBytes) {
55+
this.timestamp = timestamp == null ? null : Instant.ofEpochMilli(Objects.requireNonNull(timestamp).toEpochMilli());
5456
this.peakUsageBytes = peakUsageBytes;
5557
}
5658

59+
@Nullable
5760
public Instant getTimestamp() {
5861
return timestamp;
5962
}
@@ -65,7 +68,9 @@ public long getPeakUsageBytes() {
6568
@Override
6669
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
6770
builder.startObject();
68-
builder.timeField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.toEpochMilli());
71+
if (timestamp != null) {
72+
builder.timeField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.toEpochMilli());
73+
}
6974
builder.field(PEAK_USAGE_BYTES.getPreferredName(), peakUsageBytes);
7075
builder.endObject();
7176
return builder;
@@ -89,7 +94,7 @@ public int hashCode() {
8994
@Override
9095
public String toString() {
9196
return new ToStringBuilder(getClass())
92-
.add(TIMESTAMP.getPreferredName(), timestamp.getEpochSecond())
97+
.add(TIMESTAMP.getPreferredName(), timestamp == null ? null : timestamp.getEpochSecond())
9398
.add(PEAK_USAGE_BYTES.getPreferredName(), peakUsageBytes)
9499
.toString();
95100
}

client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@
149149
import org.elasticsearch.client.ml.dataframe.evaluation.softclassification.RecallMetric;
150150
import org.elasticsearch.client.ml.dataframe.explain.FieldSelection;
151151
import org.elasticsearch.client.ml.dataframe.explain.MemoryEstimation;
152+
import org.elasticsearch.client.ml.dataframe.stats.common.DataCounts;
152153
import org.elasticsearch.client.ml.filestructurefinder.FileStructure;
153154
import org.elasticsearch.client.ml.inference.InferenceToXContentCompressor;
154155
import org.elasticsearch.client.ml.inference.MlInferenceNamedXContentProvider;
@@ -1531,7 +1532,8 @@ public void testGetDataFrameAnalyticsStats() throws Exception {
15311532
assertThat(progress.get(1), equalTo(new PhaseProgress("loading_data", 0)));
15321533
assertThat(progress.get(2), equalTo(new PhaseProgress("analyzing", 0)));
15331534
assertThat(progress.get(3), equalTo(new PhaseProgress("writing_results", 0)));
1534-
assertThat(stats.getMemoryUsage(), is(nullValue()));
1535+
assertThat(stats.getMemoryUsage().getPeakUsageBytes(), equalTo(0L));
1536+
assertThat(stats.getDataCounts(), equalTo(new DataCounts(0, 0, 0)));
15351537
}
15361538

15371539
public void testStartDataFrameAnalyticsConfig() throws Exception {

client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/stats/common/MemoryUsageTests.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.io.IOException;
2525
import java.time.Instant;
2626

27+
import static org.hamcrest.Matchers.equalTo;
28+
2729
public class MemoryUsageTests extends AbstractXContentTestCase<MemoryUsage> {
2830

2931
@Override
@@ -32,7 +34,7 @@ protected MemoryUsage createTestInstance() {
3234
}
3335

3436
public static MemoryUsage createRandom() {
35-
return new MemoryUsage(Instant.now(), randomNonNegativeLong());
37+
return new MemoryUsage(randomBoolean() ? null : Instant.now(), randomNonNegativeLong());
3638
}
3739

3840
@Override
@@ -44,4 +46,9 @@ protected MemoryUsage doParseInstance(XContentParser parser) throws IOException
4446
protected boolean supportsUnknownFields() {
4547
return true;
4648
}
49+
50+
public void testToString_GivenNullTimestamp() {
51+
MemoryUsage memoryUsage = new MemoryUsage(null, 42L);
52+
assertThat(memoryUsage.toString(), equalTo("MemoryUsage[timestamp=null, peak_usage_bytes=42]"));
53+
}
4754
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
3030
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
3131
import org.elasticsearch.xpack.core.ml.dataframe.stats.AnalysisStats;
32-
import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCounts;
3332
import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsage;
33+
import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCounts;
3434
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
3535
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
3636

@@ -166,10 +166,8 @@ public static class Stats implements ToXContentObject, Writeable {
166166
*/
167167
private final List<PhaseProgress> progress;
168168

169-
@Nullable
170169
private final DataCounts dataCounts;
171170

172-
@Nullable
173171
private final MemoryUsage memoryUsage;
174172

175173
@Nullable
@@ -187,8 +185,8 @@ public Stats(String id, DataFrameAnalyticsState state, @Nullable String failureR
187185
this.state = Objects.requireNonNull(state);
188186
this.failureReason = failureReason;
189187
this.progress = Objects.requireNonNull(progress);
190-
this.dataCounts = dataCounts;
191-
this.memoryUsage = memoryUsage;
188+
this.dataCounts = dataCounts == null ? new DataCounts(id) : dataCounts;
189+
this.memoryUsage = memoryUsage == null ? new MemoryUsage(id) : memoryUsage;
192190
this.analysisStats = analysisStats;
193191
this.node = node;
194192
this.assignmentExplanation = assignmentExplanation;
@@ -204,12 +202,12 @@ public Stats(StreamInput in) throws IOException {
204202
progress = in.readList(PhaseProgress::new);
205203
}
206204
if (in.getVersion().onOrAfter(Version.V_7_7_0)) {
207-
dataCounts = in.readOptionalWriteable(DataCounts::new);
205+
dataCounts = new DataCounts(in);
208206
} else {
209207
dataCounts = null;
210208
}
211209
if (in.getVersion().onOrAfter(Version.V_7_7_0)) {
212-
memoryUsage = in.readOptionalWriteable(MemoryUsage::new);
210+
memoryUsage = new MemoryUsage(in);
213211
} else {
214212
memoryUsage = null;
215213
}
@@ -276,11 +274,14 @@ public DataCounts getDataCounts() {
276274
return dataCounts;
277275
}
278276

279-
@Nullable
280277
public MemoryUsage getMemoryUsage() {
281278
return memoryUsage;
282279
}
283280

281+
public AnalysisStats getAnalysisStats() {
282+
return analysisStats;
283+
}
284+
284285
public DiscoveryNode getNode() {
285286
return node;
286287
}
@@ -308,12 +309,8 @@ public XContentBuilder toUnwrappedXContent(XContentBuilder builder) throws IOExc
308309
if (progress != null) {
309310
builder.field("progress", progress);
310311
}
311-
if (dataCounts != null) {
312-
builder.field("data_counts", dataCounts);
313-
}
314-
if (memoryUsage != null) {
315-
builder.field("memory_usage", memoryUsage);
316-
}
312+
builder.field("data_counts", dataCounts);
313+
builder.field("memory_usage", memoryUsage);
317314
if (analysisStats != null) {
318315
builder.startObject("analysis_stats");
319316
builder.field(analysisStats.getWriteableName(), analysisStats);
@@ -350,10 +347,10 @@ public void writeTo(StreamOutput out) throws IOException {
350347
out.writeList(progress);
351348
}
352349
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
353-
out.writeOptionalWriteable(dataCounts);
350+
dataCounts.writeTo(out);
354351
}
355352
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
356-
out.writeOptionalWriteable(memoryUsage);
353+
memoryUsage.writeTo(out);
357354
}
358355
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
359356
out.writeOptionalNamedWriteable(analysisStats);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/stats/MemoryUsage.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,27 +46,38 @@ private static ConstructingObjectParser<MemoryUsage, Void> createParser(boolean
4646
}
4747

4848
private final String jobId;
49+
/**
50+
* timestamp may only be null when we construct a zero usage object
51+
*/
4952
private final Instant timestamp;
5053
private final long peakUsageBytes;
5154

55+
/**
56+
* Creates a zero usage object
57+
*/
58+
public MemoryUsage(String jobId) {
59+
this(jobId, null, 0);
60+
}
61+
5262
public MemoryUsage(String jobId, Instant timestamp, long peakUsageBytes) {
5363
this.jobId = Objects.requireNonNull(jobId);
5464
// We intend to store this timestamp in millis granularity. Thus we're rounding here to ensure
5565
// internal representation matches toXContent
56-
this.timestamp = Instant.ofEpochMilli(ExceptionsHelper.requireNonNull(timestamp, Fields.TIMESTAMP).toEpochMilli());
66+
this.timestamp = timestamp == null ? null : Instant.ofEpochMilli(
67+
ExceptionsHelper.requireNonNull(timestamp, Fields.TIMESTAMP).toEpochMilli());
5768
this.peakUsageBytes = peakUsageBytes;
5869
}
5970

6071
public MemoryUsage(StreamInput in) throws IOException {
6172
jobId = in.readString();
62-
timestamp = in.readInstant();
73+
timestamp = in.readOptionalInstant();
6374
peakUsageBytes = in.readVLong();
6475
}
6576

6677
@Override
6778
public void writeTo(StreamOutput out) throws IOException {
6879
out.writeString(jobId);
69-
out.writeInstant(timestamp);
80+
out.writeOptionalInstant(timestamp);
7081
out.writeVLong(peakUsageBytes);
7182
}
7283

@@ -77,7 +88,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
7788
builder.field(Fields.TYPE.getPreferredName(), TYPE_VALUE);
7889
builder.field(Fields.JOB_ID.getPreferredName(), jobId);
7990
}
80-
builder.timeField(Fields.TIMESTAMP.getPreferredName(), Fields.TIMESTAMP.getPreferredName() + "_string", timestamp.toEpochMilli());
91+
if (timestamp != null) {
92+
builder.timeField(Fields.TIMESTAMP.getPreferredName(), Fields.TIMESTAMP.getPreferredName() + "_string",
93+
timestamp.toEpochMilli());
94+
}
8195
builder.field(PEAK_USAGE_BYTES.getPreferredName(), peakUsageBytes);
8296
builder.endObject();
8397
return builder;
@@ -105,6 +119,7 @@ public String toString() {
105119
}
106120

107121
public String documentId(String jobId) {
122+
assert timestamp != null;
108123
return documentIdPrefix(jobId) + timestamp.toEpochMilli();
109124
}
110125

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/stats/common/DataCounts.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ private static ConstructingObjectParser<DataCounts, Void> createParser(boolean i
4646
private final long testDocsCount;
4747
private final long skippedDocsCount;
4848

49+
public DataCounts(String jobId) {
50+
this(jobId, 0, 0, 0);
51+
}
52+
4953
public DataCounts(String jobId, long trainingDocsCount, long testDocsCount, long skippedDocsCount) {
5054
this.jobId = Objects.requireNonNull(jobId);
5155
this.trainingDocsCount = trainingDocsCount;

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsActionResponseTests.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,22 @@
1414
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
1515
import org.elasticsearch.xpack.core.ml.dataframe.stats.AnalysisStats;
1616
import org.elasticsearch.xpack.core.ml.dataframe.stats.AnalysisStatsNamedWriteablesProvider;
17-
import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCounts;
18-
import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCountsTests;
1917
import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsage;
2018
import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsageTests;
2119
import org.elasticsearch.xpack.core.ml.dataframe.stats.classification.ClassificationStatsTests;
20+
import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCounts;
21+
import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCountsTests;
2222
import org.elasticsearch.xpack.core.ml.dataframe.stats.outlierdetection.OutlierDetectionStatsTests;
2323
import org.elasticsearch.xpack.core.ml.dataframe.stats.regression.RegressionStatsTests;
2424
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
2525

2626
import java.util.ArrayList;
27+
import java.util.Collections;
2728
import java.util.List;
2829
import java.util.stream.IntStream;
2930

31+
import static org.hamcrest.Matchers.equalTo;
32+
3033
public class GetDataFrameAnalyticsStatsActionResponseTests extends AbstractWireSerializingTestCase<Response> {
3134

3235
@Override
@@ -69,4 +72,20 @@ protected Response createTestInstance() {
6972
protected Writeable.Reader<Response> instanceReader() {
7073
return Response::new;
7174
}
75+
76+
public void testStats_GivenNulls() {
77+
Response.Stats stats = new Response.Stats(randomAlphaOfLength(10),
78+
randomFrom(DataFrameAnalyticsState.values()),
79+
null,
80+
Collections.emptyList(),
81+
null,
82+
null,
83+
null,
84+
null,
85+
null
86+
);
87+
88+
assertThat(stats.getDataCounts(), equalTo(new DataCounts(stats.getId())));
89+
assertThat(stats.getMemoryUsage(), equalTo(new MemoryUsage(stats.getId())));
90+
}
7291
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/stats/MemoryUsageTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package org.elasticsearch.xpack.core.ml.dataframe.stats;
77

8+
import org.elasticsearch.common.Strings;
89
import org.elasticsearch.common.io.stream.Writeable;
910
import org.elasticsearch.common.xcontent.ToXContent;
1011
import org.elasticsearch.common.xcontent.XContentParser;
@@ -16,6 +17,8 @@
1617
import java.time.Instant;
1718
import java.util.Collections;
1819

20+
import static org.hamcrest.Matchers.equalTo;
21+
1922
public class MemoryUsageTests extends AbstractSerializingTestCase<MemoryUsage> {
2023

2124
private boolean lenient;
@@ -53,4 +56,10 @@ protected Writeable.Reader<MemoryUsage> instanceReader() {
5356
protected MemoryUsage createTestInstance() {
5457
return createRandom();
5558
}
59+
60+
public void testZeroUsage() {
61+
MemoryUsage memoryUsage = new MemoryUsage("zero_usage_job");
62+
String asJson = Strings.toString(memoryUsage);
63+
assertThat(asJson, equalTo("{\"peak_usage_bytes\":0}"));
64+
}
5665
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@
4242
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
4343
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
4444
import org.elasticsearch.xpack.core.ml.dataframe.stats.AnalysisStats;
45-
import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCounts;
4645
import org.elasticsearch.xpack.core.ml.dataframe.stats.Fields;
4746
import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsage;
4847
import org.elasticsearch.xpack.core.ml.dataframe.stats.classification.ClassificationStats;
48+
import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCounts;
4949
import org.elasticsearch.xpack.core.ml.dataframe.stats.outlierdetection.OutlierDetectionStats;
5050
import org.elasticsearch.xpack.core.ml.dataframe.stats.regression.RegressionStats;
5151
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@@ -140,6 +140,7 @@ protected void doExecute(Task task, GetDataFrameAnalyticsStatsAction.Request req
140140
runningTasksStatsResponse -> gatherStatsForStoppedTasks(request.getExpandedIds(), runningTasksStatsResponse,
141141
ActionListener.wrap(
142142
finalResponse -> {
143+
143144
// While finalResponse has all the stats objects we need, we should report the count
144145
// from the get response
145146
QueryPage<Stats> finalStats = new QueryPage<>(finalResponse.getResponse().results(),

x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -902,6 +902,36 @@ setup:
902902
- match: { data_frame_analytics.0.id: "foo-1" }
903903
- match: { data_frame_analytics.0.state: "stopped" }
904904

905+
---
906+
"Test get stats on newly created congig":
907+
908+
- do:
909+
ml.put_data_frame_analytics:
910+
id: "foo-1"
911+
body: >
912+
{
913+
"source": {
914+
"index": "index-source"
915+
},
916+
"dest": {
917+
"index": "index-foo-1_dest"
918+
},
919+
"analysis": {"outlier_detection":{}}
920+
}
921+
- match: { id: "foo-1" }
922+
923+
- do:
924+
ml.get_data_frame_analytics_stats:
925+
id: "foo-1"
926+
- match: { count: 1 }
927+
- length: { data_frame_analytics: 1 }
928+
- match: { data_frame_analytics.0.id: "foo-1" }
929+
- match: { data_frame_analytics.0.state: "stopped" }
930+
- match: { data_frame_analytics.0.data_counts.training_docs_count: 0 }
931+
- match: { data_frame_analytics.0.data_counts.test_docs_count: 0 }
932+
- match: { data_frame_analytics.0.data_counts.skipped_docs_count: 0 }
933+
- match: { data_frame_analytics.0.memory_usage.peak_usage_bytes: 0 }
934+
905935
---
906936
"Test delete given stopped config":
907937

0 commit comments

Comments
 (0)