Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,6 @@ tests:
- class: org.elasticsearch.xpack.esql.ccq.MultiClusterSpecIT
method: test {csv-spec:spatial.convertFromStringParseError}
issue: https://github.com/elastic/elasticsearch/issues/139213
- class: org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointServiceNodeTests
method: testGetCheckpointStats
issue: https://github.com/elastic/elasticsearch/issues/141112
- class: org.elasticsearch.xpack.esql.qa.single_node.EsqlSpecForceStoredLoadingIT
method: test {csv-spec:boolean.convertFromIntAndLong}
issue: https://github.com/elastic/elasticsearch/issues/141375
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,16 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.cache.query.QueryCacheStats;
import org.elasticsearch.index.cache.request.RequestCacheStats;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.warmer.WarmerStats;
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.test.transport.StubLinkedProjectConfigService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats;
Expand All @@ -67,17 +38,12 @@
import org.junit.AfterClass;
import org.junit.Before;

import java.nio.file.Path;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TransformCheckpointServiceNodeTests extends TransformSingleNodeTestCase {

Expand All @@ -92,23 +58,14 @@ public class TransformCheckpointServiceNodeTests extends TransformSingleNodeTest

private class MockClientForCheckpointing extends NoOpClient {

private final boolean supportTransformCheckpointApi;
private volatile Map<String, long[]> checkpoints;
private volatile String[] indices;

/**
* Mock client for checkpointing
*
* @param supportTransformCheckpointApi whether to mock the checkpoint API, if false throws action not found
*/
MockClientForCheckpointing(ThreadPool threadPool, boolean supportTransformCheckpointApi) {

MockClientForCheckpointing(ThreadPool threadPool) {
super(threadPool);
this.supportTransformCheckpointApi = supportTransformCheckpointApi;
}

void setCheckpoints(Map<String, long[]> checkpoints) {
this.checkpoints = checkpoints;
this.indices = checkpoints.keySet().toArray(new String[0]);
}

@SuppressWarnings("unchecked")
Expand All @@ -118,39 +75,12 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
Request request,
ActionListener<Response> listener
) {

if (request instanceof GetCheckpointAction.Request) {
// throw action not found if checkpoint API is not supported, transform should fallback to legacy checkpointing
if (supportTransformCheckpointApi == false) {
listener.onFailure(new ActionNotFoundTransportException(GetCheckpointAction.NAME));
return;
}

final GetCheckpointAction.Response getCheckpointResponse = new GetCheckpointAction.Response(checkpoints);
listener.onResponse((Response) getCheckpointResponse);
return;
}

if (request instanceof GetIndexRequest) {
// for this test we only need the indices
assert (indices != null);
final GetIndexResponse indexResponse = new GetIndexResponse(indices, null, null, null, null, null);

listener.onResponse((Response) indexResponse);
return;
}

if (request instanceof IndicesStatsRequest) {

// IndicesStatsResponse is package private, therefore using a mock
final IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class);
when(indicesStatsResponse.getShards()).thenReturn(createShardStats(checkpoints));
when(indicesStatsResponse.getFailedShards()).thenReturn(0);

listener.onResponse((Response) indicesStatsResponse);
return;
}

super.doExecute(action, request, listener);
}
}
Expand All @@ -162,7 +92,7 @@ public void createComponents() {
threadPool = new TestThreadPool("TransformCheckpointServiceNodeTests");
}
if (mockClientForCheckpointing == null) {
mockClientForCheckpointing = new MockClientForCheckpointing(threadPool, randomBoolean());
mockClientForCheckpointing = new MockClientForCheckpointing(threadPool);
}
ClusterService clusterService = mock(ClusterService.class);
transformsConfigManager = new IndexBasedTransformConfigManager(
Expand Down Expand Up @@ -354,50 +284,6 @@ private static Map<String, long[]> createCheckPointMap(
return Collections.singletonMap(index, new long[] { checkpointShard1, checkpointShard2, checkpointShard3 });
}

private static ShardStats[] createShardStats(Map<String, long[]> checkpoints) {
List<ShardStats> shardStats = new ArrayList<>();

for (Entry<String, long[]> entry : checkpoints.entrySet()) {

for (int i = 0; i < entry.getValue().length; ++i) {
long checkpoint = entry.getValue()[i];
CommonStats stats = new CommonStats();
stats.fieldData = new FieldDataStats();
stats.queryCache = new QueryCacheStats();
stats.docs = new DocsStats();
stats.store = new StoreStats();
stats.indexing = new IndexingStats();
stats.search = new SearchStats();
stats.segments = new SegmentsStats();
stats.merge = new MergeStats();
stats.refresh = new RefreshStats();
stats.completion = new CompletionStats();
stats.requestCache = new RequestCacheStats();
stats.get = new GetStats();
stats.flush = new FlushStats();
stats.warmer = new WarmerStats();

SeqNoStats seqNoStats = new SeqNoStats(checkpoint, checkpoint, checkpoint);
Index index = new Index(entry.getKey(), UUIDs.randomBase64UUID(random()));
ShardId shardId = new ShardId(index, i);
ShardRouting shardRouting = ShardRouting.newUnassigned(
shardId,
true,
RecoverySource.EmptyStoreRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null),
ShardRouting.Role.DEFAULT
);
Path path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve(String.valueOf(i));

shardStats.add(
new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, seqNoStats, null, false, 0)
);
}

}
return shardStats.toArray(new ShardStats[0]);
}

private static void getCheckpoint(
TransformCheckpointService transformCheckpointService,
String transformId,
Expand Down
Loading