Skip to content

Commit a40ccd9

Browse files
author
David Roberts
authored
[ML] Improve response format of data frame stats endpoint (#44350)
This change adjusts the data frame transforms stats endpoint to return a structure that is easier to understand. This is a breaking change for clients of the data frame transforms stats endpoint, but the feature is in beta so stability is not guaranteed. Closes #43767
1 parent 5cdbefd commit a40ccd9

File tree

59 files changed

+1403
-1132
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+1403
-1132
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/GetDataFrameTransformStatsResponse.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import org.elasticsearch.ElasticsearchException;
2323
import org.elasticsearch.action.TaskOperationFailure;
24-
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats;
24+
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStats;
2525
import org.elasticsearch.common.Nullable;
2626
import org.elasticsearch.common.ParseField;
2727
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
@@ -42,11 +42,11 @@ public class GetDataFrameTransformStatsResponse {
4242
@SuppressWarnings("unchecked")
4343
static final ConstructingObjectParser<GetDataFrameTransformStatsResponse, Void> PARSER = new ConstructingObjectParser<>(
4444
"get_data_frame_transform_stats_response", true,
45-
args -> new GetDataFrameTransformStatsResponse((List<DataFrameTransformStateAndStats>) args[0],
45+
args -> new GetDataFrameTransformStatsResponse((List<DataFrameTransformStats>) args[0],
4646
(List<TaskOperationFailure>) args[1], (List<ElasticsearchException>) args[2]));
4747

4848
static {
49-
PARSER.declareObjectArray(constructorArg(), DataFrameTransformStateAndStats.PARSER::apply, TRANSFORMS);
49+
PARSER.declareObjectArray(constructorArg(), DataFrameTransformStats.PARSER::apply, TRANSFORMS);
5050
// Discard the count field which is the size of the transforms array
5151
PARSER.declareInt((a, b) -> {}, COUNT);
5252
PARSER.declareObjectArray(optionalConstructorArg(), (p, c) -> TaskOperationFailure.fromXContent(p),
@@ -59,20 +59,20 @@ public static GetDataFrameTransformStatsResponse fromXContent(final XContentPars
5959
return GetDataFrameTransformStatsResponse.PARSER.apply(parser, null);
6060
}
6161

62-
private final List<DataFrameTransformStateAndStats> transformsStateAndStats;
62+
private final List<DataFrameTransformStats> transformsStats;
6363
private final List<TaskOperationFailure> taskFailures;
6464
private final List<ElasticsearchException> nodeFailures;
6565

66-
public GetDataFrameTransformStatsResponse(List<DataFrameTransformStateAndStats> transformsStateAndStats,
66+
public GetDataFrameTransformStatsResponse(List<DataFrameTransformStats> transformsStats,
6767
@Nullable List<TaskOperationFailure> taskFailures,
6868
@Nullable List<? extends ElasticsearchException> nodeFailures) {
69-
this.transformsStateAndStats = transformsStateAndStats;
69+
this.transformsStats = transformsStats;
7070
this.taskFailures = taskFailures == null ? Collections.emptyList() : Collections.unmodifiableList(taskFailures);
7171
this.nodeFailures = nodeFailures == null ? Collections.emptyList() : Collections.unmodifiableList(nodeFailures);
7272
}
7373

74-
public List<DataFrameTransformStateAndStats> getTransformsStateAndStats() {
75-
return transformsStateAndStats;
74+
public List<DataFrameTransformStats> getTransformsStats() {
75+
return transformsStats;
7676
}
7777

7878
public List<ElasticsearchException> getNodeFailures() {
@@ -85,7 +85,7 @@ public List<TaskOperationFailure> getTaskFailures() {
8585

8686
@Override
8787
public int hashCode() {
88-
return Objects.hash(transformsStateAndStats, nodeFailures, taskFailures);
88+
return Objects.hash(transformsStats, nodeFailures, taskFailures);
8989
}
9090

9191
@Override
@@ -99,7 +99,7 @@ public boolean equals(Object other) {
9999
}
100100

101101
final GetDataFrameTransformStatsResponse that = (GetDataFrameTransformStatsResponse) other;
102-
return Objects.equals(this.transformsStateAndStats, that.transformsStateAndStats)
102+
return Objects.equals(this.transformsStats, that.transformsStats)
103103
&& Objects.equals(this.nodeFailures, that.nodeFailures)
104104
&& Objects.equals(this.taskFailures, that.taskFailures);
105105
}

client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointStats.java

Lines changed: 57 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,48 +19,86 @@
1919

2020
package org.elasticsearch.client.dataframe.transforms;
2121

22+
import org.elasticsearch.client.core.IndexerState;
2223
import org.elasticsearch.common.ParseField;
23-
import org.elasticsearch.common.io.stream.StreamInput;
2424
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
25+
import org.elasticsearch.common.xcontent.ObjectParser;
2526
import org.elasticsearch.common.xcontent.XContentParser;
2627

2728
import java.io.IOException;
2829
import java.util.Objects;
2930

31+
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
32+
3033
public class DataFrameTransformCheckpointStats {
34+
35+
public static final ParseField CHECKPOINT = new ParseField("checkpoint");
36+
public static final ParseField INDEXER_STATE = new ParseField("indexer_state");
37+
public static final ParseField POSITION = new ParseField("position");
38+
public static final ParseField CHECKPOINT_PROGRESS = new ParseField("checkpoint_progress");
3139
public static final ParseField TIMESTAMP_MILLIS = new ParseField("timestamp_millis");
3240
public static final ParseField TIME_UPPER_BOUND_MILLIS = new ParseField("time_upper_bound_millis");
3341

34-
public static DataFrameTransformCheckpointStats EMPTY = new DataFrameTransformCheckpointStats(0L, 0L);
42+
public static final DataFrameTransformCheckpointStats EMPTY = new DataFrameTransformCheckpointStats(0L, null, null, null, 0L, 0L);
3543

44+
private final long checkpoint;
45+
private final IndexerState indexerState;
46+
private final DataFrameIndexerPosition position;
47+
private final DataFrameTransformProgress checkpointProgress;
3648
private final long timestampMillis;
3749
private final long timeUpperBoundMillis;
3850

3951
public static final ConstructingObjectParser<DataFrameTransformCheckpointStats, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
4052
"data_frame_transform_checkpoint_stats", true, args -> {
41-
long timestamp = args[0] == null ? 0L : (Long) args[0];
42-
long timeUpperBound = args[1] == null ? 0L : (Long) args[1];
53+
long checkpoint = args[0] == null ? 0L : (Long) args[0];
54+
IndexerState indexerState = (IndexerState) args[1];
55+
DataFrameIndexerPosition position = (DataFrameIndexerPosition) args[2];
56+
DataFrameTransformProgress checkpointProgress = (DataFrameTransformProgress) args[3];
57+
long timestamp = args[4] == null ? 0L : (Long) args[4];
58+
long timeUpperBound = args[5] == null ? 0L : (Long) args[5];
4359

44-
return new DataFrameTransformCheckpointStats(timestamp, timeUpperBound);
45-
});
60+
return new DataFrameTransformCheckpointStats(checkpoint, indexerState, position, checkpointProgress, timestamp, timeUpperBound);
61+
});
4662

4763
static {
48-
LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), TIMESTAMP_MILLIS);
49-
LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), TIME_UPPER_BOUND_MILLIS);
64+
LENIENT_PARSER.declareLong(optionalConstructorArg(), CHECKPOINT);
65+
LENIENT_PARSER.declareField(optionalConstructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE,
66+
ObjectParser.ValueType.STRING);
67+
LENIENT_PARSER.declareObject(optionalConstructorArg(), DataFrameIndexerPosition.PARSER, POSITION);
68+
LENIENT_PARSER.declareObject(optionalConstructorArg(), DataFrameTransformProgress.PARSER, CHECKPOINT_PROGRESS);
69+
LENIENT_PARSER.declareLong(optionalConstructorArg(), TIMESTAMP_MILLIS);
70+
LENIENT_PARSER.declareLong(optionalConstructorArg(), TIME_UPPER_BOUND_MILLIS);
5071
}
5172

5273
public static DataFrameTransformCheckpointStats fromXContent(XContentParser parser) throws IOException {
5374
return LENIENT_PARSER.parse(parser, null);
5475
}
5576

56-
public DataFrameTransformCheckpointStats(final long timestampMillis, final long timeUpperBoundMillis) {
77+
public DataFrameTransformCheckpointStats(final long checkpoint, final IndexerState indexerState,
78+
final DataFrameIndexerPosition position, final DataFrameTransformProgress checkpointProgress,
79+
final long timestampMillis, final long timeUpperBoundMillis) {
80+
this.checkpoint = checkpoint;
81+
this.indexerState = indexerState;
82+
this.position = position;
83+
this.checkpointProgress = checkpointProgress;
5784
this.timestampMillis = timestampMillis;
5885
this.timeUpperBoundMillis = timeUpperBoundMillis;
5986
}
6087

61-
public DataFrameTransformCheckpointStats(StreamInput in) throws IOException {
62-
this.timestampMillis = in.readLong();
63-
this.timeUpperBoundMillis = in.readLong();
88+
public long getCheckpoint() {
89+
return checkpoint;
90+
}
91+
92+
public IndexerState getIndexerState() {
93+
return indexerState;
94+
}
95+
96+
public DataFrameIndexerPosition getPosition() {
97+
return position;
98+
}
99+
100+
public DataFrameTransformProgress getCheckpointProgress() {
101+
return checkpointProgress;
64102
}
65103

66104
public long getTimestampMillis() {
@@ -73,7 +111,7 @@ public long getTimeUpperBoundMillis() {
73111

74112
@Override
75113
public int hashCode() {
76-
return Objects.hash(timestampMillis, timeUpperBoundMillis);
114+
return Objects.hash(checkpoint, indexerState, position, checkpointProgress, timestampMillis, timeUpperBoundMillis);
77115
}
78116

79117
@Override
@@ -88,6 +126,11 @@ public boolean equals(Object other) {
88126

89127
DataFrameTransformCheckpointStats that = (DataFrameTransformCheckpointStats) other;
90128

91-
return this.timestampMillis == that.timestampMillis && this.timeUpperBoundMillis == that.timeUpperBoundMillis;
129+
return this.checkpoint == that.checkpoint
130+
&& Objects.equals(this.indexerState, that.indexerState)
131+
&& Objects.equals(this.position, that.position)
132+
&& Objects.equals(this.checkpointProgress, that.checkpointProgress)
133+
&& this.timestampMillis == that.timestampMillis
134+
&& this.timeUpperBoundMillis == that.timeUpperBoundMillis;
92135
}
93136
}

client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointingInfo.java

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,14 @@
2727

2828
public class DataFrameTransformCheckpointingInfo {
2929

30-
public static final ParseField CURRENT_CHECKPOINT = new ParseField("current");
31-
public static final ParseField IN_PROGRESS_CHECKPOINT = new ParseField("in_progress");
30+
public static final ParseField LAST_CHECKPOINT = new ParseField("last", "current");
31+
public static final ParseField NEXT_CHECKPOINT = new ParseField("next", "in_progress");
3232
public static final ParseField OPERATIONS_BEHIND = new ParseField("operations_behind");
3333

34-
private final DataFrameTransformCheckpointStats current;
35-
private final DataFrameTransformCheckpointStats inProgress;
34+
private final DataFrameTransformCheckpointStats last;
35+
private final DataFrameTransformCheckpointStats next;
3636
private final long operationsBehind;
3737

38-
3938
private static final ConstructingObjectParser<DataFrameTransformCheckpointingInfo, Void> LENIENT_PARSER =
4039
new ConstructingObjectParser<>(
4140
"data_frame_transform_checkpointing_info", true, a -> {
@@ -48,25 +47,25 @@ public class DataFrameTransformCheckpointingInfo {
4847

4948
static {
5049
LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
51-
(p, c) -> DataFrameTransformCheckpointStats.fromXContent(p), CURRENT_CHECKPOINT);
50+
(p, c) -> DataFrameTransformCheckpointStats.fromXContent(p), LAST_CHECKPOINT);
5251
LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
53-
(p, c) -> DataFrameTransformCheckpointStats.fromXContent(p), IN_PROGRESS_CHECKPOINT);
52+
(p, c) -> DataFrameTransformCheckpointStats.fromXContent(p), NEXT_CHECKPOINT);
5453
LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), OPERATIONS_BEHIND);
5554
}
5655

57-
public DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStats current, DataFrameTransformCheckpointStats inProgress,
56+
public DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStats last, DataFrameTransformCheckpointStats next,
5857
long operationsBehind) {
59-
this.current = Objects.requireNonNull(current);
60-
this.inProgress = Objects.requireNonNull(inProgress);
58+
this.last = Objects.requireNonNull(last);
59+
this.next = Objects.requireNonNull(next);
6160
this.operationsBehind = operationsBehind;
6261
}
6362

64-
public DataFrameTransformCheckpointStats getCurrent() {
65-
return current;
63+
public DataFrameTransformCheckpointStats getLast() {
64+
return last;
6665
}
6766

68-
public DataFrameTransformCheckpointStats getInProgress() {
69-
return inProgress;
67+
public DataFrameTransformCheckpointStats getNext() {
68+
return next;
7069
}
7170

7271
public long getOperationsBehind() {
@@ -79,7 +78,7 @@ public static DataFrameTransformCheckpointingInfo fromXContent(XContentParser p)
7978

8079
@Override
8180
public int hashCode() {
82-
return Objects.hash(current, inProgress, operationsBehind);
81+
return Objects.hash(last, next, operationsBehind);
8382
}
8483

8584
@Override
@@ -94,8 +93,8 @@ public boolean equals(Object other) {
9493

9594
DataFrameTransformCheckpointingInfo that = (DataFrameTransformCheckpointingInfo) other;
9695

97-
return Objects.equals(this.current, that.current) &&
98-
Objects.equals(this.inProgress, that.inProgress) &&
96+
return Objects.equals(this.last, that.last) &&
97+
Objects.equals(this.next, that.next) &&
9998
this.operationsBehind == that.operationsBehind;
10099
}
101100

0 commit comments

Comments
 (0)