From 9a5c7b2eae405d08c4d5d64f0fef087821c18655 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Mon, 30 Sep 2024 22:16:52 +0530 Subject: [PATCH 1/7] Add default implementation to new finalizeSnapshot() in Repository (#16128) Signed-off-by: Sachin Kale --- .../main/java/org/opensearch/repositories/Repository.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index d4520beb5b570..138bc13140aea 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -180,7 +180,7 @@ void finalizeSnapshot( * @param repositoryUpdatePriority priority for the cluster state update task * @param listener listener to be invoked with the new {@link RepositoryData} after completing the snapshot */ - void finalizeSnapshot( + default void finalizeSnapshot( ShardGenerations shardGenerations, long repositoryStateId, Metadata clusterMetadata, @@ -189,7 +189,9 @@ void finalizeSnapshot( Function stateTransformer, Priority repositoryUpdatePriority, ActionListener listener - ); + ) { + throw new UnsupportedOperationException(); + } /** * Deletes snapshots From d131d58d842d18e62e523d5c6833d8c86652ce81 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 30 Sep 2024 14:24:41 -0400 Subject: [PATCH 2/7] Bump org.apache.logging.log4j:log4j-core from 2.24.0 to 2.24.1 in /buildSrc/src/testKit/thirdPartyAudit/sample_jars (#16134) * Bump org.apache.logging.log4j:log4j-core Bumps org.apache.logging.log4j:log4j-core from 2.24.0 to 2.24.1. --- updated-dependencies: - dependency-name: org.apache.logging.log4j:log4j-core dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] * Update changelog Signed-off-by: dependabot[bot] --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] --- CHANGELOG.md | 2 +- buildSrc/src/testKit/thirdPartyAudit/sample_jars/build.gradle | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b01badbe3fd2e..da14948781849 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Dependencies - Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578)) - Bump `protobuf` from 3.22.3 to 3.25.4 ([#15684](https://github.com/opensearch-project/OpenSearch/pull/15684)) -- Bump `org.apache.logging.log4j:log4j-core` from 2.23.1 to 2.24.0 ([#15858](https://github.com/opensearch-project/OpenSearch/pull/15858)) +- Bump `org.apache.logging.log4j:log4j-core` from 2.23.1 to 2.24.1 ([#15858](https://github.com/opensearch-project/OpenSearch/pull/15858), [#16134](https://github.com/opensearch-project/OpenSearch/pull/16134)) - Bump `peter-evans/create-pull-request` from 6 to 7 ([#15863](https://github.com/opensearch-project/OpenSearch/pull/15863)) - Bump `com.nimbusds:oauth2-oidc-sdk` from 11.9.1 to 11.19.1 ([#15862](https://github.com/opensearch-project/OpenSearch/pull/15862)) - Bump `com.microsoft.azure:msal4j` from 1.17.0 to 1.17.1 ([#15945](https://github.com/opensearch-project/OpenSearch/pull/15945)) diff --git a/buildSrc/src/testKit/thirdPartyAudit/sample_jars/build.gradle b/buildSrc/src/testKit/thirdPartyAudit/sample_jars/build.gradle index 59063b3789c70..4d425964c77af 100644 --- a/buildSrc/src/testKit/thirdPartyAudit/sample_jars/build.gradle +++ b/buildSrc/src/testKit/thirdPartyAudit/sample_jars/build.gradle @@ -17,7 +17,7 @@ repositories { } dependencies { - implementation "org.apache.logging.log4j:log4j-core:2.24.0" + implementation "org.apache.logging.log4j:log4j-core:2.24.1" } ["0.0.1", "0.0.2"].forEach { v -> From 210228fcbe5c6d5efad417de64ae01c83927ed87 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Mon, 30 Sep 2024 21:11:50 -0400 Subject: [PATCH 3/7] Add 2.17.1 release notes (#16104) (#16105) (#16132) (cherry picked from commit dff2184279d7392f44cb52888916bdc9020a5e61) (cherry picked from commit af105b1bb547fc8843fab0986f1aa60454529d54) Signed-off-by: Andriy Redko --- release-notes/opensearch.release-notes-2.17.1.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 release-notes/opensearch.release-notes-2.17.1.md diff --git a/release-notes/opensearch.release-notes-2.17.1.md b/release-notes/opensearch.release-notes-2.17.1.md new file mode 100644 index 0000000000000..2ff2bd44c3b1c --- /dev/null +++ b/release-notes/opensearch.release-notes-2.17.1.md @@ -0,0 +1,16 @@ +## 2024-10-01 Version 2.17.1 Release Notes + +## [2.17.1] +### Added +- Add path prefix support to hashed prefix snapshots ([#15664](https://github.com/opensearch-project/OpenSearch/pull/15664)) +- Memory optimisations in _cluster/health API ([#15492](https://github.com/opensearch-project/OpenSearch/pull/15492)) + +### Dependencies + +### Changed + +### Deprecated + +### Removed + +### Fixed From 43e7597cdc5ba2c1852ec1796628f948633f0c57 Mon Sep 17 00:00:00 2001 From: Lakshya Taragi <157457166+ltaragi@users.noreply.github.com> Date: Tue, 1 Oct 2024 11:13:15 +0530 Subject: [PATCH 4/7] Add assertBusy to failing snapshot tests (#16146) Signed-off-by: Lakshya Taragi --- .../snapshots/SnapshotStatusApisIT.java | 40 ++++++++++++------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java index c3214022df663..8b6869aa1d81a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java @@ -116,7 +116,7 @@ public void testStatusApiConsistency() { assertEquals(snapshotStatus.getStats().getTime(), snapshotInfo.endTime() - snapshotInfo.startTime()); } - public void testStatusAPICallForShallowCopySnapshot() { + public void testStatusAPICallForShallowCopySnapshot() throws Exception { disableRepoConsistencyCheck("Remote store repository is being used for the test"); internalCluster().startClusterManagerOnlyNode(); internalCluster().startDataOnlyNode(); @@ -136,15 +136,24 @@ public void testStatusAPICallForShallowCopySnapshot() { final String snapshot = "snapshot"; createFullSnapshot(snapshotRepoName, snapshot); - final SnapshotStatus snapshotStatus = getSnapshotStatus(snapshotRepoName, snapshot); - assertThat(snapshotStatus.getState(), is(SnapshotsInProgress.State.SUCCESS)); + assertBusy(() -> { + final SnapshotStatus snapshotStatus = client().admin() + .cluster() + .prepareSnapshotStatus(snapshotRepoName) + .setSnapshots(snapshot) + .execute() + .actionGet() + .getSnapshots() + .get(0); + assertThat(snapshotStatus.getState(), is(SnapshotsInProgress.State.SUCCESS)); - final SnapshotIndexShardStatus snapshotShardState = stateFirstShard(snapshotStatus, indexName); - assertThat(snapshotShardState.getStage(), is(SnapshotIndexShardStage.DONE)); - assertThat(snapshotShardState.getStats().getTotalFileCount(), greaterThan(0)); - assertThat(snapshotShardState.getStats().getTotalSize(), greaterThan(0L)); - assertThat(snapshotShardState.getStats().getIncrementalFileCount(), greaterThan(0)); - assertThat(snapshotShardState.getStats().getIncrementalSize(), greaterThan(0L)); + final SnapshotIndexShardStatus snapshotShardState = stateFirstShard(snapshotStatus, indexName); + assertThat(snapshotShardState.getStage(), is(SnapshotIndexShardStage.DONE)); + assertThat(snapshotShardState.getStats().getTotalFileCount(), greaterThan(0)); + assertThat(snapshotShardState.getStats().getTotalSize(), greaterThan(0L)); + assertThat(snapshotShardState.getStats().getIncrementalFileCount(), greaterThan(0)); + assertThat(snapshotShardState.getStats().getIncrementalSize(), greaterThan(0L)); + }, 20, TimeUnit.SECONDS); } public void testStatusAPICallInProgressSnapshot() throws Exception { @@ -193,7 +202,7 @@ public void testExceptionOnMissingSnapBlob() throws IOException { ); } - public void testExceptionOnMissingShardLevelSnapBlob() throws IOException { + public void testExceptionOnMissingShardLevelSnapBlob() throws Exception { disableRepoConsistencyCheck("This test intentionally corrupts the repository"); final Path repoPath = randomRepoPath(); @@ -216,11 +225,12 @@ public void testExceptionOnMissingShardLevelSnapBlob() throws IOException { repoPath.resolve(resolvePath(indexId, "0")) .resolve(BlobStoreRepository.SNAPSHOT_PREFIX + snapshotInfo.snapshotId().getUUID() + ".dat") ); - - expectThrows( - SnapshotMissingException.class, - () -> client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap").execute().actionGet() - ); + assertBusy(() -> { + expectThrows( + SnapshotMissingException.class, + () -> client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap").execute().actionGet() + ); + }, 20, TimeUnit.SECONDS); } public void testGetSnapshotsWithoutIndices() throws Exception { From a767e92f3eeaf57c066deb9ad075d40ed00f4a58 Mon Sep 17 00:00:00 2001 From: Himshikha Gupta Date: Tue, 1 Oct 2024 14:48:10 +0530 Subject: [PATCH 5/7] Optimize checksum creation for remote cluster state (#16046) * Support parallelisation in remote publication checksum computation Signed-off-by: Himshikha Gupta --- .../gateway/remote/ClusterStateChecksum.java | 150 ++++++++++++------ .../remote/RemoteClusterStateService.java | 14 +- .../org/opensearch/threadpool/ThreadPool.java | 7 + .../remote/ClusterMetadataManifestTests.java | 15 +- .../remote/ClusterStateChecksumTests.java | 32 ++-- .../RemoteClusterStateServiceTests.java | 28 ++-- 6 files changed, 164 insertions(+), 82 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java index d6739c4572d1a..aa007f5da15b3 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java @@ -12,8 +12,10 @@ import org.apache.logging.log4j.Logger; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.DiffableStringMap; +import org.opensearch.common.CheckedFunction; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.io.stream.BufferedChecksumStreamOutput; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -22,11 +24,15 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParseException; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; import com.jcraft.jzlib.JZlib; @@ -37,6 +43,7 @@ */ public class ClusterStateChecksum implements ToXContentFragment, Writeable { + public static final int COMPONENT_SIZE = 11; static final String ROUTING_TABLE_CS = "routing_table"; static final String NODES_CS = "discovery_nodes"; static final String BLOCKS_CS = "blocks"; @@ -65,62 +72,103 @@ public class ClusterStateChecksum implements ToXContentFragment, Writeable { long indicesChecksum; long clusterStateChecksum; - public ClusterStateChecksum(ClusterState clusterState) { - try ( - BytesStreamOutput out = new BytesStreamOutput(); - BufferedChecksumStreamOutput checksumOut = new BufferedChecksumStreamOutput(out) - ) { - clusterState.routingTable().writeVerifiableTo(checksumOut); - routingTableChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - clusterState.nodes().writeVerifiableTo(checksumOut); - nodesChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - clusterState.coordinationMetadata().writeVerifiableTo(checksumOut); - coordinationMetadataChecksum = checksumOut.getChecksum(); - - // Settings create sortedMap by default, so no explicit sorting required here. - checksumOut.reset(); - Settings.writeSettingsToStream(clusterState.metadata().persistentSettings(), checksumOut); - settingMetadataChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - Settings.writeSettingsToStream(clusterState.metadata().transientSettings(), checksumOut); - transientSettingsMetadataChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - clusterState.metadata().templatesMetadata().writeVerifiableTo(checksumOut); - templatesMetadataChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - checksumOut.writeStringCollection(clusterState.metadata().customs().keySet()); - customMetadataMapChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - ((DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings()).writeTo(checksumOut); - hashesOfConsistentSettingsChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - checksumOut.writeMapValues( + public ClusterStateChecksum(ClusterState clusterState, ThreadPool threadpool) { + long start = threadpool.relativeTimeInNanos(); + ExecutorService executorService = threadpool.executor(ThreadPool.Names.REMOTE_STATE_CHECKSUM); + CountDownLatch latch = new CountDownLatch(COMPONENT_SIZE); + + executeChecksumTask((stream) -> { + clusterState.routingTable().writeVerifiableTo(stream); + return null; + }, checksum -> routingTableChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + clusterState.nodes().writeVerifiableTo(stream); + return null; + }, checksum -> nodesChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + clusterState.coordinationMetadata().writeVerifiableTo(stream); + return null; + }, checksum -> coordinationMetadataChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + Settings.writeSettingsToStream(clusterState.metadata().persistentSettings(), stream); + return null; + }, checksum -> settingMetadataChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + Settings.writeSettingsToStream(clusterState.metadata().transientSettings(), stream); + return null; + }, checksum -> transientSettingsMetadataChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + clusterState.metadata().templatesMetadata().writeVerifiableTo(stream); + return null; + }, checksum -> templatesMetadataChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + stream.writeStringCollection(clusterState.metadata().customs().keySet()); + return null; + }, checksum -> customMetadataMapChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + ((DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings()).writeTo(stream); + return null; + }, checksum -> hashesOfConsistentSettingsChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + stream.writeMapValues( clusterState.metadata().indices(), - (stream, value) -> value.writeVerifiableTo((BufferedChecksumStreamOutput) stream) + (checksumStream, value) -> value.writeVerifiableTo((BufferedChecksumStreamOutput) checksumStream) ); - indicesChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - clusterState.blocks().writeVerifiableTo(checksumOut); - blocksChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - checksumOut.writeStringCollection(clusterState.customs().keySet()); - clusterStateCustomsChecksum = checksumOut.getChecksum(); - } catch (IOException e) { - logger.error("Failed to create checksum for cluster state.", e); + return null; + }, checksum -> indicesChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + clusterState.blocks().writeVerifiableTo(stream); + return null; + }, checksum -> blocksChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + stream.writeStringCollection(clusterState.customs().keySet()); + return null; + }, checksum -> clusterStateCustomsChecksum = checksum, executorService, latch); + + try { + latch.await(); + } catch (InterruptedException e) { throw new RemoteStateTransferException("Failed to create checksum for cluster state.", e); } createClusterStateChecksum(); + logger.debug("Checksum execution time {}", TimeValue.nsecToMSec(threadpool.relativeTimeInNanos() - start)); + } + + private void executeChecksumTask( + CheckedFunction checksumTask, + Consumer checksumConsumer, + ExecutorService executorService, + CountDownLatch latch + ) { + executorService.execute(() -> { + try { + long checksum = createChecksum(checksumTask); + checksumConsumer.accept(checksum); + latch.countDown(); + } catch (IOException e) { + throw new RemoteStateTransferException("Failed to execute checksum task", e); + } + }); + } + + private long createChecksum(CheckedFunction task) throws IOException { + try ( + BytesStreamOutput out = new BytesStreamOutput(); + BufferedChecksumStreamOutput checksumOut = new BufferedChecksumStreamOutput(out) + ) { + task.apply(checksumOut); + return checksumOut.getChecksum(); + } } private void createClusterStateChecksum() { diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index ece29180f9cf5..ce5e57b79dadb 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -332,7 +332,9 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat uploadedMetadataResults, previousClusterUUID, clusterStateDiffManifest, - !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null, + !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) + ? new ClusterStateChecksum(clusterState, threadpool) + : null, false, codecVersion ); @@ -539,7 +541,9 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( uploadedMetadataResults, previousManifest.getPreviousClusterUUID(), clusterStateDiffManifest, - !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null, + !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) + ? new ClusterStateChecksum(clusterState, threadpool) + : null, false, previousManifest.getCodecVersion() ); @@ -1010,7 +1014,9 @@ public RemoteClusterStateManifestInfo markLastStateAsCommitted( uploadedMetadataResults, previousManifest.getPreviousClusterUUID(), previousManifest.getDiffManifest(), - !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null, + !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) + ? new ClusterStateChecksum(clusterState, threadpool) + : null, true, previousManifest.getCodecVersion() ); @@ -1631,7 +1637,7 @@ void validateClusterStateFromChecksum( String localNodeId, boolean isFullStateDownload ) { - ClusterStateChecksum newClusterStateChecksum = new ClusterStateChecksum(clusterState); + ClusterStateChecksum newClusterStateChecksum = new ClusterStateChecksum(clusterState, threadpool); List failedValidation = newClusterStateChecksum.getMismatchEntities(manifest.getClusterStateChecksum()); if (failedValidation.isEmpty()) { return; diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 81220ab171b34..d795fd252b7fc 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -53,6 +53,7 @@ import org.opensearch.core.service.ReportingService; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.gateway.remote.ClusterStateChecksum; import org.opensearch.node.Node; import java.io.IOException; @@ -118,6 +119,7 @@ public static class Names { public static final String REMOTE_RECOVERY = "remote_recovery"; public static final String REMOTE_STATE_READ = "remote_state_read"; public static final String INDEX_SEARCHER = "index_searcher"; + public static final String REMOTE_STATE_CHECKSUM = "remote_state_checksum"; } /** @@ -191,6 +193,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.REMOTE_RECOVERY, ThreadPoolType.SCALING); map.put(Names.REMOTE_STATE_READ, ThreadPoolType.SCALING); map.put(Names.INDEX_SEARCHER, ThreadPoolType.RESIZABLE); + map.put(Names.REMOTE_STATE_CHECKSUM, ThreadPoolType.FIXED); THREAD_POOL_TYPES = Collections.unmodifiableMap(map); } @@ -307,6 +310,10 @@ public ThreadPool( runnableTaskListener ) ); + builders.put( + Names.REMOTE_STATE_CHECKSUM, + new FixedExecutorBuilder(settings, Names.REMOTE_STATE_CHECKSUM, ClusterStateChecksum.COMPONENT_SIZE, 1000) + ); for (final ExecutorBuilder builder : customBuilders) { if (builders.containsKey(builder.name())) { diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 3f9aa1245cab3..09c2933680be3 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -34,6 +34,9 @@ import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; import org.opensearch.test.EqualsHashCodeTestUtils; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; import java.io.IOException; import java.util.ArrayList; @@ -64,6 +67,14 @@ public class ClusterMetadataManifestTests extends OpenSearchTestCase { + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @After + public void teardown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + } + public void testClusterMetadataManifestXContentV0() throws IOException { UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path", CODEC_V0); ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder() @@ -214,7 +225,7 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { "indicesRoutingDiffPath" ) ) - .checksum(new ClusterStateChecksum(createClusterState())) + .checksum(new ClusterStateChecksum(createClusterState(), threadPool)) .build(); { // Mutate Cluster Term EqualsHashCodeTestUtils.checkEqualsAndHashCode( @@ -647,7 +658,7 @@ public void testClusterMetadataManifestXContentV4() throws IOException { UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); UploadedMetadataAttribute uploadedMetadataAttribute = new UploadedMetadataAttribute("attribute_name", "testing_attribute"); final StringKeyDiffProvider routingTableIncrementalDiff = Mockito.mock(StringKeyDiffProvider.class); - ClusterStateChecksum checksum = new ClusterStateChecksum(createClusterState()); + ClusterStateChecksum checksum = new ClusterStateChecksum(createClusterState(), threadPool); ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder() .clusterTerm(1L) .stateVersion(1L) diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java index 0203e56dd2d5c..9b98187053a39 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java @@ -34,6 +34,9 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; import java.io.IOException; import java.util.EnumSet; @@ -41,14 +44,21 @@ import java.util.Map; public class ClusterStateChecksumTests extends OpenSearchTestCase { + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @After + public void teardown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + } public void testClusterStateChecksumEmptyClusterState() { - ClusterStateChecksum checksum = new ClusterStateChecksum(ClusterState.EMPTY_STATE); + ClusterStateChecksum checksum = new ClusterStateChecksum(ClusterState.EMPTY_STATE, threadPool); assertNotNull(checksum); } public void testClusterStateChecksum() { - ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState()); + ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState(), threadPool); assertNotNull(checksum); assertTrue(checksum.routingTableChecksum != 0); assertTrue(checksum.nodesChecksum != 0); @@ -65,8 +75,8 @@ public void testClusterStateChecksum() { } public void testClusterStateMatchChecksum() { - ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState()); - ClusterStateChecksum newChecksum = new ClusterStateChecksum(generateClusterState()); + ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState(), threadPool); + ClusterStateChecksum newChecksum = new ClusterStateChecksum(generateClusterState(), threadPool); assertNotNull(checksum); assertNotNull(newChecksum); assertEquals(checksum.routingTableChecksum, newChecksum.routingTableChecksum); @@ -84,7 +94,7 @@ public void testClusterStateMatchChecksum() { } public void testXContentConversion() throws IOException { - ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState()); + ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState(), threadPool); final XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); checksum.toXContent(builder, ToXContent.EMPTY_PARAMS); @@ -97,7 +107,7 @@ public void testXContentConversion() throws IOException { } public void testSerialization() throws IOException { - ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState()); + ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState(), threadPool); BytesStreamOutput output = new BytesStreamOutput(); checksum.writeTo(output); @@ -109,10 +119,10 @@ public void testSerialization() throws IOException { public void testGetMismatchEntities() { ClusterState clsState1 = generateClusterState(); - ClusterStateChecksum checksum = new ClusterStateChecksum(clsState1); + ClusterStateChecksum checksum = new ClusterStateChecksum(clsState1, threadPool); assertTrue(checksum.getMismatchEntities(checksum).isEmpty()); - ClusterStateChecksum checksum2 = new ClusterStateChecksum(clsState1); + ClusterStateChecksum checksum2 = new ClusterStateChecksum(clsState1, threadPool); assertTrue(checksum.getMismatchEntities(checksum2).isEmpty()); ClusterState clsState2 = ClusterState.builder(ClusterName.DEFAULT) @@ -122,7 +132,7 @@ public void testGetMismatchEntities() { .customs(Map.of()) .metadata(Metadata.EMPTY_METADATA) .build(); - ClusterStateChecksum checksum3 = new ClusterStateChecksum(clsState2); + ClusterStateChecksum checksum3 = new ClusterStateChecksum(clsState2, threadPool); List mismatches = checksum.getMismatchEntities(checksum3); assertFalse(mismatches.isEmpty()); assertEquals(11, mismatches.size()); @@ -151,8 +161,8 @@ public void testGetMismatchEntitiesUnorderedInput() { ClusterState state2 = ClusterState.builder(state1).nodes(nodes1).build(); ClusterState state3 = ClusterState.builder(state1).nodes(nodes2).build(); - ClusterStateChecksum checksum1 = new ClusterStateChecksum(state2); - ClusterStateChecksum checksum2 = new ClusterStateChecksum(state3); + ClusterStateChecksum checksum1 = new ClusterStateChecksum(state2, threadPool); + ClusterStateChecksum checksum2 = new ClusterStateChecksum(state3, threadPool); assertEquals(checksum2, checksum1); } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 56857285fa8d3..35a8ae16cacf7 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -3123,7 +3123,7 @@ public void testWriteFullMetadataSuccessWithChecksumValidationEnabled() throws I .previousClusterUUID("prev-cluster-uuid") .routingTableVersion(1L) .indicesRouting(List.of(uploadedIndiceRoutingMetadata)) - .checksum(new ClusterStateChecksum(clusterState)) + .checksum(new ClusterStateChecksum(clusterState, threadPool)) .build(); assertThat(manifest.getIndices().size(), is(1)); @@ -3193,7 +3193,7 @@ public void testWriteIncrementalMetadataSuccessWithChecksumValidationEnabled() t final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indices(Collections.emptyList()) - .checksum(new ClusterStateChecksum(clusterState)) + .checksum(new ClusterStateChecksum(clusterState, threadPool)) .build(); when((blobStoreRepository.basePath())).thenReturn(BlobPath.cleanPath().add("base-path")); @@ -3219,7 +3219,7 @@ public void testWriteIncrementalMetadataSuccessWithChecksumValidationEnabled() t .previousClusterUUID("prev-cluster-uuid") .routingTableVersion(1) .indicesRouting(List.of(uploadedIndiceRoutingMetadata)) - .checksum(new ClusterStateChecksum(clusterState)) + .checksum(new ClusterStateChecksum(clusterState, threadPool)) .build(); assertThat(manifest.getIndices().size(), is(1)); @@ -3245,7 +3245,7 @@ public void testWriteIncrementalMetadataSuccessWithChecksumValidationModeNone() final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indices(Collections.emptyList()) - .checksum(new ClusterStateChecksum(clusterState)) + .checksum(new ClusterStateChecksum(clusterState, threadPool)) .build(); when((blobStoreRepository.basePath())).thenReturn(BlobPath.cleanPath().add("base-path")); @@ -3271,7 +3271,7 @@ public void testWriteIncrementalMetadataSuccessWithChecksumValidationModeNone() .previousClusterUUID("prev-cluster-uuid") .routingTableVersion(1) .indicesRouting(List.of(uploadedIndiceRoutingMetadata)) - .checksum(new ClusterStateChecksum(clusterState)) + .checksum(new ClusterStateChecksum(clusterState, threadPool)) .build(); assertThat(manifest.getIndices().size(), is(1)); @@ -3349,7 +3349,7 @@ public void testGetClusterStateForManifestWithChecksumValidationEnabled() throws initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).build(); remoteClusterStateService.start(); RemoteClusterStateService mockService = spy(remoteClusterStateService); @@ -3382,7 +3382,7 @@ public void testGetClusterStateForManifestWithChecksumValidationModeNone() throw initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.NONE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).build(); remoteClusterStateService.start(); RemoteClusterStateService mockService = spy(remoteClusterStateService); @@ -3415,7 +3415,7 @@ public void testGetClusterStateForManifestWithChecksumValidationEnabledWithMisma initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).build(); remoteClusterStateService.start(); RemoteClusterStateService mockService = spy(remoteClusterStateService); @@ -3465,7 +3465,7 @@ public void testGetClusterStateForManifestWithChecksumValidationDebugWithMismatc ); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).build(); remoteClusterStateService.start(); RemoteClusterStateService mockService = spy(remoteClusterStateService); @@ -3505,7 +3505,7 @@ public void testGetClusterStateUsingDiffWithChecksum() throws IOException { initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).diffManifest(ClusterStateDiffManifest.builder().build()).build(); remoteClusterStateService.start(); @@ -3547,7 +3547,7 @@ public void testGetClusterStateUsingDiffWithChecksumModeNone() throws IOExceptio initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.NONE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).diffManifest(ClusterStateDiffManifest.builder().build()).build(); remoteClusterStateService.start(); @@ -3589,7 +3589,7 @@ public void testGetClusterStateUsingDiffWithChecksumModeDebugMismatch() throws I initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.DEBUG); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).diffManifest(ClusterStateDiffManifest.builder().build()).build(); remoteClusterStateService.start(); @@ -3630,7 +3630,7 @@ public void testGetClusterStateUsingDiffWithChecksumModeTraceMismatch() throws I initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.TRACE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).diffManifest(ClusterStateDiffManifest.builder().build()).build(); remoteClusterStateService.start(); @@ -3692,7 +3692,7 @@ public void testGetClusterStateUsingDiffWithChecksumMismatch() throws IOExceptio initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).diffManifest(ClusterStateDiffManifest.builder().build()).build(); remoteClusterStateService.start(); From 1ef6444cfa1ab805ba5d68a3078274cbbb2b9a09 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 1 Oct 2024 09:14:28 -0400 Subject: [PATCH 6/7] Bump com.azure:azure-core-http-netty from 1.15.3 to 1.15.4 in /plugins/repository-azure (#16133) * Bump com.azure:azure-core-http-netty in /plugins/repository-azure Bumps [com.azure:azure-core-http-netty](https://github.com/Azure/azure-sdk-for-java) from 1.15.3 to 1.15.4. - [Release notes](https://github.com/Azure/azure-sdk-for-java/releases) - [Commits](https://github.com/Azure/azure-sdk-for-java/compare/azure-core-http-netty_1.15.3...azure-core-http-netty_1.15.4) --- updated-dependencies: - dependency-name: com.azure:azure-core-http-netty dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] * Updating SHAs Signed-off-by: dependabot[bot] * Update changelog Signed-off-by: dependabot[bot] --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] --- CHANGELOG.md | 1 + plugins/repository-azure/build.gradle | 2 +- .../licenses/azure-core-http-netty-1.15.3.jar.sha1 | 1 - .../licenses/azure-core-http-netty-1.15.4.jar.sha1 | 1 + 4 files changed, 3 insertions(+), 2 deletions(-) delete mode 100644 plugins/repository-azure/licenses/azure-core-http-netty-1.15.3.jar.sha1 create mode 100644 plugins/repository-azure/licenses/azure-core-http-netty-1.15.4.jar.sha1 diff --git a/CHANGELOG.md b/CHANGELOG.md index da14948781849..038cc407d582b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `dnsjava:dnsjava` from 3.6.1 to 3.6.2 ([#16041](https://github.com/opensearch-project/OpenSearch/pull/16041)) - Bump `com.maxmind.geoip2:geoip2` from 4.2.0 to 4.2.1 ([#16042](https://github.com/opensearch-project/OpenSearch/pull/16042)) - Bump `com.maxmind.db:maxmind-db` from 3.1.0 to 3.1.1 ([#16137](https://github.com/opensearch-project/OpenSearch/pull/16137)) +- Bump `com.azure:azure-core-http-netty` from 1.15.3 to 1.15.4 ([#16133](https://github.com/opensearch-project/OpenSearch/pull/16133)) ### Changed - Add support for docker compose v2 in TestFixturesPlugin ([#16049](https://github.com/opensearch-project/OpenSearch/pull/16049)) diff --git a/plugins/repository-azure/build.gradle b/plugins/repository-azure/build.gradle index 2892bdba51ba6..4baf79e619be9 100644 --- a/plugins/repository-azure/build.gradle +++ b/plugins/repository-azure/build.gradle @@ -48,7 +48,7 @@ dependencies { api 'com.azure:azure-json:1.1.0' api 'com.azure:azure-xml:1.1.0' api 'com.azure:azure-storage-common:12.25.1' - api 'com.azure:azure-core-http-netty:1.15.3' + api 'com.azure:azure-core-http-netty:1.15.4' api "io.netty:netty-codec-dns:${versions.netty}" api "io.netty:netty-codec-socks:${versions.netty}" api "io.netty:netty-codec-http2:${versions.netty}" diff --git a/plugins/repository-azure/licenses/azure-core-http-netty-1.15.3.jar.sha1 b/plugins/repository-azure/licenses/azure-core-http-netty-1.15.3.jar.sha1 deleted file mode 100644 index 3cea52ba67ce5..0000000000000 --- a/plugins/repository-azure/licenses/azure-core-http-netty-1.15.3.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -03b5bd5f5c16eea71f130119dbfb1fe5239f806a \ No newline at end of file diff --git a/plugins/repository-azure/licenses/azure-core-http-netty-1.15.4.jar.sha1 b/plugins/repository-azure/licenses/azure-core-http-netty-1.15.4.jar.sha1 new file mode 100644 index 0000000000000..97e6fad264294 --- /dev/null +++ b/plugins/repository-azure/licenses/azure-core-http-netty-1.15.4.jar.sha1 @@ -0,0 +1 @@ +489a38c9e6efb5ce01fbd276d8cb6c0e89000459 \ No newline at end of file From be9f94258315405aee203a0737decd4e19343942 Mon Sep 17 00:00:00 2001 From: Lakshya Taragi <157457166+ltaragi@users.noreply.github.com> Date: Tue, 1 Oct 2024 19:07:29 +0530 Subject: [PATCH 7/7] [SnapshotV2] Add timestamp of last successful fetch of pinned timestamps in node stats (#15611) --------- Signed-off-by: Lakshya Taragi --- CHANGELOG.md | 1 + .../RemoteStorePinnedTimestampsIT.java | 41 ++++++++++ .../admin/cluster/node/stats/NodeStats.java | 26 +++++- .../cluster/node/stats/NodesStatsRequest.java | 3 +- .../node/stats/TransportNodesStatsAction.java | 3 +- .../stats/TransportClusterStatsAction.java | 1 + .../java/org/opensearch/node/NodeService.java | 7 +- .../remotestore/RemoteStoreNodeStats.java | 79 +++++++++++++++++++ .../cluster/node/stats/NodeStatsTests.java | 22 +++++- .../cluster/stats/ClusterStatsNodesTests.java | 1 + .../opensearch/cluster/DiskUsageTests.java | 6 ++ .../MockInternalClusterInfoService.java | 3 +- .../opensearch/test/InternalTestCluster.java | 1 + 13 files changed, 187 insertions(+), 7 deletions(-) create mode 100644 server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 038cc407d582b..1a0859a7d5af8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for msearch API to pass search pipeline name - ([#15923](https://github.com/opensearch-project/OpenSearch/pull/15923)) - Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718)) - Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976)) +- Add new metric REMOTE_STORE to NodeStats API response ([#15611](https://github.com/opensearch-project/OpenSearch/pull/15611)) ### Dependencies - Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578)) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java index 2fcda8c2d2f27..024e0e952eea5 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java @@ -9,6 +9,8 @@ package org.opensearch.remotestore; import org.opensearch.action.LatchedActionListener; +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -20,6 +22,8 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; +import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.REMOTE_STORE; + @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase { static final String INDEX_NAME = "remote-store-test-idx-1"; @@ -180,4 +184,41 @@ public void onFailure(Exception e) { assertBusy(() -> assertEquals(Set.of(timestamp2, timestamp3), RemoteStorePinnedTimestampService.getPinnedTimestamps().v2())); remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3)); } + + public void testLastSuccessfulFetchOfPinnedTimestampsPresentInNodeStats() throws Exception { + logger.info("Starting up cluster manager"); + logger.info("cluster.remote_store.pinned_timestamps.enabled set to true"); + logger.info("cluster.remote_store.pinned_timestamps.scheduler_interval set to minimum value of 1minute"); + Settings pinnedTimestampEnabledSettings = Settings.builder() + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL.getKey(), "1m") + .build(); + internalCluster().startClusterManagerOnlyNode(pinnedTimestampEnabledSettings); + String remoteNodeName = internalCluster().startDataOnlyNodes(1, pinnedTimestampEnabledSettings).get(0); + ensureStableCluster(2); + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + remoteNodeName + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + assertBusy(() -> { + long lastSuccessfulFetchOfPinnedTimestamps = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1(); + assertTrue(lastSuccessfulFetchOfPinnedTimestamps > 0L); + NodesStatsResponse nodesStatsResponse = internalCluster().client() + .admin() + .cluster() + .prepareNodesStats() + .addMetric(REMOTE_STORE.metricName()) + .execute() + .actionGet(); + for (NodeStats nodeStats : nodesStatsResponse.getNodes()) { + long lastRecordedFetch = nodeStats.getRemoteStoreNodeStats().getLastSuccessfulFetchOfPinnedTimestamps(); + assertTrue(lastRecordedFetch >= lastSuccessfulFetchOfPinnedTimestamps); + } + }); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3)); + } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index 0917a0baff1ab..c91260778f037 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -59,6 +59,7 @@ import org.opensearch.monitor.process.ProcessStats; import org.opensearch.node.AdaptiveSelectionStats; import org.opensearch.node.NodesResourceUsageStats; +import org.opensearch.node.remotestore.RemoteStoreNodeStats; import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats; import org.opensearch.repositories.RepositoriesStats; import org.opensearch.script.ScriptCacheStats; @@ -162,6 +163,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable private NodeCacheStats nodeCacheStats; + @Nullable + private RemoteStoreNodeStats remoteStoreNodeStats; + public NodeStats(StreamInput in) throws IOException { super(in); timestamp = in.readVLong(); @@ -243,6 +247,12 @@ public NodeStats(StreamInput in) throws IOException { } else { nodeCacheStats = null; } + // TODO: change version to V_2_18_0 + if (in.getVersion().onOrAfter(Version.CURRENT)) { + remoteStoreNodeStats = in.readOptionalWriteable(RemoteStoreNodeStats::new); + } else { + remoteStoreNodeStats = null; + } } public NodeStats( @@ -274,7 +284,8 @@ public NodeStats( @Nullable SegmentReplicationRejectionStats segmentReplicationRejectionStats, @Nullable RepositoriesStats repositoriesStats, @Nullable AdmissionControlStats admissionControlStats, - @Nullable NodeCacheStats nodeCacheStats + @Nullable NodeCacheStats nodeCacheStats, + @Nullable RemoteStoreNodeStats remoteStoreNodeStats ) { super(node); this.timestamp = timestamp; @@ -305,6 +316,7 @@ public NodeStats( this.repositoriesStats = repositoriesStats; this.admissionControlStats = admissionControlStats; this.nodeCacheStats = nodeCacheStats; + this.remoteStoreNodeStats = remoteStoreNodeStats; } public long getTimestamp() { @@ -467,6 +479,11 @@ public NodeCacheStats getNodeCacheStats() { return nodeCacheStats; } + @Nullable + public RemoteStoreNodeStats getRemoteStoreNodeStats() { + return remoteStoreNodeStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -525,6 +542,10 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_14_0)) { out.writeOptionalWriteable(nodeCacheStats); } + // TODO: change version to V_2_18_0 + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeOptionalWriteable(remoteStoreNodeStats); + } } @Override @@ -631,6 +652,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (getNodeCacheStats() != null) { getNodeCacheStats().toXContent(builder, params); } + if (getRemoteStoreNodeStats() != null) { + getRemoteStoreNodeStats().toXContent(builder, params); + } return builder; } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java index f1f9f93afdad2..a5b00ed82d3cb 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -220,7 +220,8 @@ public enum Metric { SEGMENT_REPLICATION_BACKPRESSURE("segment_replication_backpressure"), REPOSITORIES("repositories"), ADMISSION_CONTROL("admission_control"), - CACHE_STATS("caches"); + CACHE_STATS("caches"), + REMOTE_STORE("remote_store"); private String metricName; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 2c808adc97c7a..a98d245af872b 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -129,7 +129,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) { NodesStatsRequest.Metric.SEGMENT_REPLICATION_BACKPRESSURE.containedIn(metrics), NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics), NodesStatsRequest.Metric.ADMISSION_CONTROL.containedIn(metrics), - NodesStatsRequest.Metric.CACHE_STATS.containedIn(metrics) + NodesStatsRequest.Metric.CACHE_STATS.containedIn(metrics), + NodesStatsRequest.Metric.REMOTE_STORE.containedIn(metrics) ); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index a49ca2035783c..c4b3524cf6da5 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -174,6 +174,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq false, false, false, + false, false ); List shardsStats = new ArrayList<>(); diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 1eb38ea63ad5a..9671fda14375d 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -54,6 +54,7 @@ import org.opensearch.indices.IndicesService; import org.opensearch.ingest.IngestService; import org.opensearch.monitor.MonitorService; +import org.opensearch.node.remotestore.RemoteStoreNodeStats; import org.opensearch.plugins.PluginsService; import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; import org.opensearch.repositories.RepositoriesService; @@ -241,7 +242,8 @@ public NodeStats stats( boolean segmentReplicationTrackerStats, boolean repositoriesStats, boolean admissionControl, - boolean cacheService + boolean cacheService, + boolean remoteStoreNodeStats ) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) @@ -274,7 +276,8 @@ public NodeStats stats( segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null, repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null, admissionControl ? this.admissionControlService.stats() : null, - cacheService ? this.cacheService.stats(indices) : null + cacheService ? this.cacheService.stats(indices) : null, + remoteStoreNodeStats ? new RemoteStoreNodeStats() : null ); } diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java new file mode 100644 index 0000000000000..8da8a17e21839 --- /dev/null +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.node.remotestore; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +/** + * Node level remote store stats + * @opensearch.internal + */ +public class RemoteStoreNodeStats implements Writeable, ToXContentFragment { + + public static final String STATS_NAME = "remote_store"; + public static final String LAST_SUCCESSFUL_FETCH_OF_PINNED_TIMESTAMPS = "last_successful_fetch_of_pinned_timestamps"; + + /** + * Time stamp for the last successful fetch of pinned timestamps by the {@linkplain RemoteStorePinnedTimestampService} + */ + private final long lastSuccessfulFetchOfPinnedTimestamps; + + public RemoteStoreNodeStats() { + this.lastSuccessfulFetchOfPinnedTimestamps = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1(); + } + + public long getLastSuccessfulFetchOfPinnedTimestamps() { + return this.lastSuccessfulFetchOfPinnedTimestamps; + } + + public RemoteStoreNodeStats(StreamInput in) throws IOException { + this.lastSuccessfulFetchOfPinnedTimestamps = in.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(this.lastSuccessfulFetchOfPinnedTimestamps); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(STATS_NAME); + builder.field(LAST_SUCCESSFUL_FETCH_OF_PINNED_TIMESTAMPS, this.lastSuccessfulFetchOfPinnedTimestamps); + return builder.endObject(); + } + + @Override + public String toString() { + return "RemoteStoreNodeStats{ lastSuccessfulFetchOfPinnedTimestamps=" + lastSuccessfulFetchOfPinnedTimestamps + "}"; + } + + @Override + public boolean equals(Object o) { + if (o == null) { + return false; + } + if (o.getClass() != RemoteStoreNodeStats.class) { + return false; + } + RemoteStoreNodeStats other = (RemoteStoreNodeStats) o; + return this.lastSuccessfulFetchOfPinnedTimestamps == other.lastSuccessfulFetchOfPinnedTimestamps; + } + + @Override + public int hashCode() { + return Objects.hash(lastSuccessfulFetchOfPinnedTimestamps); + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 11902728eed07..34065daff2b8a 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -95,6 +95,7 @@ import org.opensearch.node.NodeResourceUsageStats; import org.opensearch.node.NodesResourceUsageStats; import org.opensearch.node.ResponseCollectorService; +import org.opensearch.node.remotestore.RemoteStoreNodeStats; import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; @@ -614,6 +615,14 @@ public void testSerialization() throws IOException { } else { assertEquals(nodeCacheStats, deserializedNodeCacheStats); } + + RemoteStoreNodeStats remoteStoreNodeStats = nodeStats.getRemoteStoreNodeStats(); + RemoteStoreNodeStats deserializedRemoteStoreNodeStats = deserializedNodeStats.getRemoteStoreNodeStats(); + if (remoteStoreNodeStats == null) { + assertNull(deserializedRemoteStoreNodeStats); + } else { + assertEquals(remoteStoreNodeStats, deserializedRemoteStoreNodeStats); + } } } } @@ -996,6 +1005,16 @@ public void apply(String action, AdmissionControlActionType admissionControlActi nodeCacheStats = new NodeCacheStats(cacheStatsMap, flags); } + RemoteStoreNodeStats remoteStoreNodeStats = null; + if (frequently()) { + remoteStoreNodeStats = new RemoteStoreNodeStats() { + @Override + public long getLastSuccessfulFetchOfPinnedTimestamps() { + return 123456L; + } + }; + } + // TODO: Only remote_store based aspects of NodeIndicesStats are being tested here. // It is possible to test other metrics in NodeIndicesStats as well since it extends Writeable now return new NodeStats( @@ -1027,7 +1046,8 @@ public void apply(String action, AdmissionControlActionType admissionControlActi segmentReplicationRejectionStats, null, admissionControlStats, - nodeCacheStats + nodeCacheStats, + remoteStoreNodeStats ); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java index 1c4a77905d73f..823661ba14abf 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java @@ -349,6 +349,7 @@ private ClusterStatsNodeResponse createClusterStatsNodeResponse( null, null, null, + null, null ); if (defaultBehavior) { diff --git a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java index 5539dd26dd52d..cd050fb346563 100644 --- a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java @@ -195,6 +195,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -226,6 +227,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -257,6 +259,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ) ); @@ -319,6 +322,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -350,6 +354,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -381,6 +386,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ) ); diff --git a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java index 35ca5d80aeb4e..ded457601c0ae 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java @@ -125,7 +125,8 @@ List adjustNodesStats(List nodesStats) { nodeStats.getSegmentReplicationRejectionStats(), nodeStats.getRepositoriesStats(), nodeStats.getAdmissionControlStats(), - nodeStats.getNodeCacheStats() + nodeStats.getNodeCacheStats(), + nodeStats.getRemoteStoreNodeStats() ); }).collect(Collectors.toList()); } diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index 7adff82e72245..fa5fb736f518f 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -2752,6 +2752,7 @@ public void ensureEstimatedStats() { false, false, false, + false, false ); assertThat(