Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,10 @@ private static TransformConfig applyRewriteForUpdate(Builder builder) {
return builder.setVersion(Version.CURRENT).build();
}

public static Builder builder() {
return new Builder();
}

public static class Builder {
private String id;
private SourceConfig source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,23 @@

package org.elasticsearch.xpack.transform.integration;

import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetMappingsRequest;
import org.elasticsearch.client.indices.GetMappingsResponse;
import org.elasticsearch.client.transform.PreviewTransformResponse;
import org.elasticsearch.client.transform.transforms.TransformConfig;
import org.elasticsearch.client.transform.transforms.latest.LatestConfig;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.xpack.core.transform.transforms.QueryConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.latest.LatestConfig;
import org.junit.After;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -38,20 +32,19 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

@SuppressWarnings("removal")
public class LatestIT extends TransformIntegTestCase {

private static final String SOURCE_INDEX_NAME = "basic-crud-latest-reviews";
private static final int NUM_USERS = 28;

private static final String TRANSFORM_NAME = "transform-crud-latest";

private static final Integer getUserIdForRow(int row) {
private static Integer getUserIdForRow(int row) {
int userId = row % (NUM_USERS + 1);
return userId < NUM_USERS ? userId : null;
}

private static final String getDateStringForRow(int row) {
private static String getDateStringForRow(int row) {
int month = 1 + (row / 28);
int day = 1 + (row % 28);
return "2017-" + (month < 10 ? "0" + month : month) + "-" + (day < 10 ? "0" + day : day) + "T12:30:00Z";
Expand All @@ -64,7 +57,7 @@ private static final String getDateStringForRow(int row) {
private static final String STARS = "stars";
private static final String COMMENT = "comment";

private static final Map<String, Object> row(String userId, String businessId, int count, int stars, String timestamp, String comment) {
private static Map<String, Object> row(String userId, String businessId, int count, int stars, String timestamp, String comment) {
return new HashMap<>() {
{
if (userId != null) {
Expand Down Expand Up @@ -113,59 +106,60 @@ private static final Map<String, Object> row(String userId, String businessId, i
row(null, "business_36", 86, 1, "2017-04-03T12:30:00Z", "Great stuff, deserves 1 stars") };

@After
public void cleanTransforms() throws IOException {
public void cleanTransforms() throws Exception {
cleanUp();
}

@SuppressWarnings("unchecked")
public void testLatest() throws Exception {
createReviewsIndex(SOURCE_INDEX_NAME, 100, NUM_USERS, LatestIT::getUserIdForRow, LatestIT::getDateStringForRow);

String destIndexName = "reviews-latest";
TransformConfig transformConfig = createTransformConfigBuilder(
TRANSFORM_NAME,
destIndexName,
QueryBuilders.matchAllQuery(),
QueryConfig.matchAll(),
SOURCE_INDEX_NAME
).setLatestConfig(LatestConfig.builder().setUniqueKey(USER_ID).setSort(TIMESTAMP).build()).build();
assertTrue(putTransform(transformConfig, RequestOptions.DEFAULT).isAcknowledged());
assertTrue(startTransform(transformConfig.getId(), RequestOptions.DEFAULT).isAcknowledged());
).setLatestConfig(new LatestConfig(List.of(USER_ID), TIMESTAMP)).build();
putTransform(TRANSFORM_NAME, Strings.toString(transformConfig), RequestOptions.DEFAULT);
startTransform(transformConfig.getId(), RequestOptions.DEFAULT);
waitUntilCheckpoint(transformConfig.getId(), 1L);
stopTransform(transformConfig.getId());

try (RestHighLevelClient restClient = new TestRestHighLevelClient()) {
restClient.indices().refresh(new RefreshRequest(destIndexName), RequestOptions.DEFAULT);
// Verify destination index mappings
GetMappingsResponse destIndexMapping = restClient.indices()
.getMapping(new GetMappingsRequest().indices(destIndexName), RequestOptions.DEFAULT);
assertThat(destIndexMapping.mappings().get(destIndexName).sourceAsMap(), allOf(hasKey("_meta"), hasKey("properties")));
// Verify destination index contents
SearchResponse searchResponse = restClient.search(
new SearchRequest(destIndexName).source(new SearchSourceBuilder().size(1000)),
RequestOptions.DEFAULT
);
assertThat(searchResponse.getHits().getTotalHits().value, is(equalTo(Long.valueOf(NUM_USERS + 1))));
assertThat(
Stream.of(searchResponse.getHits().getHits()).map(SearchHit::getSourceAsMap).collect(toList()),
containsInAnyOrder(EXPECTED_DEST_INDEX_ROWS)
);
}
refreshIndex(destIndexName, RequestOptions.DEFAULT);
var mappings = getIndexMapping(destIndexName, RequestOptions.DEFAULT);
assertThat(
(Map<String, Object>) XContentMapValues.extractValue(destIndexName + ".mappings", mappings),
allOf(hasKey("_meta"), hasKey("properties"))
);
var searchResponse = search(destIndexName, 1000, RequestOptions.DEFAULT);
assertThat((Integer) XContentMapValues.extractValue("hits.total.value", searchResponse), is(equalTo(NUM_USERS + 1)));
var hits = (List<Map<String, Object>>) XContentMapValues.extractValue("hits.hits", searchResponse);
var searchedDocs = hits.stream().map(h -> (Map<String, Object>) h.get("_source")).collect(Collectors.toList());
assertThat(searchedDocs, containsInAnyOrder(EXPECTED_DEST_INDEX_ROWS));
}

private Map<String, Object> search(String index, int size, RequestOptions options) throws IOException {
var r = new Request("GET", index + "/_search?size=" + size);
r.setOptions(options);
return entityAsMap(client().performRequest(r));
}

@SuppressWarnings("unchecked")
public void testLatestPreview() throws Exception {
createReviewsIndex(SOURCE_INDEX_NAME, 100, NUM_USERS, LatestIT::getUserIdForRow, LatestIT::getDateStringForRow);

TransformConfig transformConfig = createTransformConfigBuilder(
TRANSFORM_NAME,
"dummy",
QueryBuilders.matchAllQuery(),
SOURCE_INDEX_NAME
).setLatestConfig(LatestConfig.builder().setUniqueKey(USER_ID).setSort(TIMESTAMP).build()).build();
TransformConfig transformConfig = createTransformConfigBuilder(TRANSFORM_NAME, "dummy", QueryConfig.matchAll(), SOURCE_INDEX_NAME)
.setLatestConfig(new LatestConfig(List.of(USER_ID), TIMESTAMP))
.build();

PreviewTransformResponse previewResponse = previewTransform(transformConfig, RequestOptions.DEFAULT);
var previewResponse = previewTransform(Strings.toString(transformConfig), RequestOptions.DEFAULT);
// Verify preview mappings
assertThat(previewResponse.getMappings(), allOf(hasKey("_meta"), hasEntry("properties", emptyMap())));
var mappings = (Map<String, Object>) XContentMapValues.extractValue("generated_dest_index.mappings", previewResponse);
assertThat(mappings, allOf(hasKey("_meta"), hasEntry("properties", emptyMap())));
// Verify preview contents
assertThat(previewResponse.getDocs(), hasSize(NUM_USERS + 1));
assertThat(previewResponse.getDocs(), containsInAnyOrder(EXPECTED_DEST_INDEX_ROWS));
var docs = (List<Map<String, Object>>) XContentMapValues.extractValue("preview", previewResponse);
assertThat(docs, hasSize(NUM_USERS + 1));
assertThat(docs, containsInAnyOrder(EXPECTED_DEST_INDEX_ROWS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.transform.transforms.TimeSyncConfig;
import org.elasticsearch.client.transform.transforms.TransformConfig;
import org.elasticsearch.client.transform.transforms.pivot.SingleGroupSource;
import org.elasticsearch.client.transform.transforms.pivot.TermsGroupSource;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.core.transform.transforms.QueryConfig;
import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource;
import org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSource;
import org.junit.After;
import org.junit.Before;

Expand Down Expand Up @@ -61,8 +62,8 @@ public void testTransformFeatureReset() throws Exception {

Map<String, SingleGroupSource> groups = new HashMap<>();
groups.put("by-day", createDateHistogramGroupSourceWithCalendarInterval("timestamp", DateHistogramInterval.DAY, null));
groups.put("by-user", TermsGroupSource.builder().setField("user_id").build());
groups.put("by-business", TermsGroupSource.builder().setField("business_id").build());
groups.put("by-user", new TermsGroupSource("user_id", null, false));
groups.put("by-business", new TermsGroupSource("business_id", null, false));

AggregatorFactories.Builder aggs = AggregatorFactories.builder()
.addAggregator(AggregationBuilders.avg("review_score").field("stars"))
Expand All @@ -71,21 +72,21 @@ public void testTransformFeatureReset() throws Exception {
TransformConfig config = createTransformConfigBuilder(
transformId,
"reviews-by-user-business-day",
QueryBuilders.matchAllQuery(),
QueryConfig.matchAll(),
indexName
).setPivotConfig(createPivotConfig(groups, aggs)).build();

assertTrue(putTransform(config, RequestOptions.DEFAULT).isAcknowledged());
assertTrue(startTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged());
putTransform(transformId, Strings.toString(config), RequestOptions.DEFAULT);
startTransform(config.getId(), RequestOptions.DEFAULT);

transformId = "continuous-transform-feature-reset";
config = createTransformConfigBuilder(transformId, "reviews-by-user-business-day-cont", QueryBuilders.matchAllQuery(), indexName)
String continuousTransformId = "continuous-transform-feature-reset";
config = createTransformConfigBuilder(continuousTransformId, "reviews-by-user-business-day-cont", QueryConfig.matchAll(), indexName)
.setPivotConfig(createPivotConfig(groups, aggs))
.setSyncConfig(TimeSyncConfig.builder().setField("timestamp").setDelay(TimeValue.timeValueSeconds(1)).build())
.setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)))
.build();

assertTrue(putTransform(config, RequestOptions.DEFAULT).isAcknowledged());
assertTrue(startTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged());
putTransform(continuousTransformId, Strings.toString(config), RequestOptions.DEFAULT);
startTransform(continuousTransformId, RequestOptions.DEFAULT);
client().performRequest(new Request(HttpPost.METHOD_NAME, "/_features/_reset"));

Response response = adminClient().performRequest(new Request("GET", "/_cluster/state?metric=metadata"));
Expand All @@ -97,7 +98,7 @@ public void testTransformFeatureReset() throws Exception {
assertThat(transformMetadata, is(nullValue()));

// assert transforms are gone
assertThat(getTransform("_all").getCount(), equalTo(0L));
assertThat((Integer) getTransforms("_all").get("count"), equalTo(0));

// assert transform indices are gone
assertThat(ESRestTestCase.entityAsMap(adminClient().performRequest(new Request("GET", ".transform-*"))), is(anEmptyMap()));
Expand Down
Loading