Skip to content

Commit 74b4436

Browse files
ashking94dk2k
authored andcommitted
Fix issue of red index on close for remote enabled clusters (opensearch-project#15990)
* Fix red index on close for remote translog Signed-off-by: Ashish Singh <[email protected]> * Add UTs Signed-off-by: Ashish Singh <[email protected]> --------- Signed-off-by: Ashish Singh <[email protected]>
1 parent 4111a07 commit 74b4436

File tree

3 files changed

+163
-4
lines changed

3 files changed

+163
-4
lines changed

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java

+68-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
1919
import org.opensearch.action.index.IndexResponse;
2020
import org.opensearch.action.search.SearchPhaseExecutionException;
21+
import org.opensearch.client.Requests;
2122
import org.opensearch.cluster.health.ClusterHealthStatus;
2223
import org.opensearch.cluster.metadata.IndexMetadata;
2324
import org.opensearch.cluster.routing.RecoverySource;
@@ -202,7 +203,7 @@ public void testRemoteTranslogCleanup() throws Exception {
202203

203204
public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
204205
String dataNode = internalCluster().startNode();
205-
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
206+
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000L, -1));
206207
int numberOfIterations = randomIntBetween(5, 15);
207208
indexData(numberOfIterations, true, INDEX_NAME);
208209
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
@@ -1011,4 +1012,70 @@ public void testAsyncTranslogDurabilityRestrictionsThroughIdxTemplates() throws
10111012
.get()
10121013
);
10131014
}
1015+
1016+
public void testCloseIndexWithNoOpSyncAndFlushForSyncTranslog() throws InterruptedException {
1017+
internalCluster().startNodes(3);
1018+
client().admin()
1019+
.cluster()
1020+
.prepareUpdateSettings()
1021+
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "5s"))
1022+
.get();
1023+
Settings.Builder settings = Settings.builder()
1024+
.put(remoteStoreIndexSettings(0, 10000L, -1))
1025+
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s");
1026+
createIndex(INDEX_NAME, settings.build());
1027+
CountDownLatch latch = new CountDownLatch(1);
1028+
new Thread(() -> {
1029+
if (randomBoolean()) {
1030+
for (int i = 0; i < randomIntBetween(1, 5); i++) {
1031+
indexSingleDoc(INDEX_NAME);
1032+
}
1033+
flushAndRefresh(INDEX_NAME);
1034+
}
1035+
// Index single doc to start the asyn io processor to run which will lead to 10s wait time before the next sync.
1036+
indexSingleDoc(INDEX_NAME);
1037+
// Reduce the latch for the main thread to flush after some sleep.
1038+
latch.countDown();
1039+
// Index another doc and in this case the flush would have happened before the sync.
1040+
indexSingleDoc(INDEX_NAME);
1041+
}).start();
1042+
// Wait for atleast one doc to be ingested.
1043+
latch.await();
1044+
// Sleep for some time for the next doc to be present in lucene buffer. If flush happens first before the doc #2
1045+
// gets indexed, then it goes into the happy case where the close index happens succefully.
1046+
Thread.sleep(1000);
1047+
// Flush so that the subsequent sync or flushes are no-op.
1048+
flush(INDEX_NAME);
1049+
// Closing the index involves translog.sync and shard.flush which are now no-op.
1050+
client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet();
1051+
Thread.sleep(10000);
1052+
ensureGreen(INDEX_NAME);
1053+
}
1054+
1055+
public void testCloseIndexWithNoOpSyncAndFlushForAsyncTranslog() throws InterruptedException {
1056+
internalCluster().startNodes(3);
1057+
Settings.Builder settings = Settings.builder()
1058+
.put(remoteStoreIndexSettings(0, 10000L, -1))
1059+
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s")
1060+
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Durability.ASYNC)
1061+
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "10s");
1062+
createIndex(INDEX_NAME, settings.build());
1063+
CountDownLatch latch = new CountDownLatch(1);
1064+
new Thread(() -> {
1065+
// Index some docs to start the asyn io processor to run which will lead to 10s wait time before the next sync.
1066+
indexSingleDoc(INDEX_NAME);
1067+
indexSingleDoc(INDEX_NAME);
1068+
indexSingleDoc(INDEX_NAME);
1069+
// Reduce the latch for the main thread to flush after some sleep.
1070+
latch.countDown();
1071+
}).start();
1072+
// Wait for atleast one doc to be ingested.
1073+
latch.await();
1074+
// Flush so that the subsequent sync or flushes are no-op.
1075+
flush(INDEX_NAME);
1076+
// Closing the index involves translog.sync and shard.flush which are now no-op.
1077+
client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet();
1078+
Thread.sleep(10000);
1079+
ensureGreen(INDEX_NAME);
1080+
}
10141081
}

server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

+18-3
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ public class RemoteFsTranslog extends Translog {
7878
// min generation referred by last uploaded translog
7979
protected volatile long minRemoteGenReferenced;
8080

81+
// the max global checkpoint that has been synced
82+
protected volatile long globalCheckpointSynced;
83+
8184
// clean up translog folder uploaded by previous primaries once
8285
protected final SetOnce<Boolean> olderPrimaryCleaned = new SetOnce<>();
8386

@@ -437,9 +440,10 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws
437440
config.getNodeId()
438441
).build()
439442
) {
443+
Checkpoint checkpoint = current.getLastSyncedCheckpoint();
440444
return translogTransferManager.transferSnapshot(
441445
transferSnapshotProvider,
442-
new RemoteFsTranslogTransferListener(generation, primaryTerm, maxSeqNo)
446+
new RemoteFsTranslogTransferListener(generation, primaryTerm, maxSeqNo, checkpoint.globalCheckpoint)
443447
);
444448
} finally {
445449
syncPermit.release(SYNC_PERMIT);
@@ -474,7 +478,10 @@ public void sync() throws IOException {
474478
public boolean syncNeeded() {
475479
try (ReleasableLock lock = readLock.acquire()) {
476480
return current.syncNeeded()
477-
|| (maxRemoteTranslogGenerationUploaded + 1 < this.currentFileGeneration() && current.totalOperations() == 0);
481+
|| (maxRemoteTranslogGenerationUploaded + 1 < this.currentFileGeneration() && current.totalOperations() == 0)
482+
// The below condition on GCP exists to handle global checkpoint updates during close index.
483+
// Refer issue - https://github.com/opensearch-project/OpenSearch/issues/15989
484+
|| (current.getLastSyncedCheckpoint().globalCheckpoint > globalCheckpointSynced);
478485
}
479486
}
480487

@@ -674,17 +681,25 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen
674681

675682
private final long maxSeqNo;
676683

677-
RemoteFsTranslogTransferListener(long generation, long primaryTerm, long maxSeqNo) {
684+
private final long globalCheckpoint;
685+
686+
RemoteFsTranslogTransferListener(long generation, long primaryTerm, long maxSeqNo, long globalCheckpoint) {
678687
this.generation = generation;
679688
this.primaryTerm = primaryTerm;
680689
this.maxSeqNo = maxSeqNo;
690+
this.globalCheckpoint = globalCheckpoint;
681691
}
682692

683693
@Override
684694
public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException {
685695
maxRemoteTranslogGenerationUploaded = generation;
686696
long previousMinRemoteGenReferenced = minRemoteGenReferenced;
687697
minRemoteGenReferenced = getMinFileGeneration();
698+
// Update the global checkpoint only if the supplied global checkpoint is greater than it
699+
// When a new writer is created the
700+
if (globalCheckpoint > globalCheckpointSynced) {
701+
globalCheckpointSynced = globalCheckpoint;
702+
}
688703
if (previousMinRemoteGenReferenced != minRemoteGenReferenced) {
689704
onMinRemoteGenReferencedChange();
690705
}

server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java

+77
Original file line numberDiff line numberDiff line change
@@ -1801,6 +1801,83 @@ public void testDownloadWithEmptyTranslogOnlyInLocal() throws IOException {
18011801
assertArrayEquals(filesPostFirstDownload, filesPostSecondDownload);
18021802
}
18031803

1804+
public void testSyncWithGlobalCheckpointUpdate() throws IOException {
1805+
ArrayList<Translog.Operation> ops = new ArrayList<>();
1806+
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 }));
1807+
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 1, primaryTerm.get(), new byte[] { 2 }));
1808+
1809+
// Set a global checkpoint
1810+
long initialGlobalCheckpoint = 1L;
1811+
globalCheckpoint.set(initialGlobalCheckpoint);
1812+
1813+
// Sync the translog
1814+
translog.sync();
1815+
1816+
// Verify that the globalCheckpointSynced is updated
1817+
assertEquals(initialGlobalCheckpoint, ((RemoteFsTranslog) translog).getLastSyncedCheckpoint().globalCheckpoint);
1818+
1819+
// Update global checkpoint
1820+
long newGlobalCheckpoint = 2L;
1821+
globalCheckpoint.set(newGlobalCheckpoint);
1822+
1823+
// Add a new operation and sync
1824+
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 2, primaryTerm.get(), new byte[] { 3 }));
1825+
translog.sync();
1826+
1827+
// Verify that the globalCheckpointSynced is updated to the new value
1828+
assertEquals(newGlobalCheckpoint, ((RemoteFsTranslog) translog).getLastSyncedCheckpoint().globalCheckpoint);
1829+
}
1830+
1831+
public void testSyncNeededWithGlobalCheckpointUpdate() throws IOException {
1832+
ArrayList<Translog.Operation> ops = new ArrayList<>();
1833+
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 }));
1834+
1835+
// Set initial global checkpoint
1836+
long initialGlobalCheckpoint = 0L;
1837+
globalCheckpoint.set(initialGlobalCheckpoint);
1838+
1839+
// Sync the translog
1840+
translog.sync();
1841+
1842+
// Verify that sync is not needed
1843+
assertFalse(translog.syncNeeded());
1844+
1845+
// Update global checkpoint
1846+
long newGlobalCheckpoint = 1L;
1847+
globalCheckpoint.set(newGlobalCheckpoint);
1848+
1849+
// Verify that sync is now needed due to global checkpoint update
1850+
assertTrue(translog.syncNeeded());
1851+
1852+
// Sync again
1853+
translog.sync();
1854+
1855+
// Verify that sync is not needed after syncing
1856+
assertFalse(translog.syncNeeded());
1857+
}
1858+
1859+
public void testGlobalCheckpointUpdateDuringClose() throws IOException {
1860+
ArrayList<Translog.Operation> ops = new ArrayList<>();
1861+
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 }));
1862+
1863+
// Set initial global checkpoint
1864+
long initialGlobalCheckpoint = 0L;
1865+
globalCheckpoint.set(initialGlobalCheckpoint);
1866+
1867+
// Sync the translog
1868+
translog.sync();
1869+
1870+
// Update global checkpoint
1871+
long newGlobalCheckpoint = 1L;
1872+
globalCheckpoint.set(newGlobalCheckpoint);
1873+
1874+
// Close the translog
1875+
translog.close();
1876+
1877+
// Verify that the last synced checkpoint includes the updated global checkpoint
1878+
assertEquals(newGlobalCheckpoint, ((RemoteFsTranslog) translog).getLastSyncedCheckpoint().globalCheckpoint);
1879+
}
1880+
18041881
public class ThrowingBlobRepository extends FsRepository {
18051882

18061883
private final Environment environment;

0 commit comments

Comments
 (0)