Skip to content

Commit

Permalink
Add data frame feature (elastic#38934)
Browse files Browse the repository at this point in the history
The data frame plugin allows users to create feature indexes by pivoting a source index. In a
nutshell this can be understood as reindex supporting aggregations or similar to the so called entity
centric indexing.

Full history is provided in: feature/data-frame-transforms
  • Loading branch information
Hendrik Muhs committed Feb 18, 2019
1 parent 408a800 commit 0a9953a
Show file tree
Hide file tree
Showing 93 changed files with 8,671 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1697,7 +1697,7 @@ public void testCRUDIndexTemplateWithTypes() throws Exception {
assertTrue(template2.mappings().containsKey("custom_doc_type"));

List<String> names = randomBoolean()
? Arrays.asList("*-1", "template-2")
? Arrays.asList("*plate-1", "template-2")
: Arrays.asList("template-*");
GetIndexTemplatesRequest getBothRequest = new GetIndexTemplatesRequest(names);
org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse getBoth = execute(
Expand Down Expand Up @@ -1780,7 +1780,7 @@ public void testCRUDIndexTemplate() throws Exception {


List<String> names = randomBoolean()
? Arrays.asList("*-1", "template-2")
? Arrays.asList("*plate-1", "template-2")
: Arrays.asList("template-*");
GetIndexTemplatesRequest getBothRequest = new GetIndexTemplatesRequest(names);
GetIndexTemplatesResponse getBoth = execute(
Expand Down Expand Up @@ -1834,7 +1834,7 @@ public void testIndexTemplatesExist() throws Exception {

{
final List<String> templateNames = randomBoolean()
? Arrays.asList("*-1", "template-2")
? Arrays.asList("*plate-1", "template-2")
: Arrays.asList("template-*");

final IndexTemplatesExistRequest bothRequest = new IndexTemplatesExistRequest(templateNames);
Expand Down
5 changes: 5 additions & 0 deletions docs/reference/rest-api/info.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ Example response:
"available" : true,
"enabled" : true
},
"data_frame" : {
"description" : "Data Frame for the Elastic Stack",
"available" : true,
"enabled" : true
},
"graph" : {
"description" : "Graph Data Exploration for the Elastic Stack",
"available" : true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,15 @@ public static boolean isMachineLearningAllowedForOperationMode(final OperationMo
return isPlatinumOrTrialOperationMode(operationMode);
}

/**
* Data Frame is always available as long as there is a valid license
*
* @return true if the license is active
*/
public synchronized boolean isDataFrameAllowed() {
return status.active;
}

/**
* Rollup is always available as long as there is a valid license
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public final class ClientHelper {
public static final String DEPRECATION_ORIGIN = "deprecation";
public static final String PERSISTENT_TASK_ORIGIN = "persistent_tasks";
public static final String ROLLUP_ORIGIN = "rollup";
public static final String DATA_FRAME_ORIGIN = "data_frame";

private ClientHelper() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.xpack.core.beats.BeatsFeatureSetUsage;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.CCRFeatureSet;
import org.elasticsearch.xpack.core.dataframe.DataFrameFeatureSetUsage;
import org.elasticsearch.xpack.core.deprecation.DeprecationInfoAction;
import org.elasticsearch.xpack.core.graph.GraphFeatureSetUsage;
import org.elasticsearch.xpack.core.graph.action.GraphExploreAction;
Expand Down Expand Up @@ -439,8 +440,9 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new)
);
new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new),
// Data Frame
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.DATA_FRAME, DataFrameFeatureSetUsage::new));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public final class XPackField {
public static final String INDEX_LIFECYCLE = "ilm";
/** Name constant for the CCR feature. */
public static final String CCR = "ccr";
/** Name constant for the data frame feature. */
public static final String DATA_FRAME = "data_frame";

private XPackField() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ private XPackSettings() {
*/
public static final Setting<Boolean> CCR_ENABLED_SETTING = Setting.boolSetting("xpack.ccr.enabled", true, Property.NodeScope);

/** Setting for enabling or disabling data frame. Defaults to true. */
public static final Setting<Boolean> DATA_FRAME_ENABLED = Setting.boolSetting("xpack.data_frame.enabled", true,
Setting.Property.NodeScope);

/** Setting for enabling or disabling security. Defaults to true. */
public static final Setting<Boolean> SECURITY_ENABLED = Setting.boolSetting("xpack.security.enabled", true, Setting.Property.NodeScope);

Expand Down Expand Up @@ -209,6 +213,7 @@ public static List<Setting<?>> getAllSettings() {
settings.add(ROLLUP_ENABLED);
settings.add(PASSWORD_HASHING_ALGORITHM);
settings.add(INDEX_LIFECYCLE_ENABLED);
settings.add(DATA_FRAME_ENABLED);
return Collections.unmodifiableList(settings);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.dataframe;

import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.XPackFeatureSet.Usage;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.dataframe.transform.DataFrameIndexerTransformStats;

import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;

public class DataFrameFeatureSetUsage extends Usage {

private final Map<String, Long> transformCountByState;
private final DataFrameIndexerTransformStats accumulatedStats;

public DataFrameFeatureSetUsage(StreamInput in) throws IOException {
super(in);
this.transformCountByState = in.readMap(StreamInput::readString, StreamInput::readLong);
this.accumulatedStats = new DataFrameIndexerTransformStats(in);
}

public DataFrameFeatureSetUsage(boolean available, boolean enabled, Map<String, Long> transformCountByState,
DataFrameIndexerTransformStats accumulatedStats) {
super(XPackField.DATA_FRAME, available, enabled);
this.transformCountByState = Objects.requireNonNull(transformCountByState);
this.accumulatedStats = Objects.requireNonNull(accumulatedStats);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(transformCountByState, StreamOutput::writeString, StreamOutput::writeLong);
accumulatedStats.writeTo(out);
}

@Override
protected void innerXContent(XContentBuilder builder, Params params) throws IOException {
super.innerXContent(builder, params);
if (transformCountByState.isEmpty() == false) {
builder.startObject(DataFrameField.TRANSFORMS.getPreferredName());
long all = 0L;
for (Entry<String, Long> entry : transformCountByState.entrySet()) {
builder.field(entry.getKey(), entry.getValue());
all+=entry.getValue();
}
builder.field(MetaData.ALL, all);
builder.endObject();

// if there are no transforms, do not show any stats
builder.field(DataFrameField.STATS_FIELD.getPreferredName(), accumulatedStats);
}
}

@Override
public int hashCode() {
return Objects.hash(enabled, available, transformCountByState, accumulatedStats);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
DataFrameFeatureSetUsage other = (DataFrameFeatureSetUsage) obj;
return Objects.equals(name, other.name) && available == other.available && enabled == other.enabled
&& Objects.equals(transformCountByState, other.transformCountByState)
&& Objects.equals(accumulatedStats, other.accumulatedStats);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.dataframe;

import org.elasticsearch.common.ParseField;

/*
* Utility class to hold common fields and strings for data frame.
*/
public final class DataFrameField {

// common parse fields
public static final ParseField AGGREGATIONS = new ParseField("aggregations");
public static final ParseField AGGS = new ParseField("aggs");
public static final ParseField ID = new ParseField("id");
public static final ParseField TRANSFORMS = new ParseField("transforms");
public static final ParseField COUNT = new ParseField("count");
public static final ParseField GROUP_BY = new ParseField("group_by");
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ParseField WAIT_FOR_COMPLETION = new ParseField("wait_for_completion");
public static final ParseField STATS_FIELD = new ParseField("stats");

// common strings
public static final String TASK_NAME = "data_frame/transforms";
public static final String REST_BASE_PATH = "/_data_frame/";
public static final String REST_BASE_PATH_TRANSFORMS_BY_ID = REST_BASE_PATH + "transforms/{id}/";

// note: this is used to match tasks
public static final String PERSISTENT_TASK_DESCRIPTION_PREFIX = "data_frame_";

// strings for meta information
public static final String META_FIELDNAME = "_data_frame";
public static final String CREATION_DATE_MILLIS = "creation_date_in_millis";
public static final String VERSION = "version";
public static final String CREATED = "created";
public static final String CREATED_BY = "created_by";
public static final String TRANSFORM = "transform";
public static final String DATA_FRAME_SIGNATURE = "data-frame-transform";

private DataFrameField() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.dataframe;

import java.text.MessageFormat;
import java.util.Locale;

public class DataFrameMessages {

public static final String REST_STOP_TRANSFORM_WAIT_FOR_COMPLETION_TIMEOUT =
"Timed out after [{0}] while waiting for data frame transform [{1}] to stop";
public static final String REST_STOP_TRANSFORM_WAIT_FOR_COMPLETION_INTERRUPT =
"Interrupted while waiting for data frame transform [{0}] to stop";
public static final String REST_PUT_DATA_FRAME_TRANSFORM_EXISTS = "Transform with id [{0}] already exists";
public static final String REST_DATA_FRAME_UNKNOWN_TRANSFORM = "Transform with id [{0}] could not be found";
public static final String REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION =
"Failed to validate data frame configuration";
public static final String REST_PUT_DATA_FRAME_FAILED_PERSIST_TRANSFORM_CONFIGURATION = "Failed to persist data frame configuration";
public static final String REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_TARGET_MAPPINGS = "Failed to deduce target mappings";
public static final String REST_PUT_DATA_FRAME_FAILED_TO_CREATE_TARGET_INDEX = "Failed to create target index";
public static final String REST_PUT_DATA_FRAME_FAILED_TO_START_PERSISTENT_TASK =
"Failed to start persistent task, configuration has been cleaned up: [{0}]";
public static final String REST_DATA_FRAME_FAILED_TO_SERIALIZE_TRANSFORM = "Failed to serialise transform [{0}]";

public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform[{1}]";
public static final String FAILED_TO_LOAD_TRANSFORM_CONFIGURATION =
"Failed to load data frame transform configuration for transform [{0}]";
public static final String FAILED_TO_PARSE_TRANSFORM_CONFIGURATION =
"Failed to parse transform configuration for data frame transform [{0}]";
public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_NO_TRANSFORM =
"Data frame transform configuration must specify exactly 1 function";
public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_PIVOT_NO_GROUP_BY =
"Data frame pivot transform configuration must specify at least 1 group_by";
public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_PIVOT_NO_AGGREGATION =
"Data frame pivot transform configuration must specify at least 1 aggregation";
public static final String DATA_FRAME_TRANSFORM_PIVOT_FAILED_TO_CREATE_COMPOSITE_AGGREGATION =
"Failed to create composite aggregation from pivot function";
public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID =
"Data frame transform configuration [{0}] has invalid elements";

public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_QUERY =
"Failed to parse query for data frame transform";
public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_GROUP_BY =
"Failed to parse group_by for data frame pivot transform";
public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_AGGREGATION =
"Failed to parse aggregation for data frame pivot transform";

private DataFrameMessages() {
}

/**
* Returns the message parameter
*
* @param message Should be one of the statics defined in this class
*/
public static String getMessage(String message) {
return message;
}

/**
* Format the message with the supplied arguments
*
* @param message Should be one of the statics defined in this class
* @param args MessageFormat arguments. See {@linkplain MessageFormat#format(Object)}]
*/
public static String getMessage(String message, Object... args) {
return new MessageFormat(message, Locale.ROOT).format(args);
}
}
Loading

0 comments on commit 0a9953a

Please sign in to comment.