Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,36 @@
import org.elasticsearch.xpack.core.XPackField;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;

public class TransformFeatureSetUsage extends Usage {

private static final String FEATURE_COUNTS = "feature_counts";

private final Map<String, Long> transformCountByState;
private final Map<String, Long> transformCountByFeature;
private final TransformIndexerStats accumulatedStats;

public TransformFeatureSetUsage(StreamInput in) throws IOException {
super(in);
this.transformCountByState = in.readMap(StreamInput::readString, StreamInput::readLong);
if (in.getVersion().onOrAfter(Version.V_8_0_0)) { // TODO: V_7_13_0
this.transformCountByFeature = in.readMap(StreamInput::readString, StreamInput::readLong);
} else {
this.transformCountByFeature = Collections.emptyMap();
}
this.accumulatedStats = new TransformIndexerStats(in);
}

public TransformFeatureSetUsage(Map<String, Long> transformCountByState,
TransformIndexerStats accumulatedStats) {
Map<String, Long> transformCountByFeature,
TransformIndexerStats accumulatedStats) {
super(XPackField.TRANSFORM, true, true);
this.transformCountByState = Objects.requireNonNull(transformCountByState);
this.transformCountByFeature = Objects.requireNonNull(transformCountByFeature);
this.accumulatedStats = Objects.requireNonNull(accumulatedStats);
}

Expand All @@ -48,30 +59,37 @@ public Version getMinimalSupportedVersion() {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(transformCountByState, StreamOutput::writeString, StreamOutput::writeLong);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) { // TODO: V_7_13_0
out.writeMap(transformCountByFeature, StreamOutput::writeString, StreamOutput::writeLong);
}
accumulatedStats.writeTo(out);
}

@Override
protected void innerXContent(XContentBuilder builder, Params params) throws IOException {
super.innerXContent(builder, params);
if (transformCountByState.isEmpty() == false) {
// Transforms by state
builder.startObject(TransformField.TRANSFORMS.getPreferredName());
long all = 0L;
for (Entry<String, Long> entry : transformCountByState.entrySet()) {
builder.field(entry.getKey(), entry.getValue());
all+=entry.getValue();
all += entry.getValue();
}
builder.field(Metadata.ALL, all);
builder.endObject();

// Transform count for each feature
builder.field(FEATURE_COUNTS, transformCountByFeature);

// if there are no transforms, do not show any stats
builder.field(TransformField.STATS_FIELD.getPreferredName(), accumulatedStats);
}
}

@Override
public int hashCode() {
return Objects.hash(enabled, available, transformCountByState, accumulatedStats);
return Objects.hash(enabled, available, transformCountByState, transformCountByFeature, accumulatedStats);
}

@Override
Expand All @@ -85,6 +103,7 @@ public boolean equals(Object obj) {
TransformFeatureSetUsage other = (TransformFeatureSetUsage) obj;
return Objects.equals(name, other.name) && available == other.available && enabled == other.enabled
&& Objects.equals(transformCountByState, other.transformCountByState)
&& Objects.equals(transformCountByFeature, other.transformCountByFeature)
&& Objects.equals(accumulatedStats, other.accumulatedStats);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.elasticsearch.common.ParseField;

/*
* Utility class to hold common fields and strings for data frame.
* Utility class to hold common fields and strings for transform.
*/
public final class TransformField {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;

Expand All @@ -51,10 +52,20 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
/** Version in which {@code FieldCapabilitiesRequest.runtime_fields} field was introduced. */
private static final Version FIELD_CAPS_RUNTIME_MAPPINGS_INTRODUCED_VERSION = Version.V_7_12_0;

// types of transforms
public static final ParseField PIVOT_TRANSFORM = new ParseField("pivot");
public static final ParseField LATEST_TRANSFORM = new ParseField("latest");
/** Specifies all the possible transform functions. */
public enum Function {
PIVOT, LATEST;

private final ParseField parseField;

Function() {
this.parseField = new ParseField(name().toLowerCase(Locale.ROOT));
}

public ParseField getParseField() {
return parseField;
}
}
private static final ConstructingObjectParser<TransformConfig, String> STRICT_PARSER = createParser(false);
private static final ConstructingObjectParser<TransformConfig, String> LENIENT_PARSER = createParser(true);
static final int MAX_DESCRIPTION_LENGTH = 1_000;
Expand Down Expand Up @@ -149,8 +160,8 @@ private static ConstructingObjectParser<TransformConfig, String> createParser(bo
parser.declareNamedObject(optionalConstructorArg(), (p, c, n) -> p.namedObject(SyncConfig.class, n, c), TransformField.SYNC);
parser.declareString(optionalConstructorArg(), TransformField.INDEX_DOC_TYPE);
parser.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), HEADERS);
parser.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p, lenient), PIVOT_TRANSFORM);
parser.declareObject(optionalConstructorArg(), (p, c) -> LatestConfig.fromXContent(p, lenient), LATEST_TRANSFORM);
parser.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p, lenient), Function.PIVOT.getParseField());
parser.declareObject(optionalConstructorArg(), (p, c) -> LatestConfig.fromXContent(p, lenient), Function.LATEST.getParseField());
parser.declareString(optionalConstructorArg(), TransformField.DESCRIPTION);
parser.declareObject(optionalConstructorArg(), (p, c) -> SettingsConfig.fromXContent(p, lenient), TransformField.SETTINGS);
parser.declareNamedObject(
Expand Down Expand Up @@ -429,10 +440,10 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
builder.endObject();
}
if (pivotConfig != null) {
builder.field(PIVOT_TRANSFORM.getPreferredName(), pivotConfig);
builder.field(Function.PIVOT.getParseField().getPreferredName(), pivotConfig);
}
if (latestConfig != null) {
builder.field(LATEST_TRANSFORM.getPreferredName(), latestConfig);
builder.field(Function.LATEST.getParseField().getPreferredName(), latestConfig);
}
if (description != null) {
builder.field(TransformField.DESCRIPTION.getPreferredName(), description);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public final class TransformInternalIndexConstants {
public static final String TRANSFORM_PREFIX_DEPRECATED = ".data-frame-";

// version is not a rollover pattern, however padded because sort is string based
public static final Version INDEX_VERSION_LAST_CHANGED = Version.V_7_12_0;
public static final String INDEX_VERSION = "006";
public static final Version INDEX_VERSION_LAST_CHANGED = Version.V_7_13_0;
public static final String INDEX_VERSION = "007";
public static final String INDEX_PATTERN = TRANSFORM_PREFIX + "internal-";
public static final String LATEST_INDEX_VERSIONED_NAME = INDEX_PATTERN + INDEX_VERSION;
public static final String LATEST_INDEX_NAME = LATEST_INDEX_VERSIONED_NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,30 @@
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStatsTests;

import java.util.HashMap;
import java.util.Arrays;
import java.util.Map;

import static java.util.stream.Collectors.toMap;

public class TransformFeatureSetUsageTests extends AbstractWireSerializingTestCase<TransformFeatureSetUsage> {

@Override
protected TransformFeatureSetUsage createTestInstance() {
Map<String, Long> transformCountByState = new HashMap<>();

if (randomBoolean()) {
transformCountByState.put(randomFrom(IndexerState.values()).toString(), randomLong());
}

return new TransformFeatureSetUsage(transformCountByState, TransformIndexerStatsTests.randomStats());
Map<String, Long> transformCountByState =
randomSubsetOf(Arrays.asList(IndexerState.values())).stream()
.collect(toMap(state -> state.value(), state -> randomLong()));
Map<String, Long> transformCountByFeature =
randomList(10, () -> randomAlphaOfLength(10)).stream()
.collect(toMap(f -> f, f -> randomLong()));
TransformIndexerStats accumulatedStats = TransformIndexerStatsTests.randomStats();
return new TransformFeatureSetUsage(transformCountByState, transformCountByFeature, accumulatedStats);
}

@Override
protected Reader<TransformFeatureSetUsage> instanceReader() {
return TransformFeatureSetUsage::new;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,6 @@ protected void createReviewsIndexNano() throws IOException {

protected void createContinuousPivotReviewsTransform(String transformId, String transformIndex, String authHeader) throws IOException {

final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, authHeader);

String config = "{ \"dest\": {\"index\":\"" + transformIndex + "\"}," + " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
// Set frequency high for testing
+ " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"15m\"}},"
Expand All @@ -266,10 +264,7 @@ protected void createContinuousPivotReviewsTransform(String transformId, String
+ " } } } }"
+ "}";

createTransformRequest.setJsonEntity(config);

Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
createReviewsTransform(transformId, authHeader, config);
}

protected void createPivotReviewsTransform(
Expand All @@ -280,8 +275,6 @@ protected void createPivotReviewsTransform(
String authHeader,
String sourceIndex
) throws IOException {
final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, authHeader);

String config = "{";

if (pipeline != null) {
Expand Down Expand Up @@ -315,6 +308,25 @@ protected void createPivotReviewsTransform(
+ "\"frequency\":\"1s\""
+ "}";

createReviewsTransform(transformId, authHeader, config);
}

protected void createLatestReviewsTransform(String transformId, String transformIndex) throws IOException {
String config = "{"
+ " \"dest\": {\"index\":\"" + transformIndex + "\"},"
+ " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
+ " \"latest\": {"
+ " \"unique_key\": [ \"user_id\" ],"
+ " \"sort\": \"@timestamp\""
+ " },"
+ "\"frequency\":\"1s\""
+ "}";

createReviewsTransform(transformId, null, config);
}

private void createReviewsTransform(String transformId, String authHeader, String config) throws IOException {
final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, authHeader);
createTransformRequest.setJsonEntity(config);

Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,23 @@ public void testUsage() throws Exception {
assertTrue((boolean) XContentMapValues.extractValue("transform.available", usageAsMap));
assertTrue((boolean) XContentMapValues.extractValue("transform.enabled", usageAsMap));
// no transforms, no stats
assertEquals(null, XContentMapValues.extractValue("transform.transforms", usageAsMap));
assertEquals(null, XContentMapValues.extractValue("transform.stats", usageAsMap));
assertNull(XContentMapValues.extractValue("transform.transforms", usageAsMap));
assertNull(XContentMapValues.extractValue("transform.feature_counts", usageAsMap));
assertNull(XContentMapValues.extractValue("transform.stats", usageAsMap));

// create transforms
createPivotReviewsTransform("test_usage", "pivot_reviews", null);
createPivotReviewsTransform("test_usage_no_stats", "pivot_reviews_no_stats", null);
createContinuousPivotReviewsTransform("test_usage_continuous", "pivot_reviews_continuous", null);
createLatestReviewsTransform("test_usage_latest", "latest_reviews");
usageResponse = client().performRequest(new Request("GET", "_xpack/usage"));
usageAsMap = entityAsMap(usageResponse);
assertEquals(3, XContentMapValues.extractValue("transform.transforms._all", usageAsMap));
assertEquals(3, XContentMapValues.extractValue("transform.transforms.stopped", usageAsMap));
assertEquals(4, XContentMapValues.extractValue("transform.transforms._all", usageAsMap));
assertEquals(4, XContentMapValues.extractValue("transform.transforms.stopped", usageAsMap));
assertEquals(3, XContentMapValues.extractValue("transform.feature_counts.pivot", usageAsMap));
assertEquals(1, XContentMapValues.extractValue("transform.feature_counts.latest", usageAsMap));
assertEquals(0, XContentMapValues.extractValue("transform.feature_counts.retention_policy", usageAsMap));
assertEquals(1, XContentMapValues.extractValue("transform.feature_counts.sync", usageAsMap));

startAndWaitForTransform("test_usage", "pivot_reviews");
stopTransform("test_usage", false);
Expand Down Expand Up @@ -100,9 +106,13 @@ public void testUsage() throws Exception {
Response response = client().performRequest(new Request("GET", "_xpack/usage"));
Map<String, Object> statsMap = entityAsMap(response);
// we should see some stats
assertEquals(3, XContentMapValues.extractValue("transform.transforms._all", statsMap));
assertEquals(2, XContentMapValues.extractValue("transform.transforms.stopped", statsMap));
assertEquals(4, XContentMapValues.extractValue("transform.transforms._all", statsMap));
assertEquals(3, XContentMapValues.extractValue("transform.transforms.stopped", statsMap));
assertEquals(1, XContentMapValues.extractValue("transform.transforms.started", statsMap));
assertEquals(3, XContentMapValues.extractValue("transform.feature_counts.pivot", statsMap));
assertEquals(1, XContentMapValues.extractValue("transform.feature_counts.latest", statsMap));
assertEquals(0, XContentMapValues.extractValue("transform.feature_counts.retention_policy", statsMap));
assertEquals(1, XContentMapValues.extractValue("transform.feature_counts.sync", statsMap));
for (String statName : PROVIDED_STATS) {
// the trigger count can be off: e.g. if the scheduler kicked in before usage has been called,
// or if the scheduler triggered later, but state hasn't been persisted (by design)
Expand Down Expand Up @@ -130,11 +140,15 @@ public void testUsage() throws Exception {
usageResponse = client().performRequest(new Request("GET", "_xpack/usage"));
usageAsMap = entityAsMap(usageResponse);

assertEquals(3, XContentMapValues.extractValue("transform.transforms._all", usageAsMap));
assertEquals(3, XContentMapValues.extractValue("transform.transforms.stopped", usageAsMap));
assertEquals(4, XContentMapValues.extractValue("transform.transforms._all", usageAsMap));
assertEquals(4, XContentMapValues.extractValue("transform.transforms.stopped", usageAsMap));
assertEquals(3, XContentMapValues.extractValue("transform.feature_counts.pivot", usageAsMap));
assertEquals(1, XContentMapValues.extractValue("transform.feature_counts.latest", usageAsMap));
assertEquals(0, XContentMapValues.extractValue("transform.feature_counts.retention_policy", usageAsMap));
assertEquals(1, XContentMapValues.extractValue("transform.feature_counts.sync", usageAsMap));
}

private double extractStatsAsDouble(Object statsObject) {
private static double extractStatsAsDouble(Object statsObject) {
if (statsObject instanceof Integer) {
return ((Integer) statsObject).doubleValue();
} else if (statsObject instanceof Double) {
Expand Down
Loading