Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions examples/otel-headers/resources/gateway.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,21 @@
apiVersion: gateway.envoyproxy.io/v1alpha1
kind: Backend
metadata:
name: otel-collector
spec:
endpoints:
- fqdn:
hostname: localhost
port: 4317
# Use below for cloud OTLP endpoints
# endpoints:
# - fqdn:
# hostname: otel.example.com
# port: 443
# tls:
# wellKnownCACertificates: System
---
apiVersion: gateway.envoyproxy.io/v1alpha1
kind: EnvoyProxy
metadata:
name: otel-example
Expand All @@ -10,8 +27,10 @@ spec:
sinks:
- type: OpenTelemetry
openTelemetry:
host: localhost
port: 4317
backendRefs:
- group: gateway.envoyproxy.io
kind: Backend
name: otel-collector
# Delta temporality required for backends like otel-tui and Elastic
reportCountersAsDeltas: true
reportHistogramsAsDeltas: true
Expand All @@ -23,8 +42,10 @@ spec:
samplingRate: 100
provider:
type: OpenTelemetry
host: localhost
port: 4317
backendRefs:
- group: gateway.envoyproxy.io
kind: Backend
name: otel-collector
# Authorization header sent as gRPC initial metadata
openTelemetry:
headers:
Expand All @@ -46,8 +67,10 @@ spec:
sinks:
- type: OpenTelemetry
openTelemetry:
host: localhost
port: 4317
backendRefs:
- group: gateway.envoyproxy.io
kind: Backend
name: otel-collector
resources:
service.name: envoy-gateway-example
# Authorization header sent as gRPC initial metadata
Expand Down Expand Up @@ -105,5 +128,4 @@ spec:
hostname: httpbingo.org
port: 443
tls:
sni: httpbingo.org
wellKnownCACertificates: System
4 changes: 2 additions & 2 deletions internal/cmd/egctl/translate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func TestTranslate(t *testing.T) {
require.NoError(t, err)
}
if test.OverrideTestData() {
require.NoError(t, file.Write(string(out), filepath.Join("testdata", "translate", "out", fn)))
require.NoError(t, file.Write(test.NormalizeCertPath(string(out)), filepath.Join("testdata", "translate", "out", fn)))
}
want := &TranslationResult{}
mustUnmarshal(t, requireTestDataOutFile(t, fn), want)
Expand All @@ -388,7 +388,7 @@ func requireTestDataOutFile(t *testing.T, name ...string) []byte {
elems := append([]string{"testdata", "translate", "out"}, name...)
content, err := os.ReadFile(filepath.Join(elems...))
require.NoError(t, err)
return content
return []byte(test.DenormalizeCertPath(string(content)))
}

func mustUnmarshal(t *testing.T, val []byte, out interface{}) {
Expand Down
123 changes: 107 additions & 16 deletions internal/gatewayapi/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ import (
"fmt"
"math"
"net"
"net/netip"
"strconv"
"strings"

"github.com/google/cel-go/cel"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/ptr"
gwapiv1 "sigs.k8s.io/gateway-api/apis/v1"

Expand Down Expand Up @@ -55,7 +58,7 @@ func (t *Translator) ProcessListeners(gateways []*GatewayContext, xdsIR resource
infraIR[irKey].Proxy.Config = gateway.envoyProxy
}
t.processProxyReadyListener(xdsIR[irKey], gateway.envoyProxy)
t.processProxyObservability(gateway, xdsIR[irKey], infraIR[irKey].Proxy.Config, resources)
t.processProxyObservability(gateway, xdsIR[irKey], infraIR[irKey].Proxy, resources)

for _, listener := range gateway.listeners {
// Process protocol & supported kinds
Expand Down Expand Up @@ -468,8 +471,9 @@ func (t *Translator) processProxyReadyListener(xdsIR *ir.Xds, envoyProxy *egv1a1
}
}

func (t *Translator) processProxyObservability(gwCtx *GatewayContext, xdsIR *ir.Xds, envoyProxy *egv1a1.EnvoyProxy, resources *resource.Resources) {
func (t *Translator) processProxyObservability(gwCtx *GatewayContext, xdsIR *ir.Xds, proxyInfra *ir.ProxyInfra, resources *resource.Resources) {
var err error
envoyProxy := proxyInfra.Config

xdsIR.AccessLog, err = t.processAccessLog(envoyProxy, resources)
if err != nil {
Expand All @@ -485,12 +489,14 @@ func (t *Translator) processProxyObservability(gwCtx *GatewayContext, xdsIR *ir.
return
}

xdsIR.Metrics, err = t.processMetrics(envoyProxy, resources)
var resolvedSinks []ir.ResolvedMetricSink
xdsIR.Metrics, resolvedSinks, err = t.processMetrics(envoyProxy, resources)
if err != nil {
status.UpdateGatewayStatusNotAccepted(gwCtx.Gateway, gwapiv1.GatewayReasonInvalidParameters,
fmt.Sprintf("Invalid metrics backendRefs in the referenced EnvoyProxy: %v", err))
return
}
proxyInfra.ResolvedMetricSinks = resolvedSinks
}

func (t *Translator) processInfraIRListener(listener *ListenerContext, infraIR resource.InfraIRMap, irKey string, servicePort *protocolPort, containerPort int32) {
Expand Down Expand Up @@ -677,13 +683,11 @@ func (t *Translator) processAccessLog(envoyproxy *egv1a1.EnvoyProxy, resources *
// TODO: rename this, so that we can share backend with tracing?
destName := fmt.Sprintf("accesslog_otel_%d_%d", i, j)
settingName := irDestinationSettingName(destName, -1)
// TODO: how to get authority from the backendRefs?
ds, traffic, err := t.processBackendRefs(settingName, sink.OpenTelemetry.BackendCluster, envoyproxy.Namespace, resources, envoyproxy)
if err != nil {
return nil, err
}

// EG currently support GRPC OTel only, change protocol to GRPC.
// TODO: update when OTLP/HTTP is completely supported (logs, traces, metrics)
for _, d := range ds {
d.Protocol = ir.GRPC
}
Expand All @@ -692,6 +696,7 @@ func (t *Translator) processAccessLog(envoyproxy *egv1a1.EnvoyProxy, resources *
CELMatches: validExprs,
Resources: sink.OpenTelemetry.Resources,
Headers: sink.OpenTelemetry.Headers,
Authority: getAuthorityFromDestination(ds),
Destination: ir.RouteDestination{
Name: destName,
Settings: ds,
Expand Down Expand Up @@ -741,20 +746,18 @@ func (t *Translator) processTracing(gw *gwapiv1.Gateway, envoyproxy *egv1a1.Envo
// TODO: rename this, so that we can share backend with accesslog?
destName := "tracing"
settingName := irDestinationSettingName(destName, -1)
// TODO: how to get authority from the backendRefs?
ds, traffic, err := t.processBackendRefs(settingName, tracing.Provider.BackendCluster, envoyproxy.Namespace, resources, envoyproxy)
if err != nil {
return nil, err
}

// EG currently support OTel tracing only, change protocol to GRPC.
if tracing.Provider.Type == egv1a1.TracingProviderTypeOpenTelemetry {
// TODO: update when OTLP/HTTP is completely supported (logs, traces, metrics)
for _, d := range ds {
d.Protocol = ir.GRPC
}
}

var authority string
authority := getAuthorityFromDestination(ds)

// fallback to host and port
// TODO: remove support for Host/Port in v1.2
Expand Down Expand Up @@ -814,36 +817,111 @@ func proxySamplingRate(tracing *egv1a1.ProxyTracing) float64 {
return rate
}

// getAuthorityFromDestination extracts the gRPC authority from a destination setting.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhaohuabing @zirain what about a BTLSP attaching to a Service

// Priority: SNI > hostname > Service/Backend metadata.
func getAuthorityFromDestination(ds []*ir.DestinationSetting) string {
if len(ds) == 0 {
return ""
}
dest := ds[0]

// Priority 1: SNI from TLS config
if dest.TLS != nil && dest.TLS.SNI != nil {
return *dest.TLS.SNI
}

// Priority 2: Endpoint host if it's a hostname (not IP)
if len(dest.Endpoints) > 0 {
host := dest.Endpoints[0].Host
if _, err := netip.ParseAddr(host); err != nil {
// Not an IP - use as authority
return host
}

// Priority 3: Derive from metadata when endpoint is an IP
if dest.Metadata != nil && dest.Metadata.Name != "" {
if dest.Metadata.Namespace != "" {
if dest.Metadata.Kind == resource.KindService {
return fmt.Sprintf("%s.%s.svc", dest.Metadata.Name, dest.Metadata.Namespace)
}
return fmt.Sprintf("%s.%s", dest.Metadata.Name, dest.Metadata.Namespace)
}
return dest.Metadata.Name
}
}
// Don't set authority to an IP - let Envoy use defaults
return ""
}

func getOpenTelemetryTracingHeaders(provider *egv1a1.TracingProvider) []gwapiv1.HTTPHeader {
if provider != nil && provider.OpenTelemetry != nil {
return provider.OpenTelemetry.Headers
}
return nil
}

func (t *Translator) processMetrics(envoyproxy *egv1a1.EnvoyProxy, resources *resource.Resources) (*ir.Metrics, error) {
func (t *Translator) processMetrics(envoyproxy *egv1a1.EnvoyProxy, resources *resource.Resources) (*ir.Metrics, []ir.ResolvedMetricSink, error) {
if envoyproxy == nil ||
envoyproxy.Spec.Telemetry == nil ||
envoyproxy.Spec.Telemetry.Metrics == nil {
return nil, nil
return nil, nil, nil
}

for _, sink := range envoyproxy.Spec.Telemetry.Metrics.Sinks {
var resolvedSinks []ir.ResolvedMetricSink
seen := sets.NewString()

for i, sink := range envoyproxy.Spec.Telemetry.Metrics.Sinks {
if sink.OpenTelemetry == nil {
continue
}

_, _, err := t.processBackendRefs("", sink.OpenTelemetry.BackendCluster, envoyproxy.Namespace, resources, envoyproxy)
destName := fmt.Sprintf("metrics_otel_%d", i)
settingName := irDestinationSettingName(destName, -1)
ds, _, err := t.processBackendRefs(settingName, sink.OpenTelemetry.BackendCluster, envoyproxy.Namespace, resources, envoyproxy)
if err != nil {
return nil, err
return nil, nil, err
}
// TODO: update when OTLP/HTTP is completely supported (logs, traces, metrics)
for _, d := range ds {
d.Protocol = ir.GRPC
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, non-blocking: it might be helpful to add a comment to the API to clarify that OTLP is only supported over gRPC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I adjusted the code comment. d.Protocol, I might be wrong but is used besides for telemetry, so not sure how to phrase this. we can do that in another PR or you can feel free to push a commit if you can think of a way!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also once this is released to Envoy we should be able to implement http transport here. the example yaml there uses logs, metrics and traced, just like the gRPC here does envoyproxy/envoy#43001

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I adjusted the code comment. d.Protocol, I might be wrong but is used besides for telemetry, so not sure how to phrase this. we can do that in another PR or you can feel free to push a commit if you can think of a way!

I mean in the API, not the internal code. Just nit picking, not blocking.

}

authority := getAuthorityFromDestination(ds)

// Fallback to deprecated host/port
if len(ds) == 0 && sink.OpenTelemetry.Host != nil {
ds = destinationSettingFromHostAndPort(settingName, *sink.OpenTelemetry.Host, uint32(sink.OpenTelemetry.Port))
authority = *sink.OpenTelemetry.Host
}

if len(ds) > 0 && len(ds[0].Endpoints) > 0 {
// Skip duplicate sinks (same address:port)
ep := ds[0].Endpoints[0]
addr := net.JoinHostPort(ep.Host, strconv.Itoa(int(ep.Port)))
if seen.Has(addr) {
continue
}
seen.Insert(addr)

resolvedSinks = append(resolvedSinks, ir.ResolvedMetricSink{
Destination: ir.RouteDestination{
Name: destName,
Settings: ds,
Metadata: buildResourceMetadata(envoyproxy, nil),
},
Authority: authority,
Headers: sink.OpenTelemetry.Headers,
ReportCountersAsDeltas: ptr.Deref(sink.OpenTelemetry.ReportCountersAsDeltas, false),
ReportHistogramsAsDeltas: ptr.Deref(sink.OpenTelemetry.ReportHistogramsAsDeltas, false),
})
}
}

return &ir.Metrics{
EnableVirtualHostStats: ptr.Deref(envoyproxy.Spec.Telemetry.Metrics.EnableVirtualHostStats, false),
EnablePerEndpointStats: ptr.Deref(envoyproxy.Spec.Telemetry.Metrics.EnablePerEndpointStats, false),
EnableRequestResponseSizesStats: ptr.Deref(envoyproxy.Spec.Telemetry.Metrics.EnableRequestResponseSizesStats, false),
}, nil
}, resolvedSinks, nil
}

func (t *Translator) processBackendRefs(name string, backendCluster egv1a1.BackendCluster, namespace string,
Expand Down Expand Up @@ -877,6 +955,19 @@ func (t *Translator) processBackendRefs(name string, backendCluster egv1a1.Backe
if ds.IsDynamicResolver {
return nil, nil, errors.New("dynamic resolver destinations are not supported")
}
// Apply TLS config for backend (telemetry) clusters
backend := t.GetBackend(ns, string(ref.Name))
if backend.Spec.TLS != nil {
tlsConfig, err := t.processServerValidationTLSSettings(backend)
if err != nil {
return nil, nil, err
}
ds.TLS = tlsConfig
// Infer SNI from FQDN for telemetry backends (no Host header available)
if ds.TLS.SNI == nil && len(backend.Spec.Endpoints) == 1 && backend.Spec.Endpoints[0].FQDN != nil {
ds.TLS.SNI = &backend.Spec.Endpoints[0].FQDN.Hostname
}
}
result = append(result, ds)
default:
return nil, nil, fmt.Errorf("unsupported kind for backendRefs: %s", kind)
Expand Down
Loading