diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java index c6ba295f156d9..b99b7c32807b9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java @@ -659,6 +659,10 @@ private static TransformConfig applyRewriteForUpdate(Builder builder) { return builder.setVersion(Version.CURRENT).build(); } + public static Builder builder() { + return new Builder(); + } + public static class Builder { private String id; private SourceConfig source; diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/LatestIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/LatestIT.java index 43458d25ffef1..53f6bddc632bc 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/LatestIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/LatestIT.java @@ -7,29 +7,23 @@ package org.elasticsearch.xpack.transform.integration; -import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.client.indices.GetMappingsRequest; -import org.elasticsearch.client.indices.GetMappingsResponse; -import org.elasticsearch.client.transform.PreviewTransformResponse; -import org.elasticsearch.client.transform.transforms.TransformConfig; -import org.elasticsearch.client.transform.transforms.latest.LatestConfig; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.xpack.core.transform.transforms.QueryConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; +import org.elasticsearch.xpack.core.transform.transforms.latest.LatestConfig; import org.junit.After; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.stream.Stream; +import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; -import static java.util.stream.Collectors.toList; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; @@ -38,7 +32,6 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; -@SuppressWarnings("removal") public class LatestIT extends TransformIntegTestCase { private static final String SOURCE_INDEX_NAME = "basic-crud-latest-reviews"; @@ -46,12 +39,12 @@ public class LatestIT extends TransformIntegTestCase { private static final String TRANSFORM_NAME = "transform-crud-latest"; - private static final Integer getUserIdForRow(int row) { + private static Integer getUserIdForRow(int row) { int userId = row % (NUM_USERS + 1); return userId < NUM_USERS ? userId : null; } - private static final String getDateStringForRow(int row) { + private static String getDateStringForRow(int row) { int month = 1 + (row / 28); int day = 1 + (row % 28); return "2017-" + (month < 10 ? "0" + month : month) + "-" + (day < 10 ? "0" + day : day) + "T12:30:00Z"; @@ -64,7 +57,7 @@ private static final String getDateStringForRow(int row) { private static final String STARS = "stars"; private static final String COMMENT = "comment"; - private static final Map row(String userId, String businessId, int count, int stars, String timestamp, String comment) { + private static Map row(String userId, String businessId, int count, int stars, String timestamp, String comment) { return new HashMap<>() { { if (userId != null) { @@ -113,10 +106,11 @@ private static final Map row(String userId, String businessId, i row(null, "business_36", 86, 1, "2017-04-03T12:30:00Z", "Great stuff, deserves 1 stars") }; @After - public void cleanTransforms() throws IOException { + public void cleanTransforms() throws Exception { cleanUp(); } + @SuppressWarnings("unchecked") public void testLatest() throws Exception { createReviewsIndex(SOURCE_INDEX_NAME, 100, NUM_USERS, LatestIT::getUserIdForRow, LatestIT::getDateStringForRow); @@ -124,48 +118,48 @@ public void testLatest() throws Exception { TransformConfig transformConfig = createTransformConfigBuilder( TRANSFORM_NAME, destIndexName, - QueryBuilders.matchAllQuery(), + QueryConfig.matchAll(), SOURCE_INDEX_NAME - ).setLatestConfig(LatestConfig.builder().setUniqueKey(USER_ID).setSort(TIMESTAMP).build()).build(); - assertTrue(putTransform(transformConfig, RequestOptions.DEFAULT).isAcknowledged()); - assertTrue(startTransform(transformConfig.getId(), RequestOptions.DEFAULT).isAcknowledged()); + ).setLatestConfig(new LatestConfig(List.of(USER_ID), TIMESTAMP)).build(); + putTransform(TRANSFORM_NAME, Strings.toString(transformConfig), RequestOptions.DEFAULT); + startTransform(transformConfig.getId(), RequestOptions.DEFAULT); waitUntilCheckpoint(transformConfig.getId(), 1L); stopTransform(transformConfig.getId()); - try (RestHighLevelClient restClient = new TestRestHighLevelClient()) { - restClient.indices().refresh(new RefreshRequest(destIndexName), RequestOptions.DEFAULT); - // Verify destination index mappings - GetMappingsResponse destIndexMapping = restClient.indices() - .getMapping(new GetMappingsRequest().indices(destIndexName), RequestOptions.DEFAULT); - assertThat(destIndexMapping.mappings().get(destIndexName).sourceAsMap(), allOf(hasKey("_meta"), hasKey("properties"))); - // Verify destination index contents - SearchResponse searchResponse = restClient.search( - new SearchRequest(destIndexName).source(new SearchSourceBuilder().size(1000)), - RequestOptions.DEFAULT - ); - assertThat(searchResponse.getHits().getTotalHits().value, is(equalTo(Long.valueOf(NUM_USERS + 1)))); - assertThat( - Stream.of(searchResponse.getHits().getHits()).map(SearchHit::getSourceAsMap).collect(toList()), - containsInAnyOrder(EXPECTED_DEST_INDEX_ROWS) - ); - } + refreshIndex(destIndexName, RequestOptions.DEFAULT); + var mappings = getIndexMapping(destIndexName, RequestOptions.DEFAULT); + assertThat( + (Map) XContentMapValues.extractValue(destIndexName + ".mappings", mappings), + allOf(hasKey("_meta"), hasKey("properties")) + ); + var searchResponse = search(destIndexName, 1000, RequestOptions.DEFAULT); + assertThat((Integer) XContentMapValues.extractValue("hits.total.value", searchResponse), is(equalTo(NUM_USERS + 1))); + var hits = (List>) XContentMapValues.extractValue("hits.hits", searchResponse); + var searchedDocs = hits.stream().map(h -> (Map) h.get("_source")).collect(Collectors.toList()); + assertThat(searchedDocs, containsInAnyOrder(EXPECTED_DEST_INDEX_ROWS)); } + private Map search(String index, int size, RequestOptions options) throws IOException { + var r = new Request("GET", index + "/_search?size=" + size); + r.setOptions(options); + return entityAsMap(client().performRequest(r)); + } + + @SuppressWarnings("unchecked") public void testLatestPreview() throws Exception { createReviewsIndex(SOURCE_INDEX_NAME, 100, NUM_USERS, LatestIT::getUserIdForRow, LatestIT::getDateStringForRow); - TransformConfig transformConfig = createTransformConfigBuilder( - TRANSFORM_NAME, - "dummy", - QueryBuilders.matchAllQuery(), - SOURCE_INDEX_NAME - ).setLatestConfig(LatestConfig.builder().setUniqueKey(USER_ID).setSort(TIMESTAMP).build()).build(); + TransformConfig transformConfig = createTransformConfigBuilder(TRANSFORM_NAME, "dummy", QueryConfig.matchAll(), SOURCE_INDEX_NAME) + .setLatestConfig(new LatestConfig(List.of(USER_ID), TIMESTAMP)) + .build(); - PreviewTransformResponse previewResponse = previewTransform(transformConfig, RequestOptions.DEFAULT); + var previewResponse = previewTransform(Strings.toString(transformConfig), RequestOptions.DEFAULT); // Verify preview mappings - assertThat(previewResponse.getMappings(), allOf(hasKey("_meta"), hasEntry("properties", emptyMap()))); + var mappings = (Map) XContentMapValues.extractValue("generated_dest_index.mappings", previewResponse); + assertThat(mappings, allOf(hasKey("_meta"), hasEntry("properties", emptyMap()))); // Verify preview contents - assertThat(previewResponse.getDocs(), hasSize(NUM_USERS + 1)); - assertThat(previewResponse.getDocs(), containsInAnyOrder(EXPECTED_DEST_INDEX_ROWS)); + var docs = (List>) XContentMapValues.extractValue("preview", previewResponse); + assertThat(docs, hasSize(NUM_USERS + 1)); + assertThat(docs, containsInAnyOrder(EXPECTED_DEST_INDEX_ROWS)); } } diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TestFeatureResetIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TestFeatureResetIT.java index 31dc881dfd1f9..129f4e3d01d2e 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TestFeatureResetIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TestFeatureResetIT.java @@ -10,16 +10,17 @@ import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; -import org.elasticsearch.client.transform.transforms.TimeSyncConfig; -import org.elasticsearch.client.transform.transforms.TransformConfig; -import org.elasticsearch.client.transform.transforms.pivot.SingleGroupSource; -import org.elasticsearch.client.transform.transforms.pivot.TermsGroupSource; +import org.elasticsearch.common.Strings; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xpack.core.transform.transforms.QueryConfig; +import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; +import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource; +import org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSource; import org.junit.After; import org.junit.Before; @@ -61,8 +62,8 @@ public void testTransformFeatureReset() throws Exception { Map groups = new HashMap<>(); groups.put("by-day", createDateHistogramGroupSourceWithCalendarInterval("timestamp", DateHistogramInterval.DAY, null)); - groups.put("by-user", TermsGroupSource.builder().setField("user_id").build()); - groups.put("by-business", TermsGroupSource.builder().setField("business_id").build()); + groups.put("by-user", new TermsGroupSource("user_id", null, false)); + groups.put("by-business", new TermsGroupSource("business_id", null, false)); AggregatorFactories.Builder aggs = AggregatorFactories.builder() .addAggregator(AggregationBuilders.avg("review_score").field("stars")) @@ -71,21 +72,21 @@ public void testTransformFeatureReset() throws Exception { TransformConfig config = createTransformConfigBuilder( transformId, "reviews-by-user-business-day", - QueryBuilders.matchAllQuery(), + QueryConfig.matchAll(), indexName ).setPivotConfig(createPivotConfig(groups, aggs)).build(); - assertTrue(putTransform(config, RequestOptions.DEFAULT).isAcknowledged()); - assertTrue(startTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged()); + putTransform(transformId, Strings.toString(config), RequestOptions.DEFAULT); + startTransform(config.getId(), RequestOptions.DEFAULT); - transformId = "continuous-transform-feature-reset"; - config = createTransformConfigBuilder(transformId, "reviews-by-user-business-day-cont", QueryBuilders.matchAllQuery(), indexName) + String continuousTransformId = "continuous-transform-feature-reset"; + config = createTransformConfigBuilder(continuousTransformId, "reviews-by-user-business-day-cont", QueryConfig.matchAll(), indexName) .setPivotConfig(createPivotConfig(groups, aggs)) - .setSyncConfig(TimeSyncConfig.builder().setField("timestamp").setDelay(TimeValue.timeValueSeconds(1)).build()) + .setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))) .build(); - assertTrue(putTransform(config, RequestOptions.DEFAULT).isAcknowledged()); - assertTrue(startTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged()); + putTransform(continuousTransformId, Strings.toString(config), RequestOptions.DEFAULT); + startTransform(continuousTransformId, RequestOptions.DEFAULT); client().performRequest(new Request(HttpPost.METHOD_NAME, "/_features/_reset")); Response response = adminClient().performRequest(new Request("GET", "/_cluster/state?metric=metadata")); @@ -97,7 +98,7 @@ public void testTransformFeatureReset() throws Exception { assertThat(transformMetadata, is(nullValue())); // assert transforms are gone - assertThat(getTransform("_all").getCount(), equalTo(0L)); + assertThat((Integer) getTransforms("_all").get("count"), equalTo(0)); // assert transform indices are gone assertThat(ESRestTestCase.entityAsMap(adminClient().performRequest(new Request("GET", ".transform-*"))), is(anEmptyMap())); diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java index 3bea7753a7622..f670a39725676 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java @@ -7,34 +7,25 @@ package org.elasticsearch.xpack.transform.integration; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; import org.elasticsearch.Version; -import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.ingest.PutPipelineRequest; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.client.transform.transforms.DestConfig; -import org.elasticsearch.client.transform.transforms.SettingsConfig; -import org.elasticsearch.client.transform.transforms.TimeSyncConfig; -import org.elasticsearch.client.transform.transforms.TransformConfig; -import org.elasticsearch.client.transform.transforms.TransformConfigUpdate; -import org.elasticsearch.client.transform.transforms.TransformStats; -import org.elasticsearch.client.transform.transforms.pivot.SingleGroupSource; -import org.elasticsearch.client.transform.transforms.pivot.TermsGroupSource; -import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.client.Response; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; -import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.core.transform.transforms.QueryConfig; +import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; +import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; +import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource; +import org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSource; import org.junit.After; import org.junit.Before; @@ -81,7 +72,7 @@ public void setClusterSettings() throws IOException { } @After - public void cleanTransforms() throws IOException { + public void cleanTransforms() throws Exception { cleanUp(); } @@ -92,8 +83,8 @@ public void testTransformCrud() throws Exception { Map groups = new HashMap<>(); groups.put("by-day", createDateHistogramGroupSourceWithCalendarInterval("timestamp", DateHistogramInterval.DAY, null)); - groups.put("by-user", TermsGroupSource.builder().setField("user_id").build()); - groups.put("by-business", TermsGroupSource.builder().setField("business_id").build()); + groups.put("by-user", new TermsGroupSource("user_id", null, false)); + groups.put("by-business", new TermsGroupSource("business_id", null, false)); AggregatorFactories.Builder aggs = AggregatorFactories.builder() .addAggregator(AggregationBuilders.avg("review_score").field("stars")) @@ -102,24 +93,23 @@ public void testTransformCrud() throws Exception { TransformConfig config = createTransformConfigBuilder( transformId, "reviews-by-user-business-day", - QueryBuilders.matchAllQuery(), + QueryConfig.matchAll(), indexName ).setPivotConfig(createPivotConfig(groups, aggs)).build(); - assertTrue(putTransform(config, RequestOptions.DEFAULT).isAcknowledged()); - assertTrue(startTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged()); + putTransform(transformId, Strings.toString(config), RequestOptions.DEFAULT); + startTransform(config.getId(), RequestOptions.DEFAULT); waitUntilCheckpoint(config.getId(), 1L); stopTransform(config.getId()); - assertBusy( - () -> { assertEquals(TransformStats.State.STOPPED, getTransformStats(config.getId()).getTransformsStats().get(0).getState()); } - ); + assertBusy(() -> { assertEquals("stopped", getTransformStats(config.getId()).get("state")); }); - TransformConfig storedConfig = getTransform(config.getId()).getTransformConfigurations().get(0); - assertThat(storedConfig.getVersion(), equalTo(Version.CURRENT)); + var storedConfig = getTransform(config.getId()); + assertThat(storedConfig.get("version"), equalTo(Version.CURRENT.toString())); Instant now = Instant.now(); - assertTrue("[create_time] is not before current time", storedConfig.getCreateTime().isBefore(now)); + long createTime = (long) storedConfig.get("create_time"); + assertTrue("[create_time] is not before current time", Instant.ofEpochMilli(createTime).isBefore(now)); deleteTransform(config.getId()); } @@ -130,8 +120,8 @@ public void testContinuousTransformCrud() throws Exception { Map groups = new HashMap<>(); groups.put("by-day", createDateHistogramGroupSourceWithCalendarInterval("timestamp", DateHistogramInterval.DAY, null)); - groups.put("by-user", TermsGroupSource.builder().setField("user_id").build()); - groups.put("by-business", TermsGroupSource.builder().setField("business_id").build()); + groups.put("by-user", new TermsGroupSource("user_id", null, false)); + groups.put("by-business", new TermsGroupSource("business_id", null, false)); AggregatorFactories.Builder aggs = AggregatorFactories.builder() .addAggregator(AggregationBuilders.avg("review_score").field("stars")) @@ -140,25 +130,27 @@ public void testContinuousTransformCrud() throws Exception { TransformConfig config = createTransformConfigBuilder( transformId, "reviews-by-user-business-day", - QueryBuilders.matchAllQuery(), + QueryConfig.matchAll(), indexName ).setPivotConfig(createPivotConfig(groups, aggs)) - .setSyncConfig(TimeSyncConfig.builder().setField("timestamp").setDelay(TimeValue.timeValueSeconds(1)).build()) - .setSettings(SettingsConfig.builder().setAlignCheckpoints(false).build()) + .setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))) + .setSettings(new SettingsConfig(null, null, null, false, null, null)) .build(); - assertTrue(putTransform(config, RequestOptions.DEFAULT).isAcknowledged()); - assertTrue(startTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged()); + putTransform(transformId, Strings.toString(config), RequestOptions.DEFAULT); + startTransform(config.getId(), RequestOptions.DEFAULT); waitUntilCheckpoint(config.getId(), 1L); - assertThat(getTransformStats(config.getId()).getTransformsStats().get(0).getState(), equalTo(TransformStats.State.STARTED)); + var transformStats = getTransformStats(config.getId()); + assertThat(transformStats.get("state"), equalTo("started")); - long docsIndexed = getTransformStats(config.getId()).getTransformsStats().get(0).getIndexerStats().getDocumentsIndexed(); + int docsIndexed = (Integer) XContentMapValues.extractValue("stats.documents_indexed", transformStats); - TransformConfig storedConfig = getTransform(config.getId()).getTransformConfigurations().get(0); - assertThat(storedConfig.getVersion(), equalTo(Version.CURRENT)); + var storedConfig = getTransform(config.getId()); + assertThat(storedConfig.get("version"), equalTo(Version.CURRENT.toString())); Instant now = Instant.now(); - assertTrue("[create_time] is not before current time", storedConfig.getCreateTime().isBefore(now)); + long createTime = (long) storedConfig.get("create_time"); + assertTrue("[create_time] is not before current time", Instant.ofEpochMilli(createTime).isBefore(now)); // index some more docs long timeStamp = Instant.now().toEpochMilli() - 1_000; @@ -167,8 +159,9 @@ public void testContinuousTransformCrud() throws Exception { waitUntilCheckpoint(config.getId(), 2L); // Assert that we wrote the new docs + assertThat( - getTransformStats(config.getId()).getTransformsStats().get(0).getIndexerStats().getDocumentsIndexed(), + (Integer) XContentMapValues.extractValue("stats.documents_indexed", getTransformStats(config.getId())), greaterThan(docsIndexed) ); @@ -181,7 +174,7 @@ public void testContinuousTransformUpdate() throws Exception { createReviewsIndex(indexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow); Map groups = new HashMap<>(); - groups.put("by-user", TermsGroupSource.builder().setField("user_id").build()); + groups.put("by-user", new TermsGroupSource("user_id", null, false)); AggregatorFactories.Builder aggs = AggregatorFactories.builder() .addAggregator(AggregationBuilders.avg("review_score").field("stars")) @@ -189,72 +182,83 @@ public void testContinuousTransformUpdate() throws Exception { String id = "transform-to-update"; String dest = "reviews-by-user-business-day-to-update"; - TransformConfig config = createTransformConfigBuilder(id, dest, QueryBuilders.matchAllQuery(), indexName).setPivotConfig( + TransformConfig config = createTransformConfigBuilder(id, dest, QueryConfig.matchAll(), indexName).setPivotConfig( createPivotConfig(groups, aggs) - ).setSyncConfig(TimeSyncConfig.builder().setField("timestamp").setDelay(TimeValue.timeValueSeconds(1)).build()).build(); + ).setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))).build(); - assertTrue(putTransform(config, RequestOptions.DEFAULT).isAcknowledged()); - assertTrue(startTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged()); + putTransform(id, Strings.toString(config), RequestOptions.DEFAULT); + startTransform(config.getId(), RequestOptions.DEFAULT); waitUntilCheckpoint(config.getId(), 1L); - assertThat( - getTransformStats(config.getId()).getTransformsStats().get(0).getState(), - oneOf(TransformStats.State.STARTED, TransformStats.State.INDEXING) - ); + assertThat(getTransformState(config.getId()), oneOf("started", "indexing")); - long docsIndexed = getTransformStats(config.getId()).getTransformsStats().get(0).getIndexerStats().getDocumentsIndexed(); + int docsIndexed = (Integer) XContentMapValues.extractValue("stats.documents_indexed", getTransformStats(config.getId())); - TransformConfig storedConfig = getTransform(config.getId()).getTransformConfigurations().get(0); - assertThat(storedConfig.getVersion(), equalTo(Version.CURRENT)); + var storedConfig = getTransform(config.getId()); + assertThat(storedConfig.get("version"), equalTo(Version.CURRENT.toString())); Instant now = Instant.now(); - assertTrue("[create_time] is not before current time", storedConfig.getCreateTime().isBefore(now)); + long createTime = (long) storedConfig.get("create_time"); + assertTrue("[create_time] is not before current time", Instant.ofEpochMilli(createTime).isBefore(now)); String pipelineId = "add_forty_two"; - TransformConfigUpdate update = TransformConfigUpdate.builder() - .setDescription("updated config") - .setDest(DestConfig.builder().setIndex(dest).setPipeline(pipelineId).build()) - .build(); + final XContentBuilder pipelineBuilder = jsonBuilder().startObject() + .startArray("processors") + .startObject() + .startObject("set") + .field("field", "static_forty_two") + .field("value", 42) + .endObject() + .endObject() + .endArray() + .endObject(); + Request putPipeline = new Request("PUT", "/_ingest/pipeline/" + pipelineId); + putPipeline.setEntity(new StringEntity(Strings.toString(pipelineBuilder), ContentType.APPLICATION_JSON)); + assertOK(client().performRequest(putPipeline)); + + String update = """ + { + "description": "updated config", + "dest": { + "index": "%s", + "pipeline": "%s" + } + } + """.formatted(dest, pipelineId); + updateConfig(id, update); - try (RestHighLevelClient hlrc = new TestRestHighLevelClient()) { - final XContentBuilder pipelineBuilder = jsonBuilder().startObject() - .startArray("processors") - .startObject() - .startObject("set") - .field("field", "static_forty_two") - .field("value", 42) - .endObject() - .endObject() - .endArray() - .endObject(); - hlrc.ingest() - .putPipeline( - new PutPipelineRequest(pipelineId, BytesReference.bytes(pipelineBuilder), XContentType.JSON), - RequestOptions.DEFAULT - ); - - updateConfig(id, update); - - // index some more docs - long timeStamp = Instant.now().toEpochMilli() - 1_000; - long user = 42; - indexMoreDocs(timeStamp, user, indexName); + // index some more docs + long timeStamp = Instant.now().toEpochMilli() - 1_000; + long user = 42; + indexMoreDocs(timeStamp, user, indexName); - // Since updates are loaded on checkpoint start, we should see the updated config on this next run - waitUntilCheckpoint(config.getId(), 2L); - long numDocsAfterCp2 = getTransformStats(config.getId()).getTransformsStats().get(0).getIndexerStats().getDocumentsIndexed(); - assertThat(numDocsAfterCp2, greaterThan(docsIndexed)); + // Since updates are loaded on checkpoint start, we should see the updated config on this next run + waitUntilCheckpoint(config.getId(), 2L); + int numDocsAfterCp2 = (Integer) XContentMapValues.extractValue("stats.documents_indexed", getTransformStats(config.getId())); + assertThat(numDocsAfterCp2, greaterThan(docsIndexed)); + + Request searchRequest = new Request("GET", dest + "/_search"); + searchRequest.addParameter("track_total_hits", "true"); + searchRequest.setJsonEntity(""" + { + "query": { + "term": { + "static_forty_two": { + "value": 42 + } + } + } + } + """); + + // assert that we have the new field and its value is 42 in at least some docs + assertBusy(() -> { + final Response searchResponse = client().performRequest(searchRequest); + assertOK(searchResponse); + var responseMap = entityAsMap(searchResponse); + assertThat((Integer) XContentMapValues.extractValue("hits.total.value", responseMap), greaterThan(0)); + refreshIndex(dest, RequestOptions.DEFAULT); + }, 30, TimeUnit.SECONDS); - final SearchRequest searchRequest = new SearchRequest(dest).source( - new SearchSourceBuilder().trackTotalHits(true) - .query(QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("static_forty_two", 42))) - ); - // assert that we have the new field and its value is 42 in at least some docs - assertBusy(() -> { - final SearchResponse searchResponse = hlrc.search(searchRequest, RequestOptions.DEFAULT); - assertThat(searchResponse.getHits().getTotalHits().value, greaterThan(0L)); - hlrc.indices().refresh(new RefreshRequest(dest), RequestOptions.DEFAULT); - }, 30, TimeUnit.SECONDS); - } stopTransform(config.getId()); deleteTransform(config.getId()); } @@ -266,8 +270,8 @@ public void testStopWaitForCheckpoint() throws Exception { Map groups = new HashMap<>(); groups.put("by-day", createDateHistogramGroupSourceWithCalendarInterval("timestamp", DateHistogramInterval.DAY, null)); - groups.put("by-user", TermsGroupSource.builder().setField("user_id").build()); - groups.put("by-business", TermsGroupSource.builder().setField("business_id").build()); + groups.put("by-user", new TermsGroupSource("user_id", null, false)); + groups.put("by-business", new TermsGroupSource("business_id", null, false)); AggregatorFactories.Builder aggs = AggregatorFactories.builder() .addAggregator(AggregationBuilders.avg("review_score").field("stars")) @@ -276,27 +280,27 @@ public void testStopWaitForCheckpoint() throws Exception { TransformConfig config = createTransformConfigBuilder( transformId, "reviews-by-user-business-day", - QueryBuilders.matchAllQuery(), + QueryConfig.matchAll(), indexName ).setPivotConfig(createPivotConfig(groups, aggs)) - .setSyncConfig(TimeSyncConfig.builder().setField("timestamp").setDelay(TimeValue.timeValueSeconds(1)).build()) + .setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))) .build(); - assertTrue(putTransform(config, RequestOptions.DEFAULT).isAcknowledged()); + putTransform(transformId, Strings.toString(config), RequestOptions.DEFAULT); - assertTrue(startTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged()); + startTransform(config.getId(), RequestOptions.DEFAULT); // waitForCheckpoint: true should make the transform continue until we hit the first checkpoint, then it will stop - assertTrue(stopTransform(transformId, false, null, true).isAcknowledged()); + stopTransform(transformId, false, null, true); // Wait until the first checkpoint waitUntilCheckpoint(config.getId(), 1L); // Even though we are continuous, we should be stopped now as we needed to stop at the first checkpoint assertBusy(() -> { - TransformStats stateAndStats = getTransformStats(config.getId()).getTransformsStats().get(0); - assertThat(stateAndStats.getState(), equalTo(TransformStats.State.STOPPED)); - assertThat(stateAndStats.getIndexerStats().getDocumentsIndexed(), equalTo(1000L)); + var stateAndStats = getTransformStats(config.getId()); + assertThat(stateAndStats.get("state"), equalTo("stopped")); + assertThat((Integer) XContentMapValues.extractValue("stats.documents_indexed", stateAndStats), equalTo(1000)); }); int additionalRuns = randomIntBetween(1, 10); @@ -306,25 +310,23 @@ public void testStopWaitForCheckpoint() throws Exception { long timeStamp = Instant.now().toEpochMilli() - 1_000; long user = 42 + i; indexMoreDocs(timeStamp, user, indexName); - assertTrue(startTransformWithRetryOnConflict(config.getId(), RequestOptions.DEFAULT).isAcknowledged()); + startTransformWithRetryOnConflict(config.getId(), RequestOptions.DEFAULT); boolean waitForCompletion = randomBoolean(); - assertTrue(stopTransform(transformId, waitForCompletion, null, true).isAcknowledged()); + stopTransform(transformId, waitForCompletion, null, true); assertBusy(() -> { - TransformStats stateAndStats = getTransformStats(config.getId()).getTransformsStats().get(0); - assertThat(stateAndStats.getState(), equalTo(TransformStats.State.STOPPED)); + var stateAndStats = getTransformStats(config.getId()); + assertThat(stateAndStats.get("state"), equalTo("stopped")); }); - TransformStats stateAndStats = getTransformStats(config.getId()).getTransformsStats().get(0); - assertThat(stateAndStats.getState(), equalTo(TransformStats.State.STOPPED)); } - TransformStats stateAndStats = getTransformStats(config.getId()).getTransformsStats().get(0); - assertThat(stateAndStats.getState(), equalTo(TransformStats.State.STOPPED)); + var stateAndStats = getTransformStats(config.getId()); + assertThat(stateAndStats.get("state"), equalTo("stopped")); // Despite indexing new documents into the source index, the number of documents in the destination index stays the same. - assertThat(stateAndStats.getIndexerStats().getDocumentsIndexed(), equalTo(1000L)); + assertThat((Integer) XContentMapValues.extractValue("stats.documents_indexed", stateAndStats), equalTo(1000)); - assertTrue(stopTransform(transformId).isAcknowledged()); + stopTransform(transformId); deleteTransform(config.getId()); } @@ -336,8 +338,8 @@ public void testContinuousTransformRethrottle() throws Exception { Map groups = new HashMap<>(); groups.put("by-day", createDateHistogramGroupSourceWithCalendarInterval("timestamp", DateHistogramInterval.DAY, null)); - groups.put("by-user", TermsGroupSource.builder().setField("user_id").build()); - groups.put("by-business", TermsGroupSource.builder().setField("business_id").build()); + groups.put("by-user", new TermsGroupSource("user_id", null, false)); + groups.put("by-business", new TermsGroupSource("business_id", null, false)); AggregatorFactories.Builder aggs = AggregatorFactories.builder() .addAggregator(AggregationBuilders.avg("review_score").field("stars")) @@ -346,44 +348,48 @@ public void testContinuousTransformRethrottle() throws Exception { TransformConfig config = createTransformConfigBuilder( transformId, "reviews-by-user-business-day", - QueryBuilders.matchAllQuery(), + QueryConfig.matchAll(), indexName ).setPivotConfig(createPivotConfig(groups, aggs)) - .setSyncConfig(TimeSyncConfig.builder().setField("timestamp").setDelay(TimeValue.timeValueSeconds(1)).build()) + .setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))) // set requests per second and page size low enough to fail the test if update does not succeed, - .setSettings(SettingsConfig.builder().setRequestsPerSecond(1F).setMaxPageSearchSize(10).setAlignCheckpoints(false).build()) + .setSettings(new SettingsConfig(10, 1F, null, false, null, null)) .build(); - assertTrue(putTransform(config, RequestOptions.DEFAULT).isAcknowledged()); - assertTrue(startTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged()); + putTransform(transformId, Strings.toString(config), RequestOptions.DEFAULT); + startTransform(config.getId(), RequestOptions.DEFAULT); assertBusy(() -> { - TransformStats stateAndStats = getTransformStats(config.getId()).getTransformsStats().get(0); - assertThat(stateAndStats.getState(), equalTo(TransformStats.State.INDEXING)); + var stateAndStats = getTransformStats(config.getId()); + assertThat(stateAndStats.get("state"), equalTo("indexing")); }); - TransformConfigUpdate update = TransformConfigUpdate.builder() - // test randomly: with explicit settings and reset to default - .setSettings( - SettingsConfig.builder() - .setRequestsPerSecond(randomBoolean() ? 1000F : null) - .setMaxPageSearchSize(randomBoolean() ? 1000 : null) - .build() - ) - .build(); + // test randomly: with explicit settings and reset to default + String reqsPerSec = randomBoolean() ? "1000" : "null"; + String maxPageSize = randomBoolean() ? "1000" : "null"; + String update = """ + { + "settings" : { + "docs_per_second": %s, + "max_page_search_size": %s + } + } + """.formatted(reqsPerSec, maxPageSize); updateConfig(config.getId(), update); waitUntilCheckpoint(config.getId(), 1L); - assertThat(getTransformStats(config.getId()).getTransformsStats().get(0).getState(), equalTo(TransformStats.State.STARTED)); + assertThat(getTransformState(config.getId()), equalTo("started")); - long docsIndexed = getTransformStats(config.getId()).getTransformsStats().get(0).getIndexerStats().getDocumentsIndexed(); - long pagesProcessed = getTransformStats(config.getId()).getTransformsStats().get(0).getIndexerStats().getPagesProcessed(); + var transformStats = getTransformStats(config.getId()); + int docsIndexed = (Integer) XContentMapValues.extractValue("stats.documents_indexed", transformStats); + int pagesProcessed = (Integer) XContentMapValues.extractValue("stats.pages_processed", transformStats); - TransformConfig storedConfig = getTransform(config.getId()).getTransformConfigurations().get(0); - assertThat(storedConfig.getVersion(), equalTo(Version.CURRENT)); + var storedConfig = getTransform(config.getId()); + assertThat(storedConfig.get("version"), equalTo(Version.CURRENT.toString())); Instant now = Instant.now(); - assertTrue("[create_time] is not before current time", storedConfig.getCreateTime().isBefore(now)); + long createTime = (long) storedConfig.get("create_time"); + assertTrue("[create_time] is not before current time", Instant.ofEpochMilli(createTime).isBefore(now)); // index some more docs long timeStamp = Instant.now().toEpochMilli() - 1_000; @@ -393,29 +399,32 @@ public void testContinuousTransformRethrottle() throws Exception { // Assert that we wrote the new docs assertThat( - getTransformStats(config.getId()).getTransformsStats().get(0).getIndexerStats().getDocumentsIndexed(), + (Integer) XContentMapValues.extractValue("stats.documents_indexed", getTransformStats(config.getId())), greaterThan(docsIndexed) ); // Assert less than 500 pages processed, so update worked - assertThat(pagesProcessed, lessThan(1000L)); + assertThat(pagesProcessed, lessThan(1000)); stopTransform(config.getId()); deleteTransform(config.getId()); } private void indexMoreDocs(long timestamp, long userId, String index) throws Exception { - BulkRequest bulk = new BulkRequest(index); + StringBuilder bulkBuilder = new StringBuilder(); for (int i = 0; i < 25; i++) { + bulkBuilder.append(""" + {"create":{"_index":"%s"}} + """.formatted(index)); + int stars = (i + 20) % 5; long business = (i + 100) % 50; String source = """ {"user_id":"user_%s","count":%s,"business_id":"business_%s","stars":%s,"timestamp":%s} """.formatted(userId, i, business, stars, timestamp); - bulk.add(new IndexRequest().source(source, XContentType.JSON)); + bulkBuilder.append(source); } - bulk.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - bulkIndexDocs(bulk); + doBulk(bulkBuilder.toString(), true); } } diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIntegTestCase.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIntegTestCase.java index 5a6551774e02e..53dcab6038d96 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIntegTestCase.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIntegTestCase.java @@ -8,64 +8,44 @@ package org.elasticsearch.xpack.transform.integration; import org.apache.http.client.methods.HttpGet; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; import org.apache.logging.log4j.Level; import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.client.core.AcknowledgedResponse; -import org.elasticsearch.client.indices.CreateIndexRequest; -import org.elasticsearch.client.indices.CreateIndexResponse; -import org.elasticsearch.client.transform.DeleteTransformRequest; -import org.elasticsearch.client.transform.GetTransformRequest; -import org.elasticsearch.client.transform.GetTransformResponse; -import org.elasticsearch.client.transform.GetTransformStatsRequest; -import org.elasticsearch.client.transform.GetTransformStatsResponse; -import org.elasticsearch.client.transform.PreviewTransformRequest; -import org.elasticsearch.client.transform.PreviewTransformResponse; -import org.elasticsearch.client.transform.PutTransformRequest; -import org.elasticsearch.client.transform.StartTransformRequest; -import org.elasticsearch.client.transform.StartTransformResponse; -import org.elasticsearch.client.transform.StopTransformRequest; -import org.elasticsearch.client.transform.StopTransformResponse; -import org.elasticsearch.client.transform.UpdateTransformRequest; -import org.elasticsearch.client.transform.transforms.DestConfig; -import org.elasticsearch.client.transform.transforms.QueryConfig; -import org.elasticsearch.client.transform.transforms.SourceConfig; -import org.elasticsearch.client.transform.transforms.TransformConfig; -import org.elasticsearch.client.transform.transforms.TransformConfigUpdate; -import org.elasticsearch.client.transform.transforms.pivot.AggregationConfig; -import org.elasticsearch.client.transform.transforms.pivot.DateHistogramGroupSource; -import org.elasticsearch.client.transform.transforms.pivot.GroupConfig; -import org.elasticsearch.client.transform.transforms.pivot.PivotConfig; -import org.elasticsearch.client.transform.transforms.pivot.SingleGroupSource; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.query.MatchAllQueryBuilder; -import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.rest.ESRestTestCase; -import org.elasticsearch.xcontent.DeprecationHandler; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.core.transform.TransformField; +import org.elasticsearch.xpack.core.transform.transforms.DestConfig; +import org.elasticsearch.xpack.core.transform.transforms.QueryConfig; +import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; +import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; +import org.elasticsearch.xpack.core.transform.transforms.pivot.AggregationConfig; +import org.elasticsearch.xpack.core.transform.transforms.pivot.DateHistogramGroupSource; +import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig; +import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig; +import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -73,105 +53,145 @@ import java.time.ZoneId; import java.util.Base64; import java.util.Collections; -import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Function; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.core.Is.is; -@SuppressWarnings("removal") -abstract class TransformIntegTestCase extends ESRestTestCase { +public abstract class TransformIntegTestCase extends ESRestTestCase { - private Map transformConfigs = new HashMap<>(); + protected static String TRANSFORM_ENDPOINT = "/_transform/"; - protected void cleanUp() throws IOException { + private final Set createdTransformIds = new HashSet<>(); + + protected void cleanUp() throws Exception { logAudits(); cleanUpTransforms(); waitForPendingTasks(); } - private void logAudits() throws IOException { - try (RestHighLevelClient restClient = new TestRestHighLevelClient()) { - - // using '*' to make this lenient and do not fail if the audit index does not exist - SearchRequest searchRequest = new SearchRequest(".transform-notifications-*"); - searchRequest.source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()).size(100).sort("timestamp", SortOrder.ASC)); - - restClient.indices().refresh(new RefreshRequest(searchRequest.indices()), RequestOptions.DEFAULT); - - SearchResponse searchResponse = restClient.search(searchRequest, RequestOptions.DEFAULT); + @SuppressWarnings("unchecked") + private void logAudits() throws Exception { + logger.info("writing audit messages to the log"); + Request searchRequest = new Request("GET", TransformInternalIndexConstants.AUDIT_INDEX + "/_search?ignore_unavailable=true"); + searchRequest.setJsonEntity(""" + { + "size": 100, + "sort": [ { "timestamp": { "order": "asc" } } ] + }"""); + + assertBusy(() -> { + try { + refreshIndex(TransformInternalIndexConstants.AUDIT_INDEX_PATTERN, RequestOptions.DEFAULT); + Response searchResponse = client().performRequest(searchRequest); - for (SearchHit hit : searchResponse.getHits()) { - Map source = hit.getSourceAsMap(); - String level = (String) source.getOrDefault("level", "info"); - logger.log( - Level.getLevel(level.toUpperCase(Locale.ROOT)), - "Transform audit: [{}] [{}] [{}] [{}]", - Instant.ofEpochMilli((long) source.getOrDefault("timestamp", 0)), - source.getOrDefault("transform_id", "n/a"), - source.getOrDefault("message", "n/a"), - source.getOrDefault("node_name", "n/a") + Map searchResult = entityAsMap(searchResponse); + List> searchHits = (List>) XContentMapValues.extractValue( + "hits.hits", + searchResult ); + + for (Map hit : searchHits) { + Map source = (Map) XContentMapValues.extractValue("_source", hit); + String level = (String) source.getOrDefault("level", "info"); + logger.log( + Level.getLevel(level.toUpperCase(Locale.ROOT)), + "Transform audit: [{}] [{}] [{}] [{}]", + Instant.ofEpochMilli((long) source.getOrDefault("timestamp", 0)), + source.getOrDefault("transform_id", "n/a"), + source.getOrDefault("message", "n/a"), + source.getOrDefault("node_name", "n/a") + ); + } + } catch (ResponseException e) { + // see gh#54810, wrap temporary 503's as assertion error for retry + if (e.getResponse().getStatusLine().getStatusCode() != 503) { + throw e; + } + throw new AssertionError("Failed to retrieve audit logs", e); } - } + }, 5, TimeUnit.SECONDS); } protected void cleanUpTransforms() throws IOException { - for (TransformConfig config : transformConfigs.values()) { + for (String id : createdTransformIds) { try { - stopTransform(config.getId()); - deleteTransform(config.getId()); + stopTransform(id); + deleteTransform(id); } catch (ElasticsearchStatusException ex) { if (ex.status().equals(RestStatus.NOT_FOUND)) { - logger.info("tried to cleanup already deleted transform [{}]", config.getId()); + logger.info("tried to cleanup already deleted transform [{}]", id); } else { throw ex; } } } - transformConfigs.clear(); + createdTransformIds.clear(); + } + + protected void refreshIndex(String index, RequestOptions options) throws IOException { + var r = new Request("POST", index + "/_refresh"); + r.setOptions(options); + assertOK(client().performRequest(r)); + } + + protected Map getIndexMapping(String index, RequestOptions options) throws IOException { + var r = new Request("GET", "/" + index + "/_mapping"); + r.setOptions(options); + return entityAsMap(client().performRequest(r)); } - protected StopTransformResponse stopTransform(String id) throws IOException { - return stopTransform(id, true, null, false); + protected void stopTransform(String id) throws IOException { + stopTransform(id, true, null, false); } - protected StopTransformResponse stopTransform(String id, boolean waitForCompletion, TimeValue timeout, boolean waitForCheckpoint) + protected void stopTransform(String id, boolean waitForCompletion, @Nullable TimeValue timeout, boolean waitForCheckpoint) throws IOException { - try (RestHighLevelClient restClient = new TestRestHighLevelClient()) { - return restClient.transform() - .stopTransform(new StopTransformRequest(id, waitForCompletion, timeout, waitForCheckpoint), RequestOptions.DEFAULT); + + final Request stopTransformRequest = new Request("POST", TRANSFORM_ENDPOINT + id + "/_stop"); + stopTransformRequest.addParameter(TransformField.WAIT_FOR_COMPLETION.getPreferredName(), Boolean.toString(waitForCompletion)); + stopTransformRequest.addParameter(TransformField.WAIT_FOR_CHECKPOINT.getPreferredName(), Boolean.toString(waitForCheckpoint)); + if (timeout != null) { + stopTransformRequest.addParameter(TransformField.TIMEOUT.getPreferredName(), timeout.getStringRep()); } + Map stopTransformResponse = entityAsMap(client().performRequest(stopTransformRequest)); + assertThat(stopTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); } - protected StartTransformResponse startTransform(String id, RequestOptions options) throws IOException { - try (RestHighLevelClient restClient = new TestRestHighLevelClient()) { - return restClient.transform().startTransform(new StartTransformRequest(id), options); - } + protected void startTransform(String id, RequestOptions options) throws IOException { + Request startTransformRequest = new Request("POST", TRANSFORM_ENDPOINT + id + "/_start"); + startTransformRequest.setOptions(options); + Map startTransformResponse = entityAsMap(client().performRequest(startTransformRequest)); + assertThat(startTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); } // workaround for https://github.com/elastic/elasticsearch/issues/62204 - protected StartTransformResponse startTransformWithRetryOnConflict(String id, RequestOptions options) throws Exception { + protected void startTransformWithRetryOnConflict(String id, RequestOptions options) throws Exception { final int totalRetries = 10; long totalSleepTime = 0; - ElasticsearchStatusException lastConflict = null; + ResponseException lastConflict = null; for (int retries = totalRetries; retries > 0; --retries) { - try (RestHighLevelClient restClient = new TestRestHighLevelClient()) { - return restClient.transform().startTransform(new StartTransformRequest(id), options); - } catch (ElasticsearchStatusException e) { + try { + startTransform(id, options); + return; + } catch (ResponseException e) { logger.warn( "Failed to start transform [{}], remaining retries [{}], error: [{}], status: [{}]", id, retries, - e.getDetailedMessage(), - e.status() + e.getMessage(), + e.getResponse().getStatusLine().getStatusCode() ); - if (RestStatus.CONFLICT.equals(e.status()) == false) { + if ((RestStatus.CONFLICT.getStatus() == e.getResponse().getStatusLine().getStatusCode()) == false) { throw e; } @@ -187,46 +207,58 @@ protected StartTransformResponse startTransformWithRetryOnConflict(String id, Re throw new AssertionError("startTransformWithRetryOnConflict timed out after " + totalSleepTime + "ms", lastConflict); } - protected AcknowledgedResponse deleteTransform(String id) throws IOException { - try (RestHighLevelClient restClient = new TestRestHighLevelClient()) { - AcknowledgedResponse response = restClient.transform().deleteTransform(new DeleteTransformRequest(id), RequestOptions.DEFAULT); - if (response.isAcknowledged()) { - transformConfigs.remove(id); - } - return response; - } + protected void deleteTransform(String id) throws IOException { + Request request = new Request("DELETE", TRANSFORM_ENDPOINT + id); + assertOK(adminClient().performRequest(request)); } - protected AcknowledgedResponse putTransform(TransformConfig config, RequestOptions options) throws IOException { - if (transformConfigs.keySet().contains(config.getId())) { - throw new IllegalArgumentException("transform [" + config.getId() + "] is already registered"); + protected void putTransform(String id, String config, RequestOptions options) throws IOException { + if (createdTransformIds.contains(id)) { + throw new IllegalArgumentException("transform [" + id + "] is already registered"); } - try (RestHighLevelClient restClient = new TestRestHighLevelClient()) { - AcknowledgedResponse response = restClient.transform().putTransform(new PutTransformRequest(config), options); - if (response.isAcknowledged()) { - transformConfigs.put(config.getId(), config); - } - return response; - } + Request put = new Request("PUT", TRANSFORM_ENDPOINT + id); + put.setJsonEntity(config); + put.setOptions(options); + assertOK(client().performRequest(put)); } - protected PreviewTransformResponse previewTransform(TransformConfig config, RequestOptions options) throws IOException { - try (RestHighLevelClient restClient = new TestRestHighLevelClient()) { - return restClient.transform().previewTransform(new PreviewTransformRequest(config), options); - } + protected Map previewTransform(String transformConfig, RequestOptions options) throws IOException { + var request = new Request("POST", TRANSFORM_ENDPOINT + "_preview"); + request.setJsonEntity(transformConfig); + request.setOptions(options); + return entityAsMap(client().performRequest(request)); } - protected GetTransformStatsResponse getTransformStats(String id) throws IOException { - try (RestHighLevelClient restClient = new TestRestHighLevelClient()) { - return restClient.transform().getTransformStats(new GetTransformStatsRequest(id), RequestOptions.DEFAULT); - } + @SuppressWarnings("unchecked") + protected Map getTransformStats(String id) throws IOException { + var request = new Request("GET", TRANSFORM_ENDPOINT + id + "/_stats"); + request.setOptions(RequestOptions.DEFAULT); + Response response = client().performRequest(request); + List> stats = (List>) XContentMapValues.extractValue("transforms", entityAsMap(response)); + assertThat(stats, hasSize(1)); + return stats.get(0); } - protected GetTransformResponse getTransform(String id) throws IOException { - try (RestHighLevelClient restClient = new TestRestHighLevelClient()) { - return restClient.transform().getTransform(new GetTransformRequest(id), RequestOptions.DEFAULT); - } + protected String getTransformState(String id) throws IOException { + return (String) getTransformStats(id).get("state"); + } + + @SuppressWarnings("unchecked") + protected Map getTransform(String id) throws IOException { + Request request = new Request("GET", TRANSFORM_ENDPOINT + id); + Response response = client().performRequest(request); + List> transformConfigs = (List>) XContentMapValues.extractValue( + "transforms", + entityAsMap(response) + ); + assertThat(transformConfigs, hasSize(1)); + return transformConfigs.get(0); + } + + protected Map getTransforms(String id) throws IOException { + Request request = new Request("GET", TRANSFORM_ENDPOINT + id); + return entityAsMap(client().performRequest(request)); } protected void waitUntilCheckpoint(String id, long checkpoint) throws Exception { @@ -237,7 +269,7 @@ protected void waitUntilCheckpoint(String id, long checkpoint, TimeValue waitTim assertBusy( () -> assertEquals( checkpoint, - getTransformStats(id).getTransformsStats().get(0).getCheckpointingInfo().getLast().getCheckpoint() + ((Integer) XContentMapValues.extractValue("checkpointing.last.checkpoint", getTransformStats(id))).longValue() ), waitTime.getMillis(), TimeUnit.MILLISECONDS @@ -249,11 +281,7 @@ protected DateHistogramGroupSource createDateHistogramGroupSourceWithFixedInterv DateHistogramInterval interval, ZoneId zone ) { - DateHistogramGroupSource.Builder builder = DateHistogramGroupSource.builder() - .setField(field) - .setInterval(new DateHistogramGroupSource.FixedInterval(interval)) - .setTimeZone(zone); - return builder.build(); + return new DateHistogramGroupSource(field, null, false, new DateHistogramGroupSource.FixedInterval(interval), zone); } protected DateHistogramGroupSource createDateHistogramGroupSourceWithCalendarInterval( @@ -261,59 +289,68 @@ protected DateHistogramGroupSource createDateHistogramGroupSourceWithCalendarInt DateHistogramInterval interval, ZoneId zone ) { - DateHistogramGroupSource.Builder builder = DateHistogramGroupSource.builder() - .setField(field) - .setInterval(new DateHistogramGroupSource.CalendarInterval(interval)) - .setTimeZone(zone); - return builder.build(); + return new DateHistogramGroupSource(field, null, false, new DateHistogramGroupSource.CalendarInterval(interval), zone); } - protected GroupConfig createGroupConfig(Map groups) throws Exception { - GroupConfig.Builder builder = GroupConfig.builder(); - for (Map.Entry sgs : groups.entrySet()) { - builder.groupBy(sgs.getKey(), sgs.getValue()); - } - return builder.build(); - } + protected GroupConfig createGroupConfig(Map groups) throws IOException { + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + builder.startObject(); + for (Map.Entry entry : groups.entrySet()) { + builder.startObject(entry.getKey()); + builder.field(entry.getValue().getType().value(), entry.getValue()); + builder.endObject(); + } + builder.endObject(); - protected QueryConfig createQueryConfig(QueryBuilder queryBuilder) throws Exception { - return new QueryConfig(queryBuilder); + try ( + XContentParser sourceParser = XContentType.JSON.xContent() + .createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput()) + ) { + return GroupConfig.fromXContent(sourceParser, false); + } + } } - protected AggregationConfig createAggConfig(AggregatorFactories.Builder aggregations) throws Exception { - return new AggregationConfig(aggregations); + protected AggregationConfig createAggConfig(AggregatorFactories.Builder aggregations) throws IOException { + + try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) { + aggregations.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); + try ( + XContentParser sourceParser = XContentType.JSON.xContent() + .createParser( + xContentRegistry(), + LoggingDeprecationHandler.INSTANCE, + BytesReference.bytes(xContentBuilder).streamInput() + ) + ) { + return AggregationConfig.fromXContent(sourceParser, false); + } + } } protected PivotConfig createPivotConfig(Map groups, AggregatorFactories.Builder aggregations) throws Exception { - return PivotConfig.builder().setGroups(createGroupConfig(groups)).setAggregationConfig(createAggConfig(aggregations)).build(); + return new PivotConfig(createGroupConfig(groups), createAggConfig(aggregations), null); } protected TransformConfig.Builder createTransformConfigBuilder( String id, String destinationIndex, - QueryBuilder queryBuilder, + QueryConfig queryConfig, String... sourceIndices ) throws Exception { return TransformConfig.builder() .setId(id) - .setSource(SourceConfig.builder().setIndex(sourceIndices).setQueryConfig(createQueryConfig(queryBuilder)).build()) - .setDest(DestConfig.builder().setIndex(destinationIndex).build()) + .setSource(new SourceConfig(sourceIndices, queryConfig, Collections.emptyMap())) + .setDest(new DestConfig(destinationIndex, null)) .setFrequency(TimeValue.timeValueSeconds(10)) .setDescription("Test transform config id: " + id); } - protected void bulkIndexDocs(BulkRequest request) throws Exception { - try (RestHighLevelClient restClient = new TestRestHighLevelClient()) { - BulkResponse response = restClient.bulk(request, RequestOptions.DEFAULT); - assertThat(response.buildFailureMessage(), response.hasFailures(), is(false)); - } - } - - protected void updateConfig(String id, TransformConfigUpdate update) throws Exception { - try (RestHighLevelClient restClient = new TestRestHighLevelClient()) { - restClient.transform().updateTransform(new UpdateTransformRequest(update, id), RequestOptions.DEFAULT); - } + protected void updateConfig(String id, String update) throws Exception { + Request updateRequest = new Request("POST", "_transform/" + id + "/_update"); + updateRequest.setJsonEntity(update); + assertOK(client().performRequest(updateRequest)); } protected void createReviewsIndex( @@ -324,92 +361,105 @@ protected void createReviewsIndex( Function dateStringProvider ) throws Exception { assert numUsers > 0; - try (RestHighLevelClient restClient = new TestRestHighLevelClient()) { - - // create mapping - try (XContentBuilder builder = jsonBuilder()) { - builder.startObject(); - { - builder.startObject("properties") - .startObject("timestamp") - .field("type", "date") - .endObject() - .startObject("user_id") - .field("type", "keyword") - .endObject() - .startObject("count") - .field("type", "integer") - .endObject() - .startObject("business_id") - .field("type", "keyword") - .endObject() - .startObject("stars") - .field("type", "integer") - .endObject() - .startObject("regular_object") - .field("type", "object") - .endObject() - .startObject("nested_object") - .field("type", "nested") - .endObject() - .startObject("comment") - .field("type", "text") - .startObject("fields") - .startObject("keyword") - .field("type", "keyword") - .endObject() - .endObject() - .endObject() - .endObject(); - } - builder.endObject(); - CreateIndexResponse response = restClient.indices() - .create(new CreateIndexRequest(indexName).mapping(builder), RequestOptions.DEFAULT); - assertThat(response.isAcknowledged(), is(true)); + + // create mapping + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + { + builder.startObject("mappings") + .startObject("properties") + .startObject("timestamp") + .field("type", "date") + .endObject() + .startObject("user_id") + .field("type", "keyword") + .endObject() + .startObject("count") + .field("type", "integer") + .endObject() + .startObject("business_id") + .field("type", "keyword") + .endObject() + .startObject("stars") + .field("type", "integer") + .endObject() + .startObject("regular_object") + .field("type", "object") + .endObject() + .startObject("nested_object") + .field("type", "nested") + .endObject() + .startObject("comment") + .field("type", "text") + .startObject("fields") + .startObject("keyword") + .field("type", "keyword") + .endObject() + .endObject() + .endObject() + .endObject() + .endObject(); } + builder.endObject(); - // create index - BulkRequest bulk = new BulkRequest(indexName); - for (int i = 0; i < numDocs; i++) { - Integer user = userIdProvider.apply(i); - int stars = i % 5; - long business = i % 50; - String dateString = dateStringProvider.apply(i); - - StringBuilder sourceBuilder = new StringBuilder().append("{"); - if (user != null) { - sourceBuilder.append("\"user_id\":\"").append("user_").append(user).append("\","); - } - sourceBuilder.append(""" - "count":%s,"business_id":"business_%s","stars":%s,"comment":"Great stuff, deserves %s stars","regular_object":\ - {"foo": 42},"nested_object":{"bar": 43},"timestamp":"%s"} - """.formatted(i, business, stars, stars, dateString)); - bulk.add(new IndexRequest().source(sourceBuilder.toString(), XContentType.JSON)); - - if (i % 100 == 0) { - BulkResponse response = restClient.bulk(bulk, RequestOptions.DEFAULT); - assertThat(response.buildFailureMessage(), response.hasFailures(), is(false)); - bulk = new BulkRequest(indexName); - } + final StringEntity indexMappings = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON); + Request req = new Request("PUT", indexName); + req.setEntity(indexMappings); + req.setOptions(RequestOptions.DEFAULT); + assertOK(client().performRequest(req)); + } + + // create index + StringBuilder sourceBuilder = new StringBuilder(); + for (int i = 0; i < numDocs; i++) { + Integer user = userIdProvider.apply(i); + int stars = i % 5; + long business = i % 50; + String dateString = dateStringProvider.apply(i); + + sourceBuilder.append(""" + {"create":{"_index":"%s"}} + """.formatted(indexName)); + + sourceBuilder.append("{"); + if (user != null) { + sourceBuilder.append("\"user_id\":\"").append("user_").append(user).append("\","); + } + sourceBuilder.append(""" + "count":%s,"business_id":"business_%s","stars":%s,"comment":"Great stuff, deserves %s stars","regular_object":\ + {"foo": 42},"nested_object":{"bar": 43},"timestamp":"%s"} + """.formatted(i, business, stars, stars, dateString)); + + if (i % 100 == 0) { + sourceBuilder.append("\r\n"); + doBulk(sourceBuilder.toString(), false); + sourceBuilder.setLength(0); } - BulkResponse response = restClient.bulk(bulk, RequestOptions.DEFAULT); - assertThat(response.buildFailureMessage(), response.hasFailures(), is(false)); - restClient.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT); } + sourceBuilder.append("\r\n"); + doBulk(sourceBuilder.toString(), true); } - protected Map toLazy(ToXContent parsedObject) throws Exception { - BytesReference bytes = XContentHelper.toXContent(parsedObject, XContentType.JSON, false); - try ( - XContentParser parser = XContentHelper.createParser( - xContentRegistry(), - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, - bytes, - XContentType.JSON - ) - ) { - return parser.mapOrdered(); + protected void doBulk(String bulkDocuments, boolean refresh) throws IOException { + Request bulkRequest = new Request("POST", "/_bulk"); + if (refresh) { + bulkRequest.addParameter("refresh", "true"); } + bulkRequest.setJsonEntity(bulkDocuments); + bulkRequest.setOptions(RequestOptions.DEFAULT); + Response bulkResponse = client().performRequest(bulkRequest); + assertOK(bulkResponse); + var bulkMap = entityAsMap(bulkResponse); + assertThat((boolean) bulkMap.get("errors"), is(equalTo(false))); + } + + protected Map matchAllSearch(String index, int size, RequestOptions options) throws IOException { + Request request = new Request("GET", index + "/_search"); + request.addParameter("size", Integer.toString(size)); + request.setOptions(options); + Response response = client().performRequest(request); + assertOK(response); + return entityAsMap(response); } private void waitForPendingTasks() { @@ -442,13 +492,4 @@ protected Settings restClientSettings() { + Base64.getEncoder().encodeToString(("x_pack_rest_user:x-pack-test-password").getBytes(StandardCharsets.UTF_8)); return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build(); } - - protected static class TestRestHighLevelClient extends RestHighLevelClient { - private static final List X_CONTENT_ENTRIES = new SearchModule(Settings.EMPTY, Collections.emptyList()) - .getNamedXContents(); - - TestRestHighLevelClient() { - super(client(), restClient -> {}, X_CONTENT_ENTRIES); - } - } } diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUsingSearchRuntimeFieldsIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUsingSearchRuntimeFieldsIT.java index cff4dd112a483..7c82c25579bf0 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUsingSearchRuntimeFieldsIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUsingSearchRuntimeFieldsIT.java @@ -8,39 +8,32 @@ package org.elasticsearch.xpack.transform.integration; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.client.indices.GetMappingsRequest; -import org.elasticsearch.client.indices.GetMappingsResponse; -import org.elasticsearch.client.transform.PreviewTransformResponse; -import org.elasticsearch.client.transform.transforms.SourceConfig; -import org.elasticsearch.client.transform.transforms.TransformConfig; -import org.elasticsearch.client.transform.transforms.TransformStats; -import org.elasticsearch.client.transform.transforms.latest.LatestConfig; -import org.elasticsearch.client.transform.transforms.pivot.SingleGroupSource; -import org.elasticsearch.client.transform.transforms.pivot.TermsGroupSource; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; +import org.elasticsearch.client.Response; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorFactories; -import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.transform.transforms.QueryConfig; +import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; +import org.elasticsearch.xpack.core.transform.transforms.latest.LatestConfig; +import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource; +import org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSource; import org.hamcrest.Description; import org.hamcrest.TypeSafeMatcher; import org.junit.After; import org.junit.Before; -import java.io.IOException; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.stream.Stream; +import java.util.stream.Collectors; import static java.util.Collections.singletonMap; -import static java.util.stream.Collectors.toList; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -56,11 +49,11 @@ public class TransformUsingSearchRuntimeFieldsIT extends TransformIntegTestCase private static final String REVIEWS_INDEX_NAME = "basic-crud-reviews"; private static final int NUM_USERS = 28; - private static final Integer getUserIdForRow(int row) { + private static Integer getUserIdForRow(int row) { return row % NUM_USERS; } - private static final String getDateStringForRow(int row) { + private static String getDateStringForRow(int row) { int day = (11 + (row / 100)) % 28; int hour = 10 + (row % 13); int min = 10 + (row % 49); @@ -116,16 +109,17 @@ public void createReviewsIndex() throws Exception { } @After - public void cleanTransforms() throws IOException { + public void cleanTransforms() throws Exception { cleanUp(); } + @SuppressWarnings("unchecked") public void testPivotTransform() throws Exception { String destIndexName = "reviews-by-user-pivot"; String transformId = "transform-with-st-rt-fields-pivot"; Map runtimeMappings = createRuntimeMappings(); - Map groups = singletonMap("by-user", TermsGroupSource.builder().setField("user-upper").build()); + Map groups = singletonMap("by-user", new TermsGroupSource("user-upper", null, false)); AggregatorFactories.Builder aggs = AggregatorFactories.builder() .addAggregator(AggregationBuilders.avg("review_score").field("stars")) .addAggregator(AggregationBuilders.max("review_score_max").field("stars")) @@ -133,15 +127,11 @@ public void testPivotTransform() throws Exception { .addAggregator(AggregationBuilders.max("review_score_rt_max").field("stars-x2")) .addAggregator(AggregationBuilders.max("timestamp").field("timestamp")) .addAggregator(AggregationBuilders.max("timestamp_rt").field("timestamp-5m")); - TransformConfig config = createTransformConfigBuilder(transformId, destIndexName, QueryBuilders.matchAllQuery(), "dummy").setSource( - SourceConfig.builder() - .setIndex(REVIEWS_INDEX_NAME) - .setQuery(QueryBuilders.matchAllQuery()) - .setRuntimeMappings(runtimeMappings) - .build() + TransformConfig config = createTransformConfigBuilder(transformId, destIndexName, QueryConfig.matchAll(), "dummy").setSource( + new SourceConfig(new String[] { REVIEWS_INDEX_NAME }, QueryConfig.matchAll(), runtimeMappings) ).setPivotConfig(createPivotConfig(groups, aggs)).build(); - PreviewTransformResponse previewResponse = previewTransform(config, RequestOptions.DEFAULT); + var previewResponse = previewTransform(Strings.toString(config), RequestOptions.DEFAULT); // Verify preview mappings Map expectedMappingProperties = new HashMap<>() { { @@ -154,10 +144,12 @@ public void testPivotTransform() throws Exception { put("timestamp_rt", singletonMap("type", "date")); } }; - assertThat(previewResponse.getMappings(), allOf(hasKey("_meta"), hasEntry("properties", expectedMappingProperties))); + var generatedMappings = (Map) XContentMapValues.extractValue("generated_dest_index.mappings", previewResponse); + assertThat(generatedMappings, allOf(hasKey("_meta"), hasEntry("properties", expectedMappingProperties))); // Verify preview contents - assertThat(previewResponse.getDocs(), hasSize(NUM_USERS)); - previewResponse.getDocs().forEach(doc -> { + var previewDocs = (List>) XContentMapValues.extractValue("preview", previewResponse); + assertThat(previewDocs, hasSize(NUM_USERS)); + previewDocs.forEach(doc -> { assertThat((String) doc.get("by-user"), isUpperCase()); assertThat(doc.get("review_score_rt_avg"), is(equalTo(2 * (double) doc.get("review_score")))); assertThat(doc.get("review_score_rt_max"), is(equalTo(2 * (int) doc.get("review_score_max")))); @@ -167,33 +159,31 @@ public void testPivotTransform() throws Exception { ); }); - assertTrue(putTransform(config, RequestOptions.DEFAULT).isAcknowledged()); - assertTrue(startTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged()); + putTransform(config.getId(), Strings.toString(config), RequestOptions.DEFAULT); + startTransform(config.getId(), RequestOptions.DEFAULT); waitUntilCheckpoint(config.getId(), 1L); stopTransform(config.getId()); - assertBusy( - () -> { assertEquals(TransformStats.State.STOPPED, getTransformStats(config.getId()).getTransformsStats().get(0).getState()); } - ); + assertBusy(() -> { + var stats = getTransformStats(config.getId()); + assertEquals("stopped", stats.get("state")); + }); - try (RestHighLevelClient restClient = new TestRestHighLevelClient()) { - restClient.indices().refresh(new RefreshRequest(destIndexName), RequestOptions.DEFAULT); - // Verify destination index mappings - GetMappingsResponse destIndexMapping = restClient.indices() - .getMapping(new GetMappingsRequest().indices(destIndexName), RequestOptions.DEFAULT); - assertThat(destIndexMapping.mappings().get(destIndexName).sourceAsMap(), allOf(hasKey("_meta"), hasKey("properties"))); - // Verify destination index contents - SearchResponse searchResponse = restClient.search( - new SearchRequest(destIndexName).source(new SearchSourceBuilder().size(1000)), - RequestOptions.DEFAULT - ); - assertThat(searchResponse.getHits().getTotalHits().value, is(equalTo(Long.valueOf(NUM_USERS)))); - assertThat( - Stream.of(searchResponse.getHits().getHits()).map(SearchHit::getSourceAsMap).collect(toList()), - is(equalTo(previewResponse.getDocs())) - ); - } + refreshIndex(destIndexName, RequestOptions.DEFAULT); + // Verify destination index mappings + var mappings = (Map) XContentMapValues.extractValue( + destIndexName + ".mappings", + getIndexMapping(destIndexName, RequestOptions.DEFAULT) + ); + assertThat(mappings, allOf(hasKey("_meta"), hasKey("properties"))); + // Verify destination index contents + var searchResponse = matchAllSearch(destIndexName, 1000, RequestOptions.DEFAULT); + assertThat((Integer) XContentMapValues.extractValue("hits.total.value", searchResponse), is(equalTo(NUM_USERS))); + + var hits = (List>) XContentMapValues.extractValue("hits.hits", searchResponse); + var docs = hits.stream().map(h -> (Map) h.get("_source")).collect(Collectors.toList()); + assertThat(docs, is(equalTo(previewDocs))); } public void testPivotTransform_BadRuntimeFieldScript() throws Exception { @@ -214,104 +204,92 @@ public void testPivotTransform_BadRuntimeFieldScript() throws Exception { } }; - Map groups = singletonMap("by-user", TermsGroupSource.builder().setField("user-upper").build()); + Map groups = singletonMap("by-user", new TermsGroupSource("user-upper", null, false)); AggregatorFactories.Builder aggs = AggregatorFactories.builder() .addAggregator(AggregationBuilders.avg("review_score").field("stars")) .addAggregator(AggregationBuilders.avg("review_score_rt").field("stars-x2")) .addAggregator(AggregationBuilders.max("timestamp").field("timestamp")) .addAggregator(AggregationBuilders.max("timestamp_rt").field("timestamp-5m")); - TransformConfig config = createTransformConfigBuilder(transformId, destIndexName, QueryBuilders.matchAllQuery(), "dummy").setSource( - SourceConfig.builder() - .setIndex(REVIEWS_INDEX_NAME) - .setQuery(QueryBuilders.matchAllQuery()) - .setRuntimeMappings(runtimeMappings) - .build() + TransformConfig config = createTransformConfigBuilder(transformId, destIndexName, QueryConfig.matchAll(), "dummy").setSource( + new SourceConfig(new String[] { REVIEWS_INDEX_NAME }, QueryConfig.matchAll(), runtimeMappings) ).setPivotConfig(createPivotConfig(groups, aggs)).build(); - Exception e = expectThrows(Exception.class, () -> previewTransform(config, RequestOptions.DEFAULT)); + Exception e = expectThrows(Exception.class, () -> previewTransform(Strings.toString(config), RequestOptions.DEFAULT)); assertThat( ExceptionsHelper.stackTrace(e), allOf(containsString("script_exception"), containsString("dynamic method [java.lang.String, toUperCase/0] not found")) ); - e = expectThrows(Exception.class, () -> putTransform(config, RequestOptions.DEFAULT)); + e = expectThrows(Exception.class, () -> putTransform(transformId, Strings.toString(config), RequestOptions.DEFAULT)); assertThat( ExceptionsHelper.stackTrace(e), allOf(containsString("script_exception"), containsString("dynamic method [java.lang.String, toUperCase/0] not found")) ); } + @SuppressWarnings("unchecked") public void testLatestTransform() throws Exception { String destIndexName = "reviews-by-user-latest"; String transformId = "transform-with-st-rt-fields-latest"; Map runtimeMappings = createRuntimeMappings(); - SourceConfig sourceConfig = SourceConfig.builder() - .setIndex(REVIEWS_INDEX_NAME) - .setQuery(QueryBuilders.matchAllQuery()) - .setRuntimeMappings(runtimeMappings) + SourceConfig sourceConfig = new SourceConfig(new String[] { REVIEWS_INDEX_NAME }, QueryConfig.matchAll(), runtimeMappings); + TransformConfig configWithOrdinaryFields = createTransformConfigBuilder(transformId, destIndexName, QueryConfig.matchAll(), "dummy") + .setSource(sourceConfig) + .setLatestConfig(new LatestConfig(List.of("user_id"), "timestamp")) .build(); - TransformConfig configWithOrdinaryFields = createTransformConfigBuilder( - transformId, - destIndexName, - QueryBuilders.matchAllQuery(), - "dummy" - ).setSource(sourceConfig).setLatestConfig(LatestConfig.builder().setUniqueKey("user_id").setSort("timestamp").build()).build(); - - PreviewTransformResponse previewWithOrdinaryFields = previewTransform(configWithOrdinaryFields, RequestOptions.DEFAULT); + + var previewWithOrdinaryFields = previewTransform(Strings.toString(configWithOrdinaryFields), RequestOptions.DEFAULT); // Verify preview mappings - assertThat(previewWithOrdinaryFields.getMappings(), allOf(hasKey("_meta"), hasKey("properties"))); + var generatedMappings = (Map) XContentMapValues.extractValue( + "generated_dest_index.mappings", + previewWithOrdinaryFields + ); + assertThat(generatedMappings, allOf(hasKey("_meta"), hasKey("properties"))); // Verify preview contents - assertThat("Got preview: " + previewWithOrdinaryFields, previewWithOrdinaryFields.getDocs(), hasSize(NUM_USERS)); - previewWithOrdinaryFields.getDocs().forEach(doc -> { + var docsWithOrdinaryFields = (List>) previewWithOrdinaryFields.get("preview"); + assertThat("Got preview: " + previewWithOrdinaryFields, docsWithOrdinaryFields, hasSize(NUM_USERS)); + docsWithOrdinaryFields.forEach(doc -> { assertThat(doc, hasKey("user_id")); assertThat(doc, not(hasKey("user-upper"))); }); - TransformConfig configWithRuntimeFields = createTransformConfigBuilder( - transformId, - destIndexName, - QueryBuilders.matchAllQuery(), - "dummy" - ).setSource(sourceConfig) - .setLatestConfig(LatestConfig.builder().setUniqueKey("user-upper").setSort("timestamp-5m").build()) + TransformConfig configWithRuntimeFields = createTransformConfigBuilder(transformId, destIndexName, QueryConfig.matchAll(), "dummy") + .setSource(sourceConfig) + .setLatestConfig(new LatestConfig(List.of("user-upper"), "timestamp-5m")) .build(); - PreviewTransformResponse previewWithRuntimeFields = previewTransform(configWithRuntimeFields, RequestOptions.DEFAULT); - assertThat(previewWithRuntimeFields.getDocs(), is(equalTo(previewWithOrdinaryFields.getDocs()))); + var previewWithRuntimeFields = previewTransform(Strings.toString(configWithRuntimeFields), RequestOptions.DEFAULT); + var docsWithRuntimeFields = (List>) previewWithRuntimeFields.get("preview"); + assertThat(docsWithRuntimeFields, is(equalTo(docsWithOrdinaryFields))); - assertTrue(putTransform(configWithRuntimeFields, RequestOptions.DEFAULT).isAcknowledged()); - assertTrue(startTransform(configWithRuntimeFields.getId(), RequestOptions.DEFAULT).isAcknowledged()); + putTransform(configWithRuntimeFields.getId(), Strings.toString(configWithRuntimeFields), RequestOptions.DEFAULT); + startTransform(configWithRuntimeFields.getId(), RequestOptions.DEFAULT); waitUntilCheckpoint(configWithRuntimeFields.getId(), 1L); stopTransform(configWithRuntimeFields.getId()); - assertBusy( - () -> { - assertEquals( - TransformStats.State.STOPPED, - getTransformStats(configWithRuntimeFields.getId()).getTransformsStats().get(0).getState() - ); - } + assertBusy(() -> { assertEquals("stopped", getTransformState(configWithRuntimeFields.getId())); }); + + refreshIndex(destIndexName, RequestOptions.DEFAULT); + // Verify destination index mappings + var destIndexMapping = getIndexMapping(destIndexName, RequestOptions.DEFAULT); + + assertThat( + (Map) XContentMapValues.extractValue(destIndexName + ".mappings", destIndexMapping), + allOf(hasKey("_meta"), hasKey("properties")) ); - try (RestHighLevelClient restClient = new TestRestHighLevelClient()) { - restClient.indices().refresh(new RefreshRequest(destIndexName), RequestOptions.DEFAULT); - // Verify destination index mappings - GetMappingsResponse destIndexMapping = restClient.indices() - .getMapping(new GetMappingsRequest().indices(destIndexName), RequestOptions.DEFAULT); - assertThat(destIndexMapping.mappings().get(destIndexName).sourceAsMap(), allOf(hasKey("_meta"), hasKey("properties"))); - // Verify destination index contents - SearchResponse searchResponse = restClient.search( - new SearchRequest(destIndexName).source(new SearchSourceBuilder().size(1000)), - RequestOptions.DEFAULT - ); - assertThat(searchResponse.getHits().getTotalHits().value, is(equalTo(Long.valueOf(NUM_USERS)))); - assertThat( - Stream.of(searchResponse.getHits().getHits()).map(SearchHit::getSourceAsMap).collect(toList()), - is(equalTo(previewWithOrdinaryFields.getDocs())) - ); - } + // Verify destination index contents + Request searchRequest = new Request("GET", destIndexName + "/_search"); + searchRequest.addParameter("size", "1000"); + Response searchResponse = client().performRequest(searchRequest); + assertOK(searchResponse); + var searchMap = entityAsMap(searchResponse); + assertThat((Integer) XContentMapValues.extractValue("hits.total.value", searchMap), is(equalTo(NUM_USERS))); + var hits = (List>) XContentMapValues.extractValue("hits.hits", searchMap); + var searchDocs = hits.stream().map(h -> (Map) h.get("_source")).collect(Collectors.toList()); + assertThat(searchDocs, is(equalTo(docsWithOrdinaryFields))); } public void testLatestTransform_BadRuntimeFieldScript() throws Exception { @@ -332,25 +310,20 @@ public void testLatestTransform_BadRuntimeFieldScript() throws Exception { } }; - SourceConfig sourceConfig = SourceConfig.builder() - .setIndex(REVIEWS_INDEX_NAME) - .setQuery(QueryBuilders.matchAllQuery()) - .setRuntimeMappings(runtimeMappings) + SourceConfig sourceConfig = new SourceConfig(new String[] { REVIEWS_INDEX_NAME }, QueryConfig.matchAll(), runtimeMappings); + TransformConfig configWithRuntimeFields = createTransformConfigBuilder(transformId, destIndexName, QueryConfig.matchAll(), "dummy") + .setSource(sourceConfig) + .setLatestConfig(new LatestConfig(List.of("user-upper"), "timestamp")) .build(); - TransformConfig configWithRuntimeFields = createTransformConfigBuilder( - transformId, - destIndexName, - QueryBuilders.matchAllQuery(), - "dummy" - ).setSource(sourceConfig).setLatestConfig(LatestConfig.builder().setUniqueKey("user-upper").setSort("timestamp").build()).build(); - - Exception e = expectThrows(Exception.class, () -> previewTransform(configWithRuntimeFields, RequestOptions.DEFAULT)); + + var stringConfig = Strings.toString(configWithRuntimeFields); + Exception e = expectThrows(Exception.class, () -> previewTransform(stringConfig, RequestOptions.DEFAULT)); assertThat( ExceptionsHelper.stackTrace(e), allOf(containsString("script_exception"), containsString("dynamic method [java.lang.String, toUperCase/0] not found")) ); - e = expectThrows(Exception.class, () -> putTransform(configWithRuntimeFields, RequestOptions.DEFAULT)); + e = expectThrows(Exception.class, () -> putTransform(transformId, stringConfig, RequestOptions.DEFAULT)); assertThat( ExceptionsHelper.stackTrace(e), allOf(containsString("script_exception"), containsString("dynamic method [java.lang.String, toUperCase/0] not found"))