-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Add system data streams to feature state snapshots #75902
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
e5ecfd0
46871d1
1df9725
de6cd83
faeac56
9083f5d
bb6385f
8e8eb55
f1984b5
6df6980
2461699
7ecba14
33daff6
59fb331
a44a442
b578ed2
231d094
f573027
9c09345
2d1bee5
2a84b43
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -70,7 +70,7 @@ | |
| import org.elasticsearch.core.Tuple; | ||
| import org.elasticsearch.index.Index; | ||
| import org.elasticsearch.index.shard.ShardId; | ||
| import org.elasticsearch.indices.AssociatedIndexDescriptor; | ||
| import org.elasticsearch.indices.SystemDataStreamDescriptor; | ||
| import org.elasticsearch.indices.SystemIndices; | ||
| import org.elasticsearch.repositories.IndexId; | ||
| import org.elasticsearch.repositories.RepositoriesService; | ||
|
|
@@ -306,44 +306,53 @@ public ClusterState execute(ClusterState currentState) { | |
| // Store newSnapshot here to be processed in clusterStateProcessed | ||
| List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request)); | ||
|
|
||
| final List<SnapshotFeatureInfo> featureStates; | ||
| final List<SnapshotFeatureInfo> featureStates = new ArrayList<>(); | ||
| final List<String> systemDataStreamNames = new ArrayList<>(); | ||
| // if we have any feature states in the snapshot, we add their required indices to the snapshot indices if they haven't | ||
| // been requested by the request directly | ||
| if (featureStatesSet.isEmpty()) { | ||
| featureStates = Collections.emptyList(); | ||
| } else { | ||
| final Set<String> indexNames = new HashSet<>(indices); | ||
| featureStates = featureStatesSet.stream() | ||
| .map( | ||
| feature -> new SnapshotFeatureInfo( | ||
| feature, | ||
| systemIndexDescriptorMap.get(feature) | ||
| .getIndexDescriptors() | ||
| .stream() | ||
| .flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream()) | ||
| .collect(Collectors.toList()) | ||
| ) | ||
| ) | ||
| .filter(featureInfo -> featureInfo.getIndices().isEmpty() == false) // Omit any empty featureStates | ||
| final Set<String> indexNames = new HashSet<>(indices); | ||
| for (String featureName : featureStatesSet) { | ||
| SystemIndices.Feature feature = systemIndexDescriptorMap.get(featureName); | ||
|
|
||
| List<String> featureSystemIndices = feature.getIndexDescriptors() | ||
| .stream() | ||
| .flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream()) | ||
| .collect(Collectors.toList()); | ||
| List<String> featureAssociatedIndices = feature.getAssociatedIndexDescriptors() | ||
| .stream() | ||
| .flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream()) | ||
| .collect(Collectors.toList()); | ||
| for (SnapshotFeatureInfo featureState : featureStates) { | ||
| indexNames.addAll(featureState.getIndices()); | ||
| } | ||
|
|
||
| // Add all resolved indices from the feature states to the list of indices | ||
| for (String feature : featureStatesSet) { | ||
| for (AssociatedIndexDescriptor aid : systemIndexDescriptorMap.get(feature).getAssociatedIndexDescriptors()) { | ||
| indexNames.addAll(aid.getMatchingIndices(currentState.metadata())); | ||
| List<String> featureSystemDataStreams = new ArrayList<>(); | ||
| List<String> featureDataStreamBackingIndices = new ArrayList<>(); | ||
| for (SystemDataStreamDescriptor sdd : feature.getDataStreamDescriptors()) { | ||
| List<String> backingIndexNames = sdd.getBackingIndexNames(currentState.metadata()); | ||
| if (backingIndexNames.size() > 0) { | ||
| featureDataStreamBackingIndices.addAll(backingIndexNames); | ||
| featureSystemDataStreams.add(sdd.getDataStreamName()); | ||
| } | ||
| } | ||
|
|
||
| if (featureSystemIndices.size() > 0 | ||
| || featureAssociatedIndices.size() > 0 | ||
| || featureDataStreamBackingIndices.size() > 0) { | ||
|
|
||
| featureStates.add(new SnapshotFeatureInfo(featureName, featureSystemIndices)); | ||
|
||
| indexNames.addAll(featureSystemIndices); | ||
| indexNames.addAll(featureAssociatedIndices); | ||
| indexNames.addAll(featureDataStreamBackingIndices); | ||
| systemDataStreamNames.addAll(featureSystemDataStreams); | ||
| } | ||
| indices = List.copyOf(indexNames); | ||
| } | ||
|
|
||
| // need feature state data streams... | ||
williamrandolph marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| final List<String> dataStreams = indexNameExpressionResolver.dataStreamNames( | ||
| currentState, | ||
| request.indicesOptions(), | ||
| request.indices() | ||
| ); | ||
| dataStreams.addAll(systemDataStreamNames); | ||
|
|
||
| logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,8 @@ | |
| import org.elasticsearch.action.DocWriteRequest; | ||
| import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; | ||
| import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; | ||
| import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; | ||
| import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; | ||
| import org.elasticsearch.action.admin.indices.get.GetIndexResponse; | ||
| import org.elasticsearch.action.index.IndexResponse; | ||
| import org.elasticsearch.action.support.master.AcknowledgedResponse; | ||
|
|
@@ -36,7 +38,11 @@ | |
|
|
||
| import static org.elasticsearch.datastreams.SystemDataStreamSnapshotIT.SystemDataStreamTestPlugin.SYSTEM_DATA_STREAM_NAME; | ||
| import static org.hamcrest.Matchers.arrayWithSize; | ||
| import static org.hamcrest.Matchers.empty; | ||
| import static org.hamcrest.Matchers.equalTo; | ||
| import static org.hamcrest.Matchers.greaterThan; | ||
| import static org.hamcrest.Matchers.hasSize; | ||
| import static org.hamcrest.Matchers.not; | ||
| import static org.hamcrest.Matchers.oneOf; | ||
|
|
||
| public class SystemDataStreamSnapshotIT extends AbstractSnapshotIntegTestCase { | ||
|
|
@@ -88,6 +94,7 @@ public void testSystemDataStreamSnapshotIT() throws Exception { | |
| .setWaitForCompletion(true) | ||
| .setIncludeGlobalState(false) | ||
| .get(); | ||
| assertSnapshotSuccess(createSnapshotResponse); | ||
|
|
||
| // We have to delete the data stream directly, as the feature reset API doesn't clean up system data streams yet | ||
| // See https://github.com/elastic/elasticsearch/issues/75818 | ||
|
|
@@ -118,6 +125,97 @@ public void testSystemDataStreamSnapshotIT() throws Exception { | |
| } | ||
| } | ||
|
|
||
| public void testSystemDataStreamInFeatureState() throws Exception { | ||
| Path location = randomRepoPath(); | ||
| createRepository(REPO, "fs", location); | ||
|
|
||
| { | ||
| CreateDataStreamAction.Request request = new CreateDataStreamAction.Request(SYSTEM_DATA_STREAM_NAME); | ||
| final AcknowledgedResponse response = client().execute(CreateDataStreamAction.INSTANCE, request).get(); | ||
| assertTrue(response.isAcknowledged()); | ||
| } | ||
|
|
||
| // Index a doc so that a concrete backing index will be created | ||
| IndexResponse indexToDataStreamResponse = client().prepareIndex(SYSTEM_DATA_STREAM_NAME) | ||
| .setId("42") | ||
| .setSource("{ \"@timestamp\": \"2099-03-08T11:06:07.000Z\", \"name\": \"my-name\" }", XContentType.JSON) | ||
| .setOpType(DocWriteRequest.OpType.CREATE) | ||
| .execute() | ||
| .actionGet(); | ||
| assertThat(indexToDataStreamResponse.status().getStatus(), oneOf(200, 201)); | ||
|
|
||
| // Index a doc so that a concrete backing index will be created | ||
| IndexResponse indexResponse = client().prepareIndex("my-index") | ||
| .setId("42") | ||
| .setSource("{ \"name\": \"my-name\" }", XContentType.JSON) | ||
| .setOpType(DocWriteRequest.OpType.CREATE) | ||
| .execute() | ||
| .actionGet(); | ||
williamrandolph marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| assertThat(indexResponse.status().getStatus(), oneOf(200, 201)); | ||
|
|
||
| { | ||
| GetDataStreamAction.Request request = new GetDataStreamAction.Request(new String[] { SYSTEM_DATA_STREAM_NAME }); | ||
| GetDataStreamAction.Response response = client().execute(GetDataStreamAction.INSTANCE, request).get(); | ||
| assertThat(response.getDataStreams(), hasSize(1)); | ||
| assertTrue(response.getDataStreams().get(0).getDataStream().isSystem()); | ||
| } | ||
|
|
||
| CreateSnapshotResponse createSnapshotResponse = client().admin() | ||
| .cluster() | ||
| .prepareCreateSnapshot(REPO, SNAPSHOT) | ||
| .setIndices("my-index") | ||
| .setFeatureStates(SystemDataStreamTestPlugin.class.getSimpleName()) | ||
| .setWaitForCompletion(true) | ||
| .setIncludeGlobalState(false) | ||
| .get(); | ||
| assertSnapshotSuccess(createSnapshotResponse); | ||
|
|
||
| assertThat(createSnapshotResponse.getSnapshotInfo().dataStreams(), not(empty())); | ||
|
|
||
| // We have to delete the data stream directly, as the feature reset API doesn't clean up system data streams yet | ||
| // See https://github.com/elastic/elasticsearch/issues/75818 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a comment to the linked issue so we remember to clean up the places we have to work around this in tests. No action required here, just noting. |
||
| { | ||
| DeleteDataStreamAction.Request request = new DeleteDataStreamAction.Request(new String[] { SYSTEM_DATA_STREAM_NAME }); | ||
| AcknowledgedResponse response = client().execute(DeleteDataStreamAction.INSTANCE, request).get(); | ||
| assertTrue(response.isAcknowledged()); | ||
| } | ||
|
|
||
| { | ||
| DeleteIndexRequest request = new DeleteIndexRequest("my-index"); | ||
williamrandolph marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| AcknowledgedResponse response = client().execute(DeleteIndexAction.INSTANCE, request).get(); | ||
| assertTrue(response.isAcknowledged()); | ||
| } | ||
|
|
||
| { | ||
| GetIndexResponse indicesRemaining = client().admin().indices().prepareGetIndex().addIndices("_all").get(); | ||
| assertThat(indicesRemaining.indices(), arrayWithSize(0)); | ||
| } | ||
|
|
||
| RestoreSnapshotResponse restoreSnapshotResponse = client().admin() | ||
| .cluster() | ||
| .prepareRestoreSnapshot(REPO, SNAPSHOT) | ||
| .setWaitForCompletion(true) | ||
| .setIndices("my-index") | ||
| .setFeatureStates(SystemDataStreamTestPlugin.class.getSimpleName()) | ||
| .get(); | ||
| assertEquals(restoreSnapshotResponse.getRestoreInfo().totalShards(), restoreSnapshotResponse.getRestoreInfo().successfulShards()); | ||
|
|
||
| { | ||
| GetDataStreamAction.Request request = new GetDataStreamAction.Request(new String[] { SYSTEM_DATA_STREAM_NAME }); | ||
| GetDataStreamAction.Response response = client().execute(GetDataStreamAction.INSTANCE, request).get(); | ||
| assertThat(response.getDataStreams(), hasSize(1)); | ||
| assertTrue(response.getDataStreams().get(0).getDataStream().isSystem()); | ||
| } | ||
| } | ||
|
|
||
| private void assertSnapshotSuccess(CreateSnapshotResponse createSnapshotResponse) { | ||
|
||
| assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); | ||
| assertThat( | ||
| createSnapshotResponse.getSnapshotInfo().successfulShards(), | ||
| equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()) | ||
| ); | ||
| } | ||
|
|
||
| public static class SystemDataStreamTestPlugin extends Plugin implements SystemIndexPlugin { | ||
|
|
||
| static final String SYSTEM_DATA_STREAM_NAME = ".test-data-stream"; | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.