Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
Expand All @@ -39,6 +41,7 @@ public class DataFrameTransformConfig implements ToXContentObject {
public static final ParseField SOURCE = new ParseField("source");
public static final ParseField DEST = new ParseField("dest");
public static final ParseField QUERY = new ParseField("query");
public static final ParseField MAPPING_OVERRIDE = new ParseField("mapping_override");
// types of transforms
public static final ParseField PIVOT_TRANSFORM = new ParseField("pivot");

Expand All @@ -47,6 +50,7 @@ public class DataFrameTransformConfig implements ToXContentObject {
private final String dest;
private final QueryConfig queryConfig;
private final PivotConfig pivotConfig;
private final Map<String, String> mappingOverride;

public static final ConstructingObjectParser<DataFrameTransformConfig, String> PARSER =
new ConstructingObjectParser<>("data_frame_transform", true,
Expand All @@ -56,7 +60,9 @@ public class DataFrameTransformConfig implements ToXContentObject {
String dest = (String) args[2];
QueryConfig queryConfig = (QueryConfig) args[3];
PivotConfig pivotConfig = (PivotConfig) args[4];
return new DataFrameTransformConfig(id, source, dest, queryConfig, pivotConfig);
@SuppressWarnings("unchecked")
Map<String, String> mappingOverrides = (Map<String, String>) args[5];
return new DataFrameTransformConfig(id, source, dest, queryConfig, pivotConfig, mappingOverrides);
});

static {
Expand All @@ -65,23 +71,25 @@ public class DataFrameTransformConfig implements ToXContentObject {
PARSER.declareString(constructorArg(), DEST);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> QueryConfig.fromXContent(p), QUERY);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p), PIVOT_TRANSFORM);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), MAPPING_OVERRIDE);
}

public static DataFrameTransformConfig fromXContent(final XContentParser parser) {
return PARSER.apply(parser, null);
}


public DataFrameTransformConfig(final String id,
final String source,
final String dest,
final QueryConfig queryConfig,
final PivotConfig pivotConfig) {
final PivotConfig pivotConfig,
final Map<String, String> mappingOverride) {
this.id = id;
this.source = source;
this.dest = dest;
this.queryConfig = queryConfig;
this.pivotConfig = pivotConfig;
this.mappingOverride = mappingOverride == null ? null : Collections.unmodifiableMap(mappingOverride);
}

public String getId() {
Expand All @@ -104,6 +112,10 @@ public QueryConfig getQueryConfig() {
return queryConfig;
}

public Map<String, String> getMappingOverride() {
return mappingOverride;
}

@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
Expand All @@ -120,6 +132,9 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
if (pivotConfig != null) {
builder.field(PIVOT_TRANSFORM.getPreferredName(), pivotConfig);
}
if (mappingOverride != null) {
builder.field(MAPPING_OVERRIDE.getPreferredName(), mappingOverride);
}
builder.endObject();
return builder;
}
Expand All @@ -140,12 +155,13 @@ public boolean equals(Object other) {
&& Objects.equals(this.source, that.source)
&& Objects.equals(this.dest, that.dest)
&& Objects.equals(this.queryConfig, that.queryConfig)
&& Objects.equals(this.pivotConfig, that.pivotConfig);
&& Objects.equals(this.pivotConfig, that.pivotConfig)
&& Objects.equals(this.mappingOverride, that.mappingOverride);
}

@Override
public int hashCode() {
return Objects.hash(id, source, dest, queryConfig, pivotConfig);
return Objects.hash(id, source, dest, queryConfig, pivotConfig, mappingOverride);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,12 @@ public void testCreateDelete() throws IOException {
PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);

String id = "test-crud";
DataFrameTransformConfig transform = new DataFrameTransformConfig(id, sourceIndex, "pivot-dest", queryConfig, pivotConfig);
DataFrameTransformConfig transform = new DataFrameTransformConfig(id,
sourceIndex,
"pivot-dest",
queryConfig,
pivotConfig,
Collections.emptyMap());

DataFrameClient client = highLevelClient().dataFrame();
AcknowledgedResponse ack = execute(new PutDataFrameTransformRequest(transform), client::putDataFrameTransform,
Expand Down Expand Up @@ -182,7 +187,12 @@ public void testStartStop() throws IOException {
PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);

String id = "test-stop-start";
DataFrameTransformConfig transform = new DataFrameTransformConfig(id, sourceIndex, "pivot-dest", queryConfig, pivotConfig);
DataFrameTransformConfig transform = new DataFrameTransformConfig(id,
sourceIndex,
"pivot-dest",
queryConfig,
pivotConfig,
Collections.emptyMap());

DataFrameClient client = highLevelClient().dataFrame();
AcknowledgedResponse ack = execute(new PutDataFrameTransformRequest(transform), client::putDataFrameTransform,
Expand Down Expand Up @@ -219,7 +229,12 @@ public void testPreview() throws IOException {
AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);

DataFrameTransformConfig transform = new DataFrameTransformConfig("test-preview", sourceIndex, null, queryConfig, pivotConfig);
DataFrameTransformConfig transform = new DataFrameTransformConfig("test-preview",
sourceIndex,
null,
queryConfig,
pivotConfig,
Collections.emptyMap());

DataFrameClient client = highLevelClient().dataFrame();
PreviewDataFrameTransformResponse preview = execute(new PreviewDataFrameTransformRequest(transform),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ public void testValidate() {

// null id and destination is valid
DataFrameTransformConfig config = new DataFrameTransformConfig(null, "source", null,
QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig(), Collections.emptyMap());

assertFalse(new PreviewDataFrameTransformRequest(config).validate().isPresent());

// null source is not valid
config = new DataFrameTransformConfig(null, null, null,
QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig(), Collections.emptyMap());

Optional<ValidationException> error = new PreviewDataFrameTransformRequest(config).validate();
assertTrue(error.isPresent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void testValidate() {
assertFalse(createTestInstance().validate().isPresent());

DataFrameTransformConfig config = new DataFrameTransformConfig(null, null, null,
QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig(), Collections.emptyMap());

Optional<ValidationException> error = new PutDataFrameTransformRequest(config).validate();
assertTrue(error.isPresent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,29 @@

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Predicate;

public class DataFrameTransformConfigTests extends AbstractXContentTestCase<DataFrameTransformConfig> {

public static DataFrameTransformConfig randomDataFrameTransformConfig() {
return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
randomAlphaOfLengthBetween(1, 10), QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
randomAlphaOfLengthBetween(1, 10), QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig(),
randomNullableStringMap());
}

public static Map<String, String> randomNullableStringMap() {
Map<String, String> stringStringMap = null;
if (randomBoolean()) {
stringStringMap = new HashMap<>();
int kvCount = randomInt(10);
for (int i = 0; i < kvCount; i++) {
stringStringMap.put(randomAlphaOfLength(10), randomAlphaOfLength(10));
}
}
return stringStringMap;
}
@Override
protected DataFrameTransformConfig createTestInstance() {
return randomDataFrameTransformConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -117,6 +119,10 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException
AggregationBuilders.avg("avg_rating").field("stars")); // <1>
AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
// end::put-data-frame-transform-agg-config
// tag::put-data-frame-transform-mapping-override
Map<String, String> mappingOverride = new HashMap<>();
mappingOverride.put("avg_rating", "keyword"); // <1>
// end::put-data-frame-transform-mapping-override
// tag::put-data-frame-transform-pivot-config
PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);
// end::put-data-frame-transform-pivot-config
Expand All @@ -126,7 +132,8 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException
"source-index", // <2>
"pivot-destination", // <3>
queryConfig, // <4>
pivotConfig); // <5>
pivotConfig, // <5>
mappingOverride); // <6>
// end::put-data-frame-transform-config

{
Expand All @@ -146,7 +153,7 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException
{
DataFrameTransformConfig configWithDifferentId = new DataFrameTransformConfig("reviewer-avg-rating2",
transformConfig.getSource(), transformConfig.getDestination(), transformConfig.getQueryConfig(),
transformConfig.getPivotConfig());
transformConfig.getPivotConfig(), null);
PutDataFrameTransformRequest request = new PutDataFrameTransformRequest(configWithDifferentId);

// tag::put-data-frame-transform-execute-listener
Expand Down Expand Up @@ -191,7 +198,7 @@ public void testStartStop() throws IOException, InterruptedException {
PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);

DataFrameTransformConfig transformConfig = new DataFrameTransformConfig("mega-transform",
"source-data", "pivot-dest", queryConfig, pivotConfig);
"source-data", "pivot-dest", queryConfig, pivotConfig, null);

client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig), RequestOptions.DEFAULT);
transformsToClean.add(transformConfig.getId());
Expand Down Expand Up @@ -308,9 +315,9 @@ public void testDeleteDataFrameTransform() throws IOException, InterruptedExcept
PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);

DataFrameTransformConfig transformConfig1 = new DataFrameTransformConfig("mega-transform",
"source-data", "pivot-dest", queryConfig, pivotConfig);
"source-data", "pivot-dest", queryConfig, pivotConfig, null);
DataFrameTransformConfig transformConfig2 = new DataFrameTransformConfig("mega-transform2",
"source-data", "pivot-dest2", queryConfig, pivotConfig);
"source-data", "pivot-dest2", queryConfig, pivotConfig, null);

client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig1), RequestOptions.DEFAULT);
client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig2), RequestOptions.DEFAULT);
Expand Down Expand Up @@ -378,7 +385,8 @@ public void testPreview() throws IOException, InterruptedException {
"source-data",
null, // <2>
queryConfig,
pivotConfig);
pivotConfig,
Collections.emptyMap());

PreviewDataFrameTransformRequest request =
new PreviewDataFrameTransformRequest(transformConfig); // <3>
Expand Down
14 changes: 14 additions & 0 deletions docs/java-rest/high-level/dataframe/put_data_frame.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ include-tagged::{doc-tests-file}[{api}-config]
<3> The destination index
<4> Optionally a QueryConfig
<5> The PivotConfig
<6> The provided `Map<String, String>` values to override deduced destination index mappings.

[id="{upid}-{api}-query-config"]
==== QueryConfig
Expand Down Expand Up @@ -84,6 +85,19 @@ include-tagged::{doc-tests-file}[{api}-agg-config]
--------------------------------------------------
<1> Aggregate the average star rating

==== Overriding Deduced Destination Mapping Field Types

When creating the destination index for the {dataframe-transform}, a best
effort mapping is created. This option allows overriding specific mapped
field types.

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-mapping-override]
--------------------------------------------------
<1> Override the deduced mapping for field `avg_rating` to be a type of
`keyword`

include::../execution.asciidoc[]

[id="{upid}-{api}-response"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class DataFrameMessages {
"Failed to create composite aggregation from pivot function";
public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID =
"Data frame transform configuration [{0}] has invalid elements";

public static final String DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS = "Failed to gather field mappings for index [{0}]";
public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_QUERY =
"Failed to parse query for data frame transform";
public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_GROUP_BY =
Expand Down
Loading