diff --git a/docs/reference/repositories-metering-api/apis/clear-repositories-metering-archive.asciidoc b/docs/reference/repositories-metering-api/apis/clear-repositories-metering-archive.asciidoc
new file mode 100644
index 0000000000000..33441ee5efa31
--- /dev/null
+++ b/docs/reference/repositories-metering-api/apis/clear-repositories-metering-archive.asciidoc
@@ -0,0 +1,35 @@
+[role="xpack"]
+[testenv="basic"]
+[[clear-repositories-metering-archive-api]]
+=== Clear repositories metering archive
+++++
+Clear repositories metering archive
+++++
+
+Removes the archived repositories metering information present in the cluster.
+
+[[clear-repositories-metering-archive-api-request]]
+==== {api-request-title}
+
+`DELETE /_nodes//_repositories_metering/`
+
+[[clear-repositories-metering-archive-api-desc]]
+==== {api-description-title}
+
+You can use this API to clear the archived repositories metering information in the cluster.
+
+[[clear-repositories-metering-archive-api-path-params]]
+==== {api-path-parms-title}
+
+include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=node-id]
+
+``::
+ (long) Specifies the maximum <> to be cleared from the archive.
+
+All the nodes selective options are explained <>.
+[role="child_attributes"]
+[[clear-repositories-metering-archive-api-response-body]]
+==== {api-response-body-title}
+Returns the deleted archived repositories metering information.
+
+include::{es-repo-dir}/repositories-metering-api/apis/repositories-meterings-body.asciidoc[tag=repositories-metering-body]
diff --git a/docs/reference/repositories-metering-api/apis/get-repositories-metering.asciidoc b/docs/reference/repositories-metering-api/apis/get-repositories-metering.asciidoc
new file mode 100644
index 0000000000000..409d74cbde295
--- /dev/null
+++ b/docs/reference/repositories-metering-api/apis/get-repositories-metering.asciidoc
@@ -0,0 +1,35 @@
+[role="xpack"]
+[testenv="basic"]
+[[get-repositories-metering-api]]
+=== Get repositories metering information
+++++
+Get repositories metering information
+++++
+
+Returns cluster repositories metering information.
+
+[[get-repositories-metering-api-request]]
+==== {api-request-title}
+
+`GET /_nodes//_repositories_metering`
+
+[[get-repositories-metering-api-desc]]
+==== {api-description-title}
+
+You can use the cluster repositories metering API to retrieve repositories metering information in a cluster.
+
+This API exposes monotonically non-decreasing counters and it's expected that clients would durably store
+the information needed to compute aggregations over a period of time. Additionally, the information
+exposed by this API is volatile, meaning that it won't be present after node restarts.
+
+[[get-repositories-metering-api-path-params]]
+==== {api-path-parms-title}
+
+include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=node-id]
+
+All the nodes selective options are explained <>.
+
+[role="child_attributes"]
+[[get-repositories-metering-api-response-body]]
+==== {api-response-body-title}
+include::{es-repo-dir}/repositories-metering-api/apis/repositories-meterings-body.asciidoc[tag=repositories-metering-body]
diff --git a/docs/reference/repositories-metering-api/apis/repositories-meterings-body.asciidoc b/docs/reference/repositories-metering-api/apis/repositories-meterings-body.asciidoc
new file mode 100644
index 0000000000000..fa37bb6ba1853
--- /dev/null
+++ b/docs/reference/repositories-metering-api/apis/repositories-meterings-body.asciidoc
@@ -0,0 +1,178 @@
+tag::repositories-metering-body[]
+`_nodes`::
+(object)
+Contains statistics about the number of nodes selected by the request.
++
+.Properties of `_nodes`
+[%collapsible%open]
+====
+`total`::
+(integer)
+Total number of nodes selected by the request.
+
+`successful`::
+(integer)
+Number of nodes that responded successfully to the request.
+
+`failed`::
+(integer)
+Number of nodes that rejected the request or failed to respond. If this value
+is not `0`, a reason for the rejection or failure is included in the response.
+====
+
+`cluster_name`::
+(string)
+Name of the cluster. Based on the <> setting.
+
+`nodes`::
+(object)
+Contains repositories metering information for the nodes selected by the request.
++
+.Properties of `nodes`
+[%collapsible%open]
+====
+``::
+(array)
+An array of repository metering information for the node.
++
+.Properties of objects in `node_id`
+[%collapsible%open]
+=====
+`repository_name`::
+(string)
+Repository name.
+
+`repository_type`::
+(string)
+Repository type.
+
+`repository_location`::
+(object)
+Represents an unique location within the repository.
++
+.Properties of `repository_location` for repository type `Azure`
+[%collapsible%open]
+======
+`base_path`::
+(string)
+The path within the container where the repository stores data.
+
+`container`::
+(string)
+Container name.
+======
++
+.Properties of `repository_location` for repository type `GCP`
+[%collapsible%open]
+======
+`base_path`::
+(string)
+The path within the bucket where the repository stores data.
+
+`bucket`::
+(string)
+Bucket name.
+======
++
+.Properties of `repository_location` for repository type `S3`
+[%collapsible%open]
+======
+`base_path`::
+(string)
+The path within the bucket where the repository stores data.
+
+`bucket`::
+(string)
+Bucket name.
+======
+`repository_ephemeral_id`::
+(string)
+An identifier that changes every time the repository is updated.
+
+`repository_started_at`::
+(long)
+Time the repository was created or updated. Recorded in milliseconds
+since the https://en.wikipedia.org/wiki/Unix_time[Unix Epoch].
+
+`repository_stopped_at`::
+(Optional, long)
+Time the repository was deleted or updated. Recorded in milliseconds
+since the https://en.wikipedia.org/wiki/Unix_time[Unix Epoch].
+
+`archived`::
+(boolean)
+A flag that tells whether or not this object has been archived.
+When a repository is closed or updated the repository metering information
+is archived and kept for a certain period of time. This allows retrieving
+the repository metering information of previous repository instantiations.
+
+`archive_version`::
+(Optional, long)
+The cluster state version when this object was archived, this field
+can be used as a logical timestamp to delete all the archived metrics up
+to an observed version. This field is only present for archived
+repository metering information objects. The main purpose of this
+field is to avoid possible race conditions during repository metering
+information deletions, i.e. deleting archived repositories metering
+information that we haven't observed yet.
+
+`request_counts`::
+(object)
+An object with the number of request performed against the repository
+grouped by request type.
++
+.Properties of `request_counts` for repository type `Azure`
+[%collapsible%open]
+======
+`GetBlobProperties`::
+(long) Number of https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties[Get Blob Properties] requests.
+`GetBlob`::
+(long) Number of https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob[Get Blob] requests.
+`ListBlobs`::
+(long) Number of https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs[List Blobs] requests.
+`PutBlob`::
+(long) Number of https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob[Put Blob] requests.
+`PutBlock`::
+(long) Number of https://docs.microsoft.com/en-us/rest/api/storageservices/put-block[Put Block].
+`PutBlockList`::
+(long) Number of https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list[Put Block List] requests.
+
+Azure storage https://azure.microsoft.com/en-us/pricing/details/storage/blobs/[pricing].
+======
++
+.Properties of `request_counts` for repository type `GCP`
+[%collapsible%open]
+======
+`GetObject`::
+(long) Number of https://cloud.google.com/storage/docs/json_api/v1/objects/get[get object] requests.
+`ListObjects`::
+(long) Number of https://cloud.google.com/storage/docs/json_api/v1/objects/list[list objects] requests.
+`InsertObject`::
+(long) Number of https://cloud.google.com/storage/docs/json_api/v1/objects/insert[insert object] requests,
+including https://cloud.google.com/storage/docs/uploading-objects[simple], https://cloud.google.com/storage/docs/json_api/v1/how-tos/multipart-upload[multipart] and
+https://cloud.google.com/storage/docs/resumable-uploads[resumable] uploads. Resumable uploads can perform multiple http requests to
+insert a single object but they are considered as a single request since they are https://cloud.google.com/storage/docs/resumable-uploads#introduction[billed] as an individual operation.
+
+Google Cloud storage https://cloud.google.com/storage/pricing[pricing].
+======
++
+.Properties of `request_counts` for repository type `S3`
+[%collapsible%open]
+======
+`GetObject`::
+(long) Number of https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html[GetObject] requests.
+`ListObjects`::
+(long) Number of https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html[ListObjects] requests.
+`PutObject`::
+(long) Number of https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html[PutObject] requests.
+`PutMultipartObject`::
+(long) Number of https://docs.aws.amazon.com/AmazonS3/latest/dev/mpuoverview.html[Multipart] requests,
+including https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html[CreateMultipartUpload],
+https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html[UploadPart] and https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html[CompleteMultipartUpload]
+requests.
+
+Amazon Web Services Simple Storage Service https://aws.amazon.com/s3/pricing/[pricing].
+======
+=====
+====
+end::repositories-metering-body[]
diff --git a/docs/reference/repositories-metering-api/repositories-metering-apis.asciidoc b/docs/reference/repositories-metering-api/repositories-metering-apis.asciidoc
new file mode 100644
index 0000000000000..7d6a0b8da0a2c
--- /dev/null
+++ b/docs/reference/repositories-metering-api/repositories-metering-apis.asciidoc
@@ -0,0 +1,16 @@
+[role="xpack"]
+[testenv="basic"]
+[[repositories-metering-apis]]
+== Repositories metering APIs
+
+experimental[]
+
+You can use the following APIs to retrieve repositories metering information.
+
+This is an API used by Elastic's commercial offerings.
+
+* <>
+* <>
+
+include::apis/get-repositories-metering.asciidoc[]
+include::apis/clear-repositories-metering-archive.asciidoc[]
diff --git a/docs/reference/rest-api/index.asciidoc b/docs/reference/rest-api/index.asciidoc
index 99ae417cff87b..cfe21493308de 100644
--- a/docs/reference/rest-api/index.asciidoc
+++ b/docs/reference/rest-api/index.asciidoc
@@ -30,6 +30,7 @@ endif::[]
* <>
* <>
* <>
+* <>
* <>
* <>
ifdef::permanently-unreleased-branch[]
@@ -63,6 +64,7 @@ include::{es-repo-dir}/ml/anomaly-detection/apis/index.asciidoc[]
include::{es-repo-dir}/ml/df-analytics/apis/index.asciidoc[]
include::{es-repo-dir}/migration/migration.asciidoc[]
include::{es-repo-dir}/indices/apis/reload-analyzers.asciidoc[]
+include::{es-repo-dir}/repositories-metering-api/repositories-metering-apis.asciidoc[]
include::{es-repo-dir}/rollup/rollup-api.asciidoc[]
include::{es-repo-dir}/search.asciidoc[]
ifdef::permanently-unreleased-branch[]
diff --git a/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java b/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java
index 7896af44a40e1..3e305c1cb1baf 100644
--- a/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java
+++ b/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java
@@ -40,7 +40,6 @@
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.regex.Pattern;
@@ -77,11 +76,6 @@ protected HttpHandler createErroneousHttpHandler(final HttpHandler delegate) {
return new AzureErroneousHttpHandler(delegate, randomIntBetween(2, 3));
}
- @Override
- protected List requestTypesTracked() {
- return List.of("GET", "LIST", "HEAD", "PUT", "PUT_BLOCK");
- }
-
@Override
protected Settings nodeSettings(int nodeOrdinal) {
final String key = Base64.getEncoder().encodeToString(randomAlphaOfLength(10).getBytes(StandardCharsets.UTF_8));
@@ -181,23 +175,28 @@ private AzureHTTPStatsCollectorHandler(HttpHandler delegate) {
@Override
protected void maybeTrack(String request, Headers headers) {
if (Regex.simpleMatch("GET /*/*", request)) {
- trackRequest("GET");
+ trackRequest("GetBlob");
} else if (Regex.simpleMatch("HEAD /*/*", request)) {
- trackRequest("HEAD");
+ trackRequest("GetBlobProperties");
} else if (listPattern.test(request)) {
- trackRequest("LIST");
- } else if (isBlockUpload(request)) {
- trackRequest("PUT_BLOCK");
+ trackRequest("ListBlobs");
+ } else if (isPutBlock(request)) {
+ trackRequest("PutBlock");
+ } else if (isPutBlockList(request)) {
+ trackRequest("PutBlockList");
} else if (Regex.simpleMatch("PUT /*/*", request)) {
- trackRequest("PUT");
+ trackRequest("PutBlob");
}
}
- // https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block
- private boolean isBlockUpload(String request) {
- return Regex.simpleMatch("PUT /*/*?*comp=blocklist*", request)
- || (Regex.simpleMatch("PUT /*/*?*comp=block*", request) && request.contains("blockid="));
+ private boolean isPutBlock(String request) {
+ return Regex.simpleMatch("PUT /*/*?*comp=block*", request) && request.contains("blockid=");
+ }
+
+ // https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list
+ private boolean isPutBlockList(String request) {
+ return Regex.simpleMatch("PUT /*/*?*comp=blocklist*", request);
}
}
}
diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java
index 6f59f94dad8c5..aa8df87b99612 100644
--- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java
+++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java
@@ -118,19 +118,19 @@ public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service,
this.uploadMetricsCollector = (httpURLConnection -> {
assert httpURLConnection.getRequestMethod().equals("PUT");
String queryParams = httpURLConnection.getURL().getQuery();
- if (queryParams != null && isBlockUpload(queryParams)) {
- stats.putBlockOperations.incrementAndGet();
- } else {
+ if (queryParams == null) {
stats.putOperations.incrementAndGet();
+ return;
}
- });
- }
- private boolean isBlockUpload(String queryParams) {
- // https://docs.microsoft.com/en-us/rest/api/storageservices/put-block
- // https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list
- return (queryParams.contains("comp=block") && queryParams.contains("blockid="))
- || queryParams.contains("comp=blocklist");
+ // https://docs.microsoft.com/en-us/rest/api/storageservices/put-block
+ // https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list
+ if (queryParams.contains("comp=block") && queryParams.contains("blockid=")) {
+ stats.putBlockOperations.incrementAndGet();
+ } else if (queryParams.contains("comp=blocklist")) {
+ stats.putBlockListOperations.incrementAndGet();
+ }
+ });
}
@Override
@@ -383,12 +383,15 @@ private static class Stats {
private final AtomicLong putBlockOperations = new AtomicLong();
+ private final AtomicLong putBlockListOperations = new AtomicLong();
+
private Map toMap() {
- return Map.of("GET", getOperations.get(),
- "LIST", listOperations.get(),
- "HEAD", headOperations.get(),
- "PUT", putOperations.get(),
- "PUT_BLOCK", putBlockOperations.get());
+ return Map.of("GetBlob", getOperations.get(),
+ "ListBlobs", listOperations.get(),
+ "GetBlobProperties", headOperations.get(),
+ "PutBlob", putOperations.get(),
+ "PutBlock", putBlockOperations.get(),
+ "PutBlockList", putBlockListOperations.get());
}
}
}
diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java
index 024d52f40e9ba..50b2d04ad2420 100644
--- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java
+++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java
@@ -33,9 +33,10 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.indices.recovery.RecoverySettings;
-import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
+import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
import java.util.Locale;
+import java.util.Map;
import java.util.function.Function;
import static org.elasticsearch.repositories.azure.AzureStorageService.MAX_CHUNK_SIZE;
@@ -52,7 +53,7 @@
* {@code compress}If set to true metadata files will be stored compressed. Defaults to false.
*
*/
-public class AzureRepository extends BlobStoreRepository {
+public class AzureRepository extends MeteredBlobStoreRepository {
private static final Logger logger = LogManager.getLogger(AzureRepository.class);
public static final String TYPE = "azure";
@@ -82,7 +83,12 @@ public AzureRepository(
final AzureStorageService storageService,
final ClusterService clusterService,
final RecoverySettings recoverySettings) {
- super(metadata, namedXContentRegistry, clusterService, recoverySettings, buildBasePath(metadata));
+ super(metadata,
+ namedXContentRegistry,
+ clusterService,
+ recoverySettings,
+ buildBasePath(metadata),
+ buildLocation(metadata));
this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings());
this.storageService = storageService;
@@ -110,6 +116,11 @@ private static BlobPath buildBasePath(RepositoryMetadata metadata) {
}
}
+ private static Map buildLocation(RepositoryMetadata metadata) {
+ return Map.of("base_path", Repository.BASE_PATH_SETTING.get(metadata.settings()),
+ "container", Repository.CONTAINER_SETTING.get(metadata.settings()));
+ }
+
@Override
protected BlobStore getBlobStore() {
return super.getBlobStore();
diff --git a/plugins/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/plugins/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java
index f73a8668fd82c..5d193706c8d8a 100644
--- a/plugins/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java
+++ b/plugins/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java
@@ -59,7 +59,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -105,11 +104,6 @@ protected HttpHandler createErroneousHttpHandler(final HttpHandler delegate) {
return new GoogleErroneousHttpHandler(delegate, randomIntBetween(2, 3));
}
- @Override
- protected List requestTypesTracked() {
- return List.of("GET", "LIST", "POST", "PUT");
- }
-
@Override
protected Settings nodeSettings(int nodeOrdinal) {
final Settings.Builder settings = Settings.builder();
@@ -310,15 +304,18 @@ private static class GoogleCloudStorageStatsCollectorHttpHandler extends HttpSta
@Override
public void maybeTrack(final String request, Headers requestHeaders) {
if (Regex.simpleMatch("GET /storage/v1/b/*/o/*", request)) {
- trackRequest("GET");
+ trackRequest("GetObject");
} else if (Regex.simpleMatch("GET /storage/v1/b/*/o*", request)) {
- trackRequest("LIST");
+ trackRequest("ListObjects");
} else if (Regex.simpleMatch("GET /download/storage/v1/b/*", request)) {
- trackRequest("GET");
- } else if (Regex.simpleMatch("PUT /upload/storage/v1/b/*", request) && isLastPart(requestHeaders)) {
- trackRequest("PUT");
+ trackRequest("GetObject");
+ } else if (Regex.simpleMatch("PUT /upload/storage/v1/b/*uploadType=resumable*", request) && isLastPart(requestHeaders)) {
+ // Resumable uploads are billed as a single operation, that's the reason we're tracking
+ // the request only when it's the last part.
+ // See https://cloud.google.com/storage/docs/resumable-uploads#introduction
+ trackRequest("InsertObject");
} else if (Regex.simpleMatch("POST /upload/storage/v1/b/*uploadType=multipart*", request)) {
- trackRequest("POST");
+ trackRequest("InsertObject");
}
}
diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java
index 4a0de249a3e78..2264f71c29f4a 100644
--- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java
+++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java
@@ -58,10 +58,9 @@ String getTrackedBucket() {
Map toMap() {
final Map results = new HashMap<>();
- results.put("GET", getCount.get());
- results.put("LIST", listCount.get());
- results.put("PUT", putCount.get());
- results.put("POST", postCount.get());
+ results.put("GetObject", getCount.get());
+ results.put("ListObjects", listCount.get());
+ results.put("InsertObject", postCount.get() + putCount.get());
return results;
}
}
diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java
index a50e7b617b774..6cea3446a26a6 100644
--- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java
+++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java
@@ -31,15 +31,16 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.RepositoryException;
-import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
+import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
+import java.util.Map;
import java.util.function.Function;
import static org.elasticsearch.common.settings.Setting.Property;
import static org.elasticsearch.common.settings.Setting.byteSizeSetting;
import static org.elasticsearch.common.settings.Setting.simpleString;
-class GoogleCloudStorageRepository extends BlobStoreRepository {
+class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {
private static final Logger logger = LogManager.getLogger(GoogleCloudStorageRepository.class);
// package private for testing
@@ -72,7 +73,7 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
final GoogleCloudStorageService storageService,
final ClusterService clusterService,
final RecoverySettings recoverySettings) {
- super(metadata, namedXContentRegistry, clusterService, recoverySettings, buildBasePath(metadata));
+ super(metadata, namedXContentRegistry, clusterService, recoverySettings, buildBasePath(metadata), buildLocation(metadata));
this.storageService = storageService;
this.chunkSize = getSetting(CHUNK_SIZE, metadata);
@@ -95,6 +96,11 @@ private static BlobPath buildBasePath(RepositoryMetadata metadata) {
}
}
+ private static Map buildLocation(RepositoryMetadata metadata) {
+ return Map.of("base_path", BASE_PATH.get(metadata.settings()),
+ "bucket", getSetting(BUCKET, metadata));
+ }
+
@Override
protected GoogleCloudStorageBlobStore createBlobStore() {
return new GoogleCloudStorageBlobStore(bucket, clientName, metadata.name(), storageService, bufferSize);
diff --git a/plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java
index 3bf452cdb949b..d96cf762e0d60 100644
--- a/plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java
+++ b/plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java
@@ -119,11 +119,6 @@ protected HttpHandler createErroneousHttpHandler(final HttpHandler delegate) {
return new S3StatsCollectorHttpHandler(new S3ErroneousHttpHandler(delegate, randomIntBetween(2, 3)));
}
- @Override
- protected List requestTypesTracked() {
- return List.of("GET", "LIST", "PUT", "POST");
- }
-
@Override
protected Settings nodeSettings(int nodeOrdinal) {
final MockSecureSettings secureSettings = new MockSecureSettings();
@@ -287,13 +282,13 @@ private static class S3StatsCollectorHttpHandler extends HttpStatsCollectorHandl
@Override
public void maybeTrack(final String request, Headers requestHeaders) {
if (Regex.simpleMatch("GET /*/?prefix=*", request)) {
- trackRequest("LIST");
+ trackRequest("ListObjects");
} else if (Regex.simpleMatch("GET /*/*", request)) {
- trackRequest("GET");
+ trackRequest("GetObject");
} else if (isMultiPartUpload(request)) {
- trackRequest("POST");
+ trackRequest("PutMultipartObject");
} else if (Regex.simpleMatch("PUT /*/*", request)) {
- trackRequest("PUT");
+ trackRequest("PutObject");
}
}
diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java
index e32a7dd6fd2cc..cd9b4b5763cb2 100644
--- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java
+++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java
@@ -210,10 +210,10 @@ static class Stats {
Map toMap() {
final Map results = new HashMap<>();
- results.put("GET", getCount.get());
- results.put("LIST", listCount.get());
- results.put("PUT", putCount.get());
- results.put("POST", postCount.get());
+ results.put("GetObject", getCount.get());
+ results.put("ListObjects", listCount.get());
+ results.put("PutObject", putCount.get());
+ results.put("PutMultipartObject", postCount.get());
return results;
}
}
diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java
index 51032981a9eef..802957e66818e 100644
--- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java
+++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java
@@ -41,7 +41,7 @@
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.ShardGenerations;
-import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
+import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotsService;
@@ -49,6 +49,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collection;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@@ -66,7 +67,7 @@
* {@code compress}If set to true metadata files will be stored compressed. Defaults to false.
*
*/
-class S3Repository extends BlobStoreRepository {
+class S3Repository extends MeteredBlobStoreRepository {
private static final Logger logger = LogManager.getLogger(S3Repository.class);
static final String TYPE = "s3";
@@ -194,7 +195,12 @@ class S3Repository extends BlobStoreRepository {
final S3Service service,
final ClusterService clusterService,
final RecoverySettings recoverySettings) {
- super(metadata, namedXContentRegistry, clusterService, recoverySettings, buildBasePath(metadata));
+ super(metadata,
+ namedXContentRegistry,
+ clusterService,
+ recoverySettings,
+ buildBasePath(metadata),
+ buildLocation(metadata));
this.service = service;
// Parse and validate the user's S3 Storage Class setting
@@ -229,6 +235,11 @@ class S3Repository extends BlobStoreRepository {
storageClass);
}
+ private static Map buildLocation(RepositoryMetadata metadata) {
+ return Map.of("base_path", BASE_PATH_SETTING.get(metadata.settings()),
+ "bucket", BUCKET_SETTING.get(metadata.settings()));
+ }
+
/**
* Holds a reference to delayed repository operation {@link Scheduler.Cancellable} so it can be cancelled should the repository be
* closed concurrently.
diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
index 1bbd8b19aaea6..4da9ecfcf7b8d 100644
--- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
+++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
@@ -44,9 +44,12 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.regex.Regex;
+import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.internal.io.IOUtils;
+import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@@ -57,6 +60,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import java.util.Set;
/**
@@ -66,6 +71,12 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
private static final Logger logger = LogManager.getLogger(RepositoriesService.class);
+ public static final Setting REPOSITORIES_STATS_ARCHIVE_RETENTION_PERIOD =
+ Setting.positiveTimeSetting("repositories.stats.archive.retention_period", TimeValue.timeValueHours(2), Setting.Property.NodeScope);
+
+ public static final Setting REPOSITORIES_STATS_ARCHIVE_MAX_ARCHIVED_STATS =
+ Setting.intSetting("repositories.stats.archive.max_archived_stats", 100, 0, Setting.Property.NodeScope);
+
private final Map typesRegistry;
private final Map internalTypesRegistry;
@@ -77,6 +88,8 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
private final Map internalRepositories = ConcurrentCollections.newConcurrentMap();
private volatile Map repositories = Collections.emptyMap();
+ private final RepositoriesStatsArchive repositoriesStatsArchive;
+
public RepositoriesService(Settings settings, ClusterService clusterService, TransportService transportService,
Map typesRegistry, Map internalTypesRegistry,
@@ -93,6 +106,9 @@ public RepositoriesService(Settings settings, ClusterService clusterService, Tra
}
}
this.verifyAction = new VerifyNodeRepositoryAction(transportService, clusterService, this);
+ this.repositoriesStatsArchive = new RepositoriesStatsArchive(REPOSITORIES_STATS_ARCHIVE_RETENTION_PERIOD.get(settings),
+ REPOSITORIES_STATS_ARCHIVE_MAX_ARCHIVED_STATS.get(settings),
+ threadPool::relativeTimeInMillis);
}
/**
@@ -316,7 +332,9 @@ public void applyClusterState(ClusterChangedEvent event) {
for (Map.Entry entry : repositories.entrySet()) {
if (newMetadata == null || newMetadata.repository(entry.getKey()) == null) {
logger.debug("unregistering repository [{}]", entry.getKey());
- closeRepository(entry.getValue());
+ Repository repository = entry.getValue();
+ closeRepository(repository);
+ archiveRepositoryStats(repository, state.version());
} else {
survivors.put(entry.getKey(), entry.getValue());
}
@@ -335,6 +353,7 @@ public void applyClusterState(ClusterChangedEvent event) {
// Previous version is different from the version in settings
logger.debug("updating repository [{}]", repositoryMetadata.name());
closeRepository(repository);
+ archiveRepositoryStats(repository, state.version());
repository = null;
try {
repository = createRepository(repositoryMetadata, typesRegistry);
@@ -405,6 +424,27 @@ public Repository repository(String repositoryName) {
throw new RepositoryMissingException(repositoryName);
}
+ public List repositoriesStats() {
+ List archivedRepoStats = repositoriesStatsArchive.getArchivedStats();
+ List activeRepoStats = getRepositoryStatsForActiveRepositories();
+
+ List repositoriesStats = new ArrayList<>(archivedRepoStats);
+ repositoriesStats.addAll(activeRepoStats);
+ return repositoriesStats;
+ }
+
+ private List getRepositoryStatsForActiveRepositories() {
+ return Stream.concat(repositories.values().stream(), internalRepositories.values().stream())
+ .filter(r -> r instanceof MeteredBlobStoreRepository)
+ .map(r -> (MeteredBlobStoreRepository) r)
+ .map(MeteredBlobStoreRepository::statsSnapshot)
+ .collect(Collectors.toList());
+ }
+
+ public List clearRepositoriesStatsArchive(long maxVersionToClear) {
+ return repositoriesStatsArchive.clear(maxVersionToClear);
+ }
+
public void registerInternalRepository(String name, String type) {
RepositoryMetadata metadata = new RepositoryMetadata(name, type, Settings.EMPTY);
Repository repository = internalRepositories.computeIfAbsent(name, (n) -> {
@@ -435,6 +475,15 @@ private void closeRepository(Repository repository) {
repository.close();
}
+ private void archiveRepositoryStats(Repository repository, long clusterStateVersion) {
+ if (repository instanceof MeteredBlobStoreRepository) {
+ RepositoryStatsSnapshot stats = ((MeteredBlobStoreRepository) repository).statsSnapshotForArchival(clusterStateVersion);
+ if (repositoriesStatsArchive.archive(stats) == false) {
+ logger.warn("Unable to archive the repository stats [{}] as the archive is full.", stats);
+ }
+ }
+ }
+
/**
* Creates repository holder. This method starts the repository
*/
diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesStatsArchive.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesStatsArchive.java
new file mode 100644
index 0000000000000..d0af94b4155fd
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesStatsArchive.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.repositories;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.common.unit.TimeValue;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public final class RepositoriesStatsArchive {
+ private static final Logger logger = LogManager.getLogger(RepositoriesStatsArchive.class);
+
+ private final TimeValue retentionPeriod;
+ private final int maxCapacity;
+ private final LongSupplier relativeTimeSupplier;
+ private final Deque archive = new ArrayDeque<>();
+
+ public RepositoriesStatsArchive(TimeValue retentionPeriod,
+ int maxCapacity,
+ LongSupplier relativeTimeSupplier) {
+ this.retentionPeriod = retentionPeriod;
+ this.maxCapacity = maxCapacity;
+ this.relativeTimeSupplier = relativeTimeSupplier;
+ }
+
+ /**
+ * Archives the specified repository stats snapshot into the archive
+ * if it's possible without violating the capacity constraints.
+ *
+ * @return {@code true} if the repository stats were archived, {@code false} otherwise.
+ */
+ synchronized boolean archive(final RepositoryStatsSnapshot repositoryStats) {
+ assert containsRepositoryStats(repositoryStats) == false
+ : "A repository with ephemeral id " + repositoryStats.getRepositoryInfo().ephemeralId + " is already archived";
+ assert repositoryStats.isArchived();
+
+ evict();
+
+ if (archive.size() >= maxCapacity) {
+ return false;
+ }
+
+ return archive.add(new ArchiveEntry(repositoryStats, relativeTimeSupplier.getAsLong()));
+ }
+
+ synchronized List getArchivedStats() {
+ evict();
+ return archive.stream().map(e -> e.repositoryStatsSnapshot).collect(Collectors.toList());
+ }
+
+ /**
+ * Clears the archive, returning the valid archived entries up until that point.
+ *
+ * @return the repository stats that were stored before clearing the archive.
+ */
+ synchronized List clear(long maxVersionToClear) {
+ List clearedStats = new ArrayList<>();
+ Iterator iterator = archive.iterator();
+ while (iterator.hasNext()) {
+ RepositoryStatsSnapshot statsSnapshot = iterator.next().repositoryStatsSnapshot;
+ if (statsSnapshot.getClusterVersion() <= maxVersionToClear) {
+ clearedStats.add(statsSnapshot);
+ iterator.remove();
+ }
+ }
+ logger.debug("RepositoriesStatsArchive have been cleared. Removed stats: [{}]", clearedStats);
+ return clearedStats;
+ }
+
+ private void evict() {
+ ArchiveEntry entry;
+ while ((entry = archive.peek()) != null && entry.ageInMillis(relativeTimeSupplier) >= retentionPeriod.getMillis()) {
+ ArchiveEntry removedEntry = archive.poll();
+ logger.debug("Evicting repository stats [{}]", removedEntry.repositoryStatsSnapshot);
+ }
+ }
+
+ private boolean containsRepositoryStats(RepositoryStatsSnapshot repositoryStats) {
+ return archive.stream()
+ .anyMatch(entry ->
+ entry.repositoryStatsSnapshot.getRepositoryInfo().ephemeralId.equals(repositoryStats.getRepositoryInfo().ephemeralId));
+ }
+
+ private static class ArchiveEntry {
+ private final RepositoryStatsSnapshot repositoryStatsSnapshot;
+ private final long createdAtMillis;
+
+ private ArchiveEntry(RepositoryStatsSnapshot repositoryStatsSnapshot, long createdAtMillis) {
+ this.repositoryStatsSnapshot = repositoryStatsSnapshot;
+ this.createdAtMillis = createdAtMillis;
+ }
+
+ private long ageInMillis(LongSupplier relativeTimeInMillis) {
+ return Math.max(0, relativeTimeInMillis.getAsLong() - createdAtMillis);
+ }
+ }
+}
diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryInfo.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryInfo.java
new file mode 100644
index 0000000000000..8d2612ba70b04
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryInfo.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.repositories;
+
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ToXContentFragment;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+public final class RepositoryInfo implements Writeable, ToXContentFragment {
+ public final String ephemeralId;
+ public final String name;
+ public final String type;
+ public final Map location;
+ public final long startedAt;
+ @Nullable
+ public final Long stoppedAt;
+
+ public RepositoryInfo(String ephemeralId,
+ String name,
+ String type,
+ Map location,
+ long startedAt) {
+ this(ephemeralId, name, type, location, startedAt, null);
+ }
+
+ public RepositoryInfo(String ephemeralId,
+ String name,
+ String type,
+ Map location,
+ long startedAt,
+ @Nullable Long stoppedAt) {
+ this.ephemeralId = ephemeralId;
+ this.name = name;
+ this.type = type;
+ this.location = location;
+ this.startedAt = startedAt;
+ if (stoppedAt != null && startedAt > stoppedAt) {
+ throw new IllegalArgumentException("createdAt must be before or equal to stoppedAt");
+ }
+ this.stoppedAt = stoppedAt;
+ }
+
+ public RepositoryInfo(StreamInput in) throws IOException {
+ this.ephemeralId = in.readString();
+ this.name = in.readString();
+ this.type = in.readString();
+ this.location = in.readMap(StreamInput::readString, StreamInput::readString);
+ this.startedAt = in.readLong();
+ this.stoppedAt = in.readOptionalLong();
+ }
+
+ public RepositoryInfo stopped(long stoppedAt) {
+ assert isStopped() == false : "The repository is already stopped";
+
+ return new RepositoryInfo(ephemeralId, name, type, location, startedAt, stoppedAt);
+ }
+
+ public boolean isStopped() {
+ return stoppedAt != null;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeString(ephemeralId);
+ out.writeString(name);
+ out.writeString(type);
+ out.writeMap(location, StreamOutput::writeString, StreamOutput::writeString);
+ out.writeLong(startedAt);
+ out.writeOptionalLong(stoppedAt);
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.field("repository_name", name);
+ builder.field("repository_type", type);
+ builder.field("repository_location", location);
+ builder.field("repository_ephemeral_id", ephemeralId);
+ builder.field("repository_started_at", startedAt);
+ if (stoppedAt != null) {
+ builder.field("repository_stopped_at", stoppedAt);
+ }
+ return builder;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ RepositoryInfo that = (RepositoryInfo) o;
+ return ephemeralId.equals(that.ephemeralId) &&
+ name.equals(that.name) &&
+ type.equals(that.type) &&
+ location.equals(that.location) &&
+ startedAt == that.startedAt &&
+ Objects.equals(stoppedAt, that.stoppedAt);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(ephemeralId, name, type, location, startedAt, stoppedAt);
+ }
+
+ @Override
+ public String toString() {
+ return Strings.toString(this);
+ }
+}
diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryStats.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryStats.java
index 50a8b46630179..d6fc680946b38 100644
--- a/server/src/main/java/org/elasticsearch/repositories/RepositoryStats.java
+++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryStats.java
@@ -27,6 +27,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
public class RepositoryStats implements Writeable {
@@ -55,4 +56,24 @@ public RepositoryStats merge(RepositoryStats otherStats) {
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(requestCounts, StreamOutput::writeString, StreamOutput::writeLong);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ RepositoryStats that = (RepositoryStats) o;
+ return requestCounts.equals(that.requestCounts);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(requestCounts);
+ }
+
+ @Override
+ public String toString() {
+ return "RepositoryStats{" +
+ "requestCounts=" + requestCounts +
+ '}';
+ }
}
diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryStatsSnapshot.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryStatsSnapshot.java
new file mode 100644
index 0000000000000..55a13e5fde0f7
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryStatsSnapshot.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.repositories;
+
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Objects;
+
+public final class RepositoryStatsSnapshot implements Writeable, ToXContentObject {
+ public static final long UNKNOWN_CLUSTER_VERSION = -1;
+ private final RepositoryInfo repositoryInfo;
+ private final RepositoryStats repositoryStats;
+ private final long clusterVersion;
+ private final boolean archived;
+
+ public RepositoryStatsSnapshot(RepositoryInfo repositoryInfo,
+ RepositoryStats repositoryStats,
+ long clusterVersion,
+ boolean archived) {
+ assert archived != (clusterVersion == UNKNOWN_CLUSTER_VERSION);
+ this.repositoryInfo = repositoryInfo;
+ this.repositoryStats = repositoryStats;
+ this.clusterVersion = clusterVersion;
+ this.archived = archived;
+ }
+
+ public RepositoryStatsSnapshot(StreamInput in) throws IOException {
+ this.repositoryInfo = new RepositoryInfo(in);
+ this.repositoryStats = new RepositoryStats(in);
+ this.clusterVersion = in.readLong();
+ this.archived = in.readBoolean();
+ }
+
+ public RepositoryInfo getRepositoryInfo() {
+ return repositoryInfo;
+ }
+
+ public RepositoryStats getRepositoryStats() {
+ return repositoryStats;
+ }
+
+ public boolean isArchived() {
+ return archived;
+ }
+
+ public long getClusterVersion() {
+ return clusterVersion;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ repositoryInfo.writeTo(out);
+ repositoryStats.writeTo(out);
+ out.writeLong(clusterVersion);
+ out.writeBoolean(archived);
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject();
+ repositoryInfo.toXContent(builder, params);
+ builder.field("request_counts", repositoryStats.requestCounts);
+ builder.field("archived", archived);
+ if (archived) {
+ builder.field("cluster_version", clusterVersion);
+ }
+ builder.endObject();
+ return builder;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ RepositoryStatsSnapshot that = (RepositoryStatsSnapshot) o;
+ return repositoryInfo.equals(that.repositoryInfo) &&
+ repositoryStats.equals(that.repositoryStats) &&
+ clusterVersion == that.clusterVersion &&
+ archived == that.archived;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(repositoryInfo, repositoryStats, clusterVersion, archived);
+ }
+
+ @Override
+ public String toString() {
+ return Strings.toString(this);
+ }
+}
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/MeteredBlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/MeteredBlobStoreRepository.java
new file mode 100644
index 0000000000000..b460f373d86b9
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/MeteredBlobStoreRepository.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.repositories.blobstore;
+
+import org.elasticsearch.cluster.metadata.RepositoryMetadata;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.blobstore.BlobPath;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.indices.recovery.RecoverySettings;
+import org.elasticsearch.repositories.RepositoryInfo;
+import org.elasticsearch.repositories.RepositoryStatsSnapshot;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.util.Map;
+
+public abstract class MeteredBlobStoreRepository extends BlobStoreRepository {
+ private final RepositoryInfo repositoryInfo;
+
+ public MeteredBlobStoreRepository(RepositoryMetadata metadata,
+ NamedXContentRegistry namedXContentRegistry,
+ ClusterService clusterService,
+ RecoverySettings recoverySettings,
+ BlobPath basePath,
+ Map location) {
+ super(metadata, namedXContentRegistry, clusterService, recoverySettings, basePath);
+ ThreadPool threadPool = clusterService.getClusterApplierService().threadPool();
+ this.repositoryInfo = new RepositoryInfo(UUIDs.randomBase64UUID(),
+ metadata.name(),
+ metadata.type(),
+ location,
+ threadPool.absoluteTimeInMillis());
+ }
+
+ public RepositoryStatsSnapshot statsSnapshot() {
+ return new RepositoryStatsSnapshot(repositoryInfo, stats(), RepositoryStatsSnapshot.UNKNOWN_CLUSTER_VERSION, false);
+ }
+
+ public RepositoryStatsSnapshot statsSnapshotForArchival(long clusterVersion) {
+ RepositoryInfo stoppedRepoInfo = repositoryInfo.stopped(threadPool.absoluteTimeInMillis());
+ return new RepositoryStatsSnapshot(stoppedRepoInfo, stats(), clusterVersion, true);
+ }
+}
diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java
index 6a7738c1c32e4..2c74ad03db626 100644
--- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java
+++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java
@@ -23,23 +23,32 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
+import org.elasticsearch.cluster.ClusterChangedEvent;
+import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.blobstore.BlobPath;
+import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
+import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.recovery.RecoveryState;
+import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.test.ESTestCase;
@@ -49,11 +58,14 @@
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
+import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class RepositoriesServiceTests extends ESTestCase {
@@ -67,8 +79,16 @@ public void setUp() throws Exception {
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null,
Collections.emptySet());
+ final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class);
+ when(clusterApplierService.threadPool()).thenReturn(threadPool);
+ final ClusterService clusterService = mock(ClusterService.class);
+ when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService);
+ Map typesRegistry =
+ Map.of(TestRepository.TYPE, TestRepository::new,
+ MeteredRepositoryTypeA.TYPE, metadata -> new MeteredRepositoryTypeA(metadata, clusterService),
+ MeteredRepositoryTypeB.TYPE, metadata -> new MeteredRepositoryTypeB(metadata, clusterService));
repositoriesService = new RepositoriesService(Settings.EMPTY, mock(ClusterService.class),
- transportService, Collections.emptyMap(), Collections.singletonMap(TestRepository.TYPE, TestRepository::new), threadPool);
+ transportService, typesRegistry, typesRegistry, threadPool);
repositoriesService.start();
}
@@ -114,6 +134,46 @@ public void testRegisterRejectsInvalidRepositoryNames() {
}
}
+ public void testRepositoriesStatsCanHaveTheSameNameAndDifferentTypeOverTime() {
+ String repoName = "name";
+ expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName));
+
+ ClusterState clusterStateWithRepoTypeA = createClusterStateWithRepo(repoName, MeteredRepositoryTypeA.TYPE);
+
+ repositoriesService.applyClusterState(new ClusterChangedEvent("new repo", clusterStateWithRepoTypeA, emptyState()));
+ assertThat(repositoriesService.repositoriesStats().size(), equalTo(1));
+
+ repositoriesService.applyClusterState(new ClusterChangedEvent("new repo", emptyState(), clusterStateWithRepoTypeA));
+ assertThat(repositoriesService.repositoriesStats().size(), equalTo(1));
+
+ ClusterState clusterStateWithRepoTypeB = createClusterStateWithRepo(repoName, MeteredRepositoryTypeB.TYPE);
+ repositoriesService.applyClusterState(new ClusterChangedEvent("new repo", clusterStateWithRepoTypeB, emptyState()));
+
+ List repositoriesStats = repositoriesService.repositoriesStats();
+ assertThat(repositoriesStats.size(), equalTo(2));
+ RepositoryStatsSnapshot repositoryStatsTypeA = repositoriesStats.get(0);
+ assertThat(repositoryStatsTypeA.getRepositoryInfo().type, equalTo(MeteredRepositoryTypeA.TYPE));
+ assertThat(repositoryStatsTypeA.getRepositoryStats(), equalTo(MeteredRepositoryTypeA.STATS));
+
+ RepositoryStatsSnapshot repositoryStatsTypeB = repositoriesStats.get(1);
+ assertThat(repositoryStatsTypeB.getRepositoryInfo().type, equalTo(MeteredRepositoryTypeB.TYPE));
+ assertThat(repositoryStatsTypeB.getRepositoryStats(), equalTo(MeteredRepositoryTypeB.STATS));
+ }
+
+ private ClusterState createClusterStateWithRepo(String repoName, String repoType) {
+ ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
+ Metadata.Builder mdBuilder = Metadata.builder();
+ mdBuilder.putCustom(RepositoriesMetadata.TYPE,
+ new RepositoriesMetadata(Collections.singletonList(new RepositoryMetadata(repoName, repoType, Settings.EMPTY))));
+ state.metadata(mdBuilder);
+
+ return state.build();
+ }
+
+ private ClusterState emptyState() {
+ return ClusterState.builder(new ClusterName("test")).build();
+ }
+
private void assertThrowsOnRegister(String repoName) {
PutRepositoryRequest request = new PutRepositoryRequest(repoName);
expectThrows(RepositoryException.class, () -> repositoriesService.registerRepository(request, null));
@@ -257,4 +317,52 @@ public void close() {
isClosed = true;
}
}
+
+ private static class MeteredRepositoryTypeA extends MeteredBlobStoreRepository {
+ private static final String TYPE = "type-a";
+ private static final RepositoryStats STATS = new RepositoryStats(Map.of("GET", 10L));
+
+ private MeteredRepositoryTypeA(RepositoryMetadata metadata, ClusterService clusterService) {
+ super(metadata,
+ mock(NamedXContentRegistry.class),
+ clusterService,
+ mock(RecoverySettings.class),
+ BlobPath.cleanPath(),
+ Map.of("bucket", "bucket-a"));
+ }
+
+ @Override
+ protected BlobStore createBlobStore() {
+ return mock(BlobStore.class);
+ }
+
+ @Override
+ public RepositoryStats stats() {
+ return STATS;
+ }
+ }
+
+ private static class MeteredRepositoryTypeB extends MeteredBlobStoreRepository {
+ private static final String TYPE = "type-b";
+ private static final RepositoryStats STATS = new RepositoryStats(Map.of("LIST", 20L));
+
+ private MeteredRepositoryTypeB(RepositoryMetadata metadata, ClusterService clusterService) {
+ super(metadata,
+ mock(NamedXContentRegistry.class),
+ clusterService,
+ mock(RecoverySettings.class),
+ BlobPath.cleanPath(),
+ Map.of("bucket", "bucket-b"));
+ }
+
+ @Override
+ protected BlobStore createBlobStore() {
+ return mock(BlobStore.class);
+ }
+
+ @Override
+ public RepositoryStats stats() {
+ return STATS;
+ }
+ }
}
diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesStatsArchiveTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesStatsArchiveTests.java
new file mode 100644
index 0000000000000..2c9294d853d81
--- /dev/null
+++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesStatsArchiveTests.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.repositories;
+
+import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class RepositoriesStatsArchiveTests extends ESTestCase {
+ public void testStatsAreEvictedOnceTheyAreOlderThanRetentionPeriod() {
+ int retentionTimeInMillis = randomIntBetween(100, 1000);
+
+ AtomicLong fakeRelativeClock = new AtomicLong();
+ RepositoriesStatsArchive repositoriesStatsArchive =
+ new RepositoriesStatsArchive(TimeValue.timeValueMillis(retentionTimeInMillis),
+ 100,
+ fakeRelativeClock::get);
+
+ for (int i = 0; i < randomInt(10); i++) {
+ RepositoryStatsSnapshot repoStats = createRepositoryStats(RepositoryStats.EMPTY_STATS);
+ repositoriesStatsArchive.archive(repoStats);
+ }
+
+ fakeRelativeClock.set(retentionTimeInMillis * 2);
+ int statsToBeRetainedCount = randomInt(10);
+ for (int i = 0; i < statsToBeRetainedCount; i++) {
+ RepositoryStatsSnapshot repoStats = createRepositoryStats(new RepositoryStats(Map.of("GET", 10L)));
+ repositoriesStatsArchive.archive(repoStats);
+ }
+
+ List archivedStats = repositoriesStatsArchive.getArchivedStats();
+ assertThat(archivedStats.size(), equalTo(statsToBeRetainedCount));
+ for (RepositoryStatsSnapshot repositoryStatsSnapshot : archivedStats) {
+ assertThat(repositoryStatsSnapshot.getRepositoryStats().requestCounts, equalTo(Map.of("GET", 10L)));
+ }
+ }
+
+ public void testStatsAreRejectedIfTheArchiveIsFull() {
+ int retentionTimeInMillis = randomIntBetween(100, 1000);
+
+ AtomicLong fakeRelativeClock = new AtomicLong();
+ RepositoriesStatsArchive repositoriesStatsArchive =
+ new RepositoriesStatsArchive(TimeValue.timeValueMillis(retentionTimeInMillis),
+ 1,
+ fakeRelativeClock::get);
+
+ assertTrue(repositoriesStatsArchive.archive(createRepositoryStats(RepositoryStats.EMPTY_STATS)));
+
+ fakeRelativeClock.set(retentionTimeInMillis * 2);
+ // Now there's room since the previous stats should be evicted
+ assertTrue(repositoriesStatsArchive.archive(createRepositoryStats(RepositoryStats.EMPTY_STATS)));
+ // There's no room for stats with the same creation time
+ assertFalse(repositoriesStatsArchive.archive(createRepositoryStats(RepositoryStats.EMPTY_STATS)));
+ }
+
+ public void testClearArchive() {
+ int retentionTimeInMillis = randomIntBetween(100, 1000);
+ AtomicLong fakeRelativeClock = new AtomicLong();
+ RepositoriesStatsArchive repositoriesStatsArchive =
+ new RepositoriesStatsArchive(TimeValue.timeValueMillis(retentionTimeInMillis),
+ 100,
+ fakeRelativeClock::get);
+
+ int archivedStatsWithVersionZero = randomIntBetween(1, 20);
+ for (int i = 0; i < archivedStatsWithVersionZero; i++) {
+ repositoriesStatsArchive.archive(createRepositoryStats(RepositoryStats.EMPTY_STATS, 0));
+ }
+
+ int archivedStatsWithNewerVersion = randomIntBetween(1, 20);
+ for (int i = 0; i < archivedStatsWithNewerVersion; i++) {
+ repositoriesStatsArchive.archive(createRepositoryStats(RepositoryStats.EMPTY_STATS, 1));
+ }
+
+ List removedStats = repositoriesStatsArchive.clear(0L);
+ assertThat(removedStats.size(), equalTo(archivedStatsWithVersionZero));
+
+ assertThat(repositoriesStatsArchive.getArchivedStats().size(), equalTo(archivedStatsWithNewerVersion));
+ }
+
+ private RepositoryStatsSnapshot createRepositoryStats(RepositoryStats repositoryStats) {
+ return createRepositoryStats(repositoryStats, 0);
+ }
+
+ private RepositoryStatsSnapshot createRepositoryStats(RepositoryStats repositoryStats, long clusterVersion) {
+ RepositoryInfo repositoryInfo = new RepositoryInfo(UUIDs.randomBase64UUID(),
+ randomAlphaOfLength(10),
+ randomAlphaOfLength(10),
+ Map.of("bucket", randomAlphaOfLength(10)),
+ System.currentTimeMillis(),
+ null);
+ return new RepositoryStatsSnapshot(repositoryInfo, repositoryStats, clusterVersion, true);
+ }
+
+}
diff --git a/test/fixtures/azure-fixture/docker-compose.yml b/test/fixtures/azure-fixture/docker-compose.yml
index 61ea9d28a560a..85e073e1803c1 100644
--- a/test/fixtures/azure-fixture/docker-compose.yml
+++ b/test/fixtures/azure-fixture/docker-compose.yml
@@ -17,3 +17,12 @@ services:
- ./testfixtures_shared/shared:/fixture/shared
ports:
- "8091"
+
+ azure-fixture-repositories-metering:
+ build:
+ context: .
+ dockerfile: Dockerfile
+ volumes:
+ - ./testfixtures_shared/shared:/fixture/shared
+ ports:
+ - "8091"
diff --git a/test/fixtures/gcs-fixture/docker-compose.yml b/test/fixtures/gcs-fixture/docker-compose.yml
index a53c4366df6dc..30a362e7caa8d 100644
--- a/test/fixtures/gcs-fixture/docker-compose.yml
+++ b/test/fixtures/gcs-fixture/docker-compose.yml
@@ -36,3 +36,15 @@ services:
- ./testfixtures_shared/shared:/fixture/shared
ports:
- "80"
+ gcs-fixture-repositories-metering:
+ build:
+ context: .
+ args:
+ port: 80
+ bucket: "bucket"
+ token: "o/oauth2/token"
+ dockerfile: Dockerfile
+ volumes:
+ - ./testfixtures_shared/shared:/fixture/shared
+ ports:
+ - "80"
diff --git a/test/fixtures/s3-fixture/docker-compose.yml b/test/fixtures/s3-fixture/docker-compose.yml
index 1d06334eddbd3..22d101f41c318 100644
--- a/test/fixtures/s3-fixture/docker-compose.yml
+++ b/test/fixtures/s3-fixture/docker-compose.yml
@@ -30,6 +30,21 @@ services:
ports:
- "80"
+ s3-fixture-repositories-metering:
+ build:
+ context: .
+ args:
+ fixtureClass: fixture.s3.S3HttpFixture
+ port: 80
+ bucket: "bucket"
+ basePath: "base_path"
+ accessKey: "access_key"
+ dockerfile: Dockerfile
+ volumes:
+ - ./testfixtures_shared/shared:/fixture/shared
+ ports:
+ - "80"
+
s3-fixture-with-session-token:
build:
context: .
diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java
index 03a69db208530..e174d94d3716a 100644
--- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java
@@ -49,6 +49,7 @@
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -135,8 +136,6 @@ public void tearDownHttpServer() {
protected abstract HttpHandler createErroneousHttpHandler(HttpHandler delegate);
- protected abstract List requestTypesTracked();
-
/**
* Test the snapshot and restore of an index which has large segments files.
*/
@@ -217,32 +216,25 @@ public void testRequestStats() throws Exception {
Map sdkRequestCounts = repositoryStats.requestCounts;
- for (String requestType : requestTypesTracked()) {
- assertSDKCallsMatchMockCalls(sdkRequestCounts, requestType);
- }
- }
+ final Map mockCalls = getMockRequestCounts();
- private void assertSDKCallsMatchMockCalls(Map sdkRequestCount, String requestTye) {
- final long sdkCalls = sdkRequestCount.getOrDefault(requestTye, 0L);
- final long mockCalls = handlers.values().stream()
- .mapToLong(h -> {
- while (h instanceof DelegatingHttpHandler) {
- if (h instanceof HttpStatsCollectorHandler) {
- return ((HttpStatsCollectorHandler) h).getCount(requestTye);
- }
- h = ((DelegatingHttpHandler) h).getDelegate();
- }
+ String assertionErrorMsg = String.format("SDK sent [%s] calls and handler measured [%s] calls",
+ sdkRequestCounts,
+ mockCalls);
- return 0L;
- }).sum();
-
- String assertionErrorMsg = String.format("SDK sent %d [%s] calls and handler measured %d [%s] calls",
- sdkCalls,
- requestTye,
- mockCalls,
- requestTye);
+ assertEquals(assertionErrorMsg, mockCalls, sdkRequestCounts);
+ }
- assertEquals(assertionErrorMsg, mockCalls, sdkCalls);
+ private Map getMockRequestCounts() {
+ for (HttpHandler h : handlers.values()) {
+ while (h instanceof DelegatingHttpHandler) {
+ if (h instanceof HttpStatsCollectorHandler) {
+ return ((HttpStatsCollectorHandler) h).getOperationsCount();
+ }
+ h = ((DelegatingHttpHandler) h).getDelegate();
+ }
+ }
+ return Collections.emptyMap();
}
protected static String httpServerUrl() {
@@ -352,8 +344,8 @@ public HttpHandler getDelegate() {
return delegate;
}
- synchronized long getCount(final String requestType) {
- return operationCount.getOrDefault(requestType, 0L);
+ synchronized Map getOperationsCount() {
+ return Map.copyOf(operationCount);
}
protected synchronized void trackRequest(final String requestType) {
diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java
index bea1327427511..ed5d38d265e73 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java
@@ -23,6 +23,8 @@
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
import org.apache.http.message.BasicHeader;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.ssl.SSLContexts;
@@ -31,6 +33,7 @@
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
+import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RequestOptions.Builder;
@@ -1183,6 +1186,10 @@ protected static Map getAlias(final String index, final String a
protected static Map getAsMap(final String endpoint) throws IOException {
Response response = client().performRequest(new Request("GET", endpoint));
+ return responseAsMap(response);
+ }
+
+ protected static Map responseAsMap(Response response) throws IOException {
XContentType entityContentType = XContentType.fromMediaTypeOrFormat(response.getEntity().getContentType().getValue());
Map responseEntity = XContentHelper.convertToMap(entityContentType.xContent(),
response.getEntity().getContent(), false);
@@ -1190,6 +1197,52 @@ protected static Map getAsMap(final String endpoint) throws IOEx
return responseEntity;
}
+ protected static void registerRepository(String repository, String type, boolean verify, Settings settings) throws IOException {
+ final Request request = new Request(HttpPut.METHOD_NAME, "_snapshot/" + repository);
+ request.addParameter("verify", Boolean.toString(verify));
+ request.setJsonEntity(Strings.toString(new PutRepositoryRequest(repository).type(type).settings(settings)));
+
+ final Response response = client().performRequest(request);
+ assertAcked("Failed to create repository [" + repository + "] of type [" + type + "]: " + response, response);
+ }
+
+ protected static void createSnapshot(String repository, String snapshot, boolean waitForCompletion) throws IOException {
+ final Request request = new Request(HttpPut.METHOD_NAME, "_snapshot/" + repository + '/' + snapshot);
+ request.addParameter("wait_for_completion", Boolean.toString(waitForCompletion));
+
+ final Response response = client().performRequest(request);
+ assertThat(
+ "Failed to create snapshot [" + snapshot + "] in repository [" + repository + "]: " + response,
+ response.getStatusLine().getStatusCode(),
+ equalTo(RestStatus.OK.getStatus())
+ );
+ }
+
+ protected static void restoreSnapshot(String repository, String snapshot, boolean waitForCompletion) throws IOException {
+ final Request request = new Request(HttpPost.METHOD_NAME, "_snapshot/" + repository + '/' + snapshot + "/_restore");
+ request.addParameter("wait_for_completion", Boolean.toString(waitForCompletion));
+
+ final Response response = client().performRequest(request);
+ assertThat(
+ "Failed to restore snapshot [" + snapshot + "] from repository [" + repository + "]: " + response,
+ response.getStatusLine().getStatusCode(),
+ equalTo(RestStatus.OK.getStatus())
+ );
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void assertAcked(String message, Response response) throws IOException {
+ final int responseStatusCode = response.getStatusLine().getStatusCode();
+ assertThat(
+ message + ": expecting response code [200] but got [" + responseStatusCode + ']',
+ responseStatusCode,
+ equalTo(RestStatus.OK.getStatus())
+ );
+ final Map responseAsMap = responseAsMap(response);
+ Boolean acknowledged = (Boolean) XContentMapValues.extractValue(responseAsMap, "acknowledged");
+ assertThat(message + ": response is not acknowledged", acknowledged, equalTo(Boolean.TRUE));
+ }
+
/**
* Is this template one that is automatically created by xpack?
*/
diff --git a/x-pack/plugin/repositories-metering-api/build.gradle b/x-pack/plugin/repositories-metering-api/build.gradle
new file mode 100644
index 0000000000000..23c911c15c760
--- /dev/null
+++ b/x-pack/plugin/repositories-metering-api/build.gradle
@@ -0,0 +1,46 @@
+evaluationDependsOn(xpackModule('core'))
+
+apply plugin: 'elasticsearch.esplugin'
+esplugin {
+ name 'repositories-metering-api'
+ description 'Repositories metering API'
+ classname 'org.elasticsearch.xpack.repositories.metering.RepositoriesMeteringPlugin'
+ extendedPlugins = ['x-pack-core']
+}
+archivesBaseName = 'x-pack-repositories-metering-api'
+
+dependencies {
+ compileOnly project(path: xpackModule('core'), configuration: 'default')
+ testImplementation project(path: xpackModule('core'), configuration: 'testArtifacts')
+}
+
+// xpack modules are installed in real clusters as the meta plugin, so
+// installing them as individual plugins for integ tests doesn't make sense,
+// so we disable integ tests
+integTest.enabled = false
+
+// add all sub-projects of the qa sub-project
+gradle.projectsEvaluated {
+ project.subprojects
+ .find { it.path == project.path + ":qa" }
+ .subprojects
+ .findAll { it.path.startsWith(project.path + ":qa") }
+ .each { check.dependsOn it.check }
+}
+
+configurations {
+ testArtifacts.extendsFrom testRuntime
+ testArtifacts.extendsFrom testImplementation
+}
+
+task testJar(type: Jar) {
+ appendix 'test'
+ from sourceSets.test.output
+}
+
+artifacts {
+ testArtifacts testJar
+}
+
+test {
+}
diff --git a/x-pack/plugin/repositories-metering-api/qa/azure/build.gradle b/x-pack/plugin/repositories-metering-api/qa/azure/build.gradle
new file mode 100644
index 0000000000000..560d6fad19364
--- /dev/null
+++ b/x-pack/plugin/repositories-metering-api/qa/azure/build.gradle
@@ -0,0 +1,99 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.elasticsearch.gradle.info.BuildParams
+import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE
+
+apply plugin: 'elasticsearch.standalone-rest-test'
+apply plugin: 'elasticsearch.rest-test'
+apply plugin: 'elasticsearch.rest-resources'
+
+final Project fixture = project(':test:fixtures:azure-fixture')
+final Project repositoryPlugin = project(':plugins:repository-azure')
+
+dependencies {
+ testImplementation project(path: xpackModule('repositories-metering-api'), configuration: 'testArtifacts')
+ testImplementation repositoryPlugin
+}
+
+restResources {
+ restApi {
+ includeCore 'indices', 'bulk', 'snapshot', 'nodes', '_common'
+ includeXpack 'repositories-metering-api'
+ }
+}
+
+boolean useFixture = false
+String azureAccount = System.getenv("azure_storage_account")
+String azureKey = System.getenv("azure_storage_key")
+String azureContainer = System.getenv("azure_storage_container")
+String azureBasePath = System.getenv("azure_storage_base_path")
+String azureSasToken = System.getenv("azure_storage_sas_token")
+
+if (!azureAccount && !azureKey && !azureContainer && !azureBasePath && !azureSasToken) {
+ azureAccount = 'azure_integration_test_account'
+ azureKey = 'YXp1cmVfaW50ZWdyYXRpb25fdGVzdF9rZXk=' // The key is "azure_integration_test_key" encoded using base64
+ azureContainer = 'container'
+ azureBasePath = ''
+ azureSasToken = ''
+ useFixture = true
+
+}
+
+if (useFixture) {
+ apply plugin: 'elasticsearch.test.fixtures'
+ testFixtures.useFixture(fixture.path, 'azure-fixture-repositories-metering')
+}
+
+integTest {
+ dependsOn repositoryPlugin.bundlePlugin
+ systemProperty 'test.azure.container', azureContainer
+ nonInputProperties.systemProperty 'test.azure.base_path', azureBasePath + "_repositories_metering_tests_" + BuildParams.testSeed
+}
+
+testClusters.integTest {
+ testDistribution = 'DEFAULT'
+ plugin repositoryPlugin.bundlePlugin.archiveFile
+
+ keystore 'azure.client.repositories_metering.account', azureAccount
+ if (azureKey != null && azureKey.isEmpty() == false) {
+ keystore 'azure.client.repositories_metering.key', azureKey
+ }
+ if (azureSasToken != null && azureSasToken.isEmpty() == false) {
+ keystore 'azure.client.repositories_metering.sas_token', azureSasToken
+ }
+
+ if (useFixture) {
+ def fixtureAddress = { fixtureName ->
+ assert useFixture: 'closure should not be used without a fixture'
+ int ephemeralPort = fixture.postProcessFixture.ext."test.fixtures.${fixtureName}.tcp.8091"
+ assert ephemeralPort > 0
+ '127.0.0.1:' + ephemeralPort
+ }
+ setting 'azure.client.repositories_metering.endpoint_suffix',
+ { "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=http://${-> fixtureAddress('azure-fixture-repositories-metering')}" }, IGNORE_VALUE
+
+ } else {
+ println "Using an external service to test " + project.name
+ }
+}
+
+task azureThirdPartyTest {
+ dependsOn integTest
+}
+
diff --git a/x-pack/plugin/repositories-metering-api/qa/azure/src/test/java/org/elasticsearch/xpack/repositories/metering/azure/AzureRepositoriesMeteringIT.java b/x-pack/plugin/repositories-metering-api/qa/azure/src/test/java/org/elasticsearch/xpack/repositories/metering/azure/AzureRepositoriesMeteringIT.java
new file mode 100644
index 0000000000000..7be78bdabcd1c
--- /dev/null
+++ b/x-pack/plugin/repositories-metering-api/qa/azure/src/test/java/org/elasticsearch/xpack/repositories/metering/azure/AzureRepositoriesMeteringIT.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.repositories.metering.azure;
+
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.xpack.repositories.metering.AbstractRepositoriesMeteringAPIRestTestCase;
+
+import java.util.List;
+import java.util.Map;
+
+public class AzureRepositoriesMeteringIT extends AbstractRepositoriesMeteringAPIRestTestCase {
+
+ @Override
+ protected String repositoryType() {
+ return "azure";
+ }
+
+ @Override
+ protected Map repositoryLocation() {
+ return Map.of("container", getProperty("test.azure.container"), "base_path", getProperty("test.azure.base_path"));
+ }
+
+ @Override
+ protected Settings repositorySettings() {
+ final String container = getProperty("test.azure.container");
+
+ final String basePath = getProperty("test.azure.base_path");
+
+ return Settings.builder().put("client", "repositories_metering").put("container", container).put("base_path", basePath).build();
+ }
+
+ @Override
+ protected Settings updatedRepositorySettings() {
+ return Settings.builder().put(repositorySettings()).put("azure.client.repositories_metering.max_retries", 5).build();
+ }
+
+ @Override
+ protected List readCounterKeys() {
+ return List.of("GetBlob", "GetBlobProperties", "ListBlobs");
+ }
+
+ @Override
+ protected List writeCounterKeys() {
+ return List.of("PutBlob");
+ }
+}
diff --git a/x-pack/plugin/repositories-metering-api/qa/build.gradle b/x-pack/plugin/repositories-metering-api/qa/build.gradle
new file mode 100644
index 0000000000000..53a1915bb2a07
--- /dev/null
+++ b/x-pack/plugin/repositories-metering-api/qa/build.gradle
@@ -0,0 +1,6 @@
+apply plugin: 'elasticsearch.build'
+test.enabled = false
+
+dependencies {
+ api project(':test:framework')
+}
diff --git a/x-pack/plugin/repositories-metering-api/qa/gcs/build.gradle b/x-pack/plugin/repositories-metering-api/qa/gcs/build.gradle
new file mode 100644
index 0000000000000..df34fbc0e8312
--- /dev/null
+++ b/x-pack/plugin/repositories-metering-api/qa/gcs/build.gradle
@@ -0,0 +1,130 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.elasticsearch.gradle.info.BuildParams
+import org.elasticsearch.gradle.MavenFilteringHack
+
+import java.nio.file.Files
+import java.security.KeyPair
+import java.security.KeyPairGenerator
+
+import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE
+
+apply plugin: 'elasticsearch.standalone-rest-test'
+apply plugin: 'elasticsearch.rest-test'
+apply plugin: 'elasticsearch.rest-resources'
+
+final Project fixture = project(':test:fixtures:gcs-fixture')
+final Project repositoryPlugin = project(':plugins:repository-gcs')
+
+dependencies {
+ testImplementation project(path: xpackModule('repositories-metering-api'), configuration: 'testArtifacts')
+ testImplementation repositoryPlugin
+}
+
+restResources {
+ restApi {
+ includeCore 'indices', 'bulk', 'snapshot', 'nodes', '_common'
+ includeXpack 'repositories-metering-api'
+ }
+}
+
+boolean useFixture = false
+
+String gcsServiceAccount = System.getenv("google_storage_service_account")
+String gcsBucket = System.getenv("google_storage_bucket")
+String gcsBasePath = System.getenv("google_storage_base_path")
+
+File serviceAccountFile = null
+if (!gcsServiceAccount && !gcsBucket && !gcsBasePath) {
+ serviceAccountFile = new File(project.buildDir, 'generated-resources/service_account_test.json')
+ gcsBucket = 'bucket'
+ gcsBasePath = 'integration_test'
+ useFixture = true
+} else if (!gcsServiceAccount || !gcsBucket || !gcsBasePath) {
+ throw new IllegalArgumentException("not all options specified to run tests against external GCS service are present")
+} else {
+ serviceAccountFile = new File(gcsServiceAccount)
+}
+
+/** A service account file that points to the Google Cloud Storage service emulated by the fixture **/
+task createServiceAccountFile() {
+ doLast {
+ KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA")
+ keyPairGenerator.initialize(1024)
+ KeyPair keyPair = keyPairGenerator.generateKeyPair()
+ String encodedKey = Base64.getEncoder().encodeToString(keyPair.private.getEncoded())
+
+ serviceAccountFile.parentFile.mkdirs()
+ serviceAccountFile.setText("{\n" +
+ ' "type": "service_account",\n' +
+ ' "project_id": "integration_test",\n' +
+ ' "private_key_id": "' + UUID.randomUUID().toString() + '",\n' +
+ ' "private_key": "-----BEGIN PRIVATE KEY-----\\n' + encodedKey + '\\n-----END PRIVATE KEY-----\\n",\n' +
+ ' "client_email": "integration_test@appspot.gserviceaccount.com",\n' +
+ ' "client_id": "123456789101112130594"\n' +
+ '}', 'UTF-8')
+ }
+}
+
+def fixtureAddress = { f ->
+ assert useFixture: 'closure should not be used without a fixture'
+ int ephemeralPort = project(':test:fixtures:gcs-fixture').postProcessFixture.ext."test.fixtures.${f}.tcp.80"
+ assert ephemeralPort > 0
+ 'http://127.0.0.1:' + ephemeralPort
+}
+
+Map expansions = [
+ 'bucket' : gcsBucket,
+ 'base_path': gcsBasePath + "_integration_tests"
+]
+
+processTestResources {
+ inputs.properties(expansions)
+ MavenFilteringHack.filter(it, expansions)
+}
+
+if (useFixture) {
+ apply plugin: 'elasticsearch.test.fixtures'
+ testFixtures.useFixture(fixture.path, 'gcs-fixture-repositories-metering')
+}
+
+integTest {
+ dependsOn repositoryPlugin.bundlePlugin
+ systemProperty 'test.gcs.bucket', gcsBucket
+ nonInputProperties.systemProperty 'test.gcs.base_path', gcsBasePath + "_repositories_metering" + BuildParams.testSeed
+}
+
+testClusters.integTest {
+ testDistribution = 'DEFAULT'
+ plugin repositoryPlugin.bundlePlugin.archiveFile
+
+ keystore 'gcs.client.repositories_metering.credentials_file', serviceAccountFile, IGNORE_VALUE
+ if (useFixture) {
+ tasks.integTest.dependsOn createServiceAccountFile
+ /* Use a closure on the string to delay evaluation until tests are executed */
+ setting 'gcs.client.repositories_metering.endpoint', { "${-> fixtureAddress('gcs-fixture-repositories-metering')}" }, IGNORE_VALUE
+ setting 'gcs.client.repositories_metering.token_uri', { "${-> fixtureAddress('gcs-fixture-repositories-metering')}/o/oauth2/token" }, IGNORE_VALUE
+ } else {
+ println "Using an external service to test " + project.name
+ }
+}
+
+task gcsThirdPartyTest {
+ dependsOn integTest
+}
diff --git a/x-pack/plugin/repositories-metering-api/qa/gcs/src/test/java/org/elasticsearch/xpack/repositories/metering/gcs/GCSRepositoriesMeteringIT.java b/x-pack/plugin/repositories-metering-api/qa/gcs/src/test/java/org/elasticsearch/xpack/repositories/metering/gcs/GCSRepositoriesMeteringIT.java
new file mode 100644
index 0000000000000..f843410293b3f
--- /dev/null
+++ b/x-pack/plugin/repositories-metering-api/qa/gcs/src/test/java/org/elasticsearch/xpack/repositories/metering/gcs/GCSRepositoriesMeteringIT.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.repositories.metering.gcs;
+
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.xpack.repositories.metering.AbstractRepositoriesMeteringAPIRestTestCase;
+
+import java.util.List;
+import java.util.Map;
+
+public class GCSRepositoriesMeteringIT extends AbstractRepositoriesMeteringAPIRestTestCase {
+
+ @Override
+ protected String repositoryType() {
+ return "gcs";
+ }
+
+ @Override
+ protected Map repositoryLocation() {
+ return Map.of("bucket", getProperty("test.gcs.bucket"), "base_path", getProperty("test.gcs.base_path"));
+ }
+
+ @Override
+ protected Settings repositorySettings() {
+ final String bucket = getProperty("test.gcs.bucket");
+ final String basePath = getProperty("test.gcs.base_path");
+
+ return Settings.builder().put("client", "repositories_metering").put("bucket", bucket).put("base_path", basePath).build();
+ }
+
+ @Override
+ protected Settings updatedRepositorySettings() {
+ return Settings.builder().put(repositorySettings()).put("gcs.client.repositories_metering.application_name", "updated").build();
+ }
+
+ @Override
+ protected List readCounterKeys() {
+ return List.of("GetObject", "ListObjects");
+ }
+
+ @Override
+ protected List writeCounterKeys() {
+ return List.of("InsertObject");
+ }
+}
diff --git a/x-pack/plugin/repositories-metering-api/qa/s3/build.gradle b/x-pack/plugin/repositories-metering-api/qa/s3/build.gradle
new file mode 100644
index 0000000000000..63e2d46bb0321
--- /dev/null
+++ b/x-pack/plugin/repositories-metering-api/qa/s3/build.gradle
@@ -0,0 +1,75 @@
+import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE
+import org.elasticsearch.gradle.info.BuildParams
+
+apply plugin: 'elasticsearch.standalone-rest-test'
+apply plugin: 'elasticsearch.rest-test'
+apply plugin: 'elasticsearch.rest-resources'
+
+final Project fixture = project(':test:fixtures:s3-fixture')
+final Project repositoryPlugin = project(':plugins:repository-s3')
+
+dependencies {
+ testImplementation project(path: xpackModule('repositories-metering-api'), configuration: 'testArtifacts')
+ testImplementation repositoryPlugin
+}
+
+restResources {
+ restApi {
+ includeCore 'indices', 'bulk', 'snapshot', 'nodes', '_common'
+ includeXpack 'repositories-metering-api'
+ }
+}
+
+boolean useFixture = false
+String s3AccessKey = System.getenv("amazon_s3_access_key")
+String s3SecretKey = System.getenv("amazon_s3_secret_key")
+String s3Bucket = System.getenv("amazon_s3_bucket")
+String s3BasePath = System.getenv("amazon_s3_base_path")
+
+if (!s3AccessKey && !s3SecretKey && !s3Bucket && !s3BasePath) {
+ s3AccessKey = 'access_key'
+ s3SecretKey = 'secret_key'
+ s3Bucket = 'bucket'
+ s3BasePath = null
+ useFixture = true
+
+} else if (!s3AccessKey || !s3SecretKey || !s3Bucket || !s3BasePath) {
+ throw new IllegalArgumentException("not all options specified to run against external S3 service are present")
+}
+
+if (useFixture) {
+ apply plugin: 'elasticsearch.test.fixtures'
+ testFixtures.useFixture(fixture.path, 's3-fixture-repositories-metering')
+}
+
+integTest {
+ dependsOn repositoryPlugin.bundlePlugin
+ systemProperty 'test.s3.bucket', s3Bucket
+ nonInputProperties.systemProperty 'test.s3.base_path', s3BasePath ? s3BasePath + "_repositories_metering" + BuildParams.testSeed : 'base_path'
+}
+
+testClusters.integTest {
+ testDistribution = 'DEFAULT'
+ plugin repositoryPlugin.bundlePlugin.archiveFile
+
+ keystore 's3.client.repositories_metering.access_key', s3AccessKey
+ keystore 's3.client.repositories_metering.secret_key', s3SecretKey
+
+ if (useFixture) {
+ def fixtureAddress = { fixtureName ->
+ assert useFixture: 'closure should not be used without a fixture'
+ int ephemeralPort = fixture.postProcessFixture.ext."test.fixtures.${fixtureName}.tcp.80"
+ assert ephemeralPort > 0
+ '127.0.0.1:' + ephemeralPort
+ }
+ setting 's3.client.repositories_metering.protocol', 'http'
+ setting 's3.client.repositories_metering.endpoint', { "${-> fixtureAddress('s3-fixture-repositories-metering')}" }, IGNORE_VALUE
+
+ } else {
+ println "Using an external service to test " + project.name
+ }
+}
+
+task s3ThirdPartyTest {
+ dependsOn integTest
+}
diff --git a/x-pack/plugin/repositories-metering-api/qa/s3/src/test/java/org/elasticsearch/xpack/repositories/metering/s3/S3RepositoriesMeteringIT.java b/x-pack/plugin/repositories-metering-api/qa/s3/src/test/java/org/elasticsearch/xpack/repositories/metering/s3/S3RepositoriesMeteringIT.java
new file mode 100644
index 0000000000000..777fed93286ef
--- /dev/null
+++ b/x-pack/plugin/repositories-metering-api/qa/s3/src/test/java/org/elasticsearch/xpack/repositories/metering/s3/S3RepositoriesMeteringIT.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.repositories.metering.s3;
+
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.xpack.repositories.metering.AbstractRepositoriesMeteringAPIRestTestCase;
+
+import java.util.List;
+import java.util.Map;
+
+public class S3RepositoriesMeteringIT extends AbstractRepositoriesMeteringAPIRestTestCase {
+
+ @Override
+ protected String repositoryType() {
+ return "s3";
+ }
+
+ @Override
+ protected Map repositoryLocation() {
+ return Map.of("bucket", getProperty("test.s3.bucket"), "base_path", getProperty("test.s3.base_path"));
+ }
+
+ @Override
+ protected Settings repositorySettings() {
+ final String bucket = getProperty("test.s3.bucket");
+ final String basePath = getProperty("test.s3.base_path");
+
+ return Settings.builder().put("client", "repositories_metering").put("bucket", bucket).put("base_path", basePath).build();
+ }
+
+ @Override
+ protected Settings updatedRepositorySettings() {
+ Settings settings = repositorySettings();
+ return Settings.builder().put(settings).put("s3.client.max_retries", 4).build();
+ }
+
+ @Override
+ protected List readCounterKeys() {
+ return List.of("GetObject", "ListObjects");
+ }
+
+ @Override
+ protected List writeCounterKeys() {
+ return List.of("PutObject");
+ }
+}
diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/RepositoriesMeteringPlugin.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/RepositoriesMeteringPlugin.java
new file mode 100644
index 0000000000000..f7263286e3691
--- /dev/null
+++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/RepositoriesMeteringPlugin.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.repositories.metering;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.IndexScopedSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.settings.SettingsFilter;
+import org.elasticsearch.plugins.ActionPlugin;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.rest.RestController;
+import org.elasticsearch.rest.RestHandler;
+import org.elasticsearch.xpack.repositories.metering.action.ClearRepositoriesMeteringArchiveAction;
+import org.elasticsearch.xpack.repositories.metering.action.RepositoriesMeteringAction;
+import org.elasticsearch.xpack.repositories.metering.action.TransportClearRepositoriesStatsArchiveAction;
+import org.elasticsearch.xpack.repositories.metering.action.TransportRepositoriesStatsAction;
+import org.elasticsearch.xpack.repositories.metering.rest.RestClearRepositoriesMeteringArchiveAction;
+import org.elasticsearch.xpack.repositories.metering.rest.RestGetRepositoriesMeteringAction;
+
+import java.util.List;
+import java.util.function.Supplier;
+
+public final class RepositoriesMeteringPlugin extends Plugin implements ActionPlugin {
+
+ @Override
+ public List> getActions() {
+ return List.of(
+ new ActionHandler<>(RepositoriesMeteringAction.INSTANCE, TransportRepositoriesStatsAction.class),
+ new ActionHandler<>(ClearRepositoriesMeteringArchiveAction.INSTANCE, TransportClearRepositoriesStatsArchiveAction.class)
+ );
+ }
+
+ @Override
+ public List getRestHandlers(
+ Settings settings,
+ RestController restController,
+ ClusterSettings clusterSettings,
+ IndexScopedSettings indexScopedSettings,
+ SettingsFilter settingsFilter,
+ IndexNameExpressionResolver indexNameExpressionResolver,
+ Supplier nodesInCluster
+ ) {
+ return List.of(new RestGetRepositoriesMeteringAction(), new RestClearRepositoriesMeteringArchiveAction());
+ }
+}
diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/ClearRepositoriesMeteringArchiveAction.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/ClearRepositoriesMeteringArchiveAction.java
new file mode 100644
index 0000000000000..4ddddc198e454
--- /dev/null
+++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/ClearRepositoriesMeteringArchiveAction.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.repositories.metering.action;
+
+import org.elasticsearch.action.ActionType;
+
+public final class ClearRepositoriesMeteringArchiveAction extends ActionType {
+ public static final ClearRepositoriesMeteringArchiveAction INSTANCE = new ClearRepositoriesMeteringArchiveAction();
+
+ static final String NAME = "cluster:monitor/xpack/repositories_metering/clear_metering_archive";
+
+ ClearRepositoriesMeteringArchiveAction() {
+ super(NAME, RepositoriesMeteringResponse::new);
+ }
+}
diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/ClearRepositoriesMeteringArchiveRequest.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/ClearRepositoriesMeteringArchiveRequest.java
new file mode 100644
index 0000000000000..98f0a8ec0a6c2
--- /dev/null
+++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/ClearRepositoriesMeteringArchiveRequest.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.repositories.metering.action;
+
+import org.elasticsearch.action.support.nodes.BaseNodesRequest;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+
+import java.io.IOException;
+
+public final class ClearRepositoriesMeteringArchiveRequest extends BaseNodesRequest {
+ private final long maxVersionToClear;
+
+ public ClearRepositoriesMeteringArchiveRequest(StreamInput in) throws IOException {
+ super(in);
+ this.maxVersionToClear = in.readLong();
+ }
+
+ public ClearRepositoriesMeteringArchiveRequest(long maxVersionToClear, String... nodesIds) {
+ super(nodesIds);
+ this.maxVersionToClear = maxVersionToClear;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ super.writeTo(out);
+ out.writeLong(maxVersionToClear);
+ }
+
+ public long getMaxVersionToClear() {
+ return maxVersionToClear;
+ }
+}
diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringAction.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringAction.java
new file mode 100644
index 0000000000000..9d7cd8bbe7107
--- /dev/null
+++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringAction.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.repositories.metering.action;
+
+import org.elasticsearch.action.ActionType;
+
+public final class RepositoriesMeteringAction extends ActionType {
+ public static final RepositoriesMeteringAction INSTANCE = new RepositoriesMeteringAction();
+
+ static final String NAME = "cluster:monitor/xpack/repositories_metering/get_metrics";
+
+ RepositoriesMeteringAction() {
+ super(NAME, RepositoriesMeteringResponse::new);
+ }
+}
diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringRequest.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringRequest.java
new file mode 100644
index 0000000000000..92119da4f0027
--- /dev/null
+++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringRequest.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.repositories.metering.action;
+
+import org.elasticsearch.action.support.nodes.BaseNodesRequest;
+import org.elasticsearch.common.io.stream.StreamInput;
+
+import java.io.IOException;
+
+public final class RepositoriesMeteringRequest extends BaseNodesRequest {
+ public RepositoriesMeteringRequest(StreamInput in) throws IOException {
+ super(in);
+ }
+
+ public RepositoriesMeteringRequest(String... nodesIds) {
+ super(nodesIds);
+ }
+}
diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringResponse.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringResponse.java
new file mode 100644
index 0000000000000..382066c06b695
--- /dev/null
+++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringResponse.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.repositories.metering.action;
+
+import org.elasticsearch.action.FailedNodeException;
+import org.elasticsearch.action.support.nodes.BaseNodesResponse;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.ToXContentFragment;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.List;
+
+public final class RepositoriesMeteringResponse extends BaseNodesResponse implements ToXContentFragment {
+
+ public RepositoriesMeteringResponse(StreamInput in) throws IOException {
+ super(in);
+ }
+
+ public RepositoriesMeteringResponse(
+ ClusterName clusterName,
+ List nodes,
+ List failures
+ ) {
+ super(clusterName, nodes, failures);
+ }
+
+ @Override
+ protected List readNodesFrom(StreamInput in) throws IOException {
+ return in.readList(RepositoriesNodeMeteringResponse::new);
+ }
+
+ @Override
+ protected void writeNodesTo(StreamOutput out, List nodes) throws IOException {
+ out.writeList(nodes);
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject("nodes");
+ for (RepositoriesNodeMeteringResponse nodeStats : getNodes()) {
+ nodeStats.toXContent(builder, params);
+ }
+ builder.endObject();
+ return builder;
+ }
+}
diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesNodeMeteringResponse.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesNodeMeteringResponse.java
new file mode 100644
index 0000000000000..cf463e6f9b22a
--- /dev/null
+++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesNodeMeteringResponse.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.repositories.metering.action;
+
+import org.elasticsearch.action.support.nodes.BaseNodeResponse;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.ToXContentFragment;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.repositories.RepositoryStatsSnapshot;
+
+import java.io.IOException;
+import java.util.List;
+
+public final class RepositoriesNodeMeteringResponse extends BaseNodeResponse implements ToXContentFragment {
+
+ final List repositoryStatsSnapshots;
+
+ public RepositoriesNodeMeteringResponse(DiscoveryNode node, List repositoryStatsSnapshots) {
+ super(node);
+ this.repositoryStatsSnapshots = repositoryStatsSnapshots;
+ }
+
+ public RepositoriesNodeMeteringResponse(StreamInput in) throws IOException {
+ super(in);
+ this.repositoryStatsSnapshots = in.readList(RepositoryStatsSnapshot::new);
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
+ builder.startArray(getNode().getId());
+ for (RepositoryStatsSnapshot repositoryStats : repositoryStatsSnapshots) {
+ repositoryStats.toXContent(builder, params);
+ }
+ builder.endArray();
+ return builder;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ super.writeTo(out);
+ out.writeList(repositoryStatsSnapshots);
+ }
+}
diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/TransportClearRepositoriesStatsArchiveAction.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/TransportClearRepositoriesStatsArchiveAction.java
new file mode 100644
index 0000000000000..2f020a0d1bee9
--- /dev/null
+++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/TransportClearRepositoriesStatsArchiveAction.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.repositories.metering.action;
+
+import org.elasticsearch.action.FailedNodeException;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.nodes.TransportNodesAction;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.repositories.RepositoriesService;
+import org.elasticsearch.repositories.RepositoryStatsSnapshot;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportRequest;
+import org.elasticsearch.transport.TransportService;
+
+import java.io.IOException;
+import java.util.List;
+
+public final class TransportClearRepositoriesStatsArchiveAction extends TransportNodesAction<
+ ClearRepositoriesMeteringArchiveRequest,
+ RepositoriesMeteringResponse,
+ TransportClearRepositoriesStatsArchiveAction.ClearRepositoriesStatsArchiveNodeRequest,
+ RepositoriesNodeMeteringResponse> {
+
+ private final RepositoriesService repositoriesService;
+
+ @Inject
+ public TransportClearRepositoriesStatsArchiveAction(
+ ThreadPool threadPool,
+ ClusterService clusterService,
+ TransportService transportService,
+ ActionFilters actionFilters,
+ RepositoriesService repositoriesService
+ ) {
+ super(
+ ClearRepositoriesMeteringArchiveAction.NAME,
+ threadPool,
+ clusterService,
+ transportService,
+ actionFilters,
+ ClearRepositoriesMeteringArchiveRequest::new,
+ ClearRepositoriesStatsArchiveNodeRequest::new,
+ ThreadPool.Names.SAME,
+ RepositoriesNodeMeteringResponse.class
+ );
+ this.repositoriesService = repositoriesService;
+ }
+
+ @Override
+ protected RepositoriesMeteringResponse newResponse(
+ ClearRepositoriesMeteringArchiveRequest request,
+ List nodesResponses,
+ List failures
+ ) {
+ return new RepositoriesMeteringResponse(clusterService.getClusterName(), nodesResponses, failures);
+ }
+
+ @Override
+ protected ClearRepositoriesStatsArchiveNodeRequest newNodeRequest(ClearRepositoriesMeteringArchiveRequest request) {
+ return new ClearRepositoriesStatsArchiveNodeRequest(request.getMaxVersionToClear());
+ }
+
+ @Override
+ protected RepositoriesNodeMeteringResponse newNodeResponse(StreamInput in) throws IOException {
+ return new RepositoriesNodeMeteringResponse(in);
+ }
+
+ @Override
+ protected RepositoriesNodeMeteringResponse nodeOperation(ClearRepositoriesStatsArchiveNodeRequest request, Task task) {
+ List clearedStats = repositoriesService.clearRepositoriesStatsArchive(request.maxVersionToClear);
+ return new RepositoriesNodeMeteringResponse(clusterService.localNode(), clearedStats);
+ }
+
+ static final class ClearRepositoriesStatsArchiveNodeRequest extends TransportRequest {
+ private final long maxVersionToClear;
+
+ ClearRepositoriesStatsArchiveNodeRequest(long maxVersionToClear) {
+ this.maxVersionToClear = maxVersionToClear;
+ }
+
+ ClearRepositoriesStatsArchiveNodeRequest(StreamInput in) throws IOException {
+ super(in);
+ this.maxVersionToClear = in.readLong();
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ super.writeTo(out);
+ out.writeLong(maxVersionToClear);
+ }
+ }
+}
diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/TransportRepositoriesStatsAction.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/TransportRepositoriesStatsAction.java
new file mode 100644
index 0000000000000..74541a70a5eeb
--- /dev/null
+++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/TransportRepositoriesStatsAction.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.repositories.metering.action;
+
+import org.elasticsearch.action.FailedNodeException;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.nodes.TransportNodesAction;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.repositories.RepositoriesService;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportRequest;
+import org.elasticsearch.transport.TransportService;
+
+import java.io.IOException;
+import java.util.List;
+
+public final class TransportRepositoriesStatsAction extends TransportNodesAction<
+ RepositoriesMeteringRequest,
+ RepositoriesMeteringResponse,
+ TransportRepositoriesStatsAction.RepositoriesNodeStatsRequest,
+ RepositoriesNodeMeteringResponse> {
+
+ private final RepositoriesService repositoriesService;
+
+ @Inject
+ public TransportRepositoriesStatsAction(
+ ThreadPool threadPool,
+ ClusterService clusterService,
+ TransportService transportService,
+ ActionFilters actionFilters,
+ RepositoriesService repositoriesService
+ ) {
+ super(
+ RepositoriesMeteringAction.NAME,
+ threadPool,
+ clusterService,
+ transportService,
+ actionFilters,
+ RepositoriesMeteringRequest::new,
+ RepositoriesNodeStatsRequest::new,
+ ThreadPool.Names.SAME,
+ RepositoriesNodeMeteringResponse.class
+ );
+ this.repositoriesService = repositoriesService;
+ }
+
+ @Override
+ protected RepositoriesMeteringResponse newResponse(
+ RepositoriesMeteringRequest request,
+ List repositoriesNodeStatsResponses,
+ List failures
+ ) {
+ return new RepositoriesMeteringResponse(clusterService.getClusterName(), repositoriesNodeStatsResponses, failures);
+ }
+
+ @Override
+ protected RepositoriesNodeStatsRequest newNodeRequest(RepositoriesMeteringRequest request) {
+ return new RepositoriesNodeStatsRequest();
+ }
+
+ @Override
+ protected RepositoriesNodeMeteringResponse newNodeResponse(StreamInput in) throws IOException {
+ return new RepositoriesNodeMeteringResponse(in);
+ }
+
+ @Override
+ protected RepositoriesNodeMeteringResponse nodeOperation(RepositoriesNodeStatsRequest request, Task task) {
+ return new RepositoriesNodeMeteringResponse(clusterService.localNode(), repositoriesService.repositoriesStats());
+ }
+
+ static final class RepositoriesNodeStatsRequest extends TransportRequest {
+ RepositoriesNodeStatsRequest() {}
+
+ RepositoriesNodeStatsRequest(StreamInput in) throws IOException {
+ super(in);
+ }
+ }
+}
diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/rest/RestClearRepositoriesMeteringArchiveAction.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/rest/RestClearRepositoriesMeteringArchiveAction.java
new file mode 100644
index 0000000000000..ddf15ae1c46a3
--- /dev/null
+++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/rest/RestClearRepositoriesMeteringArchiveAction.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.repositories.metering.rest;
+
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestActions;
+import org.elasticsearch.xpack.repositories.metering.action.ClearRepositoriesMeteringArchiveAction;
+import org.elasticsearch.xpack.repositories.metering.action.ClearRepositoriesMeteringArchiveRequest;
+
+import java.util.List;
+
+public class RestClearRepositoriesMeteringArchiveAction extends BaseRestHandler {
+ @Override
+ public String getName() {
+ return "clear_repositories_metrics_archive_action";
+ }
+
+ @Override
+ public List routes() {
+ return List.of(new Route(RestRequest.Method.DELETE, "/_nodes/{nodeId}/_repositories_metering/{maxVersionToClear}"));
+ }
+
+ @Override
+ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
+ String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId"));
+ long maxVersionToClear = request.paramAsLong("maxVersionToClear", -1);
+ ClearRepositoriesMeteringArchiveRequest clearArchivesRequest = new ClearRepositoriesMeteringArchiveRequest(
+ maxVersionToClear,
+ nodesIds
+ );
+ return channel -> client.execute(
+ ClearRepositoriesMeteringArchiveAction.INSTANCE,
+ clearArchivesRequest,
+ new RestActions.NodesResponseRestListener<>(channel)
+ );
+ }
+}
diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/rest/RestGetRepositoriesMeteringAction.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/rest/RestGetRepositoriesMeteringAction.java
new file mode 100644
index 0000000000000..4d3bfd49c7acd
--- /dev/null
+++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/rest/RestGetRepositoriesMeteringAction.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.repositories.metering.rest;
+
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestActions;
+import org.elasticsearch.xpack.repositories.metering.action.RepositoriesMeteringRequest;
+import org.elasticsearch.xpack.repositories.metering.action.RepositoriesMeteringAction;
+
+import java.util.List;
+
+public final class RestGetRepositoriesMeteringAction extends BaseRestHandler {
+
+ @Override
+ public String getName() {
+ return "get_repositories_metering_action";
+ }
+
+ @Override
+ public List routes() {
+ return List.of(new Route(RestRequest.Method.GET, "/_nodes/{nodeId}/_repositories_metering"));
+ }
+
+ @Override
+ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
+ String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId"));
+ RepositoriesMeteringRequest repositoriesMeteringRequest = new RepositoriesMeteringRequest(nodesIds);
+ return channel -> client.execute(
+ RepositoriesMeteringAction.INSTANCE,
+ repositoriesMeteringRequest,
+ new RestActions.NodesResponseRestListener<>(channel)
+ );
+ }
+}
diff --git a/x-pack/plugin/repositories-metering-api/src/test/java/org/elasticsearch/xpack/repositories/metering/AbstractRepositoriesMeteringAPIRestTestCase.java b/x-pack/plugin/repositories-metering-api/src/test/java/org/elasticsearch/xpack/repositories/metering/AbstractRepositoriesMeteringAPIRestTestCase.java
new file mode 100644
index 0000000000000..a716d22f15953
--- /dev/null
+++ b/x-pack/plugin/repositories-metering-api/src/test/java/org/elasticsearch/xpack/repositories/metering/AbstractRepositoriesMeteringAPIRestTestCase.java
@@ -0,0 +1,374 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.repositories.metering;
+
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpPost;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.common.CheckedBiConsumer;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.support.XContentMapValues;
+import org.elasticsearch.repositories.RepositoryInfo;
+import org.elasticsearch.repositories.RepositoryStats;
+import org.elasticsearch.repositories.RepositoryStatsSnapshot;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.test.rest.ESRestTestCase;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.blankOrNullString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+
+public abstract class AbstractRepositoriesMeteringAPIRestTestCase extends ESRestTestCase {
+ protected abstract String repositoryType();
+
+ protected abstract Map repositoryLocation();
+
+ protected abstract Settings repositorySettings();
+
+ /**
+ * New settings to force a new repository creation
+ */
+ protected abstract Settings updatedRepositorySettings();
+
+ protected abstract List readCounterKeys();
+
+ protected abstract List writeCounterKeys();
+
+ @Before
+ public void clearArchive() throws Exception {
+ clearRepositoriesStats(Long.MAX_VALUE);
+ }
+
+ public void testStatsAreTracked() throws Exception {
+ snapshotAndRestoreIndex((repository, index) -> {
+ List repoStats = getRepositoriesStats();
+ assertThat(repoStats.size(), equalTo(1));
+
+ RepositoryStatsSnapshot repositoryStats = repoStats.get(0);
+ assertRepositoryStatsBelongToRepository(repositoryStats, repository);
+ assertRequestCountersAccountedForReads(repositoryStats);
+ assertRequestCountersAccountedForWrites(repositoryStats);
+ });
+ }
+
+ public void testStatsAreUpdatedAfterRepositoryOperations() throws Exception {
+ String snapshot = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+ snapshotAndRestoreIndex(snapshot, (repository, index) -> {
+ List repoStatsBeforeRestore = getRepositoriesStats();
+ assertThat(repoStatsBeforeRestore.size(), equalTo(1));
+
+ RepositoryStatsSnapshot repositoryStatsBeforeRestore = repoStatsBeforeRestore.get(0);
+ Map requestCountsBeforeRestore = repositoryStatsBeforeRestore.getRepositoryStats().requestCounts;
+ assertRepositoryStatsBelongToRepository(repositoryStatsBeforeRestore, repository);
+ assertRequestCountersAccountedForReads(repositoryStatsBeforeRestore);
+ assertRequestCountersAccountedForWrites(repositoryStatsBeforeRestore);
+
+ deleteIndex(index);
+
+ restoreSnapshot(repository, snapshot, true);
+
+ List updatedRepoStats = getRepositoriesStats();
+ assertThat(updatedRepoStats.size(), equalTo(1));
+ RepositoryStatsSnapshot repoStatsAfterRestore = updatedRepoStats.get(0);
+ Map requestCountsAfterRestore = repoStatsAfterRestore.getRepositoryStats().requestCounts;
+
+ for (String readCounterKey : readCounterKeys()) {
+ assertThat(
+ requestCountsAfterRestore.get(readCounterKey),
+ greaterThanOrEqualTo(requestCountsBeforeRestore.get(readCounterKey))
+ );
+ }
+ });
+ }
+
+ public void testClearRepositoriesStats() throws Exception {
+ snapshotAndRestoreIndex((repository, index) -> {
+ deleteRepository(repository);
+
+ List repositoriesStatsBeforeClearing = getRepositoriesStats();
+ assertThat(repositoriesStatsBeforeClearing.size(), equalTo(1));
+ RepositoryStatsSnapshot repositoryStatsSnapshot = repositoriesStatsBeforeClearing.get(0);
+
+ assertThat(clearRepositoriesStats(-1).size(), equalTo(0));
+
+ List removedRepositoriesStats = clearRepositoriesStats(repositoryStatsSnapshot.getClusterVersion());
+
+ assertThat(repositoriesStatsBeforeClearing, equalTo(removedRepositoriesStats));
+
+ assertThat(getRepositoriesStats().size(), equalTo(0));
+ });
+ }
+
+ public void testRegisterMultipleRepositoriesAndGetStats() throws Exception {
+ List repositoryNames = List.of("repo-a", "repo-b", "repo-c");
+ for (String repositoryName : repositoryNames) {
+ registerRepository(repositoryName, repositoryType(), false, repositorySettings());
+ }
+
+ List repositoriesStats = getRepositoriesStats();
+ Map> repositoryStatsByName = repositoriesStats.stream()
+ .collect(Collectors.groupingBy(r -> r.getRepositoryInfo().name));
+
+ for (String repositoryName : repositoryNames) {
+ List repositoryStats = repositoryStatsByName.get(repositoryName);
+ assertThat(repositoryStats, is(notNullValue()));
+ assertThat(repositoryStats.size(), equalTo(1));
+
+ RepositoryStatsSnapshot stats = repositoryStats.get(0);
+ assertRepositoryStatsBelongToRepository(stats, repositoryName);
+ assertAllRequestCountsAreZero(stats);
+ }
+ }
+
+ public void testStatsAreArchivedAfterRepositoryDeletion() throws Exception {
+ snapshotAndRestoreIndex((repository, index) -> {
+ List repositoriesStats = getRepositoriesStats();
+ assertThat(repositoriesStats.size(), equalTo(1));
+ RepositoryStatsSnapshot statsBeforeRepoDeletion = repositoriesStats.get(0);
+ assertRepositoryStatsBelongToRepository(statsBeforeRepoDeletion, repository);
+
+ deleteRepository(repository);
+
+ List repoStatsAfterDeletion = getRepositoriesStats();
+ assertThat(repoStatsAfterDeletion.size(), equalTo(1));
+ RepositoryStatsSnapshot statsAfterRepoDeletion = repoStatsAfterDeletion.get(0);
+ assertStatsAreEqualsIgnoringStoppedAt(statsBeforeRepoDeletion, statsAfterRepoDeletion);
+ });
+ }
+
+ public void testStatsAreStoredIntoANewCounterInstanceAfterRepoConfigUpdate() throws Exception {
+ final String snapshot = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+ snapshotAndRestoreIndex(snapshot, (repository, index) -> {
+ List repositoriesStatsBeforeUpdate = getRepositoriesStats();
+ assertThat(repositoriesStatsBeforeUpdate.size(), equalTo(1));
+ assertRepositoryStatsBelongToRepository(repositoriesStatsBeforeUpdate.get(0), repository);
+ assertRequestCountersAccountedForReads(repositoriesStatsBeforeUpdate.get(0));
+ assertRequestCountersAccountedForWrites(repositoriesStatsBeforeUpdate.get(0));
+
+ // Update repository
+ registerRepository(repository, repositoryType(), false, updatedRepositorySettings());
+
+ List repositoriesStatsAfterUpdate = getRepositoriesStats();
+
+ assertThat(repositoriesStatsAfterUpdate.size(), equalTo(2));
+ assertStatsAreEqualsIgnoringStoppedAt(repositoriesStatsBeforeUpdate.get(0), repositoriesStatsAfterUpdate.get(0));
+
+ // The counters for the new repository instance are zero
+ assertAllRequestCountsAreZero(repositoriesStatsAfterUpdate.get(1));
+
+ deleteIndex(index);
+
+ restoreSnapshot(repository, snapshot, true);
+
+ List repoStatsAfterRestore = getRepositoriesStats();
+
+ assertThat(repoStatsAfterRestore.size(), equalTo(2));
+ assertStatsAreEqualsIgnoringStoppedAt(repositoriesStatsAfterUpdate.get(0), repoStatsAfterRestore.get(0));
+
+ assertRequestCountersAccountedForReads(repoStatsAfterRestore.get(1));
+ });
+ }
+
+ public void testDeleteThenAddRepositoryWithTheSameName() throws Exception {
+ snapshotAndRestoreIndex((repository, index) -> {
+ List repoStatsBeforeDeletion = getRepositoriesStats();
+ assertThat(repoStatsBeforeDeletion.size(), equalTo(1));
+
+ deleteRepository(repository);
+
+ List repoStatsAfterDeletion = getRepositoriesStats();
+ assertThat(repoStatsAfterDeletion.size(), equalTo(1));
+ assertStatsAreEqualsIgnoringStoppedAt(repoStatsBeforeDeletion.get(0), repoStatsAfterDeletion.get(0));
+
+ registerRepository(repository, repositoryType(), false, repositorySettings());
+
+ List repositoriesStatsAfterRegisteringTheSameRepo = getRepositoriesStats();
+ assertThat(repositoriesStatsAfterRegisteringTheSameRepo.size(), equalTo(2));
+ assertStatsAreEqualsIgnoringStoppedAt(repoStatsBeforeDeletion.get(0), repositoriesStatsAfterRegisteringTheSameRepo.get(0));
+ assertAllRequestCountsAreZero(repositoriesStatsAfterRegisteringTheSameRepo.get(1));
+ });
+ }
+
+ private void snapshotAndRestoreIndex(CheckedBiConsumer biConsumer) throws Exception {
+ final String snapshot = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+ snapshotAndRestoreIndex(snapshot, biConsumer);
+ }
+
+ private void snapshotAndRestoreIndex(String snapshot, CheckedBiConsumer biConsumer) throws Exception {
+ final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+ final String repository = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+ final int numberOfShards = randomIntBetween(1, 5);
+
+ final String repositoryType = repositoryType();
+ final Settings repositorySettings = repositorySettings();
+
+ registerRepository(repository, repositoryType, true, repositorySettings);
+
+ createIndex(
+ indexName,
+ Settings.builder()
+ .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)
+ .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
+ .build()
+ );
+ ensureGreen(indexName);
+
+ final int numDocs = randomIntBetween(1, 500);
+ final StringBuilder bulkBody = new StringBuilder();
+ for (int i = 0; i < numDocs; i++) {
+ bulkBody.append("{\"index\":{\"_id\":\"").append(i).append("\"}}\n");
+ bulkBody.append("{\"field\":").append(i).append(",\"text\":\"Document number ").append(i).append("\"}\n");
+ }
+
+ final Request documents = new Request(HttpPost.METHOD_NAME, '/' + indexName + "/_bulk");
+ documents.addParameter("refresh", Boolean.TRUE.toString());
+ documents.setJsonEntity(bulkBody.toString());
+ assertOK(client().performRequest(documents));
+
+ createSnapshot(repository, snapshot, true);
+
+ deleteIndex(indexName);
+
+ restoreSnapshot(repository, snapshot, true);
+
+ biConsumer.accept(repository, indexName);
+ }
+
+ private void assertRequestCountersAccountedForReads(RepositoryStatsSnapshot statsSnapshot) {
+ RepositoryStats repositoryStats = statsSnapshot.getRepositoryStats();
+ Map requestCounts = repositoryStats.requestCounts;
+ for (String readCounterKey : readCounterKeys()) {
+ assertThat(requestCounts.get(readCounterKey), is(notNullValue()));
+ assertThat(requestCounts.get(readCounterKey), is(greaterThan(0L)));
+ }
+ }
+
+ private void assertRequestCountersAccountedForWrites(RepositoryStatsSnapshot statsSnapshot) {
+ RepositoryStats repositoryStats = statsSnapshot.getRepositoryStats();
+ Map requestCounts = repositoryStats.requestCounts;
+ for (String writeCounterKey : writeCounterKeys()) {
+ assertThat(requestCounts.get(writeCounterKey), is(notNullValue()));
+ assertThat(requestCounts.get(writeCounterKey), is(greaterThan(0L)));
+ }
+ }
+
+ private void assertStatsAreEqualsIgnoringStoppedAt(RepositoryStatsSnapshot stats, RepositoryStatsSnapshot otherStats) {
+ assertRepositoryInfoIsEqualIgnoringStoppedAt(stats.getRepositoryInfo(), otherStats.getRepositoryInfo());
+ assertThat(stats.getRepositoryStats(), equalTo(otherStats.getRepositoryStats()));
+ }
+
+ private void assertRepositoryInfoIsEqualIgnoringStoppedAt(RepositoryInfo repositoryInfo, RepositoryInfo otherRepositoryInfo) {
+ assertThat(repositoryInfo.ephemeralId, equalTo(otherRepositoryInfo.ephemeralId));
+ assertThat(repositoryInfo.name, equalTo(otherRepositoryInfo.name));
+ assertThat(repositoryInfo.type, equalTo(otherRepositoryInfo.type));
+ assertThat(repositoryInfo.location, equalTo(otherRepositoryInfo.location));
+ assertThat(repositoryInfo.startedAt, equalTo(otherRepositoryInfo.startedAt));
+ }
+
+ private void assertRepositoryStatsBelongToRepository(RepositoryStatsSnapshot stats, String repositoryName) {
+ RepositoryInfo repositoryInfo = stats.getRepositoryInfo();
+ assertThat(repositoryInfo.name, equalTo(repositoryName));
+ assertThat(repositoryInfo.type, equalTo(repositoryType()));
+ assertThat(repositoryInfo.location, equalTo(repositoryLocation()));
+ }
+
+ private void assertAllRequestCountsAreZero(RepositoryStatsSnapshot statsSnapshot) {
+ RepositoryStats stats = statsSnapshot.getRepositoryStats();
+ for (long requestCount : stats.requestCounts.values()) {
+ assertThat(requestCount, equalTo(0));
+ }
+ }
+
+ private List getRepositoriesStats() throws IOException {
+ Map response = getAsMap("/_nodes/_all/_repositories_metering");
+ return parseRepositoriesStatsResponse(response);
+ }
+
+ private List parseRepositoriesStatsResponse(Map response) throws IOException {
+ Map>> nodesRepoStats = extractValue(response, "nodes");
+ assertThat(response.size(), greaterThan(0));
+ List repositoriesStats = new ArrayList<>();
+ for (String nodeId : getNodeIds()) {
+ List