diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java index 5a831822f695d..0f77cdfa0fdc4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java @@ -21,6 +21,7 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.opensearch.common.SetOnce; import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexSettings; import org.opensearch.index.remote.RemoteSegmentTransferTracker; @@ -269,92 +270,59 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce // - Assert that download stats == upload stats // - Repeat this step for random times (between 5 and 10) - // Create index with 1 pri and 1 replica and refresh interval disabled - createIndex( - INDEX_NAME, - Settings.builder().put(remoteStoreIndexSettings(1, 1)).put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1).build() - ); - ensureGreen(INDEX_NAME); - - // Manually invoke a refresh - refresh(INDEX_NAME); - - // Get zero state values - // Extract and assert zero state primary stats - RemoteStoreStatsResponse zeroStateResponse = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get(); - RemoteSegmentTransferTracker.Stats zeroStatePrimaryStats = Arrays.stream(zeroStateResponse.getRemoteStoreStats()) - .filter(remoteStoreStats -> remoteStoreStats.getShardRouting().primary()) - .collect(Collectors.toList()) - .get(0) - .getSegmentStats(); - logger.info( - "Zero state primary stats: {}ms refresh time lag, {}b bytes lag, {}b upload bytes started, {}b upload bytes failed , {} uploads succeeded, {} upload byes succeeded.", - zeroStatePrimaryStats.refreshTimeLagMs, - zeroStatePrimaryStats.bytesLag, - zeroStatePrimaryStats.uploadBytesStarted, - zeroStatePrimaryStats.uploadBytesFailed, - zeroStatePrimaryStats.totalUploadsSucceeded, - zeroStatePrimaryStats.uploadBytesSucceeded - ); - assertTrue( - zeroStatePrimaryStats.totalUploadsStarted == zeroStatePrimaryStats.totalUploadsSucceeded - && zeroStatePrimaryStats.totalUploadsSucceeded == 1 - ); - assertTrue( - zeroStatePrimaryStats.uploadBytesStarted == zeroStatePrimaryStats.uploadBytesSucceeded - && zeroStatePrimaryStats.uploadBytesSucceeded > 0 - ); - assertTrue(zeroStatePrimaryStats.totalUploadsFailed == 0 && zeroStatePrimaryStats.uploadBytesFailed == 0); + // Prepare settings with single replica + Settings.Builder settings = Settings.builder() + .put(remoteStoreIndexSettings(1, 1)) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1); - // Extract and assert zero state replica stats - RemoteSegmentTransferTracker.Stats zeroStateReplicaStats = Arrays.stream(zeroStateResponse.getRemoteStoreStats()) - .filter(remoteStoreStats -> !remoteStoreStats.getShardRouting().primary()) - .collect(Collectors.toList()) - .get(0) - .getSegmentStats(); - assertTrue( - zeroStateReplicaStats.directoryFileTransferTrackerStats.transferredBytesStarted == 0 - && zeroStateReplicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded == 0 - ); + // Retrieve zero state stats + SetOnce zeroStatePrimaryStats = prepareZeroStateStats(settings, false); - // Index documents + // Iteration logic for (int i = 1; i <= randomIntBetween(5, 10); i++) { indexSingleDoc(INDEX_NAME); - // Running Flush & Refresh manually flushAndRefresh(INDEX_NAME); ensureGreen(INDEX_NAME); + waitForReplication(); - // Poll for RemoteStore Stats assertBusy(() -> { RemoteStoreStatsResponse response = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get(); - // Iterate through the response and extract the relevant segment upload and download stats + + // Existing validation logic List primaryStatsList = Arrays.stream(response.getRemoteStoreStats()) .filter(remoteStoreStats -> remoteStoreStats.getShardRouting().primary()) - .collect(Collectors.toList()); + .toList(); assertEquals(1, primaryStatsList.size()); + List replicaStatsList = Arrays.stream(response.getRemoteStoreStats()) .filter(remoteStoreStats -> !remoteStoreStats.getShardRouting().primary()) - .collect(Collectors.toList()); + .toList(); assertEquals(1, replicaStatsList.size()); - RemoteSegmentTransferTracker.Stats primaryStats = primaryStatsList.get(0).getSegmentStats(); - RemoteSegmentTransferTracker.Stats replicaStats = replicaStatsList.get(0).getSegmentStats(); - // Assert Upload syncs - zero state uploads == download syncs + + RemoteSegmentTransferTracker.Stats primaryStats = primaryStatsList.getFirst().getSegmentStats(); + RemoteSegmentTransferTracker.Stats replicaStats = replicaStatsList.getFirst().getSegmentStats(); + + // Existing assertions assertTrue(primaryStats.totalUploadsStarted > 0); assertTrue(primaryStats.totalUploadsSucceeded > 0); + assertTrue(replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted > 0); + assertTrue( - replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted > 0 - && primaryStats.uploadBytesStarted - - zeroStatePrimaryStats.uploadBytesStarted >= replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted + primaryStats.uploadBytesStarted - zeroStatePrimaryStats + .get().uploadBytesStarted >= replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted ); + + assertTrue(replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded > 0); + assertTrue( - replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded > 0 - && primaryStats.uploadBytesSucceeded - - zeroStatePrimaryStats.uploadBytesSucceeded >= replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded + primaryStats.uploadBytesSucceeded - zeroStatePrimaryStats + .get().uploadBytesSucceeded >= replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded ); + // Assert zero failures assertEquals(0, primaryStats.uploadBytesFailed); assertEquals(0, replicaStats.directoryFileTransferTrackerStats.transferredBytesFailed); - }, 60, TimeUnit.SECONDS); + }); } } @@ -369,76 +337,42 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr // - Assert that download stats == upload stats // - Repeat this step for random times (between 5 and 10) - // Create index + // Get number of data nodes int dataNodeCount = client().admin().cluster().prepareHealth().get().getNumberOfDataNodes(); - createIndex( - INDEX_NAME, - Settings.builder() - .put(remoteStoreIndexSettings(dataNodeCount - 1, 1)) - .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1) - .build() - ); - ensureGreen(INDEX_NAME); - // Manually invoke a refresh - refresh(INDEX_NAME); - - // Get zero state values - // Extract and assert zero state primary stats - RemoteStoreStatsResponse zeroStateResponse = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get(); - RemoteSegmentTransferTracker.Stats zeroStatePrimaryStats = Arrays.stream(zeroStateResponse.getRemoteStoreStats()) - .filter(remoteStoreStats -> remoteStoreStats.getShardRouting().primary()) - .collect(Collectors.toList()) - .get(0) - .getSegmentStats(); - logger.info( - "Zero state primary stats: {}ms refresh time lag, {}b bytes lag, {}b upload bytes started, {}b upload bytes failed , {} uploads succeeded, {} upload byes succeeded.", - zeroStatePrimaryStats.refreshTimeLagMs, - zeroStatePrimaryStats.bytesLag, - zeroStatePrimaryStats.uploadBytesStarted, - zeroStatePrimaryStats.uploadBytesFailed, - zeroStatePrimaryStats.totalUploadsSucceeded, - zeroStatePrimaryStats.uploadBytesSucceeded - ); - assertTrue( - zeroStatePrimaryStats.totalUploadsStarted == zeroStatePrimaryStats.totalUploadsSucceeded - && zeroStatePrimaryStats.totalUploadsSucceeded == 1 - ); - assertTrue( - zeroStatePrimaryStats.uploadBytesStarted == zeroStatePrimaryStats.uploadBytesSucceeded - && zeroStatePrimaryStats.uploadBytesSucceeded > 0 - ); - assertTrue(zeroStatePrimaryStats.totalUploadsFailed == 0 && zeroStatePrimaryStats.uploadBytesFailed == 0); + // Prepare settings with multiple replicas + Settings.Builder settings = Settings.builder() + .put(remoteStoreIndexSettings(dataNodeCount - 1, 1)) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1); - // Extract and assert zero state replica stats - List zeroStateReplicaStats = Arrays.stream(zeroStateResponse.getRemoteStoreStats()) - .filter(remoteStoreStats -> !remoteStoreStats.getShardRouting().primary()) - .collect(Collectors.toList()); - zeroStateReplicaStats.forEach(stats -> { - assertTrue( - stats.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted == 0 - && stats.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesSucceeded == 0 - ); - }); + // Retrieve zero state stats + SetOnce zeroStatePrimaryStats = prepareZeroStateStats(settings, true); + // Get current nodes in cluster int currentNodesInCluster = client().admin().cluster().prepareHealth().get().getNumberOfDataNodes(); + + // Iteration logic for (int i = 0; i < randomIntBetween(5, 10); i++) { indexSingleDoc(INDEX_NAME); - // Running Flush & Refresh manually flushAndRefresh(INDEX_NAME); + ensureGreen(INDEX_NAME); + waitForReplication(); assertBusy(() -> { RemoteStoreStatsResponse response = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get(); + + // Validate total and successful shards assertEquals(currentNodesInCluster, response.getSuccessfulShards()); - long uploadsStarted = 0, uploadsSucceeded = 0, uploadsFailed = 0; - long uploadBytesStarted = 0, uploadBytesSucceeded = 0, uploadBytesFailed = 0; - List downloadBytesStarted = new ArrayList<>(), downloadBytesSucceeded = new ArrayList<>(), downloadBytesFailed = - new ArrayList<>(); - // Assert that stats for primary shard and replica shard set are equal - for (RemoteStoreStats eachStatsObject : response.getRemoteStoreStats()) { - RemoteSegmentTransferTracker.Stats stats = eachStatsObject.getSegmentStats(); - if (eachStatsObject.getShardRouting().primary()) { + long uploadBytesStarted = 0, uploadBytesSucceeded = 0, uploadBytesFailed = 0; + List downloadBytesStarted = new ArrayList<>(); + List downloadBytesSucceeded = new ArrayList<>(); + List downloadBytesFailed = new ArrayList<>(); + + // Collect stats for primary and replica shards + for (RemoteStoreStats statsObject : response.getRemoteStoreStats()) { + RemoteSegmentTransferTracker.Stats stats = statsObject.getSegmentStats(); + if (statsObject.getShardRouting().primary()) { uploadBytesStarted = stats.uploadBytesStarted; uploadBytesSucceeded = stats.uploadBytesSucceeded; uploadBytesFailed = stats.uploadBytesFailed; @@ -449,17 +383,78 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr } } - assertEquals(0, uploadsFailed); + // Assertions assertEquals(0, uploadBytesFailed); for (int j = 0; j < response.getSuccessfulShards() - 1; j++) { - assertTrue(uploadBytesStarted - zeroStatePrimaryStats.uploadBytesStarted > downloadBytesStarted.get(j)); - assertTrue(uploadBytesSucceeded - zeroStatePrimaryStats.uploadBytesSucceeded > downloadBytesSucceeded.get(j)); + assertTrue(uploadBytesStarted - zeroStatePrimaryStats.get().uploadBytesStarted > downloadBytesStarted.get(j)); + assertTrue(uploadBytesSucceeded - zeroStatePrimaryStats.get().uploadBytesSucceeded > downloadBytesSucceeded.get(j)); assertEquals(0, (long) downloadBytesFailed.get(j)); } - }, 60, TimeUnit.SECONDS); + }); + } + } + + // New helper method to validate zero state primary stats + private void validateZeroStatePrimaryStats(RemoteSegmentTransferTracker.Stats primaryStats) { + logger.info("Zero state primary stats: {}", primaryStats); + assertEquals(primaryStats.totalUploadsStarted, primaryStats.totalUploadsSucceeded); + assertTrue(primaryStats.totalUploadsSucceeded >= 1); + assertEquals(primaryStats.uploadBytesStarted, primaryStats.uploadBytesSucceeded); + assertTrue(primaryStats.uploadBytesSucceeded > 0); + assertEquals(0, primaryStats.totalUploadsFailed); + assertEquals(0, primaryStats.uploadBytesFailed); + } + + // helper method to validate zero state replica stats + private void validateZeroStateReplicaStats(RemoteStoreStatsResponse zeroStateResponse, boolean multipleShardsExpected) { + List zeroStateReplicaStats = Arrays.stream(zeroStateResponse.getRemoteStoreStats()) + .filter(remoteStoreStats -> !remoteStoreStats.getShardRouting().primary()) + .toList(); + + if (multipleShardsExpected) { + zeroStateReplicaStats.forEach(stats -> { + assertEquals(0, stats.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted); + assertEquals(0, stats.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesSucceeded); + }); + } else { + RemoteSegmentTransferTracker.Stats replicaStats = zeroStateReplicaStats.getFirst().getSegmentStats(); + assertEquals(0, replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted); + assertEquals(0, replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded); } } + // New helper method for common test setup and zero state stats retrieval + private SetOnce prepareZeroStateStats( + Settings.Builder additionalSettings, + boolean multipleShardsExpected + ) throws Exception { + SetOnce zeroStatePrimaryStats = new SetOnce<>(); + + // Create index with specified settings + createIndex(INDEX_NAME, additionalSettings.build()); + ensureGreen(INDEX_NAME); + + // Manually invoke a refresh + refresh(INDEX_NAME); + + assertBusy(() -> { + RemoteStoreStatsResponse zeroStateResponse = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get(); + + RemoteSegmentTransferTracker.Stats primaryStats = Arrays.stream(zeroStateResponse.getRemoteStoreStats()) + .filter(remoteStoreStats -> remoteStoreStats.getShardRouting().primary()) + .toList() + .getFirst() + .getSegmentStats(); + + validateZeroStatePrimaryStats(primaryStats); + validateZeroStateReplicaStats(zeroStateResponse, multipleShardsExpected); + + zeroStatePrimaryStats.set(primaryStats); + }); + + return zeroStatePrimaryStats; + } + public void testStatsOnShardRelocation() { setup(); // Scenario: diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java index a29bd1d840b43..4114a14b455e7 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java @@ -580,5 +580,53 @@ public int hashCode() { directoryFileTransferTrackerStats ); } + + @Override + public String toString() { + return "Stats{" + + "shardId=" + + shardId + + ", localRefreshClockTimeMs=" + + localRefreshClockTimeMs + + ", remoteRefreshClockTimeMs=" + + remoteRefreshClockTimeMs + + ", refreshTimeLagMs=" + + refreshTimeLagMs + + ", localRefreshNumber=" + + localRefreshNumber + + ", remoteRefreshNumber=" + + remoteRefreshNumber + + ", uploadBytesStarted=" + + uploadBytesStarted + + ", uploadBytesFailed=" + + uploadBytesFailed + + ", uploadBytesSucceeded=" + + uploadBytesSucceeded + + ", totalUploadsStarted=" + + totalUploadsStarted + + ", totalUploadsFailed=" + + totalUploadsFailed + + ", totalUploadsSucceeded=" + + totalUploadsSucceeded + + ", rejectionCount=" + + rejectionCount + + ", consecutiveFailuresCount=" + + consecutiveFailuresCount + + ", lastSuccessfulRemoteRefreshBytes=" + + lastSuccessfulRemoteRefreshBytes + + ", uploadBytesMovingAverage=" + + uploadBytesMovingAverage + + ", uploadBytesPerSecMovingAverage=" + + uploadBytesPerSecMovingAverage + + ", totalUploadTimeInMs=" + + totalUploadTimeInMs + + ", uploadTimeMovingAverage=" + + uploadTimeMovingAverage + + ", bytesLag=" + + bytesLag + + ", directoryFileTransferTrackerStats=" + + directoryFileTransferTrackerStats + + '}'; + } } }