|
15 | 15 | import org.elasticsearch.client.ResponseException; |
16 | 16 | import org.elasticsearch.client.RestClient; |
17 | 17 | import org.elasticsearch.cluster.metadata.IndexMetaData; |
| 18 | +import org.elasticsearch.common.CheckedRunnable; |
18 | 19 | import org.elasticsearch.common.Nullable; |
19 | 20 | import org.elasticsearch.common.Strings; |
20 | 21 | import org.elasticsearch.common.settings.Settings; |
@@ -326,63 +327,78 @@ public void testAllocateActionOnlyReplicas() throws Exception { |
326 | 327 | }); |
327 | 328 | } |
328 | 329 |
|
329 | | - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/50781") |
330 | 330 | public void testWaitForSnapshot() throws Exception { |
331 | 331 | createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) |
332 | 332 | .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)); |
333 | | - String smlPolicy = randomAlphaOfLengthBetween(4, 10); |
334 | | - createNewSingletonPolicy("delete", new WaitForSnapshotAction(smlPolicy)); |
| 333 | + String slmPolicy = randomAlphaOfLengthBetween(4, 10); |
| 334 | + createNewSingletonPolicy("delete", new WaitForSnapshotAction(slmPolicy)); |
335 | 335 | updatePolicy(index, policy); |
336 | | - assertBusy(() -> assertThat(getStepKeyForIndex(index).getAction(), equalTo("wait_for_snapshot"))); |
337 | | - assertBusy(() -> assertThat(getFailedStepForIndex(index), equalTo("wait-for-snapshot"))); |
| 336 | + assertBusy( () -> { |
| 337 | + Map<String, Object> indexILMState = explainIndex(index); |
| 338 | + assertThat(indexILMState.get("action"), is("wait_for_snapshot")); |
| 339 | + assertThat(indexILMState.get("failed_step"), is("wait-for-snapshot")); |
| 340 | + }, slmPolicy); |
338 | 341 |
|
339 | 342 | String repo = createSnapshotRepo(); |
340 | | - createSlmPolicy(smlPolicy, repo); |
| 343 | + createSlmPolicy(slmPolicy, repo); |
341 | 344 |
|
342 | | - assertBusy(() -> assertThat(getStepKeyForIndex(index).getAction(), equalTo("wait_for_snapshot"))); |
| 345 | + assertBusy( () -> { |
| 346 | + Map<String, Object> indexILMState = explainIndex(index); |
| 347 | + //wait for step to notice that the slm policy is created and to get out of error |
| 348 | + assertThat(indexILMState.get("failed_step"), nullValue()); |
| 349 | + assertThat(indexILMState.get("action"), is("wait_for_snapshot")); |
| 350 | + assertThat(indexILMState.get("step"), is("wait-for-snapshot")); |
| 351 | + }, slmPolicy); |
343 | 352 |
|
344 | | - Request request = new Request("PUT", "/_slm/policy/" + smlPolicy + "/_execute"); |
| 353 | + Request request = new Request("PUT", "/_slm/policy/" + slmPolicy + "/_execute"); |
345 | 354 | assertOK(client().performRequest(request)); |
346 | 355 |
|
347 | | - |
348 | | - assertBusy(() -> assertThat(getStepKeyForIndex(index).getAction(), equalTo("completed")), 2, TimeUnit.MINUTES); |
349 | | - |
350 | | - request = new Request("DELETE", "/_slm/policy/" + smlPolicy); |
351 | | - assertOK(client().performRequest(request)); |
352 | | - |
353 | | - request = new Request("DELETE", "/_snapshot/" + repo); |
354 | | - assertOK(client().performRequest(request)); |
| 356 | + assertBusy(() -> assertThat(getStepKeyForIndex(index).getAction(), equalTo("completed")), slmPolicy); |
355 | 357 | } |
356 | 358 |
|
357 | 359 | public void testWaitForSnapshotSlmExecutedBefore() throws Exception { |
358 | 360 | createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) |
359 | 361 | .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)); |
360 | | - String smlPolicy = randomAlphaOfLengthBetween(4, 10); |
361 | | - createNewSingletonPolicy("delete", new WaitForSnapshotAction(smlPolicy)); |
| 362 | + String slmPolicy = randomAlphaOfLengthBetween(4, 10); |
| 363 | + createNewSingletonPolicy("delete", new WaitForSnapshotAction(slmPolicy)); |
362 | 364 |
|
363 | 365 | String repo = createSnapshotRepo(); |
364 | | - createSlmPolicy(smlPolicy, repo); |
| 366 | + createSlmPolicy(slmPolicy, repo); |
365 | 367 |
|
366 | | - Request request = new Request("PUT", "/_slm/policy/" + smlPolicy + "/_execute"); |
| 368 | + Request request = new Request("PUT", "/_slm/policy/" + slmPolicy + "/_execute"); |
367 | 369 | assertOK(client().performRequest(request)); |
368 | 370 |
|
| 371 | + //wait for slm to finish execution |
| 372 | + assertBusy(() -> { |
| 373 | + Response response = client().performRequest(new Request("GET", "/_slm/policy/" + slmPolicy)); |
| 374 | + try (InputStream is = response.getEntity().getContent()) { |
| 375 | + Map<String, Object> responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); |
| 376 | + assertEquals(1, ((Map<?, ?>) ((Map<?, ?>) responseMap.get(slmPolicy)).get("stats")).get("snapshots_taken")); |
| 377 | + } |
| 378 | + }, slmPolicy); |
| 379 | + |
369 | 380 | updatePolicy(index, policy); |
370 | | - assertBusy(() -> assertThat(getStepKeyForIndex(index).getAction(), equalTo("wait_for_snapshot"))); |
371 | | - assertBusy(() -> assertThat(getStepKeyForIndex(index).getName(), equalTo("wait-for-snapshot"))); |
372 | 381 |
|
373 | | - request = new Request("PUT", "/_slm/policy/" + smlPolicy + "/_execute"); |
374 | | - assertOK(client().performRequest(request)); |
| 382 | + assertBusy( () -> { |
| 383 | + Map<String, Object> indexILMState = explainIndex(index); |
| 384 | + assertThat(indexILMState.get("failed_step"), nullValue()); |
| 385 | + assertThat(indexILMState.get("action"), is("wait_for_snapshot")); |
| 386 | + assertThat(indexILMState.get("step"), is("wait-for-snapshot")); |
| 387 | + }, slmPolicy); |
375 | 388 |
|
376 | | - request = new Request("PUT", "/_slm/policy/" + smlPolicy + "/_execute"); |
| 389 | + request = new Request("PUT", "/_slm/policy/" + slmPolicy + "/_execute"); |
377 | 390 | assertOK(client().performRequest(request)); |
378 | 391 |
|
379 | | - assertBusy(() -> assertThat(getStepKeyForIndex(index).getAction(), equalTo("completed")), 2, TimeUnit.MINUTES); |
380 | | - |
381 | | - request = new Request("DELETE", "/_slm/policy/" + smlPolicy); |
382 | | - assertOK(client().performRequest(request)); |
| 392 | + //wait for slm to finish execution |
| 393 | + assertBusy(() -> { |
| 394 | + Response response = client().performRequest(new Request("GET", "/_slm/policy/" + slmPolicy)); |
| 395 | + try (InputStream is = response.getEntity().getContent()) { |
| 396 | + Map<String, Object> responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); |
| 397 | + assertEquals(2, ((Map<?, ?>) ((Map<?, ?>) responseMap.get(slmPolicy)).get("stats")).get("snapshots_taken")); |
| 398 | + } |
| 399 | + }, slmPolicy); |
383 | 400 |
|
384 | | - request = new Request("DELETE", "/_snapshot/" + repo); |
385 | | - assertOK(client().performRequest(request)); |
| 401 | + assertBusy(() -> assertThat(getStepKeyForIndex(index).getAction(), equalTo("completed")), slmPolicy); |
386 | 402 | } |
387 | 403 |
|
388 | 404 | public void testDelete() throws Exception { |
@@ -1677,11 +1693,29 @@ private String createSnapshotRepo() throws IOException { |
1677 | 1693 | .field("type", "fs") |
1678 | 1694 | .startObject("settings") |
1679 | 1695 | .field("compress", randomBoolean()) |
1680 | | - .field("location", System.getProperty("tests.path.repo")) |
1681 | | - .field("max_snapshot_bytes_per_sec", "256b") |
| 1696 | + //random location to avoid clash with other snapshots |
| 1697 | + .field("location", System.getProperty("tests.path.repo")+ "/" + randomAlphaOfLengthBetween(4, 10)) |
| 1698 | + .field("max_snapshot_bytes_per_sec", "100m") |
1682 | 1699 | .endObject() |
1683 | 1700 | .endObject())); |
1684 | 1701 | assertOK(client().performRequest(request)); |
1685 | 1702 | return repo; |
1686 | 1703 | } |
| 1704 | + |
| 1705 | + //adds debug information for waitForSnapshot tests |
| 1706 | + private void assertBusy(CheckedRunnable<Exception> runnable, String slmPolicy) throws Exception { |
| 1707 | + assertBusy(() -> { |
| 1708 | + try { |
| 1709 | + runnable.run(); |
| 1710 | + } catch (AssertionError e) { |
| 1711 | + Map<String, Object> slm; |
| 1712 | + try (InputStream is = client().performRequest(new Request("GET", "/_slm/policy/" + slmPolicy)).getEntity().getContent()) { |
| 1713 | + slm = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, false); |
| 1714 | + } catch (Exception ignored) { |
| 1715 | + slm = new HashMap<>(); |
| 1716 | + } |
| 1717 | + throw new AssertionError("Index:" + explainIndex(index) + "\nSLM:" + slm, e); |
| 1718 | + } |
| 1719 | + }); |
| 1720 | + } |
1687 | 1721 | } |
0 commit comments