From 4addc10b20e011eac8e38d57d026028d2efd7742 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Thu, 28 Apr 2022 22:08:30 +0000 Subject: [PATCH 01/11] Intial PR adding classes and tests related to checkpoint publishing Signed-off-by: Rishikesh1159 --- .../org/opensearch/index/engine/Engine.java | 8 + .../shard/CheckpointRefreshListener.java | 45 +++++ .../opensearch/index/shard/IndexShard.java | 29 +++ .../SegmentReplicationReplicaService.java | 30 +++ .../checkpoint/PublishCheckpointAction.java | 173 +++++++++++++++++ .../checkpoint/PublishCheckpointRequest.java | 46 +++++ ...SegmentReplicationCheckpointPublisher.java | 38 ++++ .../copy/PrimaryShardReplicationSource.java | 49 +++++ .../copy/ReplicationCheckpoint.java | 109 +++++++++++ .../PublishCheckpointActionTests.java | 176 ++++++++++++++++++ 10 files changed, 703 insertions(+) create mode 100644 server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationSource.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java create mode 100644 server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index bba1d8c069c68..6c51a5e8502bd 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -234,6 +234,10 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException { } } + public long getProcessedLocalCheckpoint() { + return 0L; + }; + /** * A throttling class that can be activated, causing the * {@code acquireThrottle} method to block on a lock when throttling @@ -1119,6 +1123,10 @@ public abstract void forceMerge( */ public abstract GatedCloseable acquireSafeIndexCommit() throws EngineException; + public SegmentInfos getLatestSegmentInfos() { + return null; + }; + /** * @return a summary of the contents of the current safe commit */ diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java new file mode 100644 index 0000000000000..ecfc79a31a15f --- /dev/null +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.search.ReferenceManager; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; + +import java.io.IOException; + +/** + * A {@link ReferenceManager.RefreshListener} that publishes a checkpoint to be consumed by replicas. + * This class is only used with Segment Replication enabled. + */ +public class CheckpointRefreshListener implements ReferenceManager.RefreshListener { + + protected static Logger logger = LogManager.getLogger(CheckpointRefreshListener.class); + + private final IndexShard shard; + private final SegmentReplicationCheckpointPublisher publisher; + + public CheckpointRefreshListener(IndexShard shard, SegmentReplicationCheckpointPublisher publisher) { + this.shard = shard; + this.publisher = publisher; + } + + @Override + public void beforeRefresh() throws IOException { + // Do nothing + } + + @Override + public void afterRefresh(boolean didRefresh) throws IOException { + if (didRefresh) { + publisher.publish(shard); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 7a12952316c67..36d3c8a8ad0b4 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -159,6 +159,10 @@ import org.opensearch.indices.recovery.RecoveryFailedException; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; +import org.opensearch.indices.replication.SegmentReplicationReplicaService; +import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest; +import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; +import org.opensearch.indices.replication.copy.ReplicationCheckpoint; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.rest.RestStatus; @@ -1357,6 +1361,31 @@ public GatedCloseable acquireSafeIndexCommit() throws EngineExcepti } } + public SegmentInfos getLatestSegmentInfos() { + return getEngine().getLatestSegmentInfos(); + } + + public long getProcessedLocalCheckpoint() { + return getEngine().getProcessedLocalCheckpoint(); + } + + public ReplicationCheckpoint getLatestReplicationCheckpoint() { + final SegmentInfos latestSegmentInfos = getLatestSegmentInfos(); + return new ReplicationCheckpoint( + this.shardId, + getOperationPrimaryTerm(), + latestSegmentInfos.getGeneration(), + getProcessedLocalCheckpoint(), + latestSegmentInfos.getVersion() + ); + } + + public synchronized void onNewCheckpoint( + final PublishCheckpointRequest request + ) { + //To do + } + /** * gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard, * without having to worry about the current state of the engine and concurrent flushes. diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java new file mode 100644 index 0000000000000..1691dd3efde71 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +public class SegmentReplicationReplicaService { + private final ThreadPool threadPool; + private final RecoverySettings recoverySettings; + private final TransportService transportService; + + + public SegmentReplicationReplicaService( + final ThreadPool threadPool, + final RecoverySettings recoverySettings, + final TransportService transportService + ) { + this.threadPool = threadPool; + this.recoverySettings = recoverySettings; + this.transportService = transportService; + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java new file mode 100644 index 0000000000000..8a7c820f6acd9 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -0,0 +1,173 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.checkpoint; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.replication.ReplicationResponse; +import org.opensearch.action.support.replication.ReplicationTask; +import org.opensearch.action.support.replication.TransportReplicationAction; +import org.opensearch.cluster.action.shard.ShardStateAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardClosedException; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.SegmentReplicationReplicaService; +import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; +import org.opensearch.node.NodeClosedException; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponseHandler; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.Objects; + +public class PublishCheckpointAction extends TransportReplicationAction< + PublishCheckpointRequest, + PublishCheckpointRequest, + ReplicationResponse> { + + public static final String ACTION_NAME = "indices:admin/publishCheckpoint"; + protected static Logger logger = LogManager.getLogger(PublishCheckpointAction.class); + + private final SegmentReplicationReplicaService replicationService; + private final PrimaryShardReplicationSource source; + + @Inject + public PublishCheckpointAction( + Settings settings, + TransportService transportService, + ClusterService clusterService, + IndicesService indicesService, + ThreadPool threadPool, + ShardStateAction shardStateAction, + ActionFilters actionFilters, + SegmentReplicationReplicaService segmentCopyService, + PrimaryShardReplicationSource source + ) { + super( + settings, + ACTION_NAME, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + actionFilters, + PublishCheckpointRequest::new, + PublishCheckpointRequest::new, + ThreadPool.Names.REFRESH + ); + this.replicationService = segmentCopyService; + this.source = source; + } + + @Override + protected ReplicationResponse newResponseInstance(StreamInput in) throws IOException { + return new ReplicationResponse(in); + } + + @Override + protected void doExecute(Task task, PublishCheckpointRequest request, ActionListener listener) { + assert false : "use PublishCheckpointAction#publish"; + } + + final void publish(IndexShard indexShard) { + String primaryAllocationId = indexShard.routingEntry().allocationId().getId(); + long primaryTerm = indexShard.getPendingPrimaryTerm(); + final ThreadContext threadContext = threadPool.getThreadContext(); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + // we have to execute under the system context so that if security is enabled the sync is authorized + threadContext.markAsSystemContext(); + PublishCheckpointRequest request = new PublishCheckpointRequest(indexShard.getLatestReplicationCheckpoint()); + final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request); + transportService.sendChildRequest( + clusterService.localNode(), + transportPrimaryAction, + new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm), + task, + transportOptions, + new TransportResponseHandler() { + @Override + public ReplicationResponse read(StreamInput in) throws IOException { + return newResponseInstance(in); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public void handleResponse(ReplicationResponse response) { + task.setPhase("finished"); + taskManager.unregister(task); + } + + @Override + public void handleException(TransportException e) { + task.setPhase("finished"); + taskManager.unregister(task); + if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) { + // node shutting down + return; + } + if (ExceptionsHelper.unwrap( + e, + IndexNotFoundException.class, + AlreadyClosedException.class, + IndexShardClosedException.class + ) != null) { + // the index was deleted or the shard is closed + return; + } + logger.warn( + new ParameterizedMessage("{} segment replication checkpoint publishing failed", indexShard.shardId()), + e + ); + } + } + ); + } + } + + @Override + protected void shardOperationOnPrimary( + PublishCheckpointRequest request, + IndexShard primary, + ActionListener> listener + ) { + ActionListener.completeWith(listener, () -> new PrimaryResult<>(request, new ReplicationResponse())); + } + + @Override + protected void shardOperationOnReplica(PublishCheckpointRequest request, IndexShard replica, ActionListener listener) { + Objects.requireNonNull(request); + Objects.requireNonNull(replica); + ActionListener.completeWith(listener, () -> { + logger.trace("Checkpoint received on replica {}", request); + if (request.getCheckpoint().getShardId().equals(replica.shardId())) { + replica.onNewCheckpoint(request); + } + return new ReplicaResult(); + }); + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java new file mode 100644 index 0000000000000..a51a234e3d9de --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java @@ -0,0 +1,46 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.checkpoint; + +import org.opensearch.action.support.replication.ReplicationRequest; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.indices.replication.copy.ReplicationCheckpoint; + +import java.io.IOException; + +public class PublishCheckpointRequest extends ReplicationRequest { + + private final ReplicationCheckpoint checkpoint; + + public PublishCheckpointRequest(ReplicationCheckpoint checkpoint) { + super(checkpoint.getShardId()); + this.checkpoint = checkpoint; + } + + public PublishCheckpointRequest(StreamInput in) throws IOException { + super(in); + this.checkpoint = new ReplicationCheckpoint(in); + } + + public ReplicationCheckpoint getCheckpoint() { + return checkpoint; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + checkpoint.writeTo(out); + } + + @Override + public String toString() { + return "PublishCheckpointRequest{" + "checkpoint=" + checkpoint + '}'; + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java new file mode 100644 index 0000000000000..750213eb0c086 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.checkpoint; + +import org.opensearch.common.inject.Inject; +import org.opensearch.index.shard.IndexShard; + +import java.util.Objects; + +public class SegmentReplicationCheckpointPublisher { + + private final PublishAction publishAction; + + @Inject + public SegmentReplicationCheckpointPublisher(PublishCheckpointAction publishAction) { + this(publishAction::publish); + } + + public SegmentReplicationCheckpointPublisher(PublishAction publishAction) { + this.publishAction = Objects.requireNonNull(publishAction); + } + + public void publish(IndexShard indexShard) { + publishAction.publish(indexShard); + } + + public interface PublishAction { + void publish(IndexShard indexShard); + } + + public static final SegmentReplicationCheckpointPublisher EMPTY = new SegmentReplicationCheckpointPublisher(indexShard -> {}); +} diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationSource.java new file mode 100644 index 0000000000000..b5cc3336f4c76 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationSource.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.copy; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.support.RetryableAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.SegmentReplicationReplicaService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.util.Map; + +public class PrimaryShardReplicationSource { + + private static final Logger logger = LogManager.getLogger(PrimaryShardReplicationSource.class); + private final TransportService transportService; + private final ClusterService clusterService; + private final IndicesService indicesService; + + private final ThreadPool threadPool; + private final RecoverySettings recoverySettings; + private final SegmentReplicationReplicaService segmentReplicationReplicaService; + + public PrimaryShardReplicationSource( + TransportService transportService, + ClusterService clusterService, + IndicesService indicesService, + RecoverySettings recoverySettings, + SegmentReplicationReplicaService segmentReplicationReplicaShardService + ) { + this.transportService = transportService; + this.clusterService = clusterService; + this.indicesService = indicesService; + this.recoverySettings = recoverySettings; + this.threadPool = transportService.getThreadPool(); + this.segmentReplicationReplicaService = segmentReplicationReplicaShardService; + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java new file mode 100644 index 0000000000000..38b4099a7755e --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java @@ -0,0 +1,109 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.copy; + +import org.opensearch.common.Nullable; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.Objects; + +public class ReplicationCheckpoint implements Writeable { + + private final ShardId shardId; + private final long primaryTerm; + private final long segmentsGen; + private final long seqNo; + private final long version; + + public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long seqNo, long version) { + this.shardId = shardId; + this.primaryTerm = primaryTerm; + this.segmentsGen = segmentsGen; + this.seqNo = seqNo; + this.version = version; + } + + public ReplicationCheckpoint(StreamInput in) throws IOException { + shardId = new ShardId(in); + primaryTerm = in.readLong(); + segmentsGen = in.readLong(); + seqNo = in.readLong(); + version = in.readLong(); + } + + public long getPrimaryTerm() { + return primaryTerm; + } + + public long getSegmentsGen() { + return segmentsGen; + } + + public long getVersion() { + return version; + } + + public long getSeqNo() { + return seqNo; + } + + public ShardId getShardId() { + return shardId; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + shardId.writeTo(out); + out.writeLong(primaryTerm); + out.writeLong(segmentsGen); + out.writeLong(seqNo); + out.writeLong(version); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ReplicationCheckpoint that = (ReplicationCheckpoint) o; + return primaryTerm == that.primaryTerm + && segmentsGen == that.segmentsGen + && seqNo == that.seqNo + && version == that.version + && Objects.equals(shardId, that.shardId); + } + + @Override + public int hashCode() { + return Objects.hash(shardId, primaryTerm, segmentsGen, seqNo); + } + + public boolean isAheadOf(@Nullable ReplicationCheckpoint other) { + return other == null || version > other.getVersion(); + } + + @Override + public String toString() { + return "ReplicationCheckpoint{" + + "shardId=" + + shardId + + ", primaryTerm=" + + primaryTerm + + ", segmentsGen=" + + segmentsGen + + ", seqNo=" + + seqNo + + ", version=" + + version + + '}'; + } +} diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java new file mode 100644 index 0000000000000..235c600dd114c --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -0,0 +1,176 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.checkpoint; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.ActionTestUtils; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.action.support.replication.TransportReplicationAction; +import org.opensearch.cluster.action.shard.ShardStateAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.index.Index; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.SegmentReplicationReplicaService; +import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; +import org.opensearch.indices.replication.copy.ReplicationCheckpoint; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.transport.CapturingTransport; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Mockito.*; +import static org.opensearch.test.ClusterServiceUtils.createClusterService; + +public class PublishCheckpointActionTests extends OpenSearchTestCase { + + private ThreadPool threadPool; + private CapturingTransport transport; + private ClusterService clusterService; + private TransportService transportService; + private ShardStateAction shardStateAction; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool(getClass().getName()); + transport = new CapturingTransport(); + clusterService = createClusterService(threadPool); + transportService = transport.createTransportService( + clusterService.getSettings(), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> clusterService.localNode(), + null, + Collections.emptySet() + ); + transportService.start(); + transportService.acceptIncomingRequests(); + shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool); + } + + @Override + public void tearDown() throws Exception { + try { + IOUtils.close(transportService, clusterService, transport); + } finally { + terminate(threadPool); + } + super.tearDown(); + } + + public void testPublishCheckpointActionOnPrimary() throws InterruptedException{ + final IndicesService indicesService = mock(IndicesService.class); + + final Index index = new Index("index", "uuid"); + final IndexService indexService = mock(IndexService.class); + when(indicesService.indexServiceSafe(index)).thenReturn(indexService); + + final int id = randomIntBetween(0, 4); + final IndexShard indexShard = mock(IndexShard.class); + when(indexService.getShard(id)).thenReturn(indexShard); + + final ShardId shardId = new ShardId(index, id); + when(indexShard.shardId()).thenReturn(shardId); + + + final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, clusterService.getClusterSettings()); + + final SegmentReplicationReplicaService replicaService = new SegmentReplicationReplicaService(threadPool,recoverySettings,transportService); + + final PrimaryShardReplicationSource source = new PrimaryShardReplicationSource(transportService,clusterService,indicesService,recoverySettings,replicaService); + + final PublishCheckpointAction action = new PublishCheckpointAction( + Settings.EMPTY, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + new ActionFilters(Collections.emptySet()), + replicaService, + source + ); + + SegmentReplicationCheckpointPublisher checkpointPublisher = new SegmentReplicationCheckpointPublisher(action); + checkpointPublisher.publish(indexShard); + + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); + + final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); + + final CountDownLatch latch = new CountDownLatch(1); + action.shardOperationOnPrimary(request, indexShard, new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> { + // we should forward the request containing the current retention leases to the replica + assertThat(result.replicaRequest(), sameInstance(request)); + }), latch)); + latch.await(); + + } + + public void testPublishCheckpointActionOnReplica() { + final IndicesService indicesService = mock(IndicesService.class); + + final Index index = new Index("index", "uuid"); + final IndexService indexService = mock(IndexService.class); + when(indicesService.indexServiceSafe(index)).thenReturn(indexService); + + final int id = randomIntBetween(0, 4); + final IndexShard indexShard = mock(IndexShard.class); + when(indexService.getShard(id)).thenReturn(indexShard); + + final ShardId shardId = new ShardId(index, id); + when(indexShard.shardId()).thenReturn(shardId); + + final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, clusterService.getClusterSettings()); + + final SegmentReplicationReplicaService replicaService = new SegmentReplicationReplicaService(threadPool,recoverySettings,transportService); + + final PrimaryShardReplicationSource source = new PrimaryShardReplicationSource(transportService,clusterService,indicesService,recoverySettings,replicaService); + + final PublishCheckpointAction action = new PublishCheckpointAction( + Settings.EMPTY, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + new ActionFilters(Collections.emptySet()), + replicaService, + source + ); + + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); + + final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); + + final PlainActionFuture listener = PlainActionFuture.newFuture(); + action.shardOperationOnReplica(request, indexShard, listener); + final TransportReplicationAction.ReplicaResult result = listener.actionGet(); + + // the result should indicate success + final AtomicBoolean success = new AtomicBoolean(); + result.runPostReplicaActions(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString()))); + assertTrue(success.get()); + + } +} From 37f49d24d80e7fada759ab1bb1258e207a0dfc77 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Fri, 29 Apr 2022 17:47:46 +0000 Subject: [PATCH 02/11] Putting a Draft PR with all changes in classes. Testing is still not included in this commit. Signed-off-by: Rishikesh1159 --- .../opensearch/index/shard/IndexShardIT.java | 4 +- .../org/opensearch/index/IndexService.java | 7 ++- .../org/opensearch/index/engine/Engine.java | 8 --- .../opensearch/index/shard/IndexShard.java | 35 ++++++------- .../org/opensearch/indices/IndicesModule.java | 2 + .../opensearch/indices/IndicesService.java | 4 +- .../cluster/IndicesClusterStateService.java | 11 ++++- .../SegmentReplicationReplicaService.java | 30 ------------ .../checkpoint/PublishCheckpointAction.java | 12 +---- .../copy/PrimaryShardReplicationSource.java | 49 ------------------- .../copy/ReplicationCheckpoint.java | 20 ++++---- ...dicesLifecycleListenerSingleNodeTests.java | 3 +- ...actIndicesClusterStateServiceTestCase.java | 2 + ...ClusterStateServiceRandomUpdatesTests.java | 2 + .../PublishCheckpointActionTests.java | 21 +------- .../snapshots/SnapshotResiliencyTests.java | 4 +- .../index/shard/IndexShardTestCase.java | 4 +- 17 files changed, 64 insertions(+), 154 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java delete mode 100644 server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationSource.java diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 5f014e89e330e..888881d43eb11 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -84,6 +84,7 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.plugins.Plugin; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.test.DummyShardLock; @@ -673,7 +674,8 @@ public static final IndexShard newIndexShard( Arrays.asList(listeners), () -> {}, RetentionLeaseSyncer.EMPTY, - cbs + cbs, + SegmentReplicationCheckpointPublisher.EMPTY ); } diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 1b301e85365ba..fdbd0a4cf8471 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -94,6 +94,7 @@ import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.mapper.MapperRegistry; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; @@ -418,7 +419,8 @@ private long getAvgShardSizeInBytes() throws IOException { public synchronized IndexShard createShard( final ShardRouting routing, final Consumer globalCheckpointSyncer, - final RetentionLeaseSyncer retentionLeaseSyncer + final RetentionLeaseSyncer retentionLeaseSyncer, + final SegmentReplicationCheckpointPublisher checkpointPublisher ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); /* @@ -520,7 +522,8 @@ public synchronized IndexShard createShard( indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId), retentionLeaseSyncer, - circuitBreakerService + circuitBreakerService, + checkpointPublisher ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 6c51a5e8502bd..bba1d8c069c68 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -234,10 +234,6 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException { } } - public long getProcessedLocalCheckpoint() { - return 0L; - }; - /** * A throttling class that can be activated, causing the * {@code acquireThrottle} method to block on a lock when throttling @@ -1123,10 +1119,6 @@ public abstract void forceMerge( */ public abstract GatedCloseable acquireSafeIndexCommit() throws EngineException; - public SegmentInfos getLatestSegmentInfos() { - return null; - }; - /** * @return a summary of the contents of the current safe commit */ diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 36d3c8a8ad0b4..f2f17f2879817 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -159,9 +159,8 @@ import org.opensearch.indices.recovery.RecoveryFailedException; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; -import org.opensearch.indices.replication.SegmentReplicationReplicaService; import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest; -import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.copy.ReplicationCheckpoint; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -298,6 +297,8 @@ Runnable getGlobalCheckpointSyncer() { private final RefreshPendingLocationListener refreshPendingLocationListener; private volatile boolean useRetentionLeasesInPeerRecovery; + private final CheckpointRefreshListener checkpointRefreshListener; + public IndexShard( final ShardRouting shardRouting, final IndexSettings indexSettings, @@ -318,7 +319,8 @@ public IndexShard( final List listeners, final Runnable globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, - final CircuitBreakerService circuitBreakerService + final CircuitBreakerService circuitBreakerService, + final SegmentReplicationCheckpointPublisher checkpointPublisher ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -401,6 +403,7 @@ public boolean shouldCache(Query query) { persistMetadata(path, indexSettings, shardRouting, null, logger); this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases(); this.refreshPendingLocationListener = new RefreshPendingLocationListener(); + this.checkpointRefreshListener = new CheckpointRefreshListener(this, checkpointPublisher); } public ThreadPool getThreadPool() { @@ -1361,29 +1364,14 @@ public GatedCloseable acquireSafeIndexCommit() throws EngineExcepti } } - public SegmentInfos getLatestSegmentInfos() { - return getEngine().getLatestSegmentInfos(); - } - - public long getProcessedLocalCheckpoint() { - return getEngine().getProcessedLocalCheckpoint(); - } - public ReplicationCheckpoint getLatestReplicationCheckpoint() { - final SegmentInfos latestSegmentInfos = getLatestSegmentInfos(); - return new ReplicationCheckpoint( - this.shardId, - getOperationPrimaryTerm(), - latestSegmentInfos.getGeneration(), - getProcessedLocalCheckpoint(), - latestSegmentInfos.getVersion() - ); + return new ReplicationCheckpoint(shardId,0,0,0,0); } public synchronized void onNewCheckpoint( final PublishCheckpointRequest request ) { - //To do + //TODO } /** @@ -3124,6 +3112,13 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { } }; + final List internalRefreshListener; + if (indexSettings.isSegRepEnabled() && shardRouting.primary()) { + internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener); + } else { + internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric)); + } + return this.engineConfigFactory.newEngineConfig( shardId, threadPool, diff --git a/server/src/main/java/org/opensearch/indices/IndicesModule.java b/server/src/main/java/org/opensearch/indices/IndicesModule.java index 5e59908e741ba..8d2dbe1cd758a 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesModule.java +++ b/server/src/main/java/org/opensearch/indices/IndicesModule.java @@ -73,6 +73,7 @@ import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.indices.cluster.IndicesClusterStateService; import org.opensearch.indices.mapper.MapperRegistry; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.store.IndicesStore; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; import org.opensearch.plugins.MapperPlugin; @@ -276,6 +277,7 @@ protected void configure() { bind(RetentionLeaseSyncAction.class).asEagerSingleton(); bind(RetentionLeaseBackgroundSyncAction.class).asEagerSingleton(); bind(RetentionLeaseSyncer.class).asEagerSingleton(); + bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton(); } /** diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 5caafb0ce60d4..0b9c3852c9803 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -137,6 +137,7 @@ import org.opensearch.indices.mapper.MapperRegistry; import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.node.Node; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.plugins.PluginsService; @@ -833,6 +834,7 @@ public synchronized void verifyIndexMetadata(IndexMetadata metadata, IndexMetada @Override public IndexShard createShard( final ShardRouting shardRouting, + final SegmentReplicationCheckpointPublisher checkpointPublisher, final PeerRecoveryTargetService recoveryTargetService, final PeerRecoveryTargetService.RecoveryListener recoveryListener, final RepositoriesService repositoriesService, @@ -847,7 +849,7 @@ public IndexShard createShard( IndexService indexService = indexService(shardRouting.index()); assert indexService != null; RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode); - IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer); + IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer,checkpointPublisher); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> { assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index 858cd238ad700..fd0706da2960d 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -80,6 +80,7 @@ import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryFailedException; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.RepositoriesService; import org.opensearch.search.SearchService; import org.opensearch.snapshots.SnapshotShardsService; @@ -132,6 +133,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final Consumer globalCheckpointSyncer; private final RetentionLeaseSyncer retentionLeaseSyncer; + private final SegmentReplicationCheckpointPublisher checkpointPublisher; + @Inject public IndicesClusterStateService( final Settings settings, @@ -147,13 +150,15 @@ public IndicesClusterStateService( final SnapshotShardsService snapshotShardsService, final PrimaryReplicaSyncer primaryReplicaSyncer, final GlobalCheckpointSyncAction globalCheckpointSyncAction, - final RetentionLeaseSyncer retentionLeaseSyncer + final RetentionLeaseSyncer retentionLeaseSyncer, + final SegmentReplicationCheckpointPublisher checkpointPublisher ) { this( settings, indicesService, clusterService, threadPool, + checkpointPublisher, recoveryTargetService, shardStateAction, nodeMappingRefreshAction, @@ -173,6 +178,7 @@ public IndicesClusterStateService( final AllocatedIndices> indicesService, final ClusterService clusterService, final ThreadPool threadPool, + final SegmentReplicationCheckpointPublisher checkpointPublisher, final PeerRecoveryTargetService recoveryTargetService, final ShardStateAction shardStateAction, final NodeMappingRefreshAction nodeMappingRefreshAction, @@ -185,6 +191,7 @@ public IndicesClusterStateService( final RetentionLeaseSyncer retentionLeaseSyncer ) { this.settings = settings; + this.checkpointPublisher = checkpointPublisher; this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, snapshotShardsService); this.indicesService = indicesService; this.clusterService = clusterService; @@ -618,6 +625,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm); indicesService.createShard( shardRouting, + checkpointPublisher, recoveryTargetService, new RecoveryListener(shardRouting, primaryTerm), repositoriesService, @@ -983,6 +991,7 @@ U createIndex(IndexMetadata indexMetadata, List builtInIndex */ T createShard( ShardRouting shardRouting, + SegmentReplicationCheckpointPublisher checkpointPublisher, PeerRecoveryTargetService recoveryTargetService, PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService, diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java deleted file mode 100644 index 1691dd3efde71..0000000000000 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices.replication; - -import org.opensearch.indices.recovery.RecoverySettings; -import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.TransportService; - -public class SegmentReplicationReplicaService { - private final ThreadPool threadPool; - private final RecoverySettings recoverySettings; - private final TransportService transportService; - - - public SegmentReplicationReplicaService( - final ThreadPool threadPool, - final RecoverySettings recoverySettings, - final TransportService transportService - ) { - this.threadPool = threadPool; - this.recoverySettings = recoverySettings; - this.transportService = transportService; - } -} diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index 8a7c820f6acd9..a498218977b48 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -28,8 +28,6 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.indices.IndicesService; -import org.opensearch.indices.replication.SegmentReplicationReplicaService; -import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; import org.opensearch.node.NodeClosedException; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -48,9 +46,6 @@ public class PublishCheckpointAction extends TransportReplicationAction< public static final String ACTION_NAME = "indices:admin/publishCheckpoint"; protected static Logger logger = LogManager.getLogger(PublishCheckpointAction.class); - private final SegmentReplicationReplicaService replicationService; - private final PrimaryShardReplicationSource source; - @Inject public PublishCheckpointAction( Settings settings, @@ -59,9 +54,7 @@ public PublishCheckpointAction( IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, - ActionFilters actionFilters, - SegmentReplicationReplicaService segmentCopyService, - PrimaryShardReplicationSource source + ActionFilters actionFilters ) { super( settings, @@ -76,8 +69,6 @@ public PublishCheckpointAction( PublishCheckpointRequest::new, ThreadPool.Names.REFRESH ); - this.replicationService = segmentCopyService; - this.source = source; } @Override @@ -91,6 +82,7 @@ protected void doExecute(Task task, PublishCheckpointRequest request, ActionList } final void publish(IndexShard indexShard) { + System.out.println(indexShard.routingEntry()); String primaryAllocationId = indexShard.routingEntry().allocationId().getId(); long primaryTerm = indexShard.getPendingPrimaryTerm(); final ThreadContext threadContext = threadPool.getThreadContext(); diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationSource.java deleted file mode 100644 index b5cc3336f4c76..0000000000000 --- a/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationSource.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices.replication.copy; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.action.support.RetryableAction; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.util.concurrent.ConcurrentCollections; -import org.opensearch.indices.IndicesService; -import org.opensearch.indices.recovery.RecoverySettings; -import org.opensearch.indices.replication.SegmentReplicationReplicaService; -import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.TransportService; - -import java.util.Map; - -public class PrimaryShardReplicationSource { - - private static final Logger logger = LogManager.getLogger(PrimaryShardReplicationSource.class); - private final TransportService transportService; - private final ClusterService clusterService; - private final IndicesService indicesService; - - private final ThreadPool threadPool; - private final RecoverySettings recoverySettings; - private final SegmentReplicationReplicaService segmentReplicationReplicaService; - - public PrimaryShardReplicationSource( - TransportService transportService, - ClusterService clusterService, - IndicesService indicesService, - RecoverySettings recoverySettings, - SegmentReplicationReplicaService segmentReplicationReplicaShardService - ) { - this.transportService = transportService; - this.clusterService = clusterService; - this.indicesService = indicesService; - this.recoverySettings = recoverySettings; - this.threadPool = transportService.getThreadPool(); - this.segmentReplicationReplicaService = segmentReplicationReplicaShardService; - } -} diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java index 38b4099a7755e..ecdb64fdb02da 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java @@ -23,14 +23,14 @@ public class ReplicationCheckpoint implements Writeable { private final long primaryTerm; private final long segmentsGen; private final long seqNo; - private final long version; + private final long segmentInfosVersion; - public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long seqNo, long version) { + public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long seqNo, long segmentInfosVersion) { this.shardId = shardId; this.primaryTerm = primaryTerm; this.segmentsGen = segmentsGen; this.seqNo = seqNo; - this.version = version; + this.segmentInfosVersion = segmentInfosVersion; } public ReplicationCheckpoint(StreamInput in) throws IOException { @@ -38,7 +38,7 @@ public ReplicationCheckpoint(StreamInput in) throws IOException { primaryTerm = in.readLong(); segmentsGen = in.readLong(); seqNo = in.readLong(); - version = in.readLong(); + segmentInfosVersion = in.readLong(); } public long getPrimaryTerm() { @@ -49,8 +49,8 @@ public long getSegmentsGen() { return segmentsGen; } - public long getVersion() { - return version; + public long getSegmentInfosVersion() { + return segmentInfosVersion; } public long getSeqNo() { @@ -67,7 +67,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(primaryTerm); out.writeLong(segmentsGen); out.writeLong(seqNo); - out.writeLong(version); + out.writeLong(segmentInfosVersion); } @Override @@ -78,7 +78,7 @@ public boolean equals(Object o) { return primaryTerm == that.primaryTerm && segmentsGen == that.segmentsGen && seqNo == that.seqNo - && version == that.version + && segmentInfosVersion == that.segmentInfosVersion && Objects.equals(shardId, that.shardId); } @@ -88,7 +88,7 @@ public int hashCode() { } public boolean isAheadOf(@Nullable ReplicationCheckpoint other) { - return other == null || version > other.getVersion(); + return other == null || segmentInfosVersion > other.getSegmentInfosVersion(); } @Override @@ -103,7 +103,7 @@ public String toString() { + ", seqNo=" + seqNo + ", version=" - + version + + segmentInfosVersion + '}'; } } diff --git a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 5f3d03f85f324..2ecef2824d738 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -49,6 +49,7 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.util.Arrays; @@ -148,7 +149,7 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem newRouting = newRouting.moveToUnassigned(unassignedInfo) .updateUnassigned(unassignedInfo, RecoverySource.EmptyStoreRecoverySource.INSTANCE); newRouting = ShardRoutingHelper.initialize(newRouting, nodeId); - IndexShard shard = index.createShard(newRouting, s -> {}, RetentionLeaseSyncer.EMPTY); + IndexShard shard = index.createShard(newRouting, s -> {}, RetentionLeaseSyncer.EMPTY, SegmentReplicationCheckpointPublisher.EMPTY); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(5, counter.get()); final DiscoveryNode localNode = new DiscoveryNode( diff --git a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 9d83071c177f5..f3710c14a0294 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -57,6 +57,7 @@ import org.opensearch.indices.cluster.IndicesClusterStateService.Shard; import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.RepositoriesService; import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; @@ -252,6 +253,7 @@ public MockIndexService indexService(Index index) { @Override public MockIndexShard createShard( final ShardRouting shardRouting, + final SegmentReplicationCheckpointPublisher checkpointPublisher, final PeerRecoveryTargetService recoveryTargetService, final PeerRecoveryTargetService.RecoveryListener recoveryListener, final RepositoriesService repositoriesService, diff --git a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 7789054cfdc16..cd3fee60014a7 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -66,6 +66,7 @@ import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.PeerRecoveryTargetService; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.RepositoriesService; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -562,6 +563,7 @@ private IndicesClusterStateService createIndicesClusterStateService( indicesService, clusterService, threadPool, + SegmentReplicationCheckpointPublisher.EMPTY, recoveryTargetService, shardStateAction, null, diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index 235c600dd114c..c4c1839426023 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -24,8 +24,6 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.RecoverySettings; -import org.opensearch.indices.replication.SegmentReplicationReplicaService; -import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; import org.opensearch.indices.replication.copy.ReplicationCheckpoint; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.CapturingTransport; @@ -92,13 +90,8 @@ public void testPublishCheckpointActionOnPrimary() throws InterruptedException{ final ShardId shardId = new ShardId(index, id); when(indexShard.shardId()).thenReturn(shardId); - final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, clusterService.getClusterSettings()); - final SegmentReplicationReplicaService replicaService = new SegmentReplicationReplicaService(threadPool,recoverySettings,transportService); - - final PrimaryShardReplicationSource source = new PrimaryShardReplicationSource(transportService,clusterService,indicesService,recoverySettings,replicaService); - final PublishCheckpointAction action = new PublishCheckpointAction( Settings.EMPTY, transportService, @@ -106,14 +99,9 @@ public void testPublishCheckpointActionOnPrimary() throws InterruptedException{ indicesService, threadPool, shardStateAction, - new ActionFilters(Collections.emptySet()), - replicaService, - source + new ActionFilters(Collections.emptySet()) ); - SegmentReplicationCheckpointPublisher checkpointPublisher = new SegmentReplicationCheckpointPublisher(action); - checkpointPublisher.publish(indexShard); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); @@ -143,9 +131,6 @@ public void testPublishCheckpointActionOnReplica() { final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, clusterService.getClusterSettings()); - final SegmentReplicationReplicaService replicaService = new SegmentReplicationReplicaService(threadPool,recoverySettings,transportService); - - final PrimaryShardReplicationSource source = new PrimaryShardReplicationSource(transportService,clusterService,indicesService,recoverySettings,replicaService); final PublishCheckpointAction action = new PublishCheckpointAction( Settings.EMPTY, @@ -154,9 +139,7 @@ public void testPublishCheckpointActionOnReplica() { indicesService, threadPool, shardStateAction, - new ActionFilters(Collections.emptySet()), - replicaService, - source + new ActionFilters(Collections.emptySet()) ); final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index a896aab0f70c9..ab9a455399366 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -182,6 +182,7 @@ import org.opensearch.indices.recovery.PeerRecoverySourceService; import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.ingest.IngestService; import org.opensearch.monitor.StatusInfo; import org.opensearch.node.ResponseCollectorService; @@ -1860,7 +1861,8 @@ public void onFailure(final Exception e) { shardStateAction, actionFilters ), - RetentionLeaseSyncer.EMPTY + RetentionLeaseSyncer.EMPTY, + SegmentReplicationCheckpointPublisher.EMPTY ); Map actions = new HashMap<>(); final SystemIndices systemIndices = new SystemIndices(emptyMap()); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 509edfd1b9103..8f43bb23db234 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -93,6 +93,7 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.recovery.StartRecoveryRequest; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.OpenSearchBlobStoreRepositoryIntegTestCase; @@ -477,7 +478,8 @@ protected IndexShard newShard( Arrays.asList(listeners), globalCheckpointSyncer, retentionLeaseSyncer, - breakerService + breakerService, + SegmentReplicationCheckpointPublisher.EMPTY ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true; From 730b601438adc27373632e4b041b16285dd20140 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Thu, 5 May 2022 15:54:23 +0000 Subject: [PATCH 03/11] Wiring up index shard to new engine, spotless apply and removing unnecessary tests and logs Signed-off-by: Rishikesh1159 --- .../opensearch/index/shard/IndexShard.java | 57 +++++++++++++++++-- .../opensearch/indices/IndicesService.java | 2 +- .../checkpoint/PublishCheckpointAction.java | 1 - ...dicesLifecycleListenerSingleNodeTests.java | 7 ++- .../PublishCheckpointActionTests.java | 17 +++--- 5 files changed, 66 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f2f17f2879817..1be01418b8853 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -406,6 +406,53 @@ public boolean shouldCache(Query query) { this.checkpointRefreshListener = new CheckpointRefreshListener(this, checkpointPublisher); } + public IndexShard( + final ShardRouting shardRouting, + final IndexSettings indexSettings, + final ShardPath path, + final Store store, + final Supplier indexSortSupplier, + final IndexCache indexCache, + final MapperService mapperService, + final SimilarityService similarityService, + final EngineFactory engineFactory, + final EngineConfigFactory engineConfigFactory, + final IndexEventListener indexEventListener, + final CheckedFunction indexReaderWrapper, + final ThreadPool threadPool, + final BigArrays bigArrays, + final Engine.Warmer warmer, + final List searchOperationListener, + final List listeners, + final Runnable globalCheckpointSyncer, + final RetentionLeaseSyncer retentionLeaseSyncer, + final CircuitBreakerService circuitBreakerService + ) throws IOException { + this( + shardRouting, + indexSettings, + path, + store, + indexSortSupplier, + indexCache, + mapperService, + similarityService, + engineFactory, + engineConfigFactory, + indexEventListener, + indexReaderWrapper, + threadPool, + bigArrays, + warmer, + searchOperationListener, + listeners, + globalCheckpointSyncer, + retentionLeaseSyncer, + circuitBreakerService, + null + ); + } + public ThreadPool getThreadPool() { return this.threadPool; } @@ -1365,13 +1412,11 @@ public GatedCloseable acquireSafeIndexCommit() throws EngineExcepti } public ReplicationCheckpoint getLatestReplicationCheckpoint() { - return new ReplicationCheckpoint(shardId,0,0,0,0); + return new ReplicationCheckpoint(shardId, 0, 0, 0, 0); } - public synchronized void onNewCheckpoint( - final PublishCheckpointRequest request - ) { - //TODO + public synchronized void onNewCheckpoint(final PublishCheckpointRequest request) { + // TODO } /** @@ -3135,7 +3180,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { translogConfig, IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), Arrays.asList(refreshListeners, refreshPendingLocationListener), - Collections.singletonList(new RefreshMetricUpdater(refreshMetric)), + internalRefreshListener, indexSort, circuitBreakerService, globalCheckpointSupplier, diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 0b9c3852c9803..f7d7bf032c5cc 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -849,7 +849,7 @@ public IndexShard createShard( IndexService indexService = indexService(shardRouting.index()); assert indexService != null; RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode); - IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer,checkpointPublisher); + IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> { assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index a498218977b48..ba05a410d4df0 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -82,7 +82,6 @@ protected void doExecute(Task task, PublishCheckpointRequest request, ActionList } final void publish(IndexShard indexShard) { - System.out.println(indexShard.routingEntry()); String primaryAllocationId = indexShard.routingEntry().allocationId().getId(); long primaryTerm = indexShard.getPendingPrimaryTerm(); final ThreadContext threadContext = threadPool.getThreadContext(); diff --git a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 2ecef2824d738..0989bf869f18e 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -149,7 +149,12 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem newRouting = newRouting.moveToUnassigned(unassignedInfo) .updateUnassigned(unassignedInfo, RecoverySource.EmptyStoreRecoverySource.INSTANCE); newRouting = ShardRoutingHelper.initialize(newRouting, nodeId); - IndexShard shard = index.createShard(newRouting, s -> {}, RetentionLeaseSyncer.EMPTY, SegmentReplicationCheckpointPublisher.EMPTY); + IndexShard shard = index.createShard( + newRouting, + s -> {}, + RetentionLeaseSyncer.EMPTY, + SegmentReplicationCheckpointPublisher.EMPTY + ); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(5, counter.get()); final DiscoveryNode localNode = new DiscoveryNode( diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index c4c1839426023..2842c3f260d90 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -9,7 +9,6 @@ package org.opensearch.indices.replication.checkpoint; import org.opensearch.action.ActionListener; -import org.opensearch.action.LatchedActionListener; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.ActionTestUtils; import org.opensearch.action.support.PlainActionFuture; @@ -32,7 +31,6 @@ import org.opensearch.transport.TransportService; import java.util.Collections; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.Matchers.sameInstance; @@ -76,7 +74,7 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testPublishCheckpointActionOnPrimary() throws InterruptedException{ + public void testPublishCheckpointActionOnPrimary() throws InterruptedException { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); @@ -106,12 +104,10 @@ public void testPublishCheckpointActionOnPrimary() throws InterruptedException{ final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); - final CountDownLatch latch = new CountDownLatch(1); - action.shardOperationOnPrimary(request, indexShard, new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> { - // we should forward the request containing the current retention leases to the replica + action.shardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { + // we should forward the request containing the current publish checkpoint to the replica assertThat(result.replicaRequest(), sameInstance(request)); - }), latch)); - latch.await(); + })); } @@ -131,7 +127,6 @@ public void testPublishCheckpointActionOnReplica() { final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, clusterService.getClusterSettings()); - final PublishCheckpointAction action = new PublishCheckpointAction( Settings.EMPTY, transportService, @@ -150,10 +145,14 @@ public void testPublishCheckpointActionOnReplica() { action.shardOperationOnReplica(request, indexShard, listener); final TransportReplicationAction.ReplicaResult result = listener.actionGet(); + // onNewCheckpoint should be called on shard with checkpoint request + verify(indexShard).onNewCheckpoint(request); + // the result should indicate success final AtomicBoolean success = new AtomicBoolean(); result.runPostReplicaActions(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString()))); assertTrue(success.get()); } + } From 4928270bea6e562b25a9efd5116b1bc0ad9a74d8 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Fri, 6 May 2022 17:03:03 +0000 Subject: [PATCH 04/11] Adding Unit test for checkpointRefreshListener Signed-off-by: Rishikesh1159 --- .../opensearch/index/shard/IndexShardIT.java | 4 +- .../index/shard/IndexShardTests.java | 33 ++++------ .../index/shard/IndexShardTestCase.java | 64 ++++++++++++++++++- 3 files changed, 78 insertions(+), 23 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 888881d43eb11..5f014e89e330e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -84,7 +84,6 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.recovery.RecoveryState; -import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.plugins.Plugin; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.test.DummyShardLock; @@ -674,8 +673,7 @@ public static final IndexShard newIndexShard( Arrays.asList(listeners), () -> {}, RetentionLeaseSyncer.EMPTY, - cbs, - SegmentReplicationCheckpointPublisher.EMPTY + cbs ); } diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 3bc5218e2f61f..dda8aa3ba47ae 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -133,6 +133,7 @@ import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.IndexId; import org.opensearch.snapshots.Snapshot; import org.opensearch.snapshots.SnapshotId; @@ -178,25 +179,8 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.either; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.everyItem; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.hasKey; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.hasToString; -import static org.hamcrest.Matchers.in; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.lessThanOrEqualTo; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.oneOf; -import static org.hamcrest.Matchers.sameInstance; +import static org.hamcrest.Matchers.*; +import static org.mockito.Mockito.mock; import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; import static org.opensearch.common.lucene.Lucene.cleanLuceneIndex; import static org.opensearch.common.xcontent.ToXContent.EMPTY_PARAMS; @@ -3424,6 +3408,17 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept closeShards(newShard); } + /** + * here we are mocking a SegmentReplicationcheckpointPublisher and testing that when a refresh happens on index shard if CheckpointRefreshListener is added to the InternalrefreshListerners List + */ + public void testCheckpointRefreshListener() throws IOException { + final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class); + IndexShard shard = newStartedShard(p -> newShard(mock), true); + shard.refresh("test"); + assertEquals(shard.getEngine().config().getInternalRefreshListener().get(1).getClass(), CheckpointRefreshListener.class); + closeShards(shard); + } + public void testIndexCheckOnStartup() throws Exception { final IndexShard indexShard = newStartedShard(true); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 8f43bb23db234..2e85c024ee635 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -409,6 +409,66 @@ protected IndexShard newShard( ); } + /** + * creates a new initializing shard. The shard will will be put in its proper path under the + * current node id the shard is assigned to. + * @param routing shard routing to use + * @param shardPath path to use for shard data + * @param indexMetadata indexMetadata for the shard, including any mapping + * @param storeProvider an optional custom store provider to use. If null a default file based store will be created + * @param indexReaderWrapper an optional wrapper to be used during search + * @param globalCheckpointSyncer callback for syncing global checkpoints + * @param indexEventListener index event listener + * @param listeners an optional set of listeners to add to the shard + */ + protected IndexShard newShard( + ShardRouting routing, + ShardPath shardPath, + IndexMetadata indexMetadata, + @Nullable CheckedFunction storeProvider, + @Nullable CheckedFunction indexReaderWrapper, + @Nullable EngineFactory engineFactory, + @Nullable EngineConfigFactory engineConfigFactory, + Runnable globalCheckpointSyncer, + RetentionLeaseSyncer retentionLeaseSyncer, + IndexEventListener indexEventListener, + IndexingOperationListener... listeners + ) throws IOException { + return newShard(routing, shardPath, indexMetadata, storeProvider, indexReaderWrapper, engineFactory, engineConfigFactory, globalCheckpointSyncer, retentionLeaseSyncer, indexEventListener, SegmentReplicationCheckpointPublisher.EMPTY, listeners); + } + + /** + * creates a new initializing shard. The shard will will be put in its proper path under the + * current node id the shard is assigned to. + * @param checkpointPublisher Segment Replication Checkpoint Publisher to publish checkpoint + */ + protected IndexShard newShard(SegmentReplicationCheckpointPublisher checkpointPublisher) throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 0); + final ShardRouting shardRouting = TestShardRouting.newShardRouting( + shardId, + randomAlphaOfLength(10), + true, + ShardRoutingState.INITIALIZING, + RecoverySource.EmptyStoreRecoverySource.INSTANCE + ); + final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir()); + ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); + + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, "SEGMENT") + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), between(0, 1000)) + .put(Settings.EMPTY) + .build(); + IndexMetadata metadata = IndexMetadata.builder(shardRouting.getIndexName()) + .settings(indexSettings) + .primaryTerm(0, primaryTerm) + .putMapping("{ \"properties\": {} }").build(); + return newShard(shardRouting, shardPath, metadata, null, null, new InternalEngineFactory(), new EngineConfigFactory(new IndexSettings(metadata, metadata.getSettings())), () -> {}, RetentionLeaseSyncer.EMPTY, EMPTY_EVENT_LISTENER, checkpointPublisher); + } + /** * creates a new initializing shard. * @param routing shard routing to use @@ -418,6 +478,7 @@ protected IndexShard newShard( * @param indexReaderWrapper an optional wrapper to be used during search * @param globalCheckpointSyncer callback for syncing global checkpoints * @param indexEventListener index event listener + * @param checkpointPublisher segment Replication Checkpoint Publisher to publish checkpoint * @param listeners an optional set of listeners to add to the shard */ protected IndexShard newShard( @@ -431,6 +492,7 @@ protected IndexShard newShard( Runnable globalCheckpointSyncer, RetentionLeaseSyncer retentionLeaseSyncer, IndexEventListener indexEventListener, + SegmentReplicationCheckpointPublisher checkpointPublisher, IndexingOperationListener... listeners ) throws IOException { final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build(); @@ -479,7 +541,7 @@ protected IndexShard newShard( globalCheckpointSyncer, retentionLeaseSyncer, breakerService, - SegmentReplicationCheckpointPublisher.EMPTY + checkpointPublisher ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true; From fa39d477259af19816cafff627771604f4dc1b5a Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Fri, 6 May 2022 18:07:38 +0000 Subject: [PATCH 05/11] Applying spotless check Signed-off-by: Rishikesh1159 --- .../index/shard/IndexShardTestCase.java | 32 +++++++++++++++++-- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 2e85c024ee635..cb78ddc17702a 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -434,7 +434,20 @@ protected IndexShard newShard( IndexEventListener indexEventListener, IndexingOperationListener... listeners ) throws IOException { - return newShard(routing, shardPath, indexMetadata, storeProvider, indexReaderWrapper, engineFactory, engineConfigFactory, globalCheckpointSyncer, retentionLeaseSyncer, indexEventListener, SegmentReplicationCheckpointPublisher.EMPTY, listeners); + return newShard( + routing, + shardPath, + indexMetadata, + storeProvider, + indexReaderWrapper, + engineFactory, + engineConfigFactory, + globalCheckpointSyncer, + retentionLeaseSyncer, + indexEventListener, + SegmentReplicationCheckpointPublisher.EMPTY, + listeners + ); } /** @@ -465,8 +478,21 @@ protected IndexShard newShard(SegmentReplicationCheckpointPublisher checkpointPu IndexMetadata metadata = IndexMetadata.builder(shardRouting.getIndexName()) .settings(indexSettings) .primaryTerm(0, primaryTerm) - .putMapping("{ \"properties\": {} }").build(); - return newShard(shardRouting, shardPath, metadata, null, null, new InternalEngineFactory(), new EngineConfigFactory(new IndexSettings(metadata, metadata.getSettings())), () -> {}, RetentionLeaseSyncer.EMPTY, EMPTY_EVENT_LISTENER, checkpointPublisher); + .putMapping("{ \"properties\": {} }") + .build(); + return newShard( + shardRouting, + shardPath, + metadata, + null, + null, + new InternalEngineFactory(), + new EngineConfigFactory(new IndexSettings(metadata, metadata.getSettings())), + () -> {}, + RetentionLeaseSyncer.EMPTY, + EMPTY_EVENT_LISTENER, + checkpointPublisher + ); } /** From e40dc41f8dcabc664fc88313cea8de8f2f83df51 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Fri, 6 May 2022 19:09:11 +0000 Subject: [PATCH 06/11] Fixing import statements * Signed-off-by: Rishikesh1159 --- .../index/shard/IndexShardTests.java | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 3f7fa30892927..49b84414588da 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -180,7 +180,25 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.either; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.in; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.oneOf; +import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; import static org.opensearch.common.lucene.Lucene.cleanLuceneIndex; From 46557bba6036a30aafa8d7434289aab6140157ba Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 11 May 2022 17:53:11 +0000 Subject: [PATCH 07/11] removing unused constructor in index shard Signed-off-by: Rishikesh1159 --- .../opensearch/index/shard/IndexShardIT.java | 4 +- .../opensearch/index/shard/IndexShard.java | 47 ------------------- 2 files changed, 3 insertions(+), 48 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 5f014e89e330e..888881d43eb11 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -84,6 +84,7 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.plugins.Plugin; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.test.DummyShardLock; @@ -673,7 +674,8 @@ public static final IndexShard newIndexShard( Arrays.asList(listeners), () -> {}, RetentionLeaseSyncer.EMPTY, - cbs + cbs, + SegmentReplicationCheckpointPublisher.EMPTY ); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 88e25e17f6de6..c119d2f9d4a3d 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -411,53 +411,6 @@ public boolean shouldCache(Query query) { this.checkpointRefreshListener = new CheckpointRefreshListener(this, checkpointPublisher); } - public IndexShard( - final ShardRouting shardRouting, - final IndexSettings indexSettings, - final ShardPath path, - final Store store, - final Supplier indexSortSupplier, - final IndexCache indexCache, - final MapperService mapperService, - final SimilarityService similarityService, - final EngineFactory engineFactory, - final EngineConfigFactory engineConfigFactory, - final IndexEventListener indexEventListener, - final CheckedFunction indexReaderWrapper, - final ThreadPool threadPool, - final BigArrays bigArrays, - final Engine.Warmer warmer, - final List searchOperationListener, - final List listeners, - final Runnable globalCheckpointSyncer, - final RetentionLeaseSyncer retentionLeaseSyncer, - final CircuitBreakerService circuitBreakerService - ) throws IOException { - this( - shardRouting, - indexSettings, - path, - store, - indexSortSupplier, - indexCache, - mapperService, - similarityService, - engineFactory, - engineConfigFactory, - indexEventListener, - indexReaderWrapper, - threadPool, - bigArrays, - warmer, - searchOperationListener, - listeners, - globalCheckpointSyncer, - retentionLeaseSyncer, - circuitBreakerService, - null - ); - } - public ThreadPool getThreadPool() { return this.threadPool; } From 7ac0b90d36b37070570a8b1aba8ef7bb160eb757 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Fri, 13 May 2022 00:18:58 +0000 Subject: [PATCH 08/11] Addressing comments from last commit Signed-off-by: Rishikesh1159 --- .../org/opensearch/index/IndexService.java | 4 +- .../shard/CheckpointRefreshListener.java | 2 + .../opensearch/index/shard/IndexShard.java | 12 +++-- .../org/opensearch/indices/IndicesModule.java | 5 +- .../checkpoint/PublishCheckpointAction.java | 9 ++++ .../checkpoint/PublishCheckpointRequest.java | 8 +++ ...SegmentReplicationCheckpointPublisher.java | 11 +++++ .../copy/ReplicationCheckpoint.java | 27 ++++++++++ .../index/shard/IndexShardTests.java | 49 ++++++++++++++++++- .../index/shard/IndexShardTestCase.java | 45 ----------------- 10 files changed, 120 insertions(+), 52 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 73946b0c3446c..fc96ca08d45b5 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -528,7 +528,9 @@ public synchronized IndexShard createShard( () -> globalCheckpointSyncer.accept(shardId), retentionLeaseSyncer, circuitBreakerService, - checkpointPublisher + this.indexSettings.isSegRepEnabled() && routing.primary() + ? checkpointPublisher + : SegmentReplicationCheckpointPublisher.EMPTY ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index ecfc79a31a15f..ac6754bf6a74a 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -18,6 +18,8 @@ /** * A {@link ReferenceManager.RefreshListener} that publishes a checkpoint to be consumed by replicas. * This class is only used with Segment Replication enabled. + * + * @opensearch.internal */ public class CheckpointRefreshListener implements ReferenceManager.RefreshListener { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index c119d2f9d4a3d..82bb4a8b6c2d4 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -301,8 +301,7 @@ Runnable getGlobalCheckpointSyncer() { private final AtomicReference pendingRefreshLocation = new AtomicReference<>(); private final RefreshPendingLocationListener refreshPendingLocationListener; private volatile boolean useRetentionLeasesInPeerRecovery; - - private final CheckpointRefreshListener checkpointRefreshListener; + private final ReferenceManager.RefreshListener checkpointRefreshListener; public IndexShard( final ShardRouting shardRouting, @@ -408,7 +407,11 @@ public boolean shouldCache(Query query) { persistMetadata(path, indexSettings, shardRouting, null, logger); this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases(); this.refreshPendingLocationListener = new RefreshPendingLocationListener(); - this.checkpointRefreshListener = new CheckpointRefreshListener(this, checkpointPublisher); + if (indexSettings.isSegRepEnabled() == true) { + this.checkpointRefreshListener = new CheckpointRefreshListener(this, checkpointPublisher); + } else { + this.checkpointRefreshListener = null; + } } public ThreadPool getThreadPool() { @@ -1374,6 +1377,7 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() { } public synchronized void onNewCheckpoint(final PublishCheckpointRequest request) { + assert shardRouting.primary() == false; // TODO } @@ -3116,7 +3120,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { }; final List internalRefreshListener; - if (indexSettings.isSegRepEnabled() && shardRouting.primary()) { + if (this.checkpointRefreshListener != null) { internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener); } else { internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric)); diff --git a/server/src/main/java/org/opensearch/indices/IndicesModule.java b/server/src/main/java/org/opensearch/indices/IndicesModule.java index ae9427b898c0e..0cb2ff958c787 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesModule.java +++ b/server/src/main/java/org/opensearch/indices/IndicesModule.java @@ -41,6 +41,7 @@ import org.opensearch.common.inject.AbstractModule; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.io.stream.NamedWriteableRegistry.Entry; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.xcontent.NamedXContentRegistry; import org.opensearch.index.mapper.BinaryFieldMapper; import org.opensearch.index.mapper.BooleanFieldMapper; @@ -279,7 +280,9 @@ protected void configure() { bind(RetentionLeaseSyncAction.class).asEagerSingleton(); bind(RetentionLeaseBackgroundSyncAction.class).asEagerSingleton(); bind(RetentionLeaseSyncer.class).asEagerSingleton(); - bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton(); + if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) { + bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton(); + } } /** diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index ba05a410d4df0..b74a69971ebd5 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -38,6 +38,12 @@ import java.io.IOException; import java.util.Objects; +/** + * Replication action responsible for publishing checkpoint to a replica shard. + * + * @opensearch.internal + */ + public class PublishCheckpointAction extends TransportReplicationAction< PublishCheckpointRequest, PublishCheckpointRequest, @@ -81,6 +87,9 @@ protected void doExecute(Task task, PublishCheckpointRequest request, ActionList assert false : "use PublishCheckpointAction#publish"; } + /** + * Publish checkpoint request to shard + */ final void publish(IndexShard indexShard) { String primaryAllocationId = indexShard.routingEntry().allocationId().getId(); long primaryTerm = indexShard.getPendingPrimaryTerm(); diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java index a51a234e3d9de..356ba9969644d 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java @@ -15,6 +15,11 @@ import java.io.IOException; +/** + * Replication request responsible for publishing checkpoint request to a replica shard. + * + * @opensearch.internal + */ public class PublishCheckpointRequest extends ReplicationRequest { private final ReplicationCheckpoint checkpoint; @@ -29,6 +34,9 @@ public PublishCheckpointRequest(StreamInput in) throws IOException { this.checkpoint = new ReplicationCheckpoint(in); } + /** + * Returns Replication Checkpoint + */ public ReplicationCheckpoint getCheckpoint() { return checkpoint; } diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java index 750213eb0c086..2b09901a947fe 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java @@ -13,6 +13,11 @@ import java.util.Objects; +/** + * Publish Segment Replication Checkpoint. + * + * @opensearch.internal + */ public class SegmentReplicationCheckpointPublisher { private final PublishAction publishAction; @@ -30,9 +35,15 @@ public void publish(IndexShard indexShard) { publishAction.publish(indexShard); } + /** + * Represents an action that is invoked to publish segment replication checkpoint to replica shard + */ public interface PublishAction { void publish(IndexShard indexShard); } + /** + * NoOp Checkpoint publisher + */ public static final SegmentReplicationCheckpointPublisher EMPTY = new SegmentReplicationCheckpointPublisher(indexShard -> {}); } diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java index ecdb64fdb02da..ba7da34596666 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java @@ -17,6 +17,11 @@ import java.io.IOException; import java.util.Objects; +/** + * Represents a Replication Checkpoint which is sent to a replica shard. + * + * @opensearch.internal + */ public class ReplicationCheckpoint implements Writeable { private final ShardId shardId; @@ -41,22 +46,41 @@ public ReplicationCheckpoint(StreamInput in) throws IOException { segmentInfosVersion = in.readLong(); } + /** + * The primary term of this Replication Checkpoint. + * + * @return the primary term + */ public long getPrimaryTerm() { return primaryTerm; } + /** + * @return the Segments Gen number + */ public long getSegmentsGen() { return segmentsGen; } + /** + * @return the Segment Info version + */ public long getSegmentInfosVersion() { return segmentInfosVersion; } + /** + * @return the Seq number + */ public long getSeqNo() { return seqNo; } + /** + * Shard Id of primary shard. + * + * @return the Shard Id + */ public ShardId getShardId() { return shardId; } @@ -87,6 +111,9 @@ public int hashCode() { return Objects.hash(shardId, primaryTerm, segmentsGen, seqNo); } + /** + * Checks if other is aheadof current replication point by comparing segmentInfosVersion. Returns true for null + */ public boolean isAheadOf(@Nullable ReplicationCheckpoint other) { return other == null || segmentInfosVersion > other.getSegmentInfosVersion(); } diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 49b84414588da..ff5cf8393cec9 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -37,6 +37,7 @@ import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.Term; +import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.AlreadyClosedException; @@ -3434,10 +3435,56 @@ public void testCheckpointRefreshListener() throws IOException { final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class); IndexShard shard = newStartedShard(p -> newShard(mock), true); shard.refresh("test"); - assertEquals(shard.getEngine().config().getInternalRefreshListener().get(1).getClass(), CheckpointRefreshListener.class); + List refreshListeners = shard.getEngine().config().getInternalRefreshListener(); + assertTrue(refreshListeners.stream().anyMatch(e -> e instanceof CheckpointRefreshListener)); closeShards(shard); } + /** + * creates a new initializing shard. The shard will will be put in its proper path under the + * current node id the shard is assigned to. + * @param checkpointPublisher Segment Replication Checkpoint Publisher to publish checkpoint + */ + private IndexShard newShard(SegmentReplicationCheckpointPublisher checkpointPublisher) throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 0); + final ShardRouting shardRouting = TestShardRouting.newShardRouting( + shardId, + randomAlphaOfLength(10), + true, + ShardRoutingState.INITIALIZING, + RecoverySource.EmptyStoreRecoverySource.INSTANCE + ); + final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir()); + ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); + + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, "SEGMENT") + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), between(0, 1000)) + .put(Settings.EMPTY) + .build(); + IndexMetadata metadata = IndexMetadata.builder(shardRouting.getIndexName()) + .settings(indexSettings) + .primaryTerm(0, primaryTerm) + .putMapping("{ \"properties\": {} }") + .build(); + return newShard( + shardRouting, + shardPath, + metadata, + null, + null, + new InternalEngineFactory(), + new EngineConfigFactory(new IndexSettings(metadata, metadata.getSettings())), + () -> {}, + RetentionLeaseSyncer.EMPTY, + EMPTY_EVENT_LISTENER, + checkpointPublisher + ); + } + public void testIndexCheckOnStartup() throws Exception { final IndexShard indexShard = newStartedShard(true); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index cb78ddc17702a..ef44d47b5688d 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -450,51 +450,6 @@ protected IndexShard newShard( ); } - /** - * creates a new initializing shard. The shard will will be put in its proper path under the - * current node id the shard is assigned to. - * @param checkpointPublisher Segment Replication Checkpoint Publisher to publish checkpoint - */ - protected IndexShard newShard(SegmentReplicationCheckpointPublisher checkpointPublisher) throws IOException { - final ShardId shardId = new ShardId("index", "_na_", 0); - final ShardRouting shardRouting = TestShardRouting.newShardRouting( - shardId, - randomAlphaOfLength(10), - true, - ShardRoutingState.INITIALIZING, - RecoverySource.EmptyStoreRecoverySource.INSTANCE - ); - final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir()); - ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); - - Settings indexSettings = Settings.builder() - .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, "SEGMENT") - .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), between(0, 1000)) - .put(Settings.EMPTY) - .build(); - IndexMetadata metadata = IndexMetadata.builder(shardRouting.getIndexName()) - .settings(indexSettings) - .primaryTerm(0, primaryTerm) - .putMapping("{ \"properties\": {} }") - .build(); - return newShard( - shardRouting, - shardPath, - metadata, - null, - null, - new InternalEngineFactory(), - new EngineConfigFactory(new IndexSettings(metadata, metadata.getSettings())), - () -> {}, - RetentionLeaseSyncer.EMPTY, - EMPTY_EVENT_LISTENER, - checkpointPublisher - ); - } - /** * creates a new initializing shard. * @param routing shard routing to use From 9bbfe20b2eb1ef01e8371aa1c051d8be4cdbb50c Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Fri, 13 May 2022 20:14:54 +0000 Subject: [PATCH 09/11] Adding package-info.java files for two new packages Signed-off-by: Rishikesh1159 --- .../indices/replication/checkpoint/package-info.java | 10 ++++++++++ .../indices/replication/copy/package-info.java | 10 ++++++++++ 2 files changed, 20 insertions(+) create mode 100644 server/src/main/java/org/opensearch/indices/replication/checkpoint/package-info.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/copy/package-info.java diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/package-info.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/package-info.java new file mode 100644 index 0000000000000..a30154ea9206a --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Package containing classes to implement a replication checkpoint */ +package org.opensearch.indices.replication.checkpoint; diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/package-info.java b/server/src/main/java/org/opensearch/indices/replication/copy/package-info.java new file mode 100644 index 0000000000000..8d810f88dde00 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/copy/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Package containing classes to implement a copy of replication */ +package org.opensearch.indices.replication.copy; From 6c75f6d2b52c287f461a5718bc982ee69d5f034d Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Mon, 23 May 2022 18:52:05 +0000 Subject: [PATCH 10/11] Adding test for null checkpoint publisher and addreesing PR comments Signed-off-by: Rishikesh1159 --- .../main/java/org/opensearch/index/IndexService.java | 4 +--- .../java/org/opensearch/index/shard/IndexShard.java | 12 +++++++++--- .../checkpoint/PublishCheckpointRequest.java | 1 - .../{copy => checkpoint}/ReplicationCheckpoint.java | 2 +- .../indices/replication/copy/package-info.java | 10 ---------- .../org/opensearch/index/shard/IndexShardTests.java | 11 +++++++++++ .../checkpoint/PublishCheckpointActionTests.java | 1 - 7 files changed, 22 insertions(+), 19 deletions(-) rename server/src/main/java/org/opensearch/indices/replication/{copy => checkpoint}/ReplicationCheckpoint.java (98%) delete mode 100644 server/src/main/java/org/opensearch/indices/replication/copy/package-info.java diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 37508ea040d73..0a6d1501f2bea 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -533,9 +533,7 @@ public synchronized IndexShard createShard( () -> globalCheckpointSyncer.accept(shardId), retentionLeaseSyncer, circuitBreakerService, - this.indexSettings.isSegRepEnabled() && routing.primary() - ? checkpointPublisher - : SegmentReplicationCheckpointPublisher.EMPTY + this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 87436094bcc9b..8bccaadd697d6 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -161,7 +161,7 @@ import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; -import org.opensearch.indices.replication.copy.ReplicationCheckpoint; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.rest.RestStatus; @@ -324,7 +324,7 @@ public IndexShard( final Runnable globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final CircuitBreakerService circuitBreakerService, - final SegmentReplicationCheckpointPublisher checkpointPublisher + @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -407,7 +407,7 @@ public boolean shouldCache(Query query) { persistMetadata(path, indexSettings, shardRouting, null, logger); this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases(); this.refreshPendingLocationListener = new RefreshPendingLocationListener(); - if (indexSettings.isSegRepEnabled() == true) { + if (checkpointPublisher != null) { this.checkpointRefreshListener = new CheckpointRefreshListener(this, checkpointPublisher); } else { this.checkpointRefreshListener = null; @@ -1372,10 +1372,16 @@ public GatedCloseable acquireSafeIndexCommit() throws EngineExcepti } } + /** + * Returns the lastest Replication Checkpoint that shard received + */ public ReplicationCheckpoint getLatestReplicationCheckpoint() { return new ReplicationCheckpoint(shardId, 0, 0, 0, 0); } + /** + * Invoked when a new checkpoint is received from a primary shard. Starts the copy process. + */ public synchronized void onNewCheckpoint(final PublishCheckpointRequest request) { assert shardRouting.primary() == false; // TODO diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java index 356ba9969644d..740fd3bccb7c4 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java @@ -11,7 +11,6 @@ import org.opensearch.action.support.replication.ReplicationRequest; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.indices.replication.copy.ReplicationCheckpoint; import java.io.IOException; diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java similarity index 98% rename from server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java rename to server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index ba7da34596666..98ab9cc4c1708 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.indices.replication.copy; +package org.opensearch.indices.replication.checkpoint; import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.StreamInput; diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/package-info.java b/server/src/main/java/org/opensearch/indices/replication/copy/package-info.java deleted file mode 100644 index 8d810f88dde00..0000000000000 --- a/server/src/main/java/org/opensearch/indices/replication/copy/package-info.java +++ /dev/null @@ -1,10 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/** Package containing classes to implement a copy of replication */ -package org.opensearch.indices.replication.copy; diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index ff5cf8393cec9..0ce571b80cdc8 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -3440,6 +3440,17 @@ public void testCheckpointRefreshListener() throws IOException { closeShards(shard); } + /** + * here we are mocking a SegmentReplicationcheckpointPublisher and testing that when a refresh happens on index shard if CheckpointRefreshListener is added to the InternalrefreshListerners List + */ + public void testCheckpointRefreshListenerWithNull() throws IOException { + IndexShard shard = newStartedShard(p -> newShard(null), true); + shard.refresh("test"); + List refreshListeners = shard.getEngine().config().getInternalRefreshListener(); + assertFalse(refreshListeners.stream().anyMatch(e -> e instanceof CheckpointRefreshListener)); + closeShards(shard); + } + /** * creates a new initializing shard. The shard will will be put in its proper path under the * current node id the shard is assigned to. diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index 2842c3f260d90..074b5ff613b08 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -23,7 +23,6 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.RecoverySettings; -import org.opensearch.indices.replication.copy.ReplicationCheckpoint; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.CapturingTransport; import org.opensearch.threadpool.TestThreadPool; From cf7c92e21723a13da20a6e987a6c868a10a0b31b Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Mon, 23 May 2022 22:05:37 +0000 Subject: [PATCH 11/11] Add docs for indexshardtests and remove shard.refresh Signed-off-by: Rishikesh1159 --- .../java/org/opensearch/index/shard/IndexShardTests.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 0ce571b80cdc8..bf9671964a210 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -3429,23 +3429,21 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept } /** - * here we are mocking a SegmentReplicationcheckpointPublisher and testing that when a refresh happens on index shard if CheckpointRefreshListener is added to the InternalrefreshListerners List + * here we are mocking a SegmentReplicationcheckpointPublisher and testing on index shard if CheckpointRefreshListener is added to the InternalrefreshListerners List */ public void testCheckpointRefreshListener() throws IOException { final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class); IndexShard shard = newStartedShard(p -> newShard(mock), true); - shard.refresh("test"); List refreshListeners = shard.getEngine().config().getInternalRefreshListener(); assertTrue(refreshListeners.stream().anyMatch(e -> e instanceof CheckpointRefreshListener)); closeShards(shard); } /** - * here we are mocking a SegmentReplicationcheckpointPublisher and testing that when a refresh happens on index shard if CheckpointRefreshListener is added to the InternalrefreshListerners List + * here we are passing null in place of SegmentReplicationCheckpointPublisher and testing on index shard if CheckpointRefreshListener is not added to the InternalrefreshListerners List */ public void testCheckpointRefreshListenerWithNull() throws IOException { IndexShard shard = newStartedShard(p -> newShard(null), true); - shard.refresh("test"); List refreshListeners = shard.getEngine().config().getInternalRefreshListener(); assertFalse(refreshListeners.stream().anyMatch(e -> e instanceof CheckpointRefreshListener)); closeShards(shard);