diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 98b2e38d..4128356d 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -364,6 +364,12 @@ validate_additional_rrs: [ preferred_ip_protocol: ] [ ip_protocol_fallback: | default = true ] +# gRPC client metadata +metadata: + [ : + [ - ], ... + ], ... + # Whether to connect to the endpoint with TLS. [ tls: ] diff --git a/config/config.go b/config/config.go index b714ed54..e65dd7b2 100644 --- a/config/config.go +++ b/config/config.go @@ -28,6 +28,8 @@ import ( "sync" "time" + "google.golang.org/grpc/metadata" + "github.com/google/cel-go/cel" "go.yaml.in/yaml/v3" @@ -327,6 +329,7 @@ type GRPCProbe struct { TLSConfig config.TLSConfig `yaml:"tls_config,omitempty"` IPProtocolFallback bool `yaml:"ip_protocol_fallback,omitempty"` PreferredIPProtocol string `yaml:"preferred_ip_protocol,omitempty"` + Metadata metadata.MD `yaml:"metadata,omitempty"` } type HeaderMatch struct { diff --git a/config/testdata/blackbox-good.yml b/config/testdata/blackbox-good.yml index 6260b835..7cbbe64f 100644 --- a/config/testdata/blackbox-good.yml +++ b/config/testdata/blackbox-good.yml @@ -78,3 +78,13 @@ modules: - header: Access-Control-Allow-Origin regexp: '(\*|example\.com)' allow_missing: false + grpc_health: + prober: grpc_probe + grpc: + service: example + metadata: + key: + - value1 + - value2 + authorization: + - "Bearer token" diff --git a/prober/grpc.go b/prober/grpc.go index fe8d1eca..c82420e2 100644 --- a/prober/grpc.go +++ b/prober/grpc.go @@ -29,12 +29,13 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/grpc/status" ) type GRPCHealthCheck interface { - Check(c context.Context, service string) (bool, codes.Code, *peer.Peer, string, error) + Check(c context.Context, service string, md metadata.MD) (bool, codes.Code, *peer.Peer, string, error) } type gRPCHealthCheckClient struct { @@ -53,15 +54,17 @@ func (c *gRPCHealthCheckClient) Close() error { return c.conn.Close() } -func (c *gRPCHealthCheckClient) Check(ctx context.Context, service string) (bool, codes.Code, *peer.Peer, string, error) { +func (c *gRPCHealthCheckClient) Check(ctx context.Context, service string, md metadata.MD) (bool, codes.Code, *peer.Peer, string, error) { var res *grpc_health_v1.HealthCheckResponse var err error req := grpc_health_v1.HealthCheckRequest{ Service: service, } + metadataCtx := metadata.NewOutgoingContext(ctx, md) + serverPeer := new(peer.Peer) - res, err = c.client.Check(ctx, &req, grpc.Peer(serverPeer)) + res, err = c.client.Check(metadataCtx, &req, grpc.Peer(serverPeer)) if err == nil { if res.GetStatus() == grpc_health_v1.HealthCheckResponse_SERVING { return true, codes.OK, serverPeer, res.Status.String(), nil @@ -138,6 +141,8 @@ func ProbeGRPC(ctx context.Context, target string, module config.Module, registr targetHost = targetURL.Host } + md := module.GRPC.Metadata + tlsConfig, err := pconfig.NewTLSConfig(&module.GRPC.TLSConfig) if err != nil { logger.Error("Error creating TLS configuration", "err", err) @@ -187,7 +192,7 @@ func ProbeGRPC(ctx context.Context, target string, module config.Module, registr client := NewGrpcHealthCheckClient(conn) defer conn.Close() - ok, statusCode, serverPeer, servingStatus, err := client.Check(context.Background(), module.GRPC.Service) + ok, statusCode, serverPeer, servingStatus, err := client.Check(context.Background(), module.GRPC.Service, md) durationGaugeVec.WithLabelValues("check").Add(time.Since(checkStart).Seconds()) for servingStatusName := range grpc_health_v1.HealthCheckResponse_ServingStatus_value { diff --git a/prober/grpc_test.go b/prober/grpc_test.go index 2a2c4aeb..fc5ea0ec 100644 --- a/prober/grpc_test.go +++ b/prober/grpc_test.go @@ -24,6 +24,8 @@ import ( "testing" "time" + "google.golang.org/grpc/metadata" + "github.com/prometheus/blackbox_exporter/config" "github.com/prometheus/client_golang/prometheus" pconfig "github.com/prometheus/common/config" @@ -102,6 +104,113 @@ func TestGRPCConnection(t *testing.T) { checkRegistryResults(expectedResults, mfs, t) } +func TestGRPCConnectionWithMetadata(t *testing.T) { + if os.Getenv("CI") == "true" { + t.Skip("skipping; CI is failing on ipv6 dns requests") + } + + binaryMetadataValue := []byte{'t', 'e', 's', 't'} + + ln, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Error listening on socket: %s", err) + } + defer ln.Close() + + _, port, err := net.SplitHostPort(ln.Addr().String()) + if err != nil { + t.Fatalf("Error retrieving port for socket: %s", err) + } + + metadataUnaryInterceptor := func(ctx context.Context, + req interface{}, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler) (interface{}, error) { + + h, err := handler(ctx, req) + md, _ := metadata.FromIncomingContext(ctx) + + expectedMetadata := map[string][]string{ + "key1": {"value1", "value2"}, + "key2-bin": {string(binaryMetadataValue)}, + "authorization": {"Bearer token"}, + } + + for key, expectedValues := range expectedMetadata { + actualValues := md.Get(key) + if len(actualValues) != len(expectedValues) { + t.Fatalf("Metadata key '%s' length mismatch. Expected %d, got %d", key, len(expectedValues), len(actualValues)) + } + for i, expectedValue := range expectedValues { + if actualValues[i] != expectedValue { + t.Fatalf("Metadata key '%s' value mismatch at index %d. Expected '%s', got '%s'", key, i, expectedValue, actualValues[i]) + } + } + } + + return h, err + } + + serverInterceptor := grpc.UnaryInterceptor(metadataUnaryInterceptor) + + s := grpc.NewServer(serverInterceptor) + healthServer := health.NewServer() + healthServer.SetServingStatus("service", grpc_health_v1.HealthCheckResponse_SERVING) + grpc_health_v1.RegisterHealthServer(s, healthServer) + + go func() { + if err := s.Serve(ln); err != nil { + t.Errorf("failed to serve: %v", err) + return + } + }() + defer s.GracefulStop() + + testCTX, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + registry := prometheus.NewRegistry() + + result := ProbeGRPC(testCTX, "localhost:"+port, + config.Module{Timeout: time.Second, GRPC: config.GRPCProbe{ + IPProtocolFallback: false, + Metadata: metadata.Pairs("key1", "value1", + "key1", "value2", + "key2-bin", string(binaryMetadataValue), + "Authorization", "Bearer token", + ), + }, + }, registry, promslog.NewNopLogger()) + + if !result { + t.Fatalf("GRPC probe failed") + } + + mfs, err := registry.Gather() + if err != nil { + t.Fatal(err) + } + + expectedMetrics := map[string]map[string]map[string]struct{}{ + "probe_grpc_healthcheck_response": { + "serving_status": { + "UNKNOWN": {}, + "SERVING": {}, + "NOT_SERVING": {}, + "SERVICE_UNKNOWN": {}, + }, + }, + } + + checkMetrics(expectedMetrics, mfs, t) + + expectedResults := map[string]float64{ + "probe_grpc_ssl": 0, + "probe_grpc_status_code": 0, + } + + checkRegistryResults(expectedResults, mfs, t) +} + func TestMultipleGRPCservices(t *testing.T) { if os.Getenv("CI") == "true" { t.Skip("skipping; CI is failing on ipv6 dns requests")