Skip to content

Commit

Permalink
Cherry-pick observability changes from master to v1.50.x and update v…
Browse files Browse the repository at this point in the history
…ersion to 1.50.1 (#5722)

* Add binary logger option for client and server (#5675)

* Add binary logger option for client and server

* gcp/observability: implement public preview config syntax, logging schema, and exposed metrics (#5704)

* Fix o11y typo (#5719)

* o11y: Fixed o11y bug (#5720)

* update version to 1.50.1
  • Loading branch information
zasweq authored Oct 14, 2022
1 parent 6576007 commit 4c776ec
Show file tree
Hide file tree
Showing 23 changed files with 2,202 additions and 2,193 deletions.
8 changes: 4 additions & 4 deletions default_dial_option_server_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s) TestAddExtraDialOptions(t *testing.T) {

// Set and check the DialOptions
opts := []DialOption{WithTransportCredentials(insecure.NewCredentials()), WithTransportCredentials(insecure.NewCredentials()), WithTransportCredentials(insecure.NewCredentials())}
internal.AddExtraDialOptions.(func(opt ...DialOption))(opts...)
internal.AddGlobalDialOptions.(func(opt ...DialOption))(opts...)
for i, opt := range opts {
if extraDialOptions[i] != opt {
t.Fatalf("Unexpected extra dial option at index %d: %v != %v", i, extraDialOptions[i], opt)
Expand All @@ -52,7 +52,7 @@ func (s) TestAddExtraDialOptions(t *testing.T) {
cc.Close()
}

internal.ClearExtraDialOptions()
internal.ClearGlobalDialOptions()
if len(extraDialOptions) != 0 {
t.Fatalf("Unexpected len of extraDialOptions: %d != 0", len(extraDialOptions))
}
Expand All @@ -62,7 +62,7 @@ func (s) TestAddExtraServerOptions(t *testing.T) {
const maxRecvSize = 998765
// Set and check the ServerOptions
opts := []ServerOption{Creds(insecure.NewCredentials()), MaxRecvMsgSize(maxRecvSize)}
internal.AddExtraServerOptions.(func(opt ...ServerOption))(opts...)
internal.AddGlobalServerOptions.(func(opt ...ServerOption))(opts...)
for i, opt := range opts {
if extraServerOptions[i] != opt {
t.Fatalf("Unexpected extra server option at index %d: %v != %v", i, extraServerOptions[i], opt)
Expand All @@ -75,7 +75,7 @@ func (s) TestAddExtraServerOptions(t *testing.T) {
t.Fatalf("Unexpected s.opts.maxReceiveMessageSize: %d != %d", s.opts.maxReceiveMessageSize, maxRecvSize)
}

internal.ClearExtraServerOptions()
internal.ClearGlobalServerOptions()
if len(extraServerOptions) != 0 {
t.Fatalf("Unexpected len of extraServerOptions: %d != 0", len(extraServerOptions))
}
Expand Down
15 changes: 13 additions & 2 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,21 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
internalbackoff "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/stats"
)

func init() {
internal.AddExtraDialOptions = func(opt ...DialOption) {
internal.AddGlobalDialOptions = func(opt ...DialOption) {
extraDialOptions = append(extraDialOptions, opt...)
}
internal.ClearExtraDialOptions = func() {
internal.ClearGlobalDialOptions = func() {
extraDialOptions = nil
}
internal.WithBinaryLogger = withBinaryLogger
}

// dialOptions configure a Dial call. dialOptions are set by the DialOption
Expand All @@ -61,6 +63,7 @@ type dialOptions struct {
timeout time.Duration
scChan <-chan ServiceConfig
authority string
binaryLogger binarylog.Logger
copts transport.ConnectOptions
callOptions []CallOption
channelzParentID *channelz.Identifier
Expand Down Expand Up @@ -401,6 +404,14 @@ func WithStatsHandler(h stats.Handler) DialOption {
})
}

// withBinaryLogger returns a DialOption that specifies the binary logger for
// this ClientConn.
func withBinaryLogger(bl binarylog.Logger) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.binaryLogger = bl
})
}

// FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on
// non-temporary dial errors. If f is true, and dialer returns a non-temporary
// error, gRPC will fail the connection to the network address and won't try to
Expand Down
246 changes: 163 additions & 83 deletions gcp/observability/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,86 +21,23 @@ package observability
import (
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
"regexp"

gcplogging "cloud.google.com/go/logging"
"golang.org/x/oauth2/google"
"google.golang.org/grpc/internal/envconfig"
)

const (
envObservabilityConfig = "GRPC_CONFIG_OBSERVABILITY"
envObservabilityConfigJSON = "GRPC_CONFIG_OBSERVABILITY_JSON"
envProjectID = "GOOGLE_CLOUD_PROJECT"
logFilterPatternRegexpStr = `^([\w./]+)/((?:\w+)|[*])$`
envProjectID = "GOOGLE_CLOUD_PROJECT"
methodStringRegexpStr = `^([\w./]+)/((?:\w+)|[*])$`
)

var logFilterPatternRegexp = regexp.MustCompile(logFilterPatternRegexpStr)

// logFilter represents a method logging configuration.
type logFilter struct {
// Pattern is a string which can select a group of method names. By
// default, the Pattern is an empty string, matching no methods.
//
// Only "*" Wildcard is accepted for Pattern. A Pattern is in the form
// of <service>/<method> or just a character "*" .
//
// If the Pattern is "*", it specifies the defaults for all the
// services; If the Pattern is <service>/*, it specifies the defaults
// for all methods in the specified service <service>; If the Pattern is
// */<method>, this is not supported.
//
// Examples:
// - "Foo/Bar" selects only the method "Bar" from service "Foo"
// - "Foo/*" selects all methods from service "Foo"
// - "*" selects all methods from all services.
Pattern string `json:"pattern,omitempty"`
// HeaderBytes is the number of bytes of each header to log. If the size of
// the header is greater than the defined limit, content past the limit will
// be truncated. The default value is 0.
HeaderBytes int32 `json:"header_bytes,omitempty"`
// MessageBytes is the number of bytes of each message to log. If the size
// of the message is greater than the defined limit, content pass the limit
// will be truncated. The default value is 0.
MessageBytes int32 `json:"message_bytes,omitempty"`
}

// config is configuration for observability behaviors. By default, no
// configuration is required for tracing/metrics/logging to function. This
// config captures the most common knobs for gRPC users. It's always possible to
// override with explicit config in code.
type config struct {
// EnableCloudTrace represents whether the tracing data upload to
// CloudTrace should be enabled or not.
EnableCloudTrace bool `json:"enable_cloud_trace,omitempty"`
// EnableCloudMonitoring represents whether the metrics data upload to
// CloudMonitoring should be enabled or not.
EnableCloudMonitoring bool `json:"enable_cloud_monitoring,omitempty"`
// EnableCloudLogging represents Whether the logging data upload to
// CloudLogging should be enabled or not.
EnableCloudLogging bool `json:"enable_cloud_logging,omitempty"`
// DestinationProjectID is the destination GCP project identifier for the
// uploading log entries. If empty, the gRPC Observability plugin will
// attempt to fetch the project_id from the GCP environment variables, or
// from the default credentials.
DestinationProjectID string `json:"destination_project_id,omitempty"`
// LogFilters is a list of method config. The order matters here - the first
// Pattern which matches the current method will apply the associated config
// options in the logFilter. Any other logFilter that also matches that
// comes later will be ignored. So a logFilter of "*/*" should appear last
// in this list.
LogFilters []logFilter `json:"log_filters,omitempty"`
// GlobalTraceSamplingRate is the global setting that controls the
// probability of a RPC being traced. For example, 0.05 means there is a 5%
// chance for a RPC to be traced, 1.0 means trace every call, 0 means don’t
// start new traces.
GlobalTraceSamplingRate float64 `json:"global_trace_sampling_rate,omitempty"`
// CustomTags a list of custom tags that will be attached to every log
// entry.
CustomTags map[string]string `json:"custom_tags,omitempty"`
}
var methodStringRegexp = regexp.MustCompile(methodStringRegexpStr)

// fetchDefaultProjectID fetches the default GCP project id from environment.
func fetchDefaultProjectID(ctx context.Context) string {
Expand All @@ -123,14 +60,34 @@ func fetchDefaultProjectID(ctx context.Context) string {
return credentials.ProjectID
}

func validateFilters(config *config) error {
for _, filter := range config.LogFilters {
if filter.Pattern == "*" {
func validateLogEventMethod(methods []string, exclude bool) error {
for _, method := range methods {
if method == "*" {
if exclude {
return errors.New("cannot have exclude and a '*' wildcard")
}
continue
}
match := logFilterPatternRegexp.FindStringSubmatch(filter.Pattern)
match := methodStringRegexp.FindStringSubmatch(method)
if match == nil {
return fmt.Errorf("invalid log filter Pattern: %v", filter.Pattern)
return fmt.Errorf("invalid method string: %v", method)
}
}
return nil
}

func validateLoggingEvents(config *config) error {
if config.CloudLogging == nil {
return nil
}
for _, clientRPCEvent := range config.CloudLogging.ClientRPCEvents {
if err := validateLogEventMethod(clientRPCEvent.Methods, clientRPCEvent.Exclude); err != nil {
return fmt.Errorf("error in clientRPCEvent method: %v", err)
}
}
for _, serverRPCEvent := range config.CloudLogging.ServerRPCEvents {
if err := validateLogEventMethod(serverRPCEvent.Methods, serverRPCEvent.Exclude); err != nil {
return fmt.Errorf("error in serverRPCEvent method: %v", err)
}
}
return nil
Expand All @@ -144,38 +101,161 @@ func unmarshalAndVerifyConfig(rawJSON json.RawMessage) (*config, error) {
if err := json.Unmarshal(rawJSON, &config); err != nil {
return nil, fmt.Errorf("error parsing observability config: %v", err)
}
if err := validateFilters(&config); err != nil {
if err := validateLoggingEvents(&config); err != nil {
return nil, fmt.Errorf("error parsing observability config: %v", err)
}
if config.GlobalTraceSamplingRate > 1 || config.GlobalTraceSamplingRate < 0 {
return nil, fmt.Errorf("error parsing observability config: invalid global trace sampling rate %v", config.GlobalTraceSamplingRate)
if config.CloudTrace != nil && (config.CloudTrace.SamplingRate > 1 || config.CloudTrace.SamplingRate < 0) {
return nil, fmt.Errorf("error parsing observability config: invalid cloud trace sampling rate %v", config.CloudTrace.SamplingRate)
}
logger.Infof("Parsed ObservabilityConfig: %+v", &config)
return &config, nil
}

func parseObservabilityConfig() (*config, error) {
if fileSystemPath := os.Getenv(envObservabilityConfigJSON); fileSystemPath != "" {
content, err := ioutil.ReadFile(fileSystemPath) // TODO: Switch to os.ReadFile once dropped support for go 1.15
if f := envconfig.ObservabilityConfigFile; f != "" {
if envconfig.ObservabilityConfig != "" {
logger.Warning("Ignoring GRPC_GCP_OBSERVABILITY_CONFIG and using GRPC_GCP_OBSERVABILITY_CONFIG_FILE contents.")
}
content, err := ioutil.ReadFile(f) // TODO: Switch to os.ReadFile once dropped support for go 1.15
if err != nil {
return nil, fmt.Errorf("error reading observability configuration file %q: %v", fileSystemPath, err)
return nil, fmt.Errorf("error reading observability configuration file %q: %v", f, err)
}
return unmarshalAndVerifyConfig(content)
} else if content := os.Getenv(envObservabilityConfig); content != "" {
return unmarshalAndVerifyConfig([]byte(content))
} else if envconfig.ObservabilityConfig != "" {
return unmarshalAndVerifyConfig([]byte(envconfig.ObservabilityConfig))
}
// If the ENV var doesn't exist, do nothing
return nil, nil
}

func ensureProjectIDInObservabilityConfig(ctx context.Context, config *config) error {
if config.DestinationProjectID == "" {
if config.ProjectID == "" {
// Try to fetch the GCP project id
projectID := fetchDefaultProjectID(ctx)
if projectID == "" {
return fmt.Errorf("empty destination project ID")
}
config.DestinationProjectID = projectID
config.ProjectID = projectID
}
return nil
}

type clientRPCEvents struct {
// Methods is a list of strings which can select a group of methods. By
// default, the list is empty, matching no methods.
//
// The value of the method is in the form of <service>/<method>.
//
// "*" is accepted as a wildcard for:
// 1. The method name. If the value is <service>/*, it matches all
// methods in the specified service.
// 2. The whole value of the field which matches any <service>/<method>.
// It’s not supported when Exclude is true.
// 3. The * wildcard cannot be used on the service name independently,
// */<method> is not supported.
//
// The service name, when specified, must be the fully qualified service
// name, including the package name.
//
// Examples:
// 1."goo.Foo/Bar" selects only the method "Bar" from service "goo.Foo",
// here “goo” is the package name.
// 2."goo.Foo/*" selects all methods from service "goo.Foo"
// 3. "*" selects all methods from all services.
Methods []string `json:"methods,omitempty"`
// Exclude represents whether the methods denoted by Methods should be
// excluded from logging. The default value is false, meaning the methods
// denoted by Methods are included in the logging. If Exclude is true, the
// wildcard `*` cannot be used as value of an entry in Methods.
Exclude bool `json:"exclude,omitempty"`
// MaxMetadataBytes is the maximum number of bytes of each header to log. If
// the size of the metadata is greater than the defined limit, content past
// the limit will be truncated. The default value is 0.
MaxMetadataBytes int `json:"max_metadata_bytes"`
// MaxMessageBytes is the maximum number of bytes of each message to log. If
// the size of the message is greater than the defined limit, content past
// the limit will be truncated. The default value is 0.
MaxMessageBytes int `json:"max_message_bytes"`
}

type serverRPCEvents struct {
// Methods is a list of strings which can select a group of methods. By
// default, the list is empty, matching no methods.
//
// The value of the method is in the form of <service>/<method>.
//
// "*" is accepted as a wildcard for:
// 1. The method name. If the value is <service>/*, it matches all
// methods in the specified service.
// 2. The whole value of the field which matches any <service>/<method>.
// It’s not supported when Exclude is true.
// 3. The * wildcard cannot be used on the service name independently,
// */<method> is not supported.
//
// The service name, when specified, must be the fully qualified service
// name, including the package name.
//
// Examples:
// 1."goo.Foo/Bar" selects only the method "Bar" from service "goo.Foo",
// here “goo” is the package name.
// 2."goo.Foo/*" selects all methods from service "goo.Foo"
// 3. "*" selects all methods from all services.
Methods []string `json:"methods,omitempty"`
// Exclude represents whether the methods denoted by Methods should be
// excluded from logging. The default value is false, meaning the methods
// denoted by Methods are included in the logging. If Exclude is true, the
// wildcard `*` cannot be used as value of an entry in Methods.
Exclude bool `json:"exclude,omitempty"`
// MaxMetadataBytes is the maximum number of bytes of each header to log. If
// the size of the metadata is greater than the defined limit, content past
// the limit will be truncated. The default value is 0.
MaxMetadataBytes int `json:"max_metadata_bytes"`
// MaxMessageBytes is the maximum number of bytes of each message to log. If
// the size of the message is greater than the defined limit, content past
// the limit will be truncated. The default value is 0.
MaxMessageBytes int `json:"max_message_bytes"`
}

type cloudLogging struct {
// ClientRPCEvents represents the configuration for outgoing RPC's from the
// binary. The client_rpc_events configs are evaluated in text order, the
// first one matched is used. If an RPC doesn't match an entry, it will
// continue on to the next entry in the list.
ClientRPCEvents []clientRPCEvents `json:"client_rpc_events,omitempty"`

// ServerRPCEvents represents the configuration for incoming RPC's to the
// binary. The server_rpc_events configs are evaluated in text order, the
// first one matched is used. If an RPC doesn't match an entry, it will
// continue on to the next entry in the list.
ServerRPCEvents []serverRPCEvents `json:"server_rpc_events,omitempty"`
}

type cloudMonitoring struct{}

type cloudTrace struct {
// SamplingRate is the global setting that controls the probability of a RPC
// being traced. For example, 0.05 means there is a 5% chance for a RPC to
// be traced, 1.0 means trace every call, 0 means don’t start new traces. By
// default, the sampling_rate is 0.
SamplingRate float64 `json:"sampling_rate,omitempty"`
}

type config struct {
// ProjectID is the destination GCP project identifier for uploading log
// entries. If empty, the gRPC Observability plugin will attempt to fetch
// the project_id from the GCP environment variables, or from the default
// credentials. If not found, the observability init functions will return
// an error.
ProjectID string `json:"project_id,omitempty"`
// CloudLogging defines the logging options. If not present, logging is disabled.
CloudLogging *cloudLogging `json:"cloud_logging,omitempty"`
// CloudMonitoring determines whether or not metrics are enabled based on
// whether it is present or not. If present, monitoring will be enabled, if
// not present, monitoring is disabled.
CloudMonitoring *cloudMonitoring `json:"cloud_monitoring,omitempty"`
// CloudTrace defines the tracing options. When present, tracing is enabled
// with default configurations. When absent, the tracing is disabled.
CloudTrace *cloudTrace `json:"cloud_trace,omitempty"`
// Labels are applied to cloud logging, monitoring, and trace.
Labels map[string]string `json:"labels,omitempty"`
}
Loading

0 comments on commit 4c776ec

Please sign in to comment.