Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375))
- Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244))
- Adds ExtensionsManager.lookupExtensionSettingsById ([#7466](https://github.com/opensearch-project/OpenSearch/pull/7466))
- SegRep with Remote: Add hook for publishing checkpoint notifications after segment upload to remote store ([#7394](https://github.com/opensearch-project/OpenSearch/pull/7394))

### Dependencies
- Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,14 @@ public boolean isSegRepEnabled() {
return ReplicationType.SEGMENT.equals(replicationType);
}

public boolean isSegRepLocalEnabled() {
return isSegRepEnabled() && !isSegRepWithRemoteEnabled();
}

public boolean isSegRepWithRemoteEnabled() {
return isSegRepEnabled() && isRemoteStoreEnabled() && FeatureFlags.isEnabled(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL);
}

/**
* Returns if remote store is enabled for this index.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ public void beforeRefresh() throws IOException {

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode()) {
if (didRefresh
&& shard.state() == IndexShardState.STARTED
&& shard.getReplicationTracker().isPrimaryMode()
&& !shard.indexSettings.isSegRepWithRemoteEnabled()) {
publisher.publish(shard, shard.getLatestReplicationCheckpoint());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3546,9 +3546,16 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (isRemoteStoreEnabled()) {
internalRefreshListener.add(new RemoteStoreRefreshListener(this));
internalRefreshListener.add(
new RemoteStoreRefreshListener(
this,
// Add the checkpoint publisher if the Segment Replciation via remote store is enabled.
indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY
)
);
}
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) {

if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -96,7 +98,9 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres

private volatile Scheduler.ScheduledCancellable scheduledCancellableRetry;

public RemoteStoreRefreshListener(IndexShard indexShard) {
private final SegmentReplicationCheckpointPublisher checkpointPublisher;

public RemoteStoreRefreshListener(IndexShard indexShard, SegmentReplicationCheckpointPublisher checkpointPublisher) {
this.indexShard = indexShard;
this.storeDirectory = indexShard.store().directory();
this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
Expand All @@ -111,6 +115,7 @@ public RemoteStoreRefreshListener(IndexShard indexShard) {
}
}
resetBackOffDelayIterator();
this.checkpointPublisher = checkpointPublisher;
}

@Override
Expand Down Expand Up @@ -151,6 +156,10 @@ private synchronized void syncSegments(boolean isRetry) {
deleteStaleCommits();
}

// Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can
// move.
ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();

String segmentInfoSnapshotFilename = null;
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
Expand Down Expand Up @@ -190,9 +199,11 @@ private synchronized void syncSegments(boolean isRetry) {
.filter(file -> !localSegmentsPostRefresh.contains(file))
.collect(Collectors.toSet())
.forEach(localSegmentChecksumMap::remove);
OnSuccessfulSegmentsSync();
onSuccessfulSegmentsSync();
final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1);

checkpointPublisher.publish(indexShard, checkpoint);
} else {
shouldRetry = true;
}
Expand Down Expand Up @@ -229,7 +240,7 @@ private void beforeSegmentsSync(boolean isRetry) {
}
}

private void OnSuccessfulSegmentsSync() {
private void onSuccessfulSegmentsSync() {
// Reset the backoffDelayIterator for the future failures
resetBackOffDelayIterator();
// Cancel the scheduled cancellable retry if possible and set it to null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.index.engine.InternalEngineFactory;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -52,7 +53,7 @@ public void setup(boolean primary, int numberOfDocs) throws IOException {
indexDocs(1, numberOfDocs);
indexShard.refresh("test");

remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard);
remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard, SegmentReplicationCheckpointPublisher.EMPTY);
}

private void indexDocs(int startDocId, int numberOfDocs) throws IOException {
Expand Down Expand Up @@ -316,7 +317,7 @@ private void mockIndexShardWithRetryAndScheduleRefresh(
return indexShard.getEngine();
}).when(shard).getEngine();

RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard);
RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, SegmentReplicationCheckpointPublisher.EMPTY);
refreshListener.afterRefresh(false);
}

Expand Down