Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,13 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
Expand All @@ -50,25 +47,14 @@ public class DataFrameTransformState {
new ConstructingObjectParser<>("data_frame_transform_state", true,
args -> new DataFrameTransformState((DataFrameTransformTaskState) args[0],
(IndexerState) args[1],
(HashMap<String, Object>) args[2],
(Map<String, Object>) args[2],
(long) args[3],
(String) args[4]));

static {
PARSER.declareField(constructorArg(),
p -> DataFrameTransformTaskState.fromString(p.text()),
TASK_STATE,
ObjectParser.ValueType.STRING);
PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE, ObjectParser.ValueType.STRING);
PARSER.declareField(optionalConstructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.START_OBJECT) {
return p.map();
}
if (p.currentToken() == XContentParser.Token.VALUE_NULL) {
return null;
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, CURRENT_POSITION, ObjectParser.ValueType.VALUE_OBJECT_ARRAY);
PARSER.declareField(constructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE, ValueType.STRING);
PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE, ValueType.STRING);
PARSER.declareField(optionalConstructorArg(), (p, c) -> p.mapOrdered(), CURRENT_POSITION, ValueType.OBJECT);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), CHECKPOINT);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REASON);
}
Expand All @@ -80,7 +66,7 @@ public static DataFrameTransformState fromXContent(XContentParser parser) throws
private final DataFrameTransformTaskState taskState;
private final IndexerState indexerState;
private final long checkpoint;
private final SortedMap<String, Object> currentPosition;
private final Map<String, Object> currentPosition;
private final String reason;

public DataFrameTransformState(DataFrameTransformTaskState taskState,
Expand All @@ -90,7 +76,7 @@ public DataFrameTransformState(DataFrameTransformTaskState taskState,
@Nullable String reason) {
this.taskState = taskState;
this.indexerState = indexerState;
this.currentPosition = position == null ? null : Collections.unmodifiableSortedMap(new TreeMap<>(position));
this.currentPosition = position == null ? null : Collections.unmodifiableMap(position);
this.checkpoint = checkpoint;
this.reason = reason;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public void testGetStats() throws Exception {
assertEquals(null, stateAndStats.getTransformState().getReason());
assertNotEquals(zeroIndexerStats, stateAndStats.getTransformStats());
assertNotNull(stateAndStats.getProgress());
assertThat(stateAndStats.getProgress().getPercentComplete(), equalTo(1.0));
assertThat(stateAndStats.getProgress().getPercentComplete(), equalTo(100.0));
assertThat(stateAndStats.getProgress().getTotalDocs(), greaterThan(0L));
assertThat(stateAndStats.getProgress().getRemainingDocs(), equalTo(0L));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void testFromXContent() throws IOException {
public static DataFrameTransformProgress randomInstance() {
long totalDocs = randomNonNegativeLong();
Long docsRemaining = randomBoolean() ? null : randomLongBetween(0, totalDocs);
double percentComplete = totalDocs == 0 ? 1.0 : docsRemaining == null ? 0.0 : (double)(totalDocs - docsRemaining)/totalDocs;
double percentComplete = totalDocs == 0 ? 1.0 : docsRemaining == null ? 0.0 : 100.0*(double)(totalDocs - docsRemaining)/totalDocs;
return new DataFrameTransformProgress(totalDocs, docsRemaining, percentComplete);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ public DataFrameTransformProgress(StreamInput in) throws IOException {

public Double getPercentComplete() {
if (totalDocs == 0) {
return 1.0;
return 100.0;
}
long docsRead = totalDocs - remainingDocs;
if (docsRead < 0) {
return 1.0;
return 100.0;
}
return (double)docsRead/totalDocs;
return 100.0*(double)docsRead/totalDocs;
}

public long getTotalDocs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.persistent.PersistentTaskState;
Expand All @@ -24,8 +24,6 @@
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
Expand All @@ -38,7 +36,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
private final long checkpoint;

@Nullable
private final SortedMap<String, Object> currentPosition;
private final Map<String, Object> currentPosition;
@Nullable
private final String reason;

Expand All @@ -58,28 +56,9 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
(String) args[4]));

static {
PARSER.declareField(constructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return DataFrameTransformTaskState.fromString(p.text());
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, TASK_STATE, ObjectParser.ValueType.STRING);
PARSER.declareField(constructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return IndexerState.fromString(p.text());
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");

}, INDEXER_STATE, ObjectParser.ValueType.STRING);
PARSER.declareField(optionalConstructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.START_OBJECT) {
return p.map();
}
if (p.currentToken() == XContentParser.Token.VALUE_NULL) {
return null;
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, CURRENT_POSITION, ObjectParser.ValueType.VALUE_OBJECT_ARRAY);
PARSER.declareField(constructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE, ValueType.STRING);
PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE, ValueType.STRING);
PARSER.declareField(optionalConstructorArg(), XContentParser::mapOrdered, CURRENT_POSITION, ValueType.OBJECT);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), CHECKPOINT);
PARSER.declareString(optionalConstructorArg(), REASON);
}
Expand All @@ -91,15 +70,16 @@ public DataFrameTransformState(DataFrameTransformTaskState taskState,
@Nullable String reason) {
this.taskState = taskState;
this.indexerState = indexerState;
this.currentPosition = position == null ? null : Collections.unmodifiableSortedMap(new TreeMap<>(position));
this.currentPosition = position == null ? null : Collections.unmodifiableMap(position);
this.checkpoint = checkpoint;
this.reason = reason;
}

public DataFrameTransformState(StreamInput in) throws IOException {
taskState = DataFrameTransformTaskState.fromStream(in);
indexerState = IndexerState.fromStream(in);
currentPosition = in.readBoolean() ? Collections.unmodifiableSortedMap(new TreeMap<>(in.readMap())) : null;
Map<String, Object> position = in.readMap();
currentPosition = position == null ? null : Collections.unmodifiableMap(position);
checkpoint = in.readLong();
reason = in.readOptionalString();
}
Expand Down Expand Up @@ -166,10 +146,7 @@ public String getWriteableName() {
public void writeTo(StreamOutput out) throws IOException {
taskState.writeTo(out);
indexerState.writeTo(out);
out.writeBoolean(currentPosition != null);
if (currentPosition != null) {
out.writeMap(currentPosition);
}
out.writeMap(currentPosition);
out.writeLong(checkpoint);
out.writeOptionalString(reason);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,19 @@ protected Reader<DataFrameTransformProgress> instanceReader() {

public void testPercentComplete() {
DataFrameTransformProgress progress = new DataFrameTransformProgress(0L, 100L);
assertThat(progress.getPercentComplete(), equalTo(1.0));
assertThat(progress.getPercentComplete(), equalTo(100.0));

progress = new DataFrameTransformProgress(100L, 0L);
assertThat(progress.getPercentComplete(), equalTo(1.0));
assertThat(progress.getPercentComplete(), equalTo(100.0));

progress = new DataFrameTransformProgress(100L, 10000L);
assertThat(progress.getPercentComplete(), equalTo(1.0));
assertThat(progress.getPercentComplete(), equalTo(100.0));

progress = new DataFrameTransformProgress(100L, null);
assertThat(progress.getPercentComplete(), equalTo(0.0));

progress = new DataFrameTransformProgress(100L, 50L);
assertThat(progress.getPercentComplete(), closeTo(0.5, 0.000001));
assertThat(progress.getPercentComplete(), closeTo(50.0, 0.000001));
}

public void testConstructor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testGetAndGetStats() throws Exception {
Map<String, Object> progress = (Map<String, Object>)XContentMapValues.extractValue("progress", transformStats);
assertThat("total_docs is not 1000", progress.get("total_docs"), equalTo(1000));
assertThat("completed_docs is not 1000", progress.get("docs_remaining"), equalTo(0));
assertThat("percent_complete is not 1.0", progress.get("percent_complete"), equalTo(1.0));
assertThat("percent_complete is not 100.0", progress.get("percent_complete"), equalTo(100.0));
}

// only pivot_1
Expand Down Expand Up @@ -173,7 +173,7 @@ public void testGetProgressStatsWithPivotQuery() throws Exception {
Map<String, Object> progress = (Map<String, Object>)XContentMapValues.extractValue("progress", transformStats);
assertThat("total_docs is not 1000", progress.get("total_docs"), equalTo(37));
assertThat("completed_docs is not 1000", progress.get("docs_remaining"), equalTo(0));
assertThat("percent_complete is not 1.0", progress.get("percent_complete"), equalTo(1.0));
assertThat("percent_complete is not 100.0", progress.get("percent_complete"), equalTo(100.0));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void testGetProgress() throws Exception {

assertThat(progress.getTotalDocs(), equalTo(1000L));
assertThat(progress.getRemainingDocs(), equalTo(800L));
assertThat(progress.getPercentComplete(), closeTo(0.20, 0.0000001));
assertThat(progress.getPercentComplete(), closeTo(20.0, 0.0000001));

progressFuture = new PlainActionFuture<>();

Expand All @@ -173,7 +173,7 @@ public void testGetProgress() throws Exception {

assertThat(progress.getTotalDocs(), equalTo(1000L));
assertThat(progress.getRemainingDocs(), equalTo(0L));
assertThat(progress.getPercentComplete(), equalTo(1.0));
assertThat(progress.getPercentComplete(), equalTo(100.0));

DateHistogramGroupSource dateHistogramGroupSource = new DateHistogramGroupSource("timestamp");
dateHistogramGroupSource.setDateHistogramInterval(DateHistogramInterval.DAY);
Expand Down Expand Up @@ -202,7 +202,7 @@ public void testGetProgress() throws Exception {

assertThat(progress.getTotalDocs(), equalTo(1000L));
assertThat(progress.getRemainingDocs(), equalTo(949L));
assertThat(progress.getPercentComplete(), closeTo(0.051, 0.0000001));
assertThat(progress.getPercentComplete(), closeTo(5.1, 0.0000001));


QueryConfig queryConfig = new QueryConfig(Collections.emptyMap(), QueryBuilders.termQuery("user_id", "user_26"));
Expand Down Expand Up @@ -231,7 +231,7 @@ public void testGetProgress() throws Exception {

assertThat(progress.getTotalDocs(), equalTo(37L));
assertThat(progress.getRemainingDocs(), equalTo(17L));
assertThat(progress.getPercentComplete(), closeTo(0.54054054054, 0.00000001));
assertThat(progress.getPercentComplete(), closeTo(54.054054054, 0.00000001));

client().admin().indices().prepareDelete(REVIEWS_INDEX_NAME).get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ teardown:
- match: { transforms.0.stats.search_failures: 0 }
- match: { transforms.0.progress.total_docs: 0 }
- match: { transforms.0.progress.docs_remaining: 0 }
- match: { transforms.0.progress.percent_complete: 1.0 }
- match: { transforms.0.progress.percent_complete: 100.0 }

---
"Test get transform stats on missing transform":
Expand Down