Skip to content

Commit 7b51ccb

Browse files
committed
Fix bug in replication lag computation.
This change fixes a bug with replication lag computation to correctly use epoch reference point with Instant.now() and DateUtils. This change also fixes pruning logic to correctly remove the latest synced to checkpoint from tracking. Previously we would only prune up to the latest. This ensures that when a new checkpoint is eventually received we aren't incorrectly computing lag from the synced-to checkpoint. Signed-off-by: Marc Handalian <[email protected]>
1 parent 6bf1a6d commit 7b51ccb

File tree

3 files changed

+25
-10
lines changed

3 files changed

+25
-10
lines changed

server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.apache.lucene.index.CorruptIndexException;
1515
import org.opensearch.OpenSearchCorruptionException;
1616
import org.opensearch.common.SetOnce;
17+
import org.opensearch.common.time.DateUtils;
1718
import org.opensearch.common.unit.TimeValue;
1819
import org.opensearch.common.util.concurrent.AbstractRunnable;
1920
import org.opensearch.common.util.concurrent.ConcurrentCollections;
@@ -30,11 +31,12 @@
3031
import org.opensearch.threadpool.ThreadPool;
3132

3233
import java.io.IOException;
34+
import java.time.Duration;
35+
import java.time.Instant;
3336
import java.util.Map;
3437
import java.util.concurrent.ConcurrentMap;
3538
import java.util.concurrent.ConcurrentNavigableMap;
3639
import java.util.concurrent.ConcurrentSkipListMap;
37-
import java.util.concurrent.TimeUnit;
3840

3941
/**
4042
* This class is responsible for managing segment replication events on replicas.
@@ -49,7 +51,7 @@ public class SegmentReplicator {
4951

5052
private final ReplicationCollection<SegmentReplicationTarget> onGoingReplications;
5153
private final Map<ShardId, SegmentReplicationState> completedReplications = ConcurrentCollections.newConcurrentMap();
52-
private final ConcurrentMap<ShardId, ConcurrentNavigableMap<Long, ReplicationCheckpointStats>> replicationCheckpointStats =
54+
protected final ConcurrentMap<ShardId, ConcurrentNavigableMap<Long, ReplicationCheckpointStats>> replicationCheckpointStats =
5355
ConcurrentCollections.newConcurrentMap();
5456
private final ConcurrentMap<ShardId, ReplicationCheckpoint> primaryCheckpoint = ConcurrentCollections.newConcurrentMap();
5557

@@ -130,9 +132,8 @@ public ReplicationStats getSegmentReplicationStats(final ShardId shardId) {
130132

131133
long bytesBehind = highestEntry.getValue().getBytesBehind();
132134
long replicationLag = bytesBehind > 0L
133-
? TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lowestEntry.getValue().getTimestamp())
135+
? Duration.ofNanos(DateUtils.toLong(Instant.now()) - lowestEntry.getValue().getTimestamp()).toMillis()
134136
: 0;
135-
136137
return new ReplicationStats(bytesBehind, bytesBehind, replicationLag);
137138
}
138139

@@ -180,7 +181,7 @@ protected void pruneCheckpointsUpToLastSync(final IndexShard indexShard) {
180181
);
181182

182183
if (existingCheckpointStats != null && !existingCheckpointStats.isEmpty()) {
183-
existingCheckpointStats.keySet().removeIf(key -> key < segmentInfoVersion);
184+
existingCheckpointStats.keySet().removeIf(key -> key <= segmentInfoVersion);
184185
Map.Entry<Long, ReplicationCheckpointStats> lastEntry = existingCheckpointStats.lastEntry();
185186
if (lastEntry != null) {
186187
lastEntry.getValue().setBytesBehind(calculateBytesBehind(latestCheckpoint, indexReplicationCheckPoint));

server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.opensearch.Version;
1212
import org.opensearch.common.Nullable;
1313
import org.opensearch.common.annotation.PublicApi;
14+
import org.opensearch.common.time.DateUtils;
1415
import org.opensearch.core.common.io.stream.StreamInput;
1516
import org.opensearch.core.common.io.stream.StreamOutput;
1617
import org.opensearch.core.common.io.stream.Writeable;
@@ -19,6 +20,7 @@
1920
import org.opensearch.index.store.StoreFileMetadata;
2021

2122
import java.io.IOException;
23+
import java.time.Instant;
2224
import java.util.Collections;
2325
import java.util.Map;
2426
import java.util.Objects;
@@ -56,11 +58,11 @@ private ReplicationCheckpoint(ShardId shardId, String codec) {
5658
length = 0L;
5759
this.codec = codec;
5860
this.metadataMap = Collections.emptyMap();
59-
this.createdTimeStamp = System.nanoTime();
61+
this.createdTimeStamp = DateUtils.toLong(Instant.now());
6062
}
6163

6264
public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion, String codec) {
63-
this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L, codec, Collections.emptyMap(), System.nanoTime());
65+
this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L, codec, Collections.emptyMap(), DateUtils.toLong(Instant.now()));
6466
}
6567

6668
public ReplicationCheckpoint(
@@ -79,7 +81,7 @@ public ReplicationCheckpoint(
7981
this.length = length;
8082
this.codec = codec;
8183
this.metadataMap = metadataMap;
82-
this.createdTimeStamp = System.nanoTime();
84+
this.createdTimeStamp = DateUtils.toLong(Instant.now());
8385
}
8486

8587
public ReplicationCheckpoint(

server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.opensearch.cluster.service.ClusterService;
2222
import org.opensearch.common.lucene.Lucene;
2323
import org.opensearch.common.settings.Settings;
24+
import org.opensearch.common.time.DateUtils;
2425
import org.opensearch.core.action.ActionListener;
2526
import org.opensearch.core.index.shard.ShardId;
2627
import org.opensearch.index.ReplicationStats;
@@ -38,6 +39,7 @@
3839

3940
import java.io.IOException;
4041
import java.io.UncheckedIOException;
42+
import java.time.Instant;
4143
import java.util.ArrayList;
4244
import java.util.HashMap;
4345
import java.util.List;
@@ -232,7 +234,7 @@ public void testGetSegmentReplicationStats_WhileOnGoingReplicationAndPrimaryRefr
232234
1000,
233235
"",
234236
stringStoreFileMetadataMapOne,
235-
System.nanoTime() - TimeUnit.MINUTES.toNanos(1)
237+
DateUtils.toLong(Instant.now())
236238
);
237239

238240
IndexShard replicaShard = mock(IndexShard.class);
@@ -260,7 +262,7 @@ public void testGetSegmentReplicationStats_WhileOnGoingReplicationAndPrimaryRefr
260262
200,
261263
"",
262264
stringStoreFileMetadataMapTwo,
263-
System.nanoTime() - TimeUnit.MINUTES.toNanos(1)
265+
DateUtils.toLong(Instant.now())
264266
);
265267

266268
segmentReplicator.updateReplicationCheckpointStats(thirdReplicationCheckpoint, replicaShard);
@@ -276,6 +278,16 @@ public void testGetSegmentReplicationStats_WhileOnGoingReplicationAndPrimaryRefr
276278
assertEquals(200, replicationStatsSecond.totalBytesBehind);
277279
assertEquals(200, replicationStatsSecond.maxBytesBehind);
278280
assertTrue(replicationStatsSecond.maxReplicationLag > 0);
281+
282+
// shard finished syncing to last checkpoint (sis 3)
283+
when(replicaShard.getLatestReplicationCheckpoint()).thenReturn(thirdReplicationCheckpoint);
284+
segmentReplicator.pruneCheckpointsUpToLastSync(replicaShard);
285+
ReplicationStats finalStats = segmentReplicator.getSegmentReplicationStats(shardId);
286+
assertEquals(0, finalStats.totalBytesBehind);
287+
assertEquals(0, finalStats.maxBytesBehind);
288+
assertEquals(0, finalStats.maxReplicationLag);
289+
// shard is up to date, should not have any tracked stats
290+
assertTrue(segmentReplicator.replicationCheckpointStats.get(shardId).isEmpty());
279291
}
280292

281293
public void testGetSegmentReplicationStats_WhenCheckPointReceivedOutOfOrder() {

0 commit comments

Comments
 (0)