Skip to content

Commit

Permalink
Addresed comment on PR
Browse files Browse the repository at this point in the history
Signed-off-by: Shubh Sahu <[email protected]>
  • Loading branch information
Shubh Sahu committed Apr 18, 2024
1 parent efd3e17 commit 312c58a
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.Translog.Durability;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.RemoteStoreSettings;
Expand Down Expand Up @@ -61,6 +62,7 @@
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
import static org.opensearch.index.shard.IndexShardTestCase.getTranslog;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
import static org.opensearch.test.OpenSearchTestCase.getShardLevelBlobPath;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -803,21 +805,31 @@ public void testFlushOnTooManyRemoteTranslogFiles() throws Exception {
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

IndexShard indexShard = getIndexShard(datanode, INDEX_NAME);

Path translogLocation = getTranslog(indexShard).location();
assertFalse(indexShard.shouldPeriodicallyFlush());
assertEquals(0, indexShard.getNumberofTranslogReaders());

try (Stream<Path> files = Files.list(translogLocation)) {
long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count();
assertEquals(totalFiles, 1L);
}

// indexing 100 documents (100 bulk requests), no flush will be triggered yet
for (int i = 0; i < 100; i++) {
indexBulk(INDEX_NAME, 1);
}

assertEquals(100, indexShard.getNumberofTranslogReaders());

try (Stream<Path> files = Files.list(translogLocation)) {
long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count();
assertEquals(totalFiles, 101L);
}
// Will flush and trim the translog readers
indexBulk(INDEX_NAME, 1);

assertBusy(() -> assertEquals(0, indexShard.getNumberofTranslogReaders()), 30, TimeUnit.SECONDS);
assertFalse(indexShard.shouldPeriodicallyFlush());
assertBusy(() -> {
try (Stream<Path> files = Files.list(translogLocation)) {
long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count();
assertEquals(totalFiles, 1L);
}
}, 30, TimeUnit.SECONDS);
}
}
15 changes: 1 addition & 14 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4484,23 +4484,10 @@ public Durability getTranslogDurability() {
// we can not protect with a lock since we "release" on a different thread
private final AtomicBoolean flushOrRollRunning = new AtomicBoolean();

// For testing purpose
public int getNumberofTranslogReaders() {
final Engine engine = getEngineOrNull();
if (engine != null) {
try {
return engine.translogManager().getNumberofTranslogReaders();
} catch (final AlreadyClosedException e) {
// we are already closed
}
}
return -1;
}

/**
* Schedules a flush or translog generation roll if needed but will not schedule more than one concurrently. The operation will be
* executed asynchronously on the flush thread pool.
* Also schedules a refresh if required, decided by translog manager
* Can also schedule a flush if decided by translog manager
*/
public void afterWriteOperation() {
if (shouldPeriodicallyFlush() || shouldRollTranslogGeneration()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,10 +438,9 @@ public String getTranslogUUID() {
*/
public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long flushThreshold) {
/*
* This triggers flush if number of translog files have breached a threshold.
* each translog type can have it's own decider.
* This can trigger flush depending upon translog's implementation
*/
if (translog.shouldFlushOnMaxTranslogFiles()) {
if (translog.shouldFlush()) {
return true;
}
// This is the minimum seqNo that is referred in translog and considered for calculating translog size
Expand Down Expand Up @@ -476,12 +475,4 @@ public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long fl
public void close() throws IOException {
IOUtils.closeWhileHandlingException(translog);
}

/**
* Retrieves the number of translog readers
* @return number of translog readers
*/
public int getNumberofTranslogReaders() {
return translog.getNumberofTranslogReaders();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,4 @@ public Releasable drainSync() {
public Translog.TranslogGeneration getTranslogGeneration() {
return null;
}

@Override
public int getNumberofTranslogReaders() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ int availablePermits() {
* @return {@code true} if the shard should be flushed
*/
@Override
public boolean shouldFlushOnMaxTranslogFiles() {
public boolean shouldFlush() {
return readers.size() >= translogTransferManager.getMaxRemoteTranslogReadersSettings();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2056,13 +2056,9 @@ public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommi
/**
* Checks whether or not the shard should be flushed based on translog files.
* each translog type can have it's own decider
* @return {@code true} if the shard should be refreshed
* @return {@code true} if the shard should be flushed
*/
public boolean shouldFlushOnMaxTranslogFiles() {
public boolean shouldFlush() {
return false;
}

public int getNumberofTranslogReaders() {
return readers.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,4 @@ public interface TranslogManager {
Releasable drainSync();

Translog.TranslogGeneration getTranslogGeneration();

int getNumberofTranslogReaders();
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class RemoteStoreSettings {
);

/**
* Controls the maximum referenced remote translog files. If breached the shard will be Refreshed.
* Controls the maximum referenced remote translog files. If breached the shard will be flushed.
*/
public static final Setting<Integer> CLUSTER_REMOTE_MAX_TRANSLOG_READERS = Setting.intSetting(
"cluster.remote_store.translog.max_readers",
Expand Down

0 comments on commit 312c58a

Please sign in to comment.