Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
Expand All @@ -44,6 +45,7 @@ public class DataFrameTransformConfig implements ToXContentObject {
public static final ParseField ID = new ParseField("id");
public static final ParseField SOURCE = new ParseField("source");
public static final ParseField DEST = new ParseField("dest");
public static final ParseField FREQUENCY = new ParseField("frequency");
public static final ParseField DESCRIPTION = new ParseField("description");
public static final ParseField SYNC = new ParseField("sync");
public static final ParseField VERSION = new ParseField("version");
Expand All @@ -54,6 +56,7 @@ public class DataFrameTransformConfig implements ToXContentObject {
private final String id;
private final SourceConfig source;
private final DestConfig dest;
private final TimeValue frequency;
private final SyncConfig syncConfig;
private final PivotConfig pivotConfig;
private final String description;
Expand All @@ -66,14 +69,16 @@ public class DataFrameTransformConfig implements ToXContentObject {
String id = (String) args[0];
SourceConfig source = (SourceConfig) args[1];
DestConfig dest = (DestConfig) args[2];
SyncConfig syncConfig = (SyncConfig) args[3];
PivotConfig pivotConfig = (PivotConfig) args[4];
String description = (String)args[5];
Instant createTime = (Instant)args[6];
String transformVersion = (String)args[7];
TimeValue frequency = (TimeValue) args[3];
SyncConfig syncConfig = (SyncConfig) args[4];
PivotConfig pivotConfig = (PivotConfig) args[5];
String description = (String)args[6];
Instant createTime = (Instant)args[7];
String transformVersion = (String)args[8];
return new DataFrameTransformConfig(id,
source,
dest,
frequency,
syncConfig,
pivotConfig,
description,
Expand All @@ -85,6 +90,8 @@ public class DataFrameTransformConfig implements ToXContentObject {
PARSER.declareString(constructorArg(), ID);
PARSER.declareObject(constructorArg(), (p, c) -> SourceConfig.PARSER.apply(p, null), SOURCE);
PARSER.declareObject(constructorArg(), (p, c) -> DestConfig.PARSER.apply(p, null), DEST);
PARSER.declareField(optionalConstructorArg(), p -> TimeValue.parseTimeValue(p.text(), FREQUENCY.getPreferredName()),
FREQUENCY, ObjectParser.ValueType.STRING);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> parseSyncConfig(p), SYNC);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p), PIVOT_TRANSFORM);
PARSER.declareString(optionalConstructorArg(), DESCRIPTION);
Expand Down Expand Up @@ -118,12 +125,13 @@ public static DataFrameTransformConfig fromXContent(final XContentParser parser)
* @return A DataFrameTransformConfig to preview, NOTE it will have a {@code null} id, destination and index.
*/
public static DataFrameTransformConfig forPreview(final SourceConfig source, final PivotConfig pivotConfig) {
return new DataFrameTransformConfig(null, source, null, null, pivotConfig, null, null, null);
return new DataFrameTransformConfig(null, source, null, null, null, pivotConfig, null, null, null);
}

DataFrameTransformConfig(final String id,
final SourceConfig source,
final DestConfig dest,
final TimeValue frequency,
final SyncConfig syncConfig,
final PivotConfig pivotConfig,
final String description,
Expand All @@ -132,6 +140,7 @@ public static DataFrameTransformConfig forPreview(final SourceConfig source, fin
this.id = id;
this.source = source;
this.dest = dest;
this.frequency = frequency;
this.syncConfig = syncConfig;
this.pivotConfig = pivotConfig;
this.description = description;
Expand All @@ -151,6 +160,10 @@ public DestConfig getDestination() {
return dest;
}

public TimeValue getFrequency() {
return frequency;
}

public SyncConfig getSyncConfig() {
return syncConfig;
}
Expand Down Expand Up @@ -184,6 +197,9 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
if (dest != null) {
builder.field(DEST.getPreferredName(), dest);
}
if (frequency != null) {
builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep());
}
if (syncConfig != null) {
builder.startObject(SYNC.getPreferredName());
builder.field(syncConfig.getName(), syncConfig);
Expand Down Expand Up @@ -220,6 +236,7 @@ public boolean equals(Object other) {
return Objects.equals(this.id, that.id)
&& Objects.equals(this.source, that.source)
&& Objects.equals(this.dest, that.dest)
&& Objects.equals(this.frequency, that.frequency)
&& Objects.equals(this.description, that.description)
&& Objects.equals(this.syncConfig, that.syncConfig)
&& Objects.equals(this.transformVersion, that.transformVersion)
Expand All @@ -229,7 +246,7 @@ public boolean equals(Object other) {

@Override
public int hashCode() {
return Objects.hash(id, source, dest, syncConfig, pivotConfig, description);
return Objects.hash(id, source, dest, frequency, syncConfig, pivotConfig, description);
}

@Override
Expand All @@ -246,6 +263,7 @@ public static class Builder {
private String id;
private SourceConfig source;
private DestConfig dest;
private TimeValue frequency;
private SyncConfig syncConfig;
private PivotConfig pivotConfig;
private String description;
Expand All @@ -265,6 +283,11 @@ public Builder setDest(DestConfig dest) {
return this;
}

public Builder setFrequency(TimeValue frequency) {
this.frequency = frequency;
return this;
}

public Builder setSyncConfig(SyncConfig syncConfig) {
this.syncConfig = syncConfig;
return this;
Expand All @@ -281,7 +304,7 @@ public Builder setDescription(String description) {
}

public DataFrameTransformConfig build() {
return new DataFrameTransformConfig(id, source, dest, syncConfig, pivotConfig, description, null, null);
return new DataFrameTransformConfig(id, source, dest, frequency, syncConfig, pivotConfig, description, null, null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfigTests;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchModule;
Expand All @@ -43,6 +44,7 @@ public static DataFrameTransformConfig randomDataFrameTransformConfig() {
return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10),
randomSourceConfig(),
randomDestConfig(),
randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1000, 1000000)),
randomBoolean() ? null : randomSyncConfig(),
PivotConfigTests.randomPivotConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 100),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,9 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException
.setId("reviewer-avg-rating") // <1>
.setSource(sourceConfig) // <2>
.setDest(destConfig) // <3>
.setPivotConfig(pivotConfig) // <4>
.setDescription("This is my test transform") // <5>
.setFrequency(TimeValue.timeValueSeconds(15)) // <4>
.setPivotConfig(pivotConfig) // <5>
.setDescription("This is my test transform") // <6>
.build();
// end::put-data-frame-transform-config

Expand Down
5 changes: 3 additions & 2 deletions docs/java-rest/high-level/dataframe/put_data_frame.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ include-tagged::{doc-tests-file}[{api}-config]
<1> The {dataframe-transform} ID
<2> The source indices and query from which to gather data
<3> The destination index and optional pipeline
<4> The PivotConfig
<5> Optional free text description of the transform
<4> How often to check for updates to the source indices
<5> The PivotConfig
<6> Optional free text description of the transform

[id="{upid}-{api}-query-config"]

Expand Down
6 changes: 6 additions & 0 deletions docs/reference/data-frames/apis/put-transform.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ IMPORTANT: You must use {kib} or this API to create a {dataframe-transform}.
(object) The destination configuration, which consists of `index` and
optionally a `pipeline` id. See <<data-frame-transform-dest>>.

`frequency` (Optional)::
(time units) The interval between checks for changes in the source indices
when the {dataframe-transform} is running continuously. Defaults to `1m`.
The lowest permitted value is `1s`; the highest `1h`.

`pivot` (Optional)::
(object) Defines the pivot function `group by` fields and the aggregation to
reduce the data. See <<data-frame-transform-pivot>>.
Expand Down Expand Up @@ -90,6 +95,7 @@ PUT _data_frame/transforms/ecommerce_transform
"index": "kibana_sample_data_ecommerce_transform",
"pipeline": "add_timestamp_pipeline"
},
"frequency": "5m",
"pivot": {
"group_by": {
"customer_id": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public final class DataFrameField {
public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type");
public static final ParseField SOURCE = new ParseField("source");
public static final ParseField DESTINATION = new ParseField("dest");
public static final ParseField FREQUENCY = new ParseField("frequency");
public static final ParseField FORCE = new ParseField("force");
public static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size");
public static final ParseField FIELD = new ParseField("field");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -24,25 +25,30 @@ public class DataFrameTransform extends AbstractDiffable<DataFrameTransform> imp

public static final String NAME = DataFrameField.TASK_NAME;
public static final ParseField VERSION = new ParseField(DataFrameField.VERSION);
public static final ParseField FREQUENCY = DataFrameField.FREQUENCY;

private final String transformId;
private final Version version;
private final TimeValue frequency;

public static final ConstructingObjectParser<DataFrameTransform, Void> PARSER = new ConstructingObjectParser<>(NAME,
a -> new DataFrameTransform((String) a[0], (String) a[1]));
public static final ConstructingObjectParser<DataFrameTransform, Void> PARSER = new ConstructingObjectParser<>(NAME, true,
a -> new DataFrameTransform((String) a[0], (String) a[1], (String) a[2]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameField.ID);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), VERSION);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FREQUENCY);
}

private DataFrameTransform(String transformId, String version) {
this(transformId, version == null ? null : Version.fromString(version));
private DataFrameTransform(String transformId, String version, String frequency) {
this(transformId, version == null ? null : Version.fromString(version),
frequency == null ? null : TimeValue.parseTimeValue(frequency, FREQUENCY.getPreferredName()));
}

public DataFrameTransform(String transformId, Version version) {
public DataFrameTransform(String transformId, Version version, TimeValue frequency) {
this.transformId = transformId;
this.version = version == null ? Version.V_7_2_0 : version;
this.frequency = frequency;
}

public DataFrameTransform(StreamInput in) throws IOException {
Expand All @@ -52,6 +58,11 @@ public DataFrameTransform(StreamInput in) throws IOException {
} else {
this.version = Version.V_7_2_0;
}
if (in.getVersion().onOrAfter(Version.CURRENT)) {
this.frequency = in.readOptionalTimeValue();
} else {
this.frequency = null;
}
}

@Override
Expand All @@ -70,13 +81,19 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
Version.writeVersion(version, out);
}
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeOptionalTimeValue(frequency);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DataFrameField.ID.getPreferredName(), transformId);
builder.field(VERSION.getPreferredName(), version);
if (frequency != null) {
builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep());
}
builder.endObject();
return builder;
}
Expand All @@ -89,6 +106,10 @@ public Version getVersion() {
return version;
}

public TimeValue getFrequency() {
return frequency;
}

public static DataFrameTransform fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
Expand All @@ -105,11 +126,13 @@ public boolean equals(Object other) {

DataFrameTransform that = (DataFrameTransform) other;

return Objects.equals(this.transformId, that.transformId) && Objects.equals(this.version, that.version);
return Objects.equals(this.transformId, that.transformId)
&& Objects.equals(this.version, that.version)
&& Objects.equals(this.frequency, that.frequency);
}

@Override
public int hashCode() {
return Objects.hash(transformId, version);
return Objects.hash(transformId, version, frequency);
}
}
Loading