Skip to content

Commit 38c7ddd

Browse files
authored
Add refresh .security index call between security migrations (#114879)
* Add refresh .security index call between security migrations
1 parent 353b5e4 commit 38c7ddd

File tree

3 files changed

+113
-30
lines changed

3 files changed

+113
-30
lines changed

docs/changelog/114879.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 114879
2+
summary: Add refresh `.security` index call between security migrations
3+
area: Security
4+
type: enhancement
5+
issues: []

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityMigrationExecutor.java

Lines changed: 55 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
14+
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
1315
import org.elasticsearch.action.support.ThreadedActionListener;
1416
import org.elasticsearch.client.internal.Client;
1517
import org.elasticsearch.core.TimeValue;
@@ -20,10 +22,14 @@
2022
import org.elasticsearch.xpack.core.security.action.UpdateIndexMigrationVersionAction;
2123
import org.elasticsearch.xpack.core.security.support.SecurityMigrationTaskParams;
2224

25+
import java.util.Arrays;
2326
import java.util.Map;
2427
import java.util.TreeMap;
2528
import java.util.concurrent.Executor;
2629

30+
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
31+
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
32+
2733
public class SecurityMigrationExecutor extends PersistentTasksExecutor<SecurityMigrationTaskParams> {
2834

2935
private static final Logger logger = LogManager.getLogger(SecurityMigrationExecutor.class);
@@ -55,20 +61,29 @@ protected void nodeOperation(AllocatedPersistentTask task, SecurityMigrationTask
5561
updateMigrationVersion(
5662
params.getMigrationVersion(),
5763
securityIndexManager.getConcreteIndexName(),
58-
ActionListener.wrap(response -> {
64+
listener.delegateFailureAndWrap((l, response) -> {
5965
logger.info("Security migration not needed. Setting current version to: [" + params.getMigrationVersion() + "]");
60-
listener.onResponse(response);
61-
}, listener::onFailure)
66+
l.onResponse(response);
67+
})
6268
);
6369
return;
6470
}
6571

66-
applyOutstandingMigrations(task, params.getMigrationVersion(), listener);
72+
refreshSecurityIndex(
73+
new ThreadedActionListener<>(
74+
this.getExecutor(),
75+
listener.delegateFailureIgnoreResponseAndWrap(l -> applyOutstandingMigrations(task, params.getMigrationVersion(), l))
76+
)
77+
);
6778
}
6879

69-
private void applyOutstandingMigrations(AllocatedPersistentTask task, int currentMigrationVersion, ActionListener<Void> listener) {
80+
private void applyOutstandingMigrations(
81+
AllocatedPersistentTask task,
82+
int currentMigrationVersion,
83+
ActionListener<Void> migrationsListener
84+
) {
7085
if (task.isCancelled()) {
71-
listener.onFailure(new TaskCancelledException("Security migration task cancelled"));
86+
migrationsListener.onFailure(new TaskCancelledException("Security migration task cancelled"));
7287
return;
7388
}
7489
Map.Entry<Integer, SecurityMigrations.SecurityMigration> migrationEntry = migrationByVersion.higherEntry(currentMigrationVersion);
@@ -79,34 +94,56 @@ private void applyOutstandingMigrations(AllocatedPersistentTask task, int curren
7994
.migrate(
8095
securityIndexManager,
8196
client,
82-
ActionListener.wrap(
83-
response -> updateMigrationVersion(
97+
migrationsListener.delegateFailureIgnoreResponseAndWrap(
98+
updateVersionListener -> updateMigrationVersion(
8499
migrationEntry.getKey(),
85100
securityIndexManager.getConcreteIndexName(),
86101
new ThreadedActionListener<>(
87102
this.getExecutor(),
88-
ActionListener.wrap(
89-
updateResponse -> applyOutstandingMigrations(task, migrationEntry.getKey(), listener),
90-
listener::onFailure
91-
)
103+
updateVersionListener.delegateFailureIgnoreResponseAndWrap(refreshListener -> {
104+
refreshSecurityIndex(
105+
new ThreadedActionListener<>(
106+
this.getExecutor(),
107+
refreshListener.delegateFailureIgnoreResponseAndWrap(
108+
l -> applyOutstandingMigrations(task, migrationEntry.getKey(), l)
109+
)
110+
)
111+
);
112+
})
92113
)
93-
),
94-
listener::onFailure
114+
)
95115
)
96116
);
97117
} else {
98118
logger.info("Security migrations applied until version: [" + currentMigrationVersion + "]");
99-
listener.onResponse(null);
119+
migrationsListener.onResponse(null);
100120
}
101121
}
102122

123+
/**
124+
* Refresh security index to make sure that docs that were migrated are visible to the next migration and to prevent version conflicts
125+
* or unexpected behaviour by APIs relying on migrated docs.
126+
*/
127+
private void refreshSecurityIndex(ActionListener<Void> listener) {
128+
RefreshRequest refreshRequest = new RefreshRequest(securityIndexManager.getConcreteIndexName());
129+
executeAsyncWithOrigin(client, SECURITY_ORIGIN, RefreshAction.INSTANCE, refreshRequest, ActionListener.wrap(response -> {
130+
if (response.getFailedShards() != 0) {
131+
// Log a warning but do not stop migration, since this is not a critical operation
132+
logger.warn("Failed to refresh security index during security migration {}", Arrays.toString(response.getShardFailures()));
133+
}
134+
listener.onResponse(null);
135+
}, exception -> {
136+
// Log a warning but do not stop migration, since this is not a critical operation
137+
logger.warn("Failed to refresh security index during security migration", exception);
138+
listener.onResponse(null);
139+
}));
140+
}
141+
103142
private void updateMigrationVersion(int migrationVersion, String indexName, ActionListener<Void> listener) {
104143
client.execute(
105144
UpdateIndexMigrationVersionAction.INSTANCE,
106145
new UpdateIndexMigrationVersionAction.Request(TimeValue.MAX_VALUE, migrationVersion, indexName),
107-
ActionListener.wrap((response) -> {
108-
listener.onResponse(null);
109-
}, listener::onFailure)
146+
listener.delegateFailureIgnoreResponseAndWrap(l -> l.onResponse(null))
110147
);
111148
}
112149
}

x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/support/SecurityMigrationExecutorTests.java

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import org.elasticsearch.action.ActionRequest;
1111
import org.elasticsearch.action.ActionResponse;
1212
import org.elasticsearch.action.ActionType;
13+
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
14+
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
1315
import org.elasticsearch.client.internal.Client;
1416
import org.elasticsearch.common.settings.Settings;
1517
import org.elasticsearch.common.util.concurrent.EsExecutors;
@@ -20,10 +22,12 @@
2022
import org.elasticsearch.test.ESTestCase;
2123
import org.elasticsearch.test.client.NoOpClient;
2224
import org.elasticsearch.threadpool.ThreadPool;
25+
import org.elasticsearch.xpack.core.security.action.UpdateIndexMigrationVersionAction;
2326
import org.elasticsearch.xpack.core.security.action.UpdateIndexMigrationVersionResponse;
2427
import org.elasticsearch.xpack.core.security.support.SecurityMigrationTaskParams;
2528
import org.junit.Before;
2629

30+
import java.util.List;
2731
import java.util.Map;
2832
import java.util.Set;
2933
import java.util.TreeMap;
@@ -40,8 +44,11 @@ public class SecurityMigrationExecutorTests extends ESTestCase {
4044
private SecurityIndexManager securityIndexManager;
4145

4246
private int updateIndexMigrationVersionActionInvocations;
47+
private int refreshActionInvocations;
4348

44-
private boolean clientShouldThrowException = false;
49+
private boolean updateVersionShouldThrowException = false;
50+
51+
private boolean refreshIndexShouldThrowException = false;
4552

4653
private AllocatedPersistentTask mockTask = mock(AllocatedPersistentTask.class);
4754

@@ -51,6 +58,7 @@ public void setUpMocks() {
5158
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
5259
when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
5360
updateIndexMigrationVersionActionInvocations = 0;
61+
refreshActionInvocations = 0;
5462
client = new NoOpClient(threadPool) {
5563
@Override
5664
@SuppressWarnings("unchecked")
@@ -59,12 +67,27 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
5967
Request request,
6068
ActionListener<Response> listener
6169
) {
62-
if (clientShouldThrowException) {
63-
listener.onFailure(new IllegalStateException("Bad client"));
64-
return;
70+
if (request instanceof RefreshRequest) {
71+
if (refreshIndexShouldThrowException) {
72+
if (randomBoolean()) {
73+
listener.onFailure(new IllegalStateException("Refresh index failed"));
74+
} else {
75+
listener.onResponse((Response) new BroadcastResponse(1, 0, 1, List.of()));
76+
}
77+
} else {
78+
refreshActionInvocations++;
79+
listener.onResponse((Response) new BroadcastResponse(1, 1, 0, List.of()));
80+
}
81+
} else if (request instanceof UpdateIndexMigrationVersionAction.Request) {
82+
if (updateVersionShouldThrowException) {
83+
listener.onFailure(new IllegalStateException("Update version failed"));
84+
} else {
85+
updateIndexMigrationVersionActionInvocations++;
86+
listener.onResponse((Response) new UpdateIndexMigrationVersionResponse());
87+
}
88+
} else {
89+
fail("Unexpected client request");
6590
}
66-
updateIndexMigrationVersionActionInvocations++;
67-
listener.onResponse((Response) new UpdateIndexMigrationVersionResponse());
6891

6992
}
7093
};
@@ -85,6 +108,7 @@ public void testSuccessfulMigration() {
85108
verify(mockTask, times(1)).markAsCompleted();
86109
verify(mockTask, times(0)).markAsFailed(any());
87110
assertEquals(2, updateIndexMigrationVersionActionInvocations);
111+
assertEquals(3, refreshActionInvocations);
88112
assertEquals(2, migrateInvocations[0]);
89113
}
90114

@@ -111,6 +135,7 @@ public void testNoMigrationMeetsRequirements() {
111135
verify(mockTask, times(1)).markAsCompleted();
112136
verify(mockTask, times(0)).markAsFailed(any());
113137
assertEquals(0, updateIndexMigrationVersionActionInvocations);
138+
assertEquals(1, refreshActionInvocations);
114139
assertEquals(0, migrateInvocationsCounter[0]);
115140
}
116141

@@ -140,6 +165,7 @@ public void testPartialMigration() {
140165
securityMigrationExecutor.nodeOperation(mockTask, new SecurityMigrationTaskParams(0, true), mock(PersistentTaskState.class));
141166
verify(mockTask, times(1)).markAsCompleted();
142167
verify(mockTask, times(0)).markAsFailed(any());
168+
assertEquals(3, refreshActionInvocations);
143169
assertEquals(2, updateIndexMigrationVersionActionInvocations);
144170
assertEquals(2, migrateInvocations[0]);
145171
}
@@ -158,6 +184,7 @@ public void testNoMigrationNeeded() {
158184
verify(mockTask, times(1)).markAsCompleted();
159185
verify(mockTask, times(0)).markAsFailed(any());
160186
assertEquals(0, updateIndexMigrationVersionActionInvocations);
187+
assertEquals(1, refreshActionInvocations);
161188
assertEquals(0, migrateInvocations[0]);
162189
}
163190

@@ -186,14 +213,13 @@ public int minMappingVersion() {
186213
}))
187214
);
188215

189-
assertThrows(
190-
IllegalStateException.class,
191-
() -> securityMigrationExecutor.nodeOperation(
216+
securityMigrationExecutor.nodeOperation(
192217
mockTask,
193218
new SecurityMigrationTaskParams(0, true),
194219
mock(PersistentTaskState.class)
195-
)
196-
);
220+
);
221+
verify(mockTask, times(1)).markAsFailed(any());
222+
verify(mockTask, times(0)).markAsCompleted();
197223
}
198224

199225
public void testUpdateMigrationVersionThrowsException() {
@@ -205,12 +231,27 @@ public void testUpdateMigrationVersionThrowsException() {
205231
client,
206232
new TreeMap<>(Map.of(1, generateMigration(migrateInvocations, true), 2, generateMigration(migrateInvocations, true)))
207233
);
208-
clientShouldThrowException = true;
234+
updateVersionShouldThrowException = true;
209235
securityMigrationExecutor.nodeOperation(mockTask, new SecurityMigrationTaskParams(0, true), mock(PersistentTaskState.class));
210236
verify(mockTask, times(1)).markAsFailed(any());
211237
verify(mockTask, times(0)).markAsCompleted();
212238
}
213239

240+
public void testRefreshSecurityIndexThrowsException() {
241+
final int[] migrateInvocations = new int[1];
242+
SecurityMigrationExecutor securityMigrationExecutor = new SecurityMigrationExecutor(
243+
"test-task",
244+
threadPool.generic(),
245+
securityIndexManager,
246+
client,
247+
new TreeMap<>(Map.of(1, generateMigration(migrateInvocations, true), 2, generateMigration(migrateInvocations, true)))
248+
);
249+
refreshIndexShouldThrowException = true;
250+
securityMigrationExecutor.nodeOperation(mockTask, new SecurityMigrationTaskParams(0, true), mock(PersistentTaskState.class));
251+
verify(mockTask, times(0)).markAsFailed(any());
252+
verify(mockTask, times(1)).markAsCompleted();
253+
}
254+
214255
private SecurityMigrations.SecurityMigration generateMigration(int[] migrateInvocationsCounter, boolean isEligible) {
215256
SecurityMigrations.SecurityMigration migration = new SecurityMigrations.SecurityMigration() {
216257
@Override

0 commit comments

Comments
 (0)