diff --git a/storage/experimental/experimental.go b/storage/experimental/experimental.go index c83d1fa0935a..b35de64d39d2 100644 --- a/storage/experimental/experimental.go +++ b/storage/experimental/experimental.go @@ -25,10 +25,26 @@ import ( "time" "cloud.google.com/go/storage/internal" + "go.opentelemetry.io/otel/sdk/metric" "google.golang.org/api/option" ) -// WithReadStallTimeout provides a [ClientOption] that may be passed to [storage.NewClient]. +// WithMetricInterval provides a [option.ClientOption] that may be passed to [storage.NewGRPCClient]. +// It sets how often to emit metrics [metric.WithInterval] when using +// [metric.NewPeriodicReader] +// When using Cloud Monitoring interval must be at minimum 1 [time.Minute]. +func WithMetricInterval(metricInterval time.Duration) option.ClientOption { + return internal.WithMetricInterval.(func(time.Duration) option.ClientOption)(metricInterval) +} + +// WithMetricExporter provides a [option.ClientOption] that may be passed to [storage.NewGRPCClient]. +// Set an alternate client-side metric Exporter to emit metrics through. +// Must implement [metric.Exporter] +func WithMetricExporter(ex *metric.Exporter) option.ClientOption { + return internal.WithMetricExporter.(func(*metric.Exporter) option.ClientOption)(ex) +} + +// WithReadStallTimeout provides a [option.ClientOption] that may be passed to [storage.NewClient]. // It enables the client to retry stalled requests when starting a download from // Cloud Storage. If the timeout elapses with no response from the server, the request // is automatically retried. diff --git a/storage/grpc_client.go b/storage/grpc_client.go index eb327a3eeb48..40789270540e 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -129,7 +129,7 @@ func newGRPCStorageClient(ctx context.Context, opts ...storageOption) (storageCl if !config.disableClientMetrics { // Do not fail client creation if enabling metrics fails. - if metricsContext, err := enableClientMetrics(ctx, s); err == nil { + if metricsContext, err := enableClientMetrics(ctx, s, config); err == nil { s.metricsContext = metricsContext s.clientOption = append(s.clientOption, metricsContext.clientOpts...) } else { diff --git a/storage/grpc_metrics.go b/storage/grpc_metrics.go index 460a9d0a2b8f..149b37807ed4 100644 --- a/storage/grpc_metrics.go +++ b/storage/grpc_metrics.go @@ -134,8 +134,6 @@ func newPreparedResource(ctx context.Context, project string, resourceOptions [] } type metricsContext struct { - // project used by exporter - project string // client options passed to gRPC channels clientOpts []option.ClientOption // instance of metric reader used by gRPC client-side metrics @@ -154,29 +152,36 @@ func createHistogramView(name string, boundaries []float64) metric.View { }) } -func newGRPCMetricContext(ctx context.Context, project string) (*metricsContext, error) { - preparedResource, err := newPreparedResource(ctx, project, []resource.Option{resource.WithDetectors(gcp.NewDetector())}) - if err != nil { - return nil, err - } - // Implementation requires a project, if one is not determined possibly user - // credentials. Then we will fail stating gRPC Metrics require a project-id. - if project == "" && preparedResource.projectToUse != "" { - return nil, fmt.Errorf("google cloud project is required to start client-side metrics") - } - // If projectTouse isn't the same as project provided to Storage client, then - // emit a log stating which project is being used to emit metrics to. - if project != preparedResource.projectToUse { - log.Printf("The Project ID configured for metrics is %s, but the Project ID of the storage client is %s. Make sure that the service account in use has the required metric writing role (roles/monitoring.metricWriter) in the project projectIdToUse or metrics will not be written.", preparedResource.projectToUse, project) - } - meOpts := []mexporter.Option{ - mexporter.WithProjectID(preparedResource.projectToUse), - mexporter.WithMetricDescriptorTypeFormatter(metricFormatter), - mexporter.WithCreateServiceTimeSeries(), - mexporter.WithMonitoredResourceDescription(monitoredResourceName, []string{"project_id", "location", "cloud_platform", "host_id", "instance_id", "api"})} - exporter, err := mexporter.New(meOpts...) - if err != nil { - return nil, err +func newGRPCMetricContext(ctx context.Context, project string, config storageConfig) (*metricsContext, error) { + var exporter metric.Exporter + meterOpts := []metric.Option{} + if config.metricExporter != nil { + exporter = *config.metricExporter + } else { + preparedResource, err := newPreparedResource(ctx, project, []resource.Option{resource.WithDetectors(gcp.NewDetector())}) + if err != nil { + return nil, err + } + meterOpts = append(meterOpts, metric.WithResource(preparedResource.resource)) + // Implementation requires a project, if one is not determined possibly user + // credentials. Then we will fail stating gRPC Metrics require a project-id. + if project == "" && preparedResource.projectToUse == "" { + return nil, fmt.Errorf("google cloud project is required to start client-side metrics") + } + // If projectTouse isn't the same as project provided to Storage client, then + // emit a log stating which project is being used to emit metrics to. + if project != preparedResource.projectToUse { + log.Printf("The Project ID configured for metrics is %s, but the Project ID of the storage client is %s. Make sure that the service account in use has the required metric writing role (roles/monitoring.metricWriter) in the project projectIdToUse or metrics will not be written.", preparedResource.projectToUse, project) + } + meOpts := []mexporter.Option{ + mexporter.WithProjectID(preparedResource.projectToUse), + mexporter.WithMetricDescriptorTypeFormatter(metricFormatter), + mexporter.WithCreateServiceTimeSeries(), + mexporter.WithMonitoredResourceDescription(monitoredResourceName, []string{"project_id", "location", "cloud_platform", "host_id", "instance_id", "api"})} + exporter, err = mexporter.New(meOpts...) + if err != nil { + return nil, err + } } // Metric views update histogram boundaries to be relevant to GCS // otherwise default OTel histogram boundaries are used. @@ -185,11 +190,13 @@ func newGRPCMetricContext(ctx context.Context, project string) (*metricsContext, createHistogramView("grpc.client.attempt.rcvd_total_compressed_message_size", sizeHistogramBoundaries()), createHistogramView("grpc.client.attempt.sent_total_compressed_message_size", sizeHistogramBoundaries()), } - provider := metric.NewMeterProvider( - metric.WithReader(metric.NewPeriodicReader(&exporterLogSuppressor{exporter: exporter}, metric.WithInterval(time.Minute))), - metric.WithResource(preparedResource.resource), - metric.WithView(metricViews...), - ) + interval := time.Minute + if config.metricInterval > 0 { + interval = config.metricInterval + } + meterOpts = append(meterOpts, metric.WithReader(metric.NewPeriodicReader(&exporterLogSuppressor{exporter: exporter}, metric.WithInterval(interval))), + metric.WithView(metricViews...)) + provider := metric.NewMeterProvider(meterOpts...) mo := opentelemetry.MetricsOptions{ MeterProvider: provider, Metrics: opentelemetry.DefaultMetrics().Add( @@ -209,7 +216,6 @@ func newGRPCMetricContext(ctx context.Context, project string) (*metricsContext, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.StaticMethodCallOption{})), } context := &metricsContext{ - project: preparedResource.projectToUse, clientOpts: opts, provider: provider, close: createShutdown(ctx, provider), @@ -217,14 +223,14 @@ func newGRPCMetricContext(ctx context.Context, project string) (*metricsContext, return context, nil } -func enableClientMetrics(ctx context.Context, s *settings) (*metricsContext, error) { +func enableClientMetrics(ctx context.Context, s *settings, config storageConfig) (*metricsContext, error) { var project string c, err := transport.Creds(ctx, s.clientOption...) if err == nil { project = c.ProjectID } // Enable client-side metrics for gRPC - metricsContext, err := newGRPCMetricContext(ctx, project) + metricsContext, err := newGRPCMetricContext(ctx, project, config) if err != nil { return nil, fmt.Errorf("gRPC Metrics: %w", err) } diff --git a/storage/internal/experimental.go b/storage/internal/experimental.go index f22d67b36ed1..7db3ff52743c 100644 --- a/storage/internal/experimental.go +++ b/storage/internal/experimental.go @@ -17,6 +17,15 @@ package internal var ( + // WithMetricInterval is a function which is implemented by storage package. + // It sets how often to emit metrics when using NewPeriodicReader and must be + // greater than 1 minute. + WithMetricInterval any // func (*time.Duration) option.ClientOption + + // WithMetricExporter is a function which is implemented by storage package. + // Set an alternate client-side metric Exporter to emit metrics through. + WithMetricExporter any // func (*metric.Exporter) option.ClientOption + // WithReadStallTimeout is a function which is implemented by storage package. // It takes ReadStallTimeoutConfig as inputs and returns a option.ClientOption. WithReadStallTimeout any // func (*ReadStallTimeoutConfig) option.ClientOption diff --git a/storage/option.go b/storage/option.go index 7fca012c7557..3b0cf9e71831 100644 --- a/storage/option.go +++ b/storage/option.go @@ -21,6 +21,7 @@ import ( "cloud.google.com/go/storage/experimental" storageinternal "cloud.google.com/go/storage/internal" + "go.opentelemetry.io/otel/sdk/metric" "google.golang.org/api/option" "google.golang.org/api/option/internaloption" ) @@ -35,7 +36,9 @@ const ( ) func init() { - // initialize experimental option. + // initialize experimental options + storageinternal.WithMetricExporter = withMetricExporter + storageinternal.WithMetricInterval = withMetricInterval storageinternal.WithReadStallTimeout = withReadStallTimeout } @@ -69,12 +72,13 @@ func getDynamicReadReqInitialTimeoutSecFromEnv(defaultVal time.Duration) time.Du return val } -// storageConfig contains the Storage client option configuration that can be // set through storageClientOptions. type storageConfig struct { useJSONforReads bool readAPIWasSet bool disableClientMetrics bool + metricExporter *metric.Exporter + metricInterval time.Duration readStallTimeoutConfig *experimental.ReadStallTimeoutConfig } @@ -160,6 +164,34 @@ func (w *withDisabledClientMetrics) ApplyStorageOpt(c *storageConfig) { c.disableClientMetrics = w.disabledClientMetrics } +type withMeterOptions struct { + internaloption.EmbeddableAdapter + // set sampling interval + interval time.Duration +} + +func withMetricInterval(interval time.Duration) option.ClientOption { + return &withMeterOptions{interval: interval} +} + +func (w *withMeterOptions) ApplyStorageOpt(c *storageConfig) { + c.metricInterval = w.interval +} + +type withMetricExporterConfig struct { + internaloption.EmbeddableAdapter + // exporter override + metricExporter *metric.Exporter +} + +func withMetricExporter(ex *metric.Exporter) option.ClientOption { + return &withMetricExporterConfig{metricExporter: ex} +} + +func (w *withMetricExporterConfig) ApplyStorageOpt(c *storageConfig) { + c.metricExporter = w.metricExporter +} + // WithReadStallTimeout is an option that may be passed to [NewClient]. // It enables the client to retry the stalled read request, happens as part of // storage.Reader creation. As the name suggest, timeout is adjusted dynamically diff --git a/storage/option_test.go b/storage/option_test.go index 7c4f7acfdde2..2299215fcf86 100644 --- a/storage/option_test.go +++ b/storage/option_test.go @@ -20,6 +20,7 @@ import ( "time" "cloud.google.com/go/storage/experimental" + mexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric" "github.com/google/go-cmp/cmp" "google.golang.org/api/option" ) @@ -37,6 +38,8 @@ func TestApplyStorageOpt(t *testing.T) { useJSONforReads: true, readAPIWasSet: true, disableClientMetrics: false, + metricInterval: 0, + metricExporter: nil, }, }, { @@ -46,6 +49,8 @@ func TestApplyStorageOpt(t *testing.T) { useJSONforReads: false, readAPIWasSet: true, disableClientMetrics: false, + metricInterval: 0, + metricExporter: nil, }, }, { @@ -55,6 +60,8 @@ func TestApplyStorageOpt(t *testing.T) { useJSONforReads: false, readAPIWasSet: true, disableClientMetrics: false, + metricInterval: 0, + metricExporter: nil, }, }, { @@ -64,6 +71,8 @@ func TestApplyStorageOpt(t *testing.T) { useJSONforReads: false, readAPIWasSet: false, disableClientMetrics: false, + metricInterval: 0, + metricExporter: nil, }, }, { @@ -73,6 +82,8 @@ func TestApplyStorageOpt(t *testing.T) { useJSONforReads: false, readAPIWasSet: false, disableClientMetrics: false, + metricInterval: 0, + metricExporter: nil, }, }, { @@ -82,6 +93,19 @@ func TestApplyStorageOpt(t *testing.T) { useJSONforReads: false, readAPIWasSet: false, disableClientMetrics: true, + metricInterval: 0, + metricExporter: nil, + }, + }, + { + desc: "set metrics interval", + opts: []option.ClientOption{experimental.WithMetricInterval(time.Minute * 5)}, + want: storageConfig{ + useJSONforReads: false, + readAPIWasSet: false, + disableClientMetrics: false, + metricInterval: time.Minute * 5, + metricExporter: nil, }, }, { @@ -128,6 +152,24 @@ func TestApplyStorageOpt(t *testing.T) { } } +func TestSetCustomExporter(t *testing.T) { + exporter, err := mexporter.New() + if err != nil { + t.Errorf("TestSetCustomExporter: %v", err) + } + want := storageConfig{ + metricExporter: &exporter, + } + var got storageConfig + opt := experimental.WithMetricExporter(&exporter) + if storageOpt, ok := opt.(storageClientOption); ok { + storageOpt.ApplyStorageOpt(&got) + } + if got.metricExporter != want.metricExporter { + t.Errorf("TestSetCustomExpoerter: metricExporter want=%v, got=%v", want.metricExporter, got.metricExporter) + } +} + func TestGetDynamicReadReqInitialTimeoutSecFromEnv(t *testing.T) { defaultValue := 10 * time.Second