Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ task verifyVersions {
* after the backport of the backcompat code is complete.
*/

boolean bwc_tests_enabled = true
final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */
boolean bwc_tests_enabled = false
final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/44219" /* place a PR link here when committing bwc changes */
if (bwc_tests_enabled == false) {
if (bwc_tests_disabled_issue.isEmpty()) {
throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client.dataframe.transforms;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

/**
* Holds state of the cursors:
*
* indexer_position: the position of the indexer querying the source
* bucket_position: the position used for identifying changes
*/
public class DataFrameIndexerPosition {
public static final ParseField INDEXER_POSITION = new ParseField("indexer_position");
public static final ParseField BUCKET_POSITION = new ParseField("bucket_position");

private final Map<String, Object> indexerPosition;
private final Map<String, Object> bucketPosition;

@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<DataFrameIndexerPosition, Void> PARSER = new ConstructingObjectParser<>(
"data_frame_indexer_position",
true,
args -> new DataFrameIndexerPosition((Map<String, Object>) args[0],(Map<String, Object>) args[1]));

static {
PARSER.declareField(optionalConstructorArg(), XContentParser::mapOrdered, INDEXER_POSITION, ValueType.OBJECT);
PARSER.declareField(optionalConstructorArg(), XContentParser::mapOrdered, BUCKET_POSITION, ValueType.OBJECT);
}

public DataFrameIndexerPosition(Map<String, Object> indexerPosition, Map<String, Object> bucketPosition) {
this.indexerPosition = indexerPosition == null ? null : Collections.unmodifiableMap(indexerPosition);
this.bucketPosition = bucketPosition == null ? null : Collections.unmodifiableMap(bucketPosition);
}

public Map<String, Object> getIndexerPosition() {
return indexerPosition;
}

public Map<String, Object> getBucketsPosition() {
return bucketPosition;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}

if (other == null || getClass() != other.getClass()) {
return false;
}

DataFrameIndexerPosition that = (DataFrameIndexerPosition) other;

return Objects.equals(this.indexerPosition, that.indexerPosition) &&
Objects.equals(this.bucketPosition, that.bucketPosition);
}

@Override
public int hashCode() {
return Objects.hash(indexerPosition, bucketPosition);
}

public static DataFrameIndexerPosition fromXContent(XContentParser parser) {
try {
return PARSER.parse(parser, null);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;

Expand All @@ -39,7 +37,10 @@ public class DataFrameTransformState {

private static final ParseField INDEXER_STATE = new ParseField("indexer_state");
private static final ParseField TASK_STATE = new ParseField("task_state");

// 7.3 BWC: current_position only exists in 7.2. In 7.3+ it is replaced by position.
private static final ParseField CURRENT_POSITION = new ParseField("current_position");
private static final ParseField POSITION = new ParseField("position");
private static final ParseField CHECKPOINT = new ParseField("checkpoint");
private static final ParseField REASON = new ParseField("reason");
private static final ParseField PROGRESS = new ParseField("progress");
Expand All @@ -48,18 +49,31 @@ public class DataFrameTransformState {
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER =
new ConstructingObjectParser<>("data_frame_transform_state", true,
args -> new DataFrameTransformState((DataFrameTransformTaskState) args[0],
(IndexerState) args[1],
(Map<String, Object>) args[2],
(long) args[3],
(String) args[4],
(DataFrameTransformProgress) args[5],
(NodeAttributes) args[6]));
args -> {
DataFrameTransformTaskState taskState = (DataFrameTransformTaskState) args[0];
IndexerState indexerState = (IndexerState) args[1];
Map<String, Object> bwcCurrentPosition = (Map<String, Object>) args[2];
DataFrameIndexerPosition dataFrameIndexerPosition = (DataFrameIndexerPosition) args[3];

// BWC handling, translate current_position to position iff position isn't set
if (bwcCurrentPosition != null && dataFrameIndexerPosition == null) {
dataFrameIndexerPosition = new DataFrameIndexerPosition(bwcCurrentPosition, null);
}

long checkpoint = (long) args[4];
String reason = (String) args[5];
DataFrameTransformProgress progress = (DataFrameTransformProgress) args[6];
NodeAttributes node = (NodeAttributes) args[7];

return new DataFrameTransformState(taskState, indexerState, dataFrameIndexerPosition, checkpoint, reason, progress,
node);
});

static {
PARSER.declareField(constructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE, ValueType.STRING);
PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE, ValueType.STRING);
PARSER.declareField(optionalConstructorArg(), (p, c) -> p.mapOrdered(), CURRENT_POSITION, ValueType.OBJECT);
PARSER.declareField(optionalConstructorArg(), DataFrameIndexerPosition::fromXContent, POSITION, ValueType.OBJECT);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), CHECKPOINT);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REASON);
PARSER.declareField(optionalConstructorArg(), DataFrameTransformProgress::fromXContent, PROGRESS, ValueType.OBJECT);
Expand All @@ -73,21 +87,21 @@ public static DataFrameTransformState fromXContent(XContentParser parser) throws
private final DataFrameTransformTaskState taskState;
private final IndexerState indexerState;
private final long checkpoint;
private final Map<String, Object> currentPosition;
private final DataFrameIndexerPosition position;
private final String reason;
private final DataFrameTransformProgress progress;
private final NodeAttributes node;

public DataFrameTransformState(DataFrameTransformTaskState taskState,
IndexerState indexerState,
@Nullable Map<String, Object> position,
@Nullable DataFrameIndexerPosition position,
long checkpoint,
@Nullable String reason,
@Nullable DataFrameTransformProgress progress,
@Nullable NodeAttributes node) {
this.taskState = taskState;
this.indexerState = indexerState;
this.currentPosition = position == null ? null : Collections.unmodifiableMap(new LinkedHashMap<>(position));
this.position = position;
this.checkpoint = checkpoint;
this.reason = reason;
this.progress = progress;
Expand All @@ -103,8 +117,8 @@ public DataFrameTransformTaskState getTaskState() {
}

@Nullable
public Map<String, Object> getPosition() {
return currentPosition;
public DataFrameIndexerPosition getPosition() {
return position;
}

public long getCheckpoint() {
Expand Down Expand Up @@ -140,7 +154,7 @@ public boolean equals(Object other) {

return Objects.equals(this.taskState, that.taskState) &&
Objects.equals(this.indexerState, that.indexerState) &&
Objects.equals(this.currentPosition, that.currentPosition) &&
Objects.equals(this.position, that.position) &&
Objects.equals(this.progress, that.progress) &&
this.checkpoint == that.checkpoint &&
Objects.equals(this.node, that.node) &&
Expand All @@ -149,7 +163,7 @@ public boolean equals(Object other) {

@Override
public int hashCode() {
return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason, progress, node);
return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client.dataframe.transforms;

import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;

import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;

public class DataFrameIndexerPositionTests extends ESTestCase {

public void testFromXContent() throws IOException {
xContentTester(this::createParser,
DataFrameIndexerPositionTests::randomDataFrameIndexerPosition,
DataFrameIndexerPositionTests::toXContent,
DataFrameIndexerPosition::fromXContent)
.supportsUnknownFields(true)
.randomFieldsExcludeFilter(field -> field.equals("indexer_position") ||
field.equals("bucket_position"))
.test();
}

public static DataFrameIndexerPosition randomDataFrameIndexerPosition() {
return new DataFrameIndexerPosition(randomPositionMap(), randomPositionMap());
}

public static void toXContent(DataFrameIndexerPosition position, XContentBuilder builder) throws IOException {
builder.startObject();
if (position.getIndexerPosition() != null) {
builder.field("indexer_position", position.getIndexerPosition());
}
if (position.getBucketsPosition() != null) {
builder.field("bucket_position", position.getBucketsPosition());
}
builder.endObject();
}

private static Map<String, Object> randomPositionMap() {
if (randomBoolean()) {
return null;
}
int numFields = randomIntBetween(1, 5);
Map<String, Object> position = new LinkedHashMap<>();
for (int i = 0; i < numFields; i++) {
Object value;
if (randomBoolean()) {
value = randomLong();
} else {
value = randomAlphaOfLengthBetween(1, 10);
}
position.put(randomAlphaOfLengthBetween(3, 10), value);
}
return position;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;

import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;

Expand All @@ -38,15 +36,16 @@ public void testFromXContent() throws IOException {
DataFrameTransformStateTests::toXContent,
DataFrameTransformState::fromXContent)
.supportsUnknownFields(true)
.randomFieldsExcludeFilter(field -> field.equals("current_position") ||
field.equals("node.attributes"))
.randomFieldsExcludeFilter(field -> field.equals("position.indexer_position") ||
field.equals("position.bucket_position") ||
field.equals("node.attributes"))
.test();
}

public static DataFrameTransformState randomDataFrameTransformState() {
return new DataFrameTransformState(randomFrom(DataFrameTransformTaskState.values()),
randomFrom(IndexerState.values()),
randomPositionMap(),
randomBoolean() ? null : DataFrameIndexerPositionTests.randomDataFrameIndexerPosition(),
randomLongBetween(0,10),
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean() ? null : DataFrameTransformProgressTests.randomInstance(),
Expand All @@ -58,7 +57,8 @@ public static void toXContent(DataFrameTransformState state, XContentBuilder bui
builder.field("task_state", state.getTaskState().value());
builder.field("indexer_state", state.getIndexerState().value());
if (state.getPosition() != null) {
builder.field("current_position", state.getPosition());
builder.field("position");
DataFrameIndexerPositionTests.toXContent(state.getPosition(), builder);
}
builder.field("checkpoint", state.getCheckpoint());
if (state.getReason() != null) {
Expand All @@ -75,21 +75,4 @@ public static void toXContent(DataFrameTransformState state, XContentBuilder bui
builder.endObject();
}

private static Map<String, Object> randomPositionMap() {
if (randomBoolean()) {
return null;
}
int numFields = randomIntBetween(1, 5);
Map<String, Object> position = new LinkedHashMap<>();
for (int i = 0; i < numFields; i++) {
Object value;
if (randomBoolean()) {
value = randomLong();
} else {
value = randomAlphaOfLengthBetween(1, 10);
}
position.put(randomAlphaOfLengthBetween(3, 10), value);
}
return position;
}
}
Loading