Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,15 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction;
Expand All @@ -33,21 +27,17 @@
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
import org.elasticsearch.xpack.transform.Transform;
import org.elasticsearch.xpack.transform.checkpoint.RemoteClusterResolver.ResolvedIndices;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;

import java.time.Clock;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;

import static org.elasticsearch.core.Strings.format;
Expand All @@ -71,9 +61,6 @@ class DefaultCheckpointProvider implements CheckpointProvider {
protected final TransformAuditor transformAuditor;
protected final TransformConfig transformConfig;

// set of clusters that do not support 8.2+ checkpoint actions
private final Set<String> fallbackToBWC = new HashSet<>();

DefaultCheckpointProvider(
final Clock clock,
final ParentTaskAssigningClient client,
Expand Down Expand Up @@ -182,56 +169,17 @@ private void getCheckpointsFromOneCluster(
String[] indices,
QueryBuilder query,
String cluster,
ActionListener<Map<String, long[]>> listener
ActionListener<Map<String, long[]>> responseListener
) {
if (fallbackToBWC.contains(cluster)) {
getCheckpointsFromOneClusterBWC(threadContext, client, timeout, headers, indices, cluster, listener);
} else {
getCheckpointsFromOneClusterV2(
threadContext,
client,
timeout,
headers,
indices,
query,
cluster,
ActionListener.wrap(response -> {
logger.debug(
"[{}] Successfully retrieved checkpoints from cluster [{}] using transform checkpoint API",
transformConfig.getId(),
cluster
);
listener.onResponse(response);
}, e -> {
Throwable unwrappedException = ExceptionsHelper.unwrapCause(e);
if (unwrappedException instanceof ActionNotFoundTransportException) {
// this is an implementation detail, so not necessary to audit or warn, but only report as debug
logger.debug(
"[{}] Cluster [{}] does not support transform checkpoint API, falling back to legacy checkpointing",
transformConfig.getId(),
cluster
);

fallbackToBWC.add(cluster);
getCheckpointsFromOneClusterBWC(threadContext, client, timeout, headers, indices, cluster, listener);
} else {
listener.onFailure(e);
}
})
ActionListener<Map<String, long[]>> debugListener = logger.isDebugEnabled() ? responseListener.delegateFailure((l, r) -> {
logger.debug(
"[{}] Successfully retrieved checkpoints from cluster [{}] using transform checkpoint API",
transformConfig.getId(),
cluster
);
}
}
l.onResponse(r);
}) : responseListener;

private static void getCheckpointsFromOneClusterV2(
ThreadContext threadContext,
CheckpointClient client,
TimeValue timeout,
Map<String, String> headers,
String[] indices,
QueryBuilder query,
String cluster,
ActionListener<Map<String, long[]>> listener
) {
GetCheckpointAction.Request getCheckpointRequest = new GetCheckpointAction.Request(
indices,
IndicesOptions.LENIENT_EXPAND_OPEN,
Expand All @@ -241,21 +189,20 @@ private static void getCheckpointsFromOneClusterV2(
);
ActionListener<GetCheckpointAction.Response> checkpointListener;
if (RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY.equals(cluster)) {
checkpointListener = listener.safeMap(GetCheckpointAction.Response::getCheckpoints);
checkpointListener = debugListener.safeMap(GetCheckpointAction.Response::getCheckpoints);
} else {
checkpointListener = ActionListener.wrap(
checkpointResponse -> listener.onResponse(
checkpointListener = debugListener.delegateFailure(
(l, checkpointResponse) -> l.onResponse(
checkpointResponse.getCheckpoints()
.entrySet()
.stream()
.collect(
Collectors.toMap(
entry -> cluster + RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR + entry.getKey(),
entry -> entry.getValue()
Map.Entry::getValue
)
)
),
listener::onFailure
)
);
}

Expand All @@ -269,123 +216,6 @@ private static void getCheckpointsFromOneClusterV2(
);
}

/**
* BWC fallback for nodes/cluster older than 8.2
*/
private static void getCheckpointsFromOneClusterBWC(
ThreadContext threadContext,
CheckpointClient client,
TimeValue timeout,
Map<String, String> headers,
String[] indices,
String cluster,
ActionListener<Map<String, long[]>> listener
) {
// 1st get index to see the indexes the user has access to
GetIndexRequest getIndexRequest = new GetIndexRequest(Transform.HARD_CODED_TRANSFORM_MASTER_NODE_TIMEOUT).indices(indices)
.features(new GetIndexRequest.Feature[0])
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);

ClientHelper.executeWithHeadersAsync(
threadContext,
headers,
ClientHelper.TRANSFORM_ORIGIN,
getIndexRequest,
ActionListener.wrap(getIndexResponse -> {
Set<String> userIndices = getIndexResponse.getIndices() != null
? new HashSet<>(Arrays.asList(getIndexResponse.getIndices()))
: Collections.emptySet();
// 2nd get stats request
ClientHelper.executeAsyncWithOrigin(
threadContext,
ClientHelper.TRANSFORM_ORIGIN,
new IndicesStatsRequest().indices(indices).timeout(timeout).clear().indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN),
ActionListener.wrap(response -> {
if (response.getFailedShards() != 0) {
for (int i = 0; i < response.getShardFailures().length; ++i) {
int shardNo = i;
logger.warn(
() -> Strings.format(
"Source has [%s] failed shards, shard failure [%s]",
response.getFailedShards(),
shardNo
),
response.getShardFailures()[i]
);
}
listener.onFailure(
new CheckpointException(
"Source has [{}] failed shards, first shard failure: {}",
response.getShardFailures()[0],
response.getFailedShards(),
response.getShardFailures()[0].toString()
)
);
return;
}
listener.onResponse(extractIndexCheckPoints(response.getShards(), userIndices, cluster));
}, e -> listener.onFailure(new CheckpointException("Failed to create checkpoint", e))),
client::getIndicesStats
);
}, e -> listener.onFailure(new CheckpointException("Failed to create checkpoint", e))),
client::getIndex
);
}

static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<String> userIndices, String cluster) {
Map<String, TreeMap<Integer, Long>> checkpointsByIndex = new TreeMap<>();

for (ShardStats shard : shards) {
String indexName = shard.getShardRouting().getIndexName();

if (userIndices.contains(indexName)) {
// SeqNoStats could be `null`, assume the global checkpoint to be -1 in this case
long globalCheckpoint = shard.getSeqNoStats() == null ? -1L : shard.getSeqNoStats().getGlobalCheckpoint();
String fullIndexName = cluster.isEmpty()
? indexName
: cluster + RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR + indexName;
if (checkpointsByIndex.containsKey(fullIndexName)) {
// we have already seen this index, just check/add shards
TreeMap<Integer, Long> checkpoints = checkpointsByIndex.get(fullIndexName);
// 1st time we see this shard for this index, add the entry for the shard
// or there is already a checkpoint entry for this index/shard combination
// but with a higher global checkpoint. This is by design(not a problem) and
// we take the higher value
if (checkpoints.containsKey(shard.getShardRouting().getId()) == false
|| checkpoints.get(shard.getShardRouting().getId()) < globalCheckpoint) {
checkpoints.put(shard.getShardRouting().getId(), globalCheckpoint);
}
} else {
// 1st time we see this index, create an entry for the index and add the shard checkpoint
checkpointsByIndex.put(fullIndexName, new TreeMap<>());
checkpointsByIndex.get(fullIndexName).put(shard.getShardRouting().getId(), globalCheckpoint);
}
}
}

// checkpoint extraction is done in 2 steps:
// 1. GetIndexRequest to retrieve the indices the user has access to
// 2. IndicesStatsRequest to retrieve stats about indices
// between 1 and 2 indices could get deleted or created
if (logger.isDebugEnabled()) {
Set<String> userIndicesClone = new HashSet<>(userIndices);

userIndicesClone.removeAll(checkpointsByIndex.keySet());
if (userIndicesClone.isEmpty() == false) {
logger.debug("Original set of user indices contained more indexes [{}]", userIndicesClone);
}
}

// create the final structure
Map<String, long[]> checkpointsByIndexReduced = new TreeMap<>();

checkpointsByIndex.forEach((indexName, checkpoints) -> {
checkpointsByIndexReduced.put(indexName, checkpoints.values().stream().mapToLong(l -> l).toArray());
});

return checkpointsByIndexReduced;
}

@Override
public void getCheckpointingInfo(
TransformCheckpoint lastCheckpoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.test.MockLog.LoggingExpectation;
import org.elasticsearch.test.transport.StubLinkedProjectConfigService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction;
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
Expand Down Expand Up @@ -59,7 +58,6 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -232,51 +230,6 @@ public void testReportSourceIndexChangesAddDeleteMany() {
);
}

public void testHandlingShardFailures() throws Exception {
var transformId = getTestName();
var indexName = "some-index";
TransformConfig transformConfig = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig(transformId)).setSource(
new SourceConfig(indexName)
).build();

var remoteClusterResolver = mock(RemoteClusterResolver.class);
doReturn(new RemoteClusterResolver.ResolvedIndices(Collections.emptyMap(), Collections.singletonList(indexName))).when(
remoteClusterResolver
).resolve(transformConfig.getSource().getIndex());

mockGetIndexResponse(indexName);
mockIndicesStatsResponse(indexName);
mockGetCheckpointAction();

var provider = new DefaultCheckpointProvider(
clock,
parentTaskClient,
remoteClusterResolver,
transformConfigManager,
transformAuditor,
transformConfig
);

var latch = new CountDownLatch(1);
provider.createNextCheckpoint(
null,
new LatchedActionListener<>(
ActionListener.wrap(
response -> fail("This test case must fail"),
e -> assertThat(
e.getMessage(),
startsWith(
"Source has [7] failed shards, first shard failure: [some-index][3] failed, "
+ "reason [java.lang.Exception: something's wrong"
)
)
),
latch
)
);
assertTrue(latch.await(1, TimeUnit.MILLISECONDS));
}

private void mockGetIndexResponse(String indexName) {
GetIndexResponse getIndexResponse = new GetIndexResponse(new String[] { indexName }, null, null, null, null, null);
doAnswer(withResponse(getIndexResponse)).when(client).execute(eq(GetIndexAction.INSTANCE), any(), any());
Expand All @@ -292,14 +245,6 @@ private void mockIndicesStatsResponse(String indexName) {
doAnswer(withResponse(indicesStatsResponse)).when(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());
}

private void mockGetCheckpointAction() {
doAnswer(invocationOnMock -> {
ActionListener<?> listener = invocationOnMock.getArgument(2);
listener.onFailure(new ActionNotFoundTransportException("This should fail."));
return null;
}).when(client).execute(eq(GetCheckpointAction.INSTANCE), any(), any());
}

public void testHandlingNoClusters() throws Exception {
var transformId = getTestName();
var indexName = "some-missing-index";
Expand Down
Loading