diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java index 38171a7249bd8..c17deadbaf67e 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java @@ -9,21 +9,15 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.get.GetIndexRequest; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; -import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.ParentTaskAssigningClient; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; @@ -33,21 +27,17 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; -import org.elasticsearch.xpack.transform.Transform; import org.elasticsearch.xpack.transform.checkpoint.RemoteClusterResolver.ResolvedIndices; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import java.time.Clock; import java.time.Instant; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeMap; import java.util.stream.Collectors; import static org.elasticsearch.core.Strings.format; @@ -71,9 +61,6 @@ class DefaultCheckpointProvider implements CheckpointProvider { protected final TransformAuditor transformAuditor; protected final TransformConfig transformConfig; - // set of clusters that do not support 8.2+ checkpoint actions - private final Set fallbackToBWC = new HashSet<>(); - DefaultCheckpointProvider( final Clock clock, final ParentTaskAssigningClient client, @@ -182,56 +169,17 @@ private void getCheckpointsFromOneCluster( String[] indices, QueryBuilder query, String cluster, - ActionListener> listener + ActionListener> responseListener ) { - if (fallbackToBWC.contains(cluster)) { - getCheckpointsFromOneClusterBWC(threadContext, client, timeout, headers, indices, cluster, listener); - } else { - getCheckpointsFromOneClusterV2( - threadContext, - client, - timeout, - headers, - indices, - query, - cluster, - ActionListener.wrap(response -> { - logger.debug( - "[{}] Successfully retrieved checkpoints from cluster [{}] using transform checkpoint API", - transformConfig.getId(), - cluster - ); - listener.onResponse(response); - }, e -> { - Throwable unwrappedException = ExceptionsHelper.unwrapCause(e); - if (unwrappedException instanceof ActionNotFoundTransportException) { - // this is an implementation detail, so not necessary to audit or warn, but only report as debug - logger.debug( - "[{}] Cluster [{}] does not support transform checkpoint API, falling back to legacy checkpointing", - transformConfig.getId(), - cluster - ); - - fallbackToBWC.add(cluster); - getCheckpointsFromOneClusterBWC(threadContext, client, timeout, headers, indices, cluster, listener); - } else { - listener.onFailure(e); - } - }) + ActionListener> debugListener = logger.isDebugEnabled() ? responseListener.delegateFailure((l, r) -> { + logger.debug( + "[{}] Successfully retrieved checkpoints from cluster [{}] using transform checkpoint API", + transformConfig.getId(), + cluster ); - } - } + l.onResponse(r); + }) : responseListener; - private static void getCheckpointsFromOneClusterV2( - ThreadContext threadContext, - CheckpointClient client, - TimeValue timeout, - Map headers, - String[] indices, - QueryBuilder query, - String cluster, - ActionListener> listener - ) { GetCheckpointAction.Request getCheckpointRequest = new GetCheckpointAction.Request( indices, IndicesOptions.LENIENT_EXPAND_OPEN, @@ -241,21 +189,20 @@ private static void getCheckpointsFromOneClusterV2( ); ActionListener checkpointListener; if (RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY.equals(cluster)) { - checkpointListener = listener.safeMap(GetCheckpointAction.Response::getCheckpoints); + checkpointListener = debugListener.safeMap(GetCheckpointAction.Response::getCheckpoints); } else { - checkpointListener = ActionListener.wrap( - checkpointResponse -> listener.onResponse( + checkpointListener = debugListener.delegateFailure( + (l, checkpointResponse) -> l.onResponse( checkpointResponse.getCheckpoints() .entrySet() .stream() .collect( Collectors.toMap( entry -> cluster + RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR + entry.getKey(), - entry -> entry.getValue() + Map.Entry::getValue ) ) - ), - listener::onFailure + ) ); } @@ -269,123 +216,6 @@ private static void getCheckpointsFromOneClusterV2( ); } - /** - * BWC fallback for nodes/cluster older than 8.2 - */ - private static void getCheckpointsFromOneClusterBWC( - ThreadContext threadContext, - CheckpointClient client, - TimeValue timeout, - Map headers, - String[] indices, - String cluster, - ActionListener> listener - ) { - // 1st get index to see the indexes the user has access to - GetIndexRequest getIndexRequest = new GetIndexRequest(Transform.HARD_CODED_TRANSFORM_MASTER_NODE_TIMEOUT).indices(indices) - .features(new GetIndexRequest.Feature[0]) - .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); - - ClientHelper.executeWithHeadersAsync( - threadContext, - headers, - ClientHelper.TRANSFORM_ORIGIN, - getIndexRequest, - ActionListener.wrap(getIndexResponse -> { - Set userIndices = getIndexResponse.getIndices() != null - ? new HashSet<>(Arrays.asList(getIndexResponse.getIndices())) - : Collections.emptySet(); - // 2nd get stats request - ClientHelper.executeAsyncWithOrigin( - threadContext, - ClientHelper.TRANSFORM_ORIGIN, - new IndicesStatsRequest().indices(indices).timeout(timeout).clear().indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN), - ActionListener.wrap(response -> { - if (response.getFailedShards() != 0) { - for (int i = 0; i < response.getShardFailures().length; ++i) { - int shardNo = i; - logger.warn( - () -> Strings.format( - "Source has [%s] failed shards, shard failure [%s]", - response.getFailedShards(), - shardNo - ), - response.getShardFailures()[i] - ); - } - listener.onFailure( - new CheckpointException( - "Source has [{}] failed shards, first shard failure: {}", - response.getShardFailures()[0], - response.getFailedShards(), - response.getShardFailures()[0].toString() - ) - ); - return; - } - listener.onResponse(extractIndexCheckPoints(response.getShards(), userIndices, cluster)); - }, e -> listener.onFailure(new CheckpointException("Failed to create checkpoint", e))), - client::getIndicesStats - ); - }, e -> listener.onFailure(new CheckpointException("Failed to create checkpoint", e))), - client::getIndex - ); - } - - static Map extractIndexCheckPoints(ShardStats[] shards, Set userIndices, String cluster) { - Map> checkpointsByIndex = new TreeMap<>(); - - for (ShardStats shard : shards) { - String indexName = shard.getShardRouting().getIndexName(); - - if (userIndices.contains(indexName)) { - // SeqNoStats could be `null`, assume the global checkpoint to be -1 in this case - long globalCheckpoint = shard.getSeqNoStats() == null ? -1L : shard.getSeqNoStats().getGlobalCheckpoint(); - String fullIndexName = cluster.isEmpty() - ? indexName - : cluster + RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR + indexName; - if (checkpointsByIndex.containsKey(fullIndexName)) { - // we have already seen this index, just check/add shards - TreeMap checkpoints = checkpointsByIndex.get(fullIndexName); - // 1st time we see this shard for this index, add the entry for the shard - // or there is already a checkpoint entry for this index/shard combination - // but with a higher global checkpoint. This is by design(not a problem) and - // we take the higher value - if (checkpoints.containsKey(shard.getShardRouting().getId()) == false - || checkpoints.get(shard.getShardRouting().getId()) < globalCheckpoint) { - checkpoints.put(shard.getShardRouting().getId(), globalCheckpoint); - } - } else { - // 1st time we see this index, create an entry for the index and add the shard checkpoint - checkpointsByIndex.put(fullIndexName, new TreeMap<>()); - checkpointsByIndex.get(fullIndexName).put(shard.getShardRouting().getId(), globalCheckpoint); - } - } - } - - // checkpoint extraction is done in 2 steps: - // 1. GetIndexRequest to retrieve the indices the user has access to - // 2. IndicesStatsRequest to retrieve stats about indices - // between 1 and 2 indices could get deleted or created - if (logger.isDebugEnabled()) { - Set userIndicesClone = new HashSet<>(userIndices); - - userIndicesClone.removeAll(checkpointsByIndex.keySet()); - if (userIndicesClone.isEmpty() == false) { - logger.debug("Original set of user indices contained more indexes [{}]", userIndicesClone); - } - } - - // create the final structure - Map checkpointsByIndexReduced = new TreeMap<>(); - - checkpointsByIndex.forEach((indexName, checkpoints) -> { - checkpointsByIndexReduced.put(indexName, checkpoints.values().stream().mapToLong(l -> l).toArray()); - }); - - return checkpointsByIndexReduced; - } - @Override public void getCheckpointingInfo( TransformCheckpoint lastCheckpoint, diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java index 8ed6b49d98753..db6fc6ce61db5 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java @@ -31,7 +31,6 @@ import org.elasticsearch.test.MockLog.LoggingExpectation; import org.elasticsearch.test.transport.StubLinkedProjectConfigService; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.transport.Transport; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; @@ -59,7 +58,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.startsWith; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; @@ -232,51 +230,6 @@ public void testReportSourceIndexChangesAddDeleteMany() { ); } - public void testHandlingShardFailures() throws Exception { - var transformId = getTestName(); - var indexName = "some-index"; - TransformConfig transformConfig = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig(transformId)).setSource( - new SourceConfig(indexName) - ).build(); - - var remoteClusterResolver = mock(RemoteClusterResolver.class); - doReturn(new RemoteClusterResolver.ResolvedIndices(Collections.emptyMap(), Collections.singletonList(indexName))).when( - remoteClusterResolver - ).resolve(transformConfig.getSource().getIndex()); - - mockGetIndexResponse(indexName); - mockIndicesStatsResponse(indexName); - mockGetCheckpointAction(); - - var provider = new DefaultCheckpointProvider( - clock, - parentTaskClient, - remoteClusterResolver, - transformConfigManager, - transformAuditor, - transformConfig - ); - - var latch = new CountDownLatch(1); - provider.createNextCheckpoint( - null, - new LatchedActionListener<>( - ActionListener.wrap( - response -> fail("This test case must fail"), - e -> assertThat( - e.getMessage(), - startsWith( - "Source has [7] failed shards, first shard failure: [some-index][3] failed, " - + "reason [java.lang.Exception: something's wrong" - ) - ) - ), - latch - ) - ); - assertTrue(latch.await(1, TimeUnit.MILLISECONDS)); - } - private void mockGetIndexResponse(String indexName) { GetIndexResponse getIndexResponse = new GetIndexResponse(new String[] { indexName }, null, null, null, null, null); doAnswer(withResponse(getIndexResponse)).when(client).execute(eq(GetIndexAction.INSTANCE), any(), any()); @@ -292,14 +245,6 @@ private void mockIndicesStatsResponse(String indexName) { doAnswer(withResponse(indicesStatsResponse)).when(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any()); } - private void mockGetCheckpointAction() { - doAnswer(invocationOnMock -> { - ActionListener listener = invocationOnMock.getArgument(2); - listener.onFailure(new ActionNotFoundTransportException("This should fail.")); - return null; - }).when(client).execute(eq(GetCheckpointAction.INSTANCE), any(), any()); - } - public void testHandlingNoClusters() throws Exception { var transformId = getTestName(); var indexName = "some-missing-index"; diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformsCheckpointServiceTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformsCheckpointServiceTests.java index 38f3223d840b2..fb40cc0f6b49f 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformsCheckpointServiceTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformsCheckpointServiceTests.java @@ -40,11 +40,9 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import static org.mockito.Mockito.mock; @@ -52,74 +50,6 @@ public class TransformsCheckpointServiceTests extends ESTestCase { - public void testExtractIndexCheckpoints() { - Map expectedCheckpoints = new HashMap<>(); - Set indices = randomUserIndices(); - - ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, false, false, false); - - Map checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices, ""); - - assertEquals(expectedCheckpoints.size(), checkpoints.size()); - assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet()); - - // low-level compare - for (Entry entry : expectedCheckpoints.entrySet()) { - assertArrayEquals(entry.getValue(), checkpoints.get(entry.getKey())); - } - } - - public void testExtractIndexCheckpointsMissingSeqNoStats() { - Map expectedCheckpoints = new HashMap<>(); - Set indices = randomUserIndices(); - - ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, false, false, true); - - Map checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices, ""); - - assertEquals(expectedCheckpoints.size(), checkpoints.size()); - assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet()); - - // low-level compare - for (Entry entry : expectedCheckpoints.entrySet()) { - assertArrayEquals(entry.getValue(), checkpoints.get(entry.getKey())); - } - } - - public void testExtractIndexCheckpointsLostPrimaries() { - Map expectedCheckpoints = new HashMap<>(); - Set indices = randomUserIndices(); - - ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, true, false, false); - - Map checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices, ""); - - assertEquals(expectedCheckpoints.size(), checkpoints.size()); - assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet()); - - // low-level compare - for (Entry entry : expectedCheckpoints.entrySet()) { - assertArrayEquals(entry.getValue(), checkpoints.get(entry.getKey())); - } - } - - public void testExtractIndexCheckpointsInconsistentGlobalCheckpoints() { - Map expectedCheckpoints = new HashMap<>(); - Set indices = randomUserIndices(); - - ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, randomBoolean(), true, false); - - Map checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices, ""); - - assertEquals(expectedCheckpoints.size(), checkpoints.size()); - assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet()); - - // global checkpoints should be max() of all global checkpoints - for (Entry entry : expectedCheckpoints.entrySet()) { - assertArrayEquals(entry.getValue(), checkpoints.get(entry.getKey())); - } - } - public void testTransformCheckpointingInfoWithZeroLastCheckpoint() { var transformState = mock(TransformState.class); when(transformState.getCheckpoint()).thenReturn(0L);