diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a548cccac6f9..6b7c623144d9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Cannot communicate with HTTP/2 when reactor-netty is enabled ([#18599](https://github.com/opensearch-project/OpenSearch/pull/18599)) - Fix the visit of sub queries for HasParentQuery and HasChildQuery ([#18621](https://github.com/opensearch-project/OpenSearch/pull/18621)) - Fix the backward compatibility regression with COMPLEMENT for Regexp queries introduced in OpenSearch 3.0 ([#18640](https://github.com/opensearch-project/OpenSearch/pull/18640)) +- Fix Replication lag computation ([#18602](https://github.com/opensearch-project/OpenSearch/pull/18602)) ### Security diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java index d1683f57315f4..72abd4f33e465 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java @@ -15,6 +15,7 @@ import org.opensearch.OpenSearchCorruptionException; import org.opensearch.common.Nullable; import org.opensearch.common.SetOnce; +import org.opensearch.common.time.DateUtils; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.ConcurrentCollections; @@ -31,12 +32,13 @@ import org.opensearch.threadpool.ThreadPool; import java.io.IOException; +import java.time.Duration; +import java.time.Instant; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.TimeUnit; import reactor.util.annotation.NonNull; @@ -54,7 +56,7 @@ public class SegmentReplicator { private final ReplicationCollection onGoingReplications; private final ReplicationCollection onGoingMergedSegmentReplications; private final Map completedReplications = ConcurrentCollections.newConcurrentMap(); - private final ConcurrentMap> replicationCheckpointStats = + protected final ConcurrentMap> replicationCheckpointStats = ConcurrentCollections.newConcurrentMap(); private final ConcurrentMap primaryCheckpoint = ConcurrentCollections.newConcurrentMap(); @@ -167,9 +169,8 @@ public ReplicationStats getSegmentReplicationStats(final ShardId shardId) { long bytesBehind = highestEntry.getValue().getBytesBehind(); long replicationLag = bytesBehind > 0L - ? TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lowestEntry.getValue().getTimestamp()) + ? Duration.ofNanos(DateUtils.toLong(Instant.now()) - lowestEntry.getValue().getTimestamp()).toMillis() : 0; - return new ReplicationStats(bytesBehind, bytesBehind, replicationLag); } @@ -217,7 +218,7 @@ protected void pruneCheckpointsUpToLastSync(final IndexShard indexShard) { ); if (existingCheckpointStats != null && !existingCheckpointStats.isEmpty()) { - existingCheckpointStats.keySet().removeIf(key -> key < segmentInfoVersion); + existingCheckpointStats.keySet().removeIf(key -> key <= segmentInfoVersion); Map.Entry lastEntry = existingCheckpointStats.lastEntry(); if (lastEntry != null) { lastEntry.getValue().setBytesBehind(calculateBytesBehind(latestCheckpoint, indexReplicationCheckPoint)); diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index 8380187a288ba..39c2039191def 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -11,6 +11,7 @@ import org.opensearch.Version; import org.opensearch.common.Nullable; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.time.DateUtils; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; @@ -19,6 +20,7 @@ import org.opensearch.index.store.StoreFileMetadata; import java.io.IOException; +import java.time.Instant; import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -56,11 +58,11 @@ private ReplicationCheckpoint(ShardId shardId, String codec) { length = 0L; this.codec = codec; this.metadataMap = Collections.emptyMap(); - this.createdTimeStamp = System.nanoTime(); + this.createdTimeStamp = DateUtils.toLong(Instant.now()); } public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion, String codec) { - this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L, codec, Collections.emptyMap(), System.nanoTime()); + this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L, codec, Collections.emptyMap(), DateUtils.toLong(Instant.now())); } public ReplicationCheckpoint( @@ -79,7 +81,7 @@ public ReplicationCheckpoint( this.length = length; this.codec = codec; this.metadataMap = metadataMap; - this.createdTimeStamp = System.nanoTime(); + this.createdTimeStamp = DateUtils.toLong(Instant.now()); } public ReplicationCheckpoint( diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java index 38f1c59bd5b68..a487a0f9a6032 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java @@ -21,6 +21,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.settings.Settings; +import org.opensearch.common.time.DateUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.ReplicationStats; @@ -38,6 +39,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -215,10 +217,10 @@ public void testGetSegmentReplicationStats_WhenNoReplication() { assertEquals(0, replicationStats.maxBytesBehind); } - public void testGetSegmentReplicationStats_WhileOnGoingReplicationAndPrimaryRefreshedToNewCheckPoint() { + public void testGetSegmentReplicationStats_WhileOnGoingReplicationAndPrimaryRefreshedToNewCheckPoint() throws InterruptedException { ShardId shardId = new ShardId("index", "uuid", 0); ReplicationCheckpoint firstReplicationCheckpoint = ReplicationCheckpoint.empty(shardId); - + long baseTime = DateUtils.toLong(Instant.now()); StoreFileMetadata storeFileMetadata1 = new StoreFileMetadata("test-1", 500, "1", Version.LATEST, new BytesRef(500)); StoreFileMetadata storeFileMetadata2 = new StoreFileMetadata("test-2", 500, "1", Version.LATEST, new BytesRef(500)); Map stringStoreFileMetadataMapOne = new HashMap<>(); @@ -232,7 +234,7 @@ public void testGetSegmentReplicationStats_WhileOnGoingReplicationAndPrimaryRefr 1000, "", stringStoreFileMetadataMapOne, - System.nanoTime() - TimeUnit.MINUTES.toNanos(1) + baseTime - 5_000_000 ); IndexShard replicaShard = mock(IndexShard.class); @@ -260,7 +262,7 @@ public void testGetSegmentReplicationStats_WhileOnGoingReplicationAndPrimaryRefr 200, "", stringStoreFileMetadataMapTwo, - System.nanoTime() - TimeUnit.MINUTES.toNanos(1) + baseTime - 1_000_000 ); segmentReplicator.updateReplicationCheckpointStats(thirdReplicationCheckpoint, replicaShard); @@ -276,6 +278,16 @@ public void testGetSegmentReplicationStats_WhileOnGoingReplicationAndPrimaryRefr assertEquals(200, replicationStatsSecond.totalBytesBehind); assertEquals(200, replicationStatsSecond.maxBytesBehind); assertTrue(replicationStatsSecond.maxReplicationLag > 0); + + // shard finished syncing to last checkpoint (sis 3) + when(replicaShard.getLatestReplicationCheckpoint()).thenReturn(thirdReplicationCheckpoint); + segmentReplicator.pruneCheckpointsUpToLastSync(replicaShard); + ReplicationStats finalStats = segmentReplicator.getSegmentReplicationStats(shardId); + assertEquals(0, finalStats.totalBytesBehind); + assertEquals(0, finalStats.maxBytesBehind); + assertEquals(0, finalStats.maxReplicationLag); + // shard is up to date, should not have any tracked stats + assertTrue(segmentReplicator.replicationCheckpointStats.get(shardId).isEmpty()); } public void testGetSegmentReplicationStats_WhenCheckPointReceivedOutOfOrder() {