diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java index a4a5025a139fa..24ae1ce992758 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java @@ -7,10 +7,10 @@ import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.Version; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; +import org.elasticsearch.client.core.IndexerState; import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsResponse; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStats; @@ -28,6 +28,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.xpack.test.rest.XPackRestTestConstants; @@ -37,7 +38,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -48,7 +51,6 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.oneOf; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/43662") public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase { private static final Version UPGRADE_FROM_VERSION = Version.fromString(System.getProperty("tests.upgrade_from_version")); @@ -80,11 +82,18 @@ protected static void waitForPendingDataFrameTasks() throws Exception { */ public void testDataFramesRollingUpgrade() throws Exception { assumeTrue("Continuous data frames not supported until 7.3", UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_3_0)); + Request adjustLoggingLevels = new Request("PUT", "/_cluster/settings"); + adjustLoggingLevels.setJsonEntity( + "{\"transient\": {" + + "\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," + + "\"logger.org.elasticsearch.xpack.dataframe\": \"trace\"}}"); + client().performRequest(adjustLoggingLevels); Request waitForYellow = new Request("GET", "/_cluster/health"); waitForYellow.addParameter("wait_for_nodes", "3"); waitForYellow.addParameter("wait_for_status", "yellow"); switch (CLUSTER_TYPE) { case OLD: + client().performRequest(waitForYellow); createAndStartContinuousDataFrame(); break; case MIXED: @@ -113,15 +122,15 @@ private void cleanUpTransforms() throws Exception { private void createAndStartContinuousDataFrame() throws Exception { createIndex(CONTINUOUS_DATA_FRAME_SOURCE); - long totalDocsWritten = 0; + long totalDocsWrittenSum = 0; for (TimeValue bucket : BUCKETS) { int docs = randomIntBetween(1, 25); putData(CONTINUOUS_DATA_FRAME_SOURCE, docs, bucket, ENTITIES); - totalDocsWritten += docs * ENTITIES.size(); + totalDocsWrittenSum += docs * ENTITIES.size(); } - + long totalDocsWritten = totalDocsWrittenSum; DataFrameTransformConfig config = DataFrameTransformConfig.builder() - .setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(30))) + .setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))) .setPivotConfig(PivotConfig.builder() .setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("stars").field("stars"))) .setGroups(GroupConfig.builder().groupBy("user_id", TermsGroupSource.builder().setField("user_id").build()).build()) @@ -129,19 +138,28 @@ private void createAndStartContinuousDataFrame() throws Exception { .setDest(DestConfig.builder().setIndex(CONTINUOUS_DATA_FRAME_ID + "_idx").build()) .setSource(SourceConfig.builder().setIndex(CONTINUOUS_DATA_FRAME_SOURCE).build()) .setId(CONTINUOUS_DATA_FRAME_ID) + .setFrequency(TimeValue.timeValueSeconds(1)) .build(); putTransform(CONTINUOUS_DATA_FRAME_ID, config); startTransform(CONTINUOUS_DATA_FRAME_ID); waitUntilAfterCheckpoint(CONTINUOUS_DATA_FRAME_ID, 0L); - DataFrameTransformStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID); + assertBusy(() -> { + DataFrameTransformStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID); + assertThat(stateAndStats.getIndexerStats().getOutputDocuments(), equalTo((long)ENTITIES.size())); + assertThat(stateAndStats.getIndexerStats().getNumDocuments(), equalTo(totalDocsWritten)); + // Even if we get back to started, we may periodically get set back to `indexing` when triggered. + // Though short lived due to no changes on the source indices, it could result in flaky test behavior + assertThat(stateAndStats.getState(), oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING)); + }, 120, TimeUnit.SECONDS); - assertThat(stateAndStats.getIndexerStats().getOutputDocuments(), equalTo((long)ENTITIES.size())); - assertThat(stateAndStats.getIndexerStats().getNumDocuments(), equalTo(totalDocsWritten)); - assertThat(stateAndStats.getState(), oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING)); + + // We want to make sure our latest state is written before we turn the node off, this makes the testing more reliable + awaitWrittenIndexerState(CONTINUOUS_DATA_FRAME_ID, IndexerState.STARTED.value()); } + @SuppressWarnings("unchecked") private void verifyContinuousDataFrameHandlesData(long expectedLastCheckpoint) throws Exception { // A continuous data frame should automatically become started when it gets assigned to a node @@ -161,9 +179,9 @@ private void verifyContinuousDataFrameHandlesData(long expectedLastCheckpoint) t List entities = new ArrayList<>(1); entities.add("user_" + ENTITIES.size() + expectedLastCheckpoint); int docs = 5; - // Index the data very recently in the past so that the transform sync delay can catch up to reading it in our spin - // wait later. - putData(CONTINUOUS_DATA_FRAME_SOURCE, docs, TimeValue.timeValueSeconds(1), entities); + // Index the data + // The frequency and delay should see the data once its indexed + putData(CONTINUOUS_DATA_FRAME_SOURCE, docs, TimeValue.timeValueSeconds(0), entities); waitUntilAfterCheckpoint(CONTINUOUS_DATA_FRAME_ID, expectedLastCheckpoint); @@ -176,10 +194,55 @@ private void verifyContinuousDataFrameHandlesData(long expectedLastCheckpoint) t assertThat(stateAndStats.getState(), oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING)); - assertThat(stateAndStats.getIndexerStats().getOutputDocuments(), - greaterThan(previousStateAndStats.getIndexerStats().getOutputDocuments())); - assertThat(stateAndStats.getIndexerStats().getNumDocuments(), - greaterThanOrEqualTo(docs + previousStateAndStats.getIndexerStats().getNumDocuments())); + awaitWrittenIndexerState(CONTINUOUS_DATA_FRAME_ID, (responseBody) -> { + Map indexerStats = (Map)((List)XContentMapValues.extractValue("hits.hits._source.stats", + responseBody)) + .get(0); + assertThat((Integer)indexerStats.get("documents_indexed"), + greaterThan(Long.valueOf(previousStateAndStats.getIndexerStats().getOutputDocuments()).intValue())); + assertThat((Integer)indexerStats.get("documents_processed"), + greaterThan(Long.valueOf(previousStateAndStats.getIndexerStats().getNumDocuments()).intValue())); + }); + } + + private void awaitWrittenIndexerState(String id, Consumer> responseAssertion) throws Exception { + Request getStatsDocsRequest = new Request("GET", ".data-frame-internal-*/_search"); + getStatsDocsRequest.setJsonEntity("{\n" + + " \"query\": {\n" + + " \"bool\": {\n" + + " \"filter\": \n" + + " {\"term\": {\n" + + " \"_id\": \"data_frame_transform_state_and_stats-" + id + "\"\n" + + " }}\n" + + " }\n" + + " },\n" + + " \"sort\": [\n" + + " {\n" + + " \"_index\": {\n" + + " \"order\": \"desc\"\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"size\": 1\n" + + "}"); + assertBusy(() -> { + // Want to make sure we get the latest docs + client().performRequest(new Request("POST", ".data-frame-internal-*/_refresh")); + Response response = client().performRequest(getStatsDocsRequest); + assertEquals(200, response.getStatusLine().getStatusCode()); + Map responseBody = entityAsMap(response); + assertEquals(1, XContentMapValues.extractValue("hits.total.value", responseBody)); + responseAssertion.accept(responseBody); + }, 60, TimeUnit.SECONDS); + } + + private void awaitWrittenIndexerState(String id, String indexerState) throws Exception { + awaitWrittenIndexerState(id, (responseBody) -> { + String storedState = ((List)XContentMapValues.extractValue("hits.hits._source.state.indexer_state", responseBody)) + .get(0) + .toString(); + assertThat(storedState, equalTo(indexerState)); + }); } private void putTransform(String id, DataFrameTransformConfig config) throws IOException { @@ -222,7 +285,7 @@ private DataFrameTransformStats getTransformStats(String id) throws IOException } private void waitUntilAfterCheckpoint(String id, long currentCheckpoint) throws Exception { - assertBusy(() -> assertThat(getTransformStats(id).getCheckpointingInfo().getNext().getCheckpoint(), greaterThan(currentCheckpoint)), + assertBusy(() -> assertThat(getTransformStats(id).getCheckpointingInfo().getLast().getCheckpoint(), greaterThan(currentCheckpoint)), 60, TimeUnit.SECONDS); } @@ -249,7 +312,7 @@ private void createIndex(String indexName) throws IOException { final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON); Request req = new Request("PUT", indexName); req.setEntity(entity); - client().performRequest(req); + assertThat(client().performRequest(req).getStatusLine().getStatusCode(), equalTo(200)); } }