Skip to content

Commit 33ec660

Browse files
Nagaraj Gnagarajg17
authored andcommitted
Fixing stale cache issue post snapshot restore
Signed-off-by: Nagaraj G <[email protected]>
1 parent 5bc2523 commit 33ec660

File tree

4 files changed

+124
-1
lines changed

4 files changed

+124
-1
lines changed

src/main/java/org/opensearch/security/OpenSearchSecurityPlugin.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -870,6 +870,8 @@ public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
870870
}
871871
}
872872
}.toListener());
873+
874+
indexModule.addIndexEventListener(cr);
873875
}
874876
}
875877

src/main/java/org/opensearch/security/configuration/ConfigurationRepository.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,13 @@
7575
import org.opensearch.common.util.concurrent.ThreadContext.StoredContext;
7676
import org.opensearch.core.action.ActionListener;
7777
import org.opensearch.core.common.Strings;
78+
import org.opensearch.core.index.Index;
79+
import org.opensearch.core.index.shard.ShardId;
7880
import org.opensearch.core.rest.RestStatus;
7981
import org.opensearch.core.xcontent.MediaTypeRegistry;
8082
import org.opensearch.env.Environment;
83+
import org.opensearch.index.shard.IndexEventListener;
84+
import org.opensearch.index.shard.IndexShard;
8185
import org.opensearch.security.auditlog.AuditLog;
8286
import org.opensearch.security.auditlog.config.AuditConfig;
8387
import org.opensearch.security.securityconf.DynamicConfigFactory;
@@ -93,8 +97,9 @@
9397
import org.opensearch.transport.client.Client;
9498

9599
import static org.opensearch.security.support.ConfigConstants.SECURITY_ALLOW_DEFAULT_INIT_USE_CLUSTER_STATE;
100+
import static org.opensearch.security.support.SnapshotRestoreHelper.isSecurityIndexRestoredFromSnapshot;
96101

97-
public class ConfigurationRepository implements ClusterStateListener {
102+
public class ConfigurationRepository implements ClusterStateListener, IndexEventListener {
98103
private static final Logger LOGGER = LogManager.getLogger(ConfigurationRepository.class);
99104

100105
private final String securityIndex;
@@ -668,4 +673,23 @@ public Void run() {
668673
});
669674
}
670675
}
676+
677+
@Override
678+
public void afterIndexShardStarted(IndexShard indexShard) {
679+
final ShardId shardId = indexShard.shardId();
680+
final Index index = shardId.getIndex();
681+
682+
// Check if this is a security index shard
683+
if (securityIndex.equals(index.getName())) {
684+
// Only trigger on primary shard to avoid multiple reloads
685+
if (indexShard.routingEntry() != null && indexShard.routingEntry().primary()) {
686+
threadPool.generic().execute(() -> {
687+
if (isSecurityIndexRestoredFromSnapshot(clusterService, index, securityIndex)) {
688+
LOGGER.info("Security index primary shard {} started - config reloading for snapshot restore", shardId);
689+
reloadConfiguration(CType.values());
690+
}
691+
});
692+
}
693+
}
694+
}
671695
}

src/main/java/org/opensearch/security/support/SnapshotRestoreHelper.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,11 @@
3737
import org.opensearch.SpecialPermission;
3838
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
3939
import org.opensearch.action.support.PlainActionFuture;
40+
import org.opensearch.cluster.ClusterState;
41+
import org.opensearch.cluster.RestoreInProgress;
42+
import org.opensearch.cluster.service.ClusterService;
4043
import org.opensearch.common.util.IndexUtils;
44+
import org.opensearch.core.index.Index;
4145
import org.opensearch.repositories.RepositoriesService;
4246
import org.opensearch.repositories.Repository;
4347
import org.opensearch.security.OpenSearchSecurityPlugin;
@@ -89,6 +93,24 @@ public static SnapshotInfo getSnapshotInfo(RestoreSnapshotRequest restoreRequest
8993
return snapshotInfo;
9094
}
9195

96+
public static boolean isSecurityIndexRestoredFromSnapshot(ClusterService clusterService, Index index, String securityIndex) {
97+
try {
98+
ClusterState clusterState = clusterService.state();
99+
RestoreInProgress restoreInProgress = clusterState.custom(RestoreInProgress.TYPE);
100+
101+
if (restoreInProgress != null) {
102+
for (RestoreInProgress.Entry entry : restoreInProgress) {
103+
if (entry.indices().contains(securityIndex)) {
104+
return true;
105+
}
106+
}
107+
}
108+
} catch (Exception e) {
109+
log.warn("Could not determine if index {} was restored from snapshot, assuming new index", index.getName(), e);
110+
}
111+
return false;
112+
}
113+
92114
@SuppressWarnings("removal")
93115
private static void setCurrentThreadName(final String name) {
94116
final SecurityManager sm = System.getSecurityManager();

src/test/java/org/opensearch/security/configuration/ConfigurationRepositoryTest.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
import java.io.IOException;
1616
import java.nio.file.Path;
1717
import java.time.Instant;
18+
import java.util.Collections;
1819
import java.util.Set;
20+
import java.util.concurrent.ExecutorService;
1921
import java.util.concurrent.TimeoutException;
2022

2123
import com.fasterxml.jackson.databind.InjectableValues;
@@ -34,17 +36,22 @@
3436
import org.opensearch.cluster.ClusterChangedEvent;
3537
import org.opensearch.cluster.ClusterState;
3638
import org.opensearch.cluster.ClusterStateUpdateTask;
39+
import org.opensearch.cluster.RestoreInProgress;
3740
import org.opensearch.cluster.block.ClusterBlocks;
3841
import org.opensearch.cluster.metadata.IndexMetadata;
3942
import org.opensearch.cluster.metadata.MappingMetadata;
4043
import org.opensearch.cluster.metadata.Metadata;
4144
import org.opensearch.cluster.node.DiscoveryNode;
4245
import org.opensearch.cluster.node.DiscoveryNodes;
46+
import org.opensearch.cluster.routing.ShardRouting;
4347
import org.opensearch.cluster.service.ClusterService;
4448
import org.opensearch.common.Priority;
4549
import org.opensearch.common.settings.Settings;
4650
import org.opensearch.core.action.ActionListener;
51+
import org.opensearch.core.index.Index;
52+
import org.opensearch.core.index.shard.ShardId;
4753
import org.opensearch.core.rest.RestStatus;
54+
import org.opensearch.index.shard.IndexShard;
4855
import org.opensearch.security.DefaultObjectMapper;
4956
import org.opensearch.security.auditlog.AuditLog;
5057
import org.opensearch.security.securityconf.DynamicConfigFactory;
@@ -63,6 +70,7 @@
6370

6471
import org.mockito.ArgumentCaptor;
6572
import org.mockito.Mock;
73+
import org.mockito.Mockito;
6674
import org.mockito.junit.MockitoJUnitRunner;
6775
import org.mockito.stubbing.OngoingStubbing;
6876

@@ -82,9 +90,11 @@
8290
import static org.mockito.Mockito.anyString;
8391
import static org.mockito.Mockito.doAnswer;
8492
import static org.mockito.Mockito.doCallRealMethod;
93+
import static org.mockito.Mockito.doReturn;
8594
import static org.mockito.Mockito.mock;
8695
import static org.mockito.Mockito.never;
8796
import static org.mockito.Mockito.reset;
97+
import static org.mockito.Mockito.spy;
8898
import static org.mockito.Mockito.times;
8999
import static org.mockito.Mockito.verify;
90100
import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -183,6 +193,20 @@ private ConfigurationRepository createConfigurationRepository(Settings settings)
183193
);
184194
}
185195

196+
private ConfigurationRepository createConfigurationRepository(Settings settings, ThreadPool threadPool) {
197+
return new ConfigurationRepository(
198+
settings.get(ConfigConstants.SECURITY_CONFIG_INDEX_NAME, ConfigConstants.OPENDISTRO_SECURITY_DEFAULT_CONFIG_INDEX),
199+
settings,
200+
path,
201+
threadPool,
202+
localClient,
203+
clusterService,
204+
auditLog,
205+
securityIndexHandler,
206+
configurationLoaderSecurity7
207+
);
208+
}
209+
186210
@Test
187211
public void create_shouldReturnConfigurationRepository() {
188212
ConfigurationRepository configRepository = createConfigurationRepository(Settings.EMPTY);
@@ -569,6 +593,57 @@ public void getConfigurationsFromIndex_SecurityIndexNotInitiallyReady() throws I
569593
assertThat(result.size(), is(CType.values().size()));
570594
}
571595

596+
@Test
597+
public void afterIndexShardStarted_whenSecurityIndexUpdated() throws InterruptedException, TimeoutException {
598+
Settings settings = Settings.builder().build();
599+
IndexShard indexShard = mock(IndexShard.class);
600+
ShardRouting shardRouting = mock(ShardRouting.class);
601+
ShardId shardId = mock(ShardId.class);
602+
Index index = mock(Index.class);
603+
ClusterState mockClusterState = mock(ClusterState.class);
604+
RestoreInProgress mockRestore = mock(RestoreInProgress.class);
605+
RestoreInProgress.Entry mockEntry = mock(RestoreInProgress.Entry.class);
606+
ExecutorService executorService = mock(ExecutorService.class);
607+
ThreadPool threadPool = mock(ThreadPool.class);
608+
ConfigurationRepository configurationRepository = spy(createConfigurationRepository(settings, threadPool));
609+
610+
// Setup mock behavior
611+
when(indexShard.shardId()).thenReturn(shardId);
612+
when(shardId.getIndex()).thenReturn(index);
613+
when(index.getName()).thenReturn(ConfigConstants.OPENDISTRO_SECURITY_DEFAULT_CONFIG_INDEX);
614+
when(indexShard.routingEntry()).thenReturn(shardRouting);
615+
when(clusterService.state()).thenReturn(mockClusterState);
616+
when(mockClusterState.custom(RestoreInProgress.TYPE)).thenReturn(mockRestore);
617+
when(threadPool.generic()).thenReturn(executorService);
618+
619+
// when replica shard updated
620+
when(shardRouting.primary()).thenReturn(false);
621+
configurationRepository.afterIndexShardStarted(indexShard);
622+
verify(executorService, never()).execute(any());
623+
verify(configurationRepository, never()).reloadConfiguration(any());
624+
625+
// when primary shard updated
626+
doReturn(true).when(configurationRepository).reloadConfiguration(any());
627+
when(shardRouting.primary()).thenReturn(true);
628+
when(mockRestore.iterator()).thenReturn(Collections.singletonList(mockEntry).iterator());
629+
when(mockEntry.indices()).thenReturn(Collections.singletonList(ConfigConstants.OPENDISTRO_SECURITY_DEFAULT_CONFIG_INDEX));
630+
ArgumentCaptor<Runnable> successRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
631+
configurationRepository.afterIndexShardStarted(indexShard);
632+
verify(executorService).execute(successRunnableCaptor.capture());
633+
successRunnableCaptor.getValue().run();
634+
verify(configurationRepository).reloadConfiguration(CType.values());
635+
636+
// When there is error in checking if restored from snapshot
637+
Mockito.reset(configurationRepository, executorService);
638+
ArgumentCaptor<Runnable> errorRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
639+
when(clusterService.state()).thenThrow(new RuntimeException("ClusterState exception"));
640+
when(shardRouting.primary()).thenReturn(true);
641+
configurationRepository.afterIndexShardStarted(indexShard);
642+
verify(executorService).execute(errorRunnableCaptor.capture());
643+
errorRunnableCaptor.getValue().run();
644+
verify(configurationRepository, never()).reloadConfiguration(any());
645+
}
646+
572647
void assertClusterState(final ArgumentCaptor<ClusterStateUpdateTask> clusterStateUpdateTaskCaptor) throws Exception {
573648
final var initializedStateUpdate = clusterStateUpdateTaskCaptor.getValue();
574649
assertThat(initializedStateUpdate.priority(), is(Priority.IMMEDIATE));

0 commit comments

Comments
 (0)