77
88import org .apache .http .entity .ContentType ;
99import org .apache .http .entity .StringEntity ;
10- import org .apache .lucene .util .LuceneTestCase ;
1110import org .elasticsearch .Version ;
1211import org .elasticsearch .client .Request ;
1312import org .elasticsearch .client .Response ;
13+ import org .elasticsearch .client .core .IndexerState ;
1414import org .elasticsearch .client .dataframe .GetDataFrameTransformStatsResponse ;
1515import org .elasticsearch .client .dataframe .transforms .DataFrameTransformConfig ;
1616import org .elasticsearch .client .dataframe .transforms .DataFrameTransformStats ;
2828import org .elasticsearch .common .xcontent .XContentBuilder ;
2929import org .elasticsearch .common .xcontent .XContentParser ;
3030import org .elasticsearch .common .xcontent .XContentType ;
31+ import org .elasticsearch .common .xcontent .support .XContentMapValues ;
3132import org .elasticsearch .search .aggregations .AggregationBuilders ;
3233import org .elasticsearch .search .aggregations .AggregatorFactories ;
3334import org .elasticsearch .xpack .test .rest .XPackRestTestConstants ;
3738import java .util .ArrayList ;
3839import java .util .Collection ;
3940import java .util .List ;
41+ import java .util .Map ;
4042import java .util .concurrent .TimeUnit ;
43+ import java .util .function .Consumer ;
4144import java .util .stream .Collectors ;
4245import java .util .stream .Stream ;
4346
4851import static org .hamcrest .Matchers .hasSize ;
4952import static org .hamcrest .Matchers .oneOf ;
5053
51- @ LuceneTestCase .AwaitsFix (bugUrl = "https://github.com/elastic/elasticsearch/issues/43662" )
5254public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
5355
5456 private static final Version UPGRADE_FROM_VERSION = Version .fromString (System .getProperty ("tests.upgrade_from_version" ));
@@ -79,12 +81,19 @@ protected static void waitForPendingDataFrameTasks() throws Exception {
7981 * index mappings when it is assigned to an upgraded node even if no other ML endpoint is called after the upgrade
8082 */
8183 public void testDataFramesRollingUpgrade () throws Exception {
82- assumeTrue ("Continuous data frames not supported until 7.3" , UPGRADE_FROM_VERSION .onOrAfter (Version .V_7_3_0 ));
84+ assumeTrue ("Continuous data frames time sync not fixed until 7.4" , UPGRADE_FROM_VERSION .onOrAfter (Version .V_7_4_0 ));
85+ Request adjustLoggingLevels = new Request ("PUT" , "/_cluster/settings" );
86+ adjustLoggingLevels .setJsonEntity (
87+ "{\" transient\" : {" +
88+ "\" logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\" : \" trace\" ," +
89+ "\" logger.org.elasticsearch.xpack.dataframe\" : \" trace\" }}" );
90+ client ().performRequest (adjustLoggingLevels );
8391 Request waitForYellow = new Request ("GET" , "/_cluster/health" );
8492 waitForYellow .addParameter ("wait_for_nodes" , "3" );
8593 waitForYellow .addParameter ("wait_for_status" , "yellow" );
8694 switch (CLUSTER_TYPE ) {
8795 case OLD :
96+ client ().performRequest (waitForYellow );
8897 createAndStartContinuousDataFrame ();
8998 break ;
9099 case MIXED :
@@ -113,35 +122,44 @@ private void cleanUpTransforms() throws Exception {
113122
114123 private void createAndStartContinuousDataFrame () throws Exception {
115124 createIndex (CONTINUOUS_DATA_FRAME_SOURCE );
116- long totalDocsWritten = 0 ;
125+ long totalDocsWrittenSum = 0 ;
117126 for (TimeValue bucket : BUCKETS ) {
118127 int docs = randomIntBetween (1 , 25 );
119128 putData (CONTINUOUS_DATA_FRAME_SOURCE , docs , bucket , ENTITIES );
120- totalDocsWritten += docs * ENTITIES .size ();
129+ totalDocsWrittenSum += docs * ENTITIES .size ();
121130 }
122-
131+ long totalDocsWritten = totalDocsWrittenSum ;
123132 DataFrameTransformConfig config = DataFrameTransformConfig .builder ()
124- .setSyncConfig (new TimeSyncConfig ("timestamp" , TimeValue .timeValueSeconds (30 )))
133+ .setSyncConfig (new TimeSyncConfig ("timestamp" , TimeValue .timeValueSeconds (1 )))
125134 .setPivotConfig (PivotConfig .builder ()
126135 .setAggregations (new AggregatorFactories .Builder ().addAggregator (AggregationBuilders .avg ("stars" ).field ("stars" )))
127136 .setGroups (GroupConfig .builder ().groupBy ("user_id" , TermsGroupSource .builder ().setField ("user_id" ).build ()).build ())
128137 .build ())
129138 .setDest (DestConfig .builder ().setIndex (CONTINUOUS_DATA_FRAME_ID + "_idx" ).build ())
130139 .setSource (SourceConfig .builder ().setIndex (CONTINUOUS_DATA_FRAME_SOURCE ).build ())
131140 .setId (CONTINUOUS_DATA_FRAME_ID )
141+ .setFrequency (TimeValue .timeValueSeconds (1 ))
132142 .build ();
133143 putTransform (CONTINUOUS_DATA_FRAME_ID , config );
134144
135145 startTransform (CONTINUOUS_DATA_FRAME_ID );
136146 waitUntilAfterCheckpoint (CONTINUOUS_DATA_FRAME_ID , 0L );
137147
138- DataFrameTransformStats stateAndStats = getTransformStats (CONTINUOUS_DATA_FRAME_ID );
148+ assertBusy (() -> {
149+ DataFrameTransformStats stateAndStats = getTransformStats (CONTINUOUS_DATA_FRAME_ID );
150+ assertThat (stateAndStats .getIndexerStats ().getOutputDocuments (), equalTo ((long )ENTITIES .size ()));
151+ assertThat (stateAndStats .getIndexerStats ().getNumDocuments (), equalTo (totalDocsWritten ));
152+ // Even if we get back to started, we may periodically get set back to `indexing` when triggered.
153+ // Though short lived due to no changes on the source indices, it could result in flaky test behavior
154+ assertThat (stateAndStats .getState (), oneOf (DataFrameTransformStats .State .STARTED , DataFrameTransformStats .State .INDEXING ));
155+ }, 120 , TimeUnit .SECONDS );
139156
140- assertThat ( stateAndStats . getIndexerStats (). getOutputDocuments (), equalTo (( long ) ENTITIES . size ()));
141- assertThat ( stateAndStats . getIndexerStats (). getNumDocuments (), equalTo ( totalDocsWritten ));
142- assertThat ( stateAndStats . getState (), oneOf ( DataFrameTransformStats . State . STARTED , DataFrameTransformStats . State . INDEXING ));
157+
158+ // We want to make sure our latest state is written before we turn the node off, this makes the testing more reliable
159+ awaitWrittenIndexerState ( CONTINUOUS_DATA_FRAME_ID , IndexerState . STARTED . value ( ));
143160 }
144161
162+ @ SuppressWarnings ("unchecked" )
145163 private void verifyContinuousDataFrameHandlesData (long expectedLastCheckpoint ) throws Exception {
146164
147165 // A continuous data frame should automatically become started when it gets assigned to a node
@@ -161,9 +179,9 @@ private void verifyContinuousDataFrameHandlesData(long expectedLastCheckpoint) t
161179 List <String > entities = new ArrayList <>(1 );
162180 entities .add ("user_" + ENTITIES .size () + expectedLastCheckpoint );
163181 int docs = 5 ;
164- // Index the data very recently in the past so that the transform sync delay can catch up to reading it in our spin
165- // wait later.
166- putData (CONTINUOUS_DATA_FRAME_SOURCE , docs , TimeValue .timeValueSeconds (1 ), entities );
182+ // Index the data
183+ // The frequency and delay should see the data once its indexed
184+ putData (CONTINUOUS_DATA_FRAME_SOURCE , docs , TimeValue .timeValueSeconds (0 ), entities );
167185
168186 waitUntilAfterCheckpoint (CONTINUOUS_DATA_FRAME_ID , expectedLastCheckpoint );
169187
@@ -176,10 +194,55 @@ private void verifyContinuousDataFrameHandlesData(long expectedLastCheckpoint) t
176194
177195 assertThat (stateAndStats .getState (),
178196 oneOf (DataFrameTransformStats .State .STARTED , DataFrameTransformStats .State .INDEXING ));
179- assertThat (stateAndStats .getIndexerStats ().getOutputDocuments (),
180- greaterThan (previousStateAndStats .getIndexerStats ().getOutputDocuments ()));
181- assertThat (stateAndStats .getIndexerStats ().getNumDocuments (),
182- greaterThanOrEqualTo (docs + previousStateAndStats .getIndexerStats ().getNumDocuments ()));
197+ awaitWrittenIndexerState (CONTINUOUS_DATA_FRAME_ID , (responseBody ) -> {
198+ Map <String , Object > indexerStats = (Map <String ,Object >)((List <?>)XContentMapValues .extractValue ("hits.hits._source.stats" ,
199+ responseBody ))
200+ .get (0 );
201+ assertThat ((Integer )indexerStats .get ("documents_indexed" ),
202+ greaterThan (Long .valueOf (previousStateAndStats .getIndexerStats ().getOutputDocuments ()).intValue ()));
203+ assertThat ((Integer )indexerStats .get ("documents_processed" ),
204+ greaterThan (Long .valueOf (previousStateAndStats .getIndexerStats ().getNumDocuments ()).intValue ()));
205+ });
206+ }
207+
208+ private void awaitWrittenIndexerState (String id , Consumer <Map <?, ?>> responseAssertion ) throws Exception {
209+ Request getStatsDocsRequest = new Request ("GET" , ".data-frame-internal-*/_search" );
210+ getStatsDocsRequest .setJsonEntity ("{\n " +
211+ " \" query\" : {\n " +
212+ " \" bool\" : {\n " +
213+ " \" filter\" : \n " +
214+ " {\" term\" : {\n " +
215+ " \" _id\" : \" data_frame_transform_state_and_stats-" + id + "\" \n " +
216+ " }}\n " +
217+ " }\n " +
218+ " },\n " +
219+ " \" sort\" : [\n " +
220+ " {\n " +
221+ " \" _index\" : {\n " +
222+ " \" order\" : \" desc\" \n " +
223+ " }\n " +
224+ " }\n " +
225+ " ],\n " +
226+ " \" size\" : 1\n " +
227+ "}" );
228+ assertBusy (() -> {
229+ // Want to make sure we get the latest docs
230+ client ().performRequest (new Request ("POST" , ".data-frame-internal-*/_refresh" ));
231+ Response response = client ().performRequest (getStatsDocsRequest );
232+ assertEquals (200 , response .getStatusLine ().getStatusCode ());
233+ Map <String , Object > responseBody = entityAsMap (response );
234+ assertEquals (1 , XContentMapValues .extractValue ("hits.total.value" , responseBody ));
235+ responseAssertion .accept (responseBody );
236+ }, 60 , TimeUnit .SECONDS );
237+ }
238+
239+ private void awaitWrittenIndexerState (String id , String indexerState ) throws Exception {
240+ awaitWrittenIndexerState (id , (responseBody ) -> {
241+ String storedState = ((List <?>)XContentMapValues .extractValue ("hits.hits._source.state.indexer_state" , responseBody ))
242+ .get (0 )
243+ .toString ();
244+ assertThat (storedState , equalTo (indexerState ));
245+ });
183246 }
184247
185248 private void putTransform (String id , DataFrameTransformConfig config ) throws IOException {
@@ -222,7 +285,7 @@ private DataFrameTransformStats getTransformStats(String id) throws IOException
222285 }
223286
224287 private void waitUntilAfterCheckpoint (String id , long currentCheckpoint ) throws Exception {
225- assertBusy (() -> assertThat (getTransformStats (id ).getCheckpointingInfo ().getNext ().getCheckpoint (), greaterThan (currentCheckpoint )),
288+ assertBusy (() -> assertThat (getTransformStats (id ).getCheckpointingInfo ().getLast ().getCheckpoint (), greaterThan (currentCheckpoint )),
226289 60 , TimeUnit .SECONDS );
227290 }
228291
@@ -249,7 +312,7 @@ private void createIndex(String indexName) throws IOException {
249312 final StringEntity entity = new StringEntity (Strings .toString (builder ), ContentType .APPLICATION_JSON );
250313 Request req = new Request ("PUT" , indexName );
251314 req .setEntity (entity );
252- client ().performRequest (req );
315+ assertThat ( client ().performRequest (req ). getStatusLine (). getStatusCode (), equalTo ( 200 ) );
253316 }
254317 }
255318
0 commit comments