Skip to content

Commit

Permalink
feat(storage): add grpc metrics experimental options (#10984)
Browse files Browse the repository at this point in the history
* feat(storage): add experimental options

* use internal for experimental option introduction

* reduce to minimal changes

* remove comments from unexported options

* fix issues committed on merging main..

* address review feedback

* revert changes to go.work.sum

* vet.sh and update comments

* address feedback

* allow any non-zero interval

* note monitoring interval

---------

Co-authored-by: Chris Cotter <[email protected]>
  • Loading branch information
frankyn and tritone authored Oct 18, 2024
1 parent 5e363a3 commit 5b7397b
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 37 deletions.
18 changes: 17 additions & 1 deletion storage/experimental/experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,26 @@ import (
"time"

"cloud.google.com/go/storage/internal"
"go.opentelemetry.io/otel/sdk/metric"
"google.golang.org/api/option"
)

// WithReadStallTimeout provides a [ClientOption] that may be passed to [storage.NewClient].
// WithMetricInterval provides a [option.ClientOption] that may be passed to [storage.NewGRPCClient].
// It sets how often to emit metrics [metric.WithInterval] when using
// [metric.NewPeriodicReader]
// When using Cloud Monitoring interval must be at minimum 1 [time.Minute].
func WithMetricInterval(metricInterval time.Duration) option.ClientOption {
return internal.WithMetricInterval.(func(time.Duration) option.ClientOption)(metricInterval)
}

// WithMetricExporter provides a [option.ClientOption] that may be passed to [storage.NewGRPCClient].
// Set an alternate client-side metric Exporter to emit metrics through.
// Must implement [metric.Exporter]
func WithMetricExporter(ex *metric.Exporter) option.ClientOption {
return internal.WithMetricExporter.(func(*metric.Exporter) option.ClientOption)(ex)
}

// WithReadStallTimeout provides a [option.ClientOption] that may be passed to [storage.NewClient].
// It enables the client to retry stalled requests when starting a download from
// Cloud Storage. If the timeout elapses with no response from the server, the request
// is automatically retried.
Expand Down
2 changes: 1 addition & 1 deletion storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func newGRPCStorageClient(ctx context.Context, opts ...storageOption) (storageCl

if !config.disableClientMetrics {
// Do not fail client creation if enabling metrics fails.
if metricsContext, err := enableClientMetrics(ctx, s); err == nil {
if metricsContext, err := enableClientMetrics(ctx, s, config); err == nil {
s.metricsContext = metricsContext
s.clientOption = append(s.clientOption, metricsContext.clientOpts...)
} else {
Expand Down
72 changes: 39 additions & 33 deletions storage/grpc_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,6 @@ func newPreparedResource(ctx context.Context, project string, resourceOptions []
}

type metricsContext struct {
// project used by exporter
project string
// client options passed to gRPC channels
clientOpts []option.ClientOption
// instance of metric reader used by gRPC client-side metrics
Expand All @@ -154,29 +152,36 @@ func createHistogramView(name string, boundaries []float64) metric.View {
})
}

func newGRPCMetricContext(ctx context.Context, project string) (*metricsContext, error) {
preparedResource, err := newPreparedResource(ctx, project, []resource.Option{resource.WithDetectors(gcp.NewDetector())})
if err != nil {
return nil, err
}
// Implementation requires a project, if one is not determined possibly user
// credentials. Then we will fail stating gRPC Metrics require a project-id.
if project == "" && preparedResource.projectToUse != "" {
return nil, fmt.Errorf("google cloud project is required to start client-side metrics")
}
// If projectTouse isn't the same as project provided to Storage client, then
// emit a log stating which project is being used to emit metrics to.
if project != preparedResource.projectToUse {
log.Printf("The Project ID configured for metrics is %s, but the Project ID of the storage client is %s. Make sure that the service account in use has the required metric writing role (roles/monitoring.metricWriter) in the project projectIdToUse or metrics will not be written.", preparedResource.projectToUse, project)
}
meOpts := []mexporter.Option{
mexporter.WithProjectID(preparedResource.projectToUse),
mexporter.WithMetricDescriptorTypeFormatter(metricFormatter),
mexporter.WithCreateServiceTimeSeries(),
mexporter.WithMonitoredResourceDescription(monitoredResourceName, []string{"project_id", "location", "cloud_platform", "host_id", "instance_id", "api"})}
exporter, err := mexporter.New(meOpts...)
if err != nil {
return nil, err
func newGRPCMetricContext(ctx context.Context, project string, config storageConfig) (*metricsContext, error) {
var exporter metric.Exporter
meterOpts := []metric.Option{}
if config.metricExporter != nil {
exporter = *config.metricExporter
} else {
preparedResource, err := newPreparedResource(ctx, project, []resource.Option{resource.WithDetectors(gcp.NewDetector())})
if err != nil {
return nil, err
}
meterOpts = append(meterOpts, metric.WithResource(preparedResource.resource))
// Implementation requires a project, if one is not determined possibly user
// credentials. Then we will fail stating gRPC Metrics require a project-id.
if project == "" && preparedResource.projectToUse == "" {
return nil, fmt.Errorf("google cloud project is required to start client-side metrics")
}
// If projectTouse isn't the same as project provided to Storage client, then
// emit a log stating which project is being used to emit metrics to.
if project != preparedResource.projectToUse {
log.Printf("The Project ID configured for metrics is %s, but the Project ID of the storage client is %s. Make sure that the service account in use has the required metric writing role (roles/monitoring.metricWriter) in the project projectIdToUse or metrics will not be written.", preparedResource.projectToUse, project)
}
meOpts := []mexporter.Option{
mexporter.WithProjectID(preparedResource.projectToUse),
mexporter.WithMetricDescriptorTypeFormatter(metricFormatter),
mexporter.WithCreateServiceTimeSeries(),
mexporter.WithMonitoredResourceDescription(monitoredResourceName, []string{"project_id", "location", "cloud_platform", "host_id", "instance_id", "api"})}
exporter, err = mexporter.New(meOpts...)
if err != nil {
return nil, err
}
}
// Metric views update histogram boundaries to be relevant to GCS
// otherwise default OTel histogram boundaries are used.
Expand All @@ -185,11 +190,13 @@ func newGRPCMetricContext(ctx context.Context, project string) (*metricsContext,
createHistogramView("grpc.client.attempt.rcvd_total_compressed_message_size", sizeHistogramBoundaries()),
createHistogramView("grpc.client.attempt.sent_total_compressed_message_size", sizeHistogramBoundaries()),
}
provider := metric.NewMeterProvider(
metric.WithReader(metric.NewPeriodicReader(&exporterLogSuppressor{exporter: exporter}, metric.WithInterval(time.Minute))),
metric.WithResource(preparedResource.resource),
metric.WithView(metricViews...),
)
interval := time.Minute
if config.metricInterval > 0 {
interval = config.metricInterval
}
meterOpts = append(meterOpts, metric.WithReader(metric.NewPeriodicReader(&exporterLogSuppressor{exporter: exporter}, metric.WithInterval(interval))),
metric.WithView(metricViews...))
provider := metric.NewMeterProvider(meterOpts...)
mo := opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics().Add(
Expand All @@ -209,22 +216,21 @@ func newGRPCMetricContext(ctx context.Context, project string) (*metricsContext,
option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.StaticMethodCallOption{})),
}
context := &metricsContext{
project: preparedResource.projectToUse,
clientOpts: opts,
provider: provider,
close: createShutdown(ctx, provider),
}
return context, nil
}

func enableClientMetrics(ctx context.Context, s *settings) (*metricsContext, error) {
func enableClientMetrics(ctx context.Context, s *settings, config storageConfig) (*metricsContext, error) {
var project string
c, err := transport.Creds(ctx, s.clientOption...)
if err == nil {
project = c.ProjectID
}
// Enable client-side metrics for gRPC
metricsContext, err := newGRPCMetricContext(ctx, project)
metricsContext, err := newGRPCMetricContext(ctx, project, config)
if err != nil {
return nil, fmt.Errorf("gRPC Metrics: %w", err)
}
Expand Down
9 changes: 9 additions & 0 deletions storage/internal/experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@
package internal

var (
// WithMetricInterval is a function which is implemented by storage package.
// It sets how often to emit metrics when using NewPeriodicReader and must be
// greater than 1 minute.
WithMetricInterval any // func (*time.Duration) option.ClientOption

// WithMetricExporter is a function which is implemented by storage package.
// Set an alternate client-side metric Exporter to emit metrics through.
WithMetricExporter any // func (*metric.Exporter) option.ClientOption

// WithReadStallTimeout is a function which is implemented by storage package.
// It takes ReadStallTimeoutConfig as inputs and returns a option.ClientOption.
WithReadStallTimeout any // func (*ReadStallTimeoutConfig) option.ClientOption
Expand Down
36 changes: 34 additions & 2 deletions storage/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"cloud.google.com/go/storage/experimental"
storageinternal "cloud.google.com/go/storage/internal"
"go.opentelemetry.io/otel/sdk/metric"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
)
Expand All @@ -35,7 +36,9 @@ const (
)

func init() {
// initialize experimental option.
// initialize experimental options
storageinternal.WithMetricExporter = withMetricExporter
storageinternal.WithMetricInterval = withMetricInterval
storageinternal.WithReadStallTimeout = withReadStallTimeout
}

Expand Down Expand Up @@ -69,12 +72,13 @@ func getDynamicReadReqInitialTimeoutSecFromEnv(defaultVal time.Duration) time.Du
return val
}

// storageConfig contains the Storage client option configuration that can be
// set through storageClientOptions.
type storageConfig struct {
useJSONforReads bool
readAPIWasSet bool
disableClientMetrics bool
metricExporter *metric.Exporter
metricInterval time.Duration
readStallTimeoutConfig *experimental.ReadStallTimeoutConfig
}

Expand Down Expand Up @@ -160,6 +164,34 @@ func (w *withDisabledClientMetrics) ApplyStorageOpt(c *storageConfig) {
c.disableClientMetrics = w.disabledClientMetrics
}

type withMeterOptions struct {
internaloption.EmbeddableAdapter
// set sampling interval
interval time.Duration
}

func withMetricInterval(interval time.Duration) option.ClientOption {
return &withMeterOptions{interval: interval}
}

func (w *withMeterOptions) ApplyStorageOpt(c *storageConfig) {
c.metricInterval = w.interval
}

type withMetricExporterConfig struct {
internaloption.EmbeddableAdapter
// exporter override
metricExporter *metric.Exporter
}

func withMetricExporter(ex *metric.Exporter) option.ClientOption {
return &withMetricExporterConfig{metricExporter: ex}
}

func (w *withMetricExporterConfig) ApplyStorageOpt(c *storageConfig) {
c.metricExporter = w.metricExporter
}

// WithReadStallTimeout is an option that may be passed to [NewClient].
// It enables the client to retry the stalled read request, happens as part of
// storage.Reader creation. As the name suggest, timeout is adjusted dynamically
Expand Down
42 changes: 42 additions & 0 deletions storage/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"cloud.google.com/go/storage/experimental"
mexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric"
"github.com/google/go-cmp/cmp"
"google.golang.org/api/option"
)
Expand All @@ -37,6 +38,8 @@ func TestApplyStorageOpt(t *testing.T) {
useJSONforReads: true,
readAPIWasSet: true,
disableClientMetrics: false,
metricInterval: 0,
metricExporter: nil,
},
},
{
Expand All @@ -46,6 +49,8 @@ func TestApplyStorageOpt(t *testing.T) {
useJSONforReads: false,
readAPIWasSet: true,
disableClientMetrics: false,
metricInterval: 0,
metricExporter: nil,
},
},
{
Expand All @@ -55,6 +60,8 @@ func TestApplyStorageOpt(t *testing.T) {
useJSONforReads: false,
readAPIWasSet: true,
disableClientMetrics: false,
metricInterval: 0,
metricExporter: nil,
},
},
{
Expand All @@ -64,6 +71,8 @@ func TestApplyStorageOpt(t *testing.T) {
useJSONforReads: false,
readAPIWasSet: false,
disableClientMetrics: false,
metricInterval: 0,
metricExporter: nil,
},
},
{
Expand All @@ -73,6 +82,8 @@ func TestApplyStorageOpt(t *testing.T) {
useJSONforReads: false,
readAPIWasSet: false,
disableClientMetrics: false,
metricInterval: 0,
metricExporter: nil,
},
},
{
Expand All @@ -82,6 +93,19 @@ func TestApplyStorageOpt(t *testing.T) {
useJSONforReads: false,
readAPIWasSet: false,
disableClientMetrics: true,
metricInterval: 0,
metricExporter: nil,
},
},
{
desc: "set metrics interval",
opts: []option.ClientOption{experimental.WithMetricInterval(time.Minute * 5)},
want: storageConfig{
useJSONforReads: false,
readAPIWasSet: false,
disableClientMetrics: false,
metricInterval: time.Minute * 5,
metricExporter: nil,
},
},
{
Expand Down Expand Up @@ -128,6 +152,24 @@ func TestApplyStorageOpt(t *testing.T) {
}
}

func TestSetCustomExporter(t *testing.T) {
exporter, err := mexporter.New()
if err != nil {
t.Errorf("TestSetCustomExporter: %v", err)
}
want := storageConfig{
metricExporter: &exporter,
}
var got storageConfig
opt := experimental.WithMetricExporter(&exporter)
if storageOpt, ok := opt.(storageClientOption); ok {
storageOpt.ApplyStorageOpt(&got)
}
if got.metricExporter != want.metricExporter {
t.Errorf("TestSetCustomExpoerter: metricExporter want=%v, got=%v", want.metricExporter, got.metricExporter)
}
}

func TestGetDynamicReadReqInitialTimeoutSecFromEnv(t *testing.T) {
defaultValue := 10 * time.Second

Expand Down

0 comments on commit 5b7397b

Please sign in to comment.