diff --git a/cmd/aigw/.env.otel.otel-tui b/cmd/aigw/.env.otel.otel-tui index 09f5f09848..172dcb932b 100644 --- a/cmd/aigw/.env.otel.otel-tui +++ b/cmd/aigw/.env.otel.otel-tui @@ -1,6 +1,6 @@ # otel-tui configuration - Terminal UI for OpenTelemetry -OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-tui:4318 -OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf +OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-tui:4317 +OTEL_EXPORTER_OTLP_PROTOCOL=grpc # Reduce trace and metrics export delay for demo purposes OTEL_BSP_SCHEDULE_DELAY=100 OTEL_METRIC_EXPORT_INTERVAL=100 diff --git a/cmd/aigw/.env.otel.phoenix b/cmd/aigw/.env.otel.phoenix index 967ee20d93..1cb212ea29 100644 --- a/cmd/aigw/.env.otel.phoenix +++ b/cmd/aigw/.env.otel.phoenix @@ -1,7 +1,7 @@ # Phoenix configuration - LLM-specific observability -# Phoenix uses port 6006 for OTLP -OTEL_EXPORTER_OTLP_ENDPOINT=http://phoenix:6006 -OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf +# Phoenix uses port 4317 for OTLP gRPC, port 6006 for web UI +OTEL_EXPORTER_OTLP_ENDPOINT=http://phoenix:4317 +OTEL_EXPORTER_OTLP_PROTOCOL=grpc # Phoenix only supports traces, not metrics OTEL_METRICS_EXPORTER=none # Reduce trace export delay for demo purposes diff --git a/cmd/aigw/README.md b/cmd/aigw/README.md index 1d2c0f654c..39993513f4 100644 --- a/cmd/aigw/README.md +++ b/cmd/aigw/README.md @@ -134,7 +134,7 @@ traces and metrics in real-time. The `.env.otel.otel-tui` file is already provided and will be used automatically when you set `COMPOSE_PROFILES=otel-tui`. This also starts the otel-tui service. -This configures the OTLP endpoint to otel-tui on port 4318. +This configures the OTLP gRPC endpoint to otel-tui on port 4317. diff --git a/cmd/aigw/docker-compose-otel.yaml b/cmd/aigw/docker-compose-otel.yaml index 76403746a7..c1df4baf30 100644 --- a/cmd/aigw/docker-compose-otel.yaml +++ b/cmd/aigw/docker-compose-otel.yaml @@ -23,7 +23,7 @@ services: container_name: otel-tui profiles: ["otel-tui"] ports: - - "4318:4318" + - "4317:4317" stdin_open: true tty: true @@ -35,7 +35,8 @@ services: container_name: phoenix profiles: ["phoenix"] # Only start when explicitly requested ports: - - "6006:6006" + - "4317:4317" # OTLP gRPC + - "6006:6006" # Web UI environment: PHOENIX_ENABLE_AUTH: "false" diff --git a/examples/mcp/agent.py b/examples/mcp/agent.py index 6fadf6a1e4..7083933388 100644 --- a/examples/mcp/agent.py +++ b/examples/mcp/agent.py @@ -13,8 +13,8 @@ # # MCP_URL=http://localhost:1975/mcp # -# OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 -# OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf +# OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +# OTEL_EXPORTER_OTLP_PROTOCOL=grpc # # /// script # dependencies = [ diff --git a/internal/testing/testotel/collector.go b/internal/testing/testotel/collector.go index 44c1e8794e..9bfee1f785 100644 --- a/internal/testing/testotel/collector.go +++ b/internal/testing/testotel/collector.go @@ -8,10 +8,9 @@ package testotel import ( + "context" "fmt" - "io" - "net/http" - "net/http/httptest" + "net" "strings" "time" @@ -19,87 +18,87 @@ import ( collecttracev1 "go.opentelemetry.io/proto/otlp/collector/trace/v1" metricsv1 "go.opentelemetry.io/proto/otlp/metrics/v1" tracev1 "go.opentelemetry.io/proto/otlp/trace/v1" - "google.golang.org/protobuf/proto" + "google.golang.org/grpc" ) // otlpTimeout is the timeout for spans to read back. const otlpTimeout = 1 * time.Second // OTEL_BSP_SCHEDULE_DELAY + overhead.. -// StartOTLPCollector starts a test OTLP collector server that receives trace and metrics data. -func StartOTLPCollector() *OTLPCollector { - spanCh := make(chan *tracev1.ResourceSpans, 10) - metricsCh := make(chan *metricsv1.ResourceMetrics, 10) - mux := http.NewServeMux() +// traceServer implements the OTLP trace service. +type traceServer struct { + collecttracev1.UnimplementedTraceServiceServer + spanCh chan *tracev1.ResourceSpans +} - mux.HandleFunc("/v1/traces", func(w http.ResponseWriter, r *http.Request) { - body, err := io.ReadAll(r.Body) - if err != nil { - http.Error(w, "Failed to read body", http.StatusBadRequest) - return +func (s *traceServer) Export(_ context.Context, req *collecttracev1.ExportTraceServiceRequest) (*collecttracev1.ExportTraceServiceResponse, error) { + for _, resourceSpans := range req.ResourceSpans { + timeout := time.After(otlpTimeout) + select { + case s.spanCh <- resourceSpans: + case <-timeout: + // Avoid blocking if the channel is full. Likely indicates a test issue or spans not being read like + // the ones emitted during test shutdown. Otherwise, server shutdown blocks the test indefinitely. + fmt.Println("Warning: Dropping spans due to timeout") } + } + return &collecttracev1.ExportTraceServiceResponse{}, nil +} - var traces collecttracev1.ExportTraceServiceRequest - if err := proto.Unmarshal(body, &traces); err != nil { - http.Error(w, "Failed to parse traces", http.StatusBadRequest) - return - } +// metricsServer implements the OTLP metrics service. +type metricsServer struct { + collectmetricsv1.UnimplementedMetricsServiceServer + metricsCh chan *metricsv1.ResourceMetrics +} - for _, resourceSpans := range traces.ResourceSpans { - timeout := time.After(otlpTimeout) - select { - case spanCh <- resourceSpans: - case <-timeout: - // Avoid blocking if the channel is full. Likely indicates a test issue or spans not being read like - // the ones emitted during test shutdown. Otherwise, testerver shutdown blocks the test indefinitely. - fmt.Println("Warning: Dropping spans due to timeout") - } +func (s *metricsServer) Export(_ context.Context, req *collectmetricsv1.ExportMetricsServiceRequest) (*collectmetricsv1.ExportMetricsServiceResponse, error) { + for _, resourceMetrics := range req.ResourceMetrics { + timeout := time.After(otlpTimeout) + select { + case s.metricsCh <- resourceMetrics: + case <-timeout: + // Avoid blocking if the channel is full. Likely indicates a test issue or metrics not being read like + // the ones emitted during test shutdown. Otherwise, server shutdown blocks the test indefinitely. + fmt.Println("Warning: Dropping metrics due to timeout") } + } + return &collectmetricsv1.ExportMetricsServiceResponse{}, nil +} - w.WriteHeader(http.StatusOK) - }) - - mux.HandleFunc("/v1/metrics", func(w http.ResponseWriter, r *http.Request) { - body, err := io.ReadAll(r.Body) - if err != nil { - http.Error(w, "Failed to read body", http.StatusBadRequest) - return - } +// StartOTLPCollector starts a test OTLP collector server that receives trace and metrics data via gRPC. +func StartOTLPCollector() *OTLPCollector { + spanCh := make(chan *tracev1.ResourceSpans, 10) + metricsCh := make(chan *metricsv1.ResourceMetrics, 10) - var metrics collectmetricsv1.ExportMetricsServiceRequest - if err := proto.Unmarshal(body, &metrics); err != nil { - http.Error(w, "Failed to parse metrics", http.StatusBadRequest) - return - } + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + panic(fmt.Sprintf("failed to listen: %v", err)) + } - for _, resourceMetrics := range metrics.ResourceMetrics { - timeout := time.After(otlpTimeout) - select { - case metricsCh <- resourceMetrics: - case <-timeout: - // Avoid blocking if the channel is full. Likely indicates a test issue or metrics not being read like - // the ones emitted during test shutdown. Otherwise, testerver shutdown blocks the test indefinitely. - fmt.Println("Warning: Dropping metrics due to timeout") - } - } + server := grpc.NewServer() + collecttracev1.RegisterTraceServiceServer(server, &traceServer{spanCh: spanCh}) + collectmetricsv1.RegisterMetricsServiceServer(server, &metricsServer{metricsCh: metricsCh}) - w.WriteHeader(http.StatusOK) - }) + go func() { + // Server.Serve returns error on Stop/GracefulStop which is expected. + _ = server.Serve(listener) + }() - s := httptest.NewServer(mux) + endpoint := fmt.Sprintf("http://%s", listener.Addr().String()) env := []string{ - fmt.Sprintf("OTEL_EXPORTER_OTLP_ENDPOINT=%s", s.URL), - "OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf", + fmt.Sprintf("OTEL_EXPORTER_OTLP_ENDPOINT=%s", endpoint), + "OTEL_EXPORTER_OTLP_PROTOCOL=grpc", "OTEL_SERVICE_NAME=ai-gateway-extproc", "OTEL_BSP_SCHEDULE_DELAY=100", "OTEL_METRIC_EXPORT_INTERVAL=100", // Use delta temporality to prevent metric accumulation across subtests. "OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE=delta", } - return &OTLPCollector{s, env, spanCh, metricsCh} + return &OTLPCollector{server, listener, env, spanCh, metricsCh} } type OTLPCollector struct { - s *httptest.Server + server *grpc.Server + listener net.Listener env []string spanCh chan *tracev1.ResourceSpans metricsCh chan *metricsv1.ResourceMetrics @@ -185,5 +184,6 @@ func (o *OTLPCollector) TakeMetrics(expectedCount int) []*metricsv1.ResourceMetr // Close shuts down the collector and cleans up resources. func (o *OTLPCollector) Close() { - o.s.Close() + o.server.GracefulStop() + o.listener.Close() } diff --git a/tests/data-plane/vcr/docker-compose-otel.yaml b/tests/data-plane/vcr/docker-compose-otel.yaml index 4246081213..a879767eb8 100644 --- a/tests/data-plane/vcr/docker-compose-otel.yaml +++ b/tests/data-plane/vcr/docker-compose-otel.yaml @@ -25,7 +25,8 @@ services: image: arizephoenix/phoenix:latest container_name: phoenix ports: - - "6006:6006" + - "4317:4317" # OTLP gRPC + - "6006:6006" # Web UI environment: PHOENIX_ENABLE_AUTH: "false" @@ -41,7 +42,8 @@ services: - extproc-target:/app - ./extproc.yaml:/etc/extproc/config.yaml:ro environment: - - OTEL_EXPORTER_OTLP_ENDPOINT=http://phoenix:6006 + - OTEL_EXPORTER_OTLP_ENDPOINT=http://phoenix:4317 + - OTEL_EXPORTER_OTLP_PROTOCOL=grpc - OTEL_BSP_SCHEDULE_DELAY=100 - OTEL_METRIC_EXPORT_INTERVAL=100 # ExtProc ports (internal to Docker network, consumed by Envoy): @@ -92,7 +94,7 @@ services: environment: - OPENAI_BASE_URL=http://envoy:1062/v1 - OPENAI_API_KEY=unused - - OTEL_EXPORTER_OTLP_ENDPOINT=http://phoenix:6006 - - OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf + - OTEL_EXPORTER_OTLP_ENDPOINT=http://phoenix:4317 + - OTEL_EXPORTER_OTLP_PROTOCOL=grpc - OTEL_BSP_SCHEDULE_DELAY=100 - OTEL_METRIC_EXPORT_INTERVAL=100 diff --git a/tests/internal/testopeninference/Dockerfile.openai_client b/tests/internal/testopeninference/Dockerfile.openai_client index b451d04d3d..afd2975016 100644 --- a/tests/internal/testopeninference/Dockerfile.openai_client +++ b/tests/internal/testopeninference/Dockerfile.openai_client @@ -11,6 +11,7 @@ RUN python -m pip install --upgrade pip RUN pip install openai \ opentelemetry-sdk \ opentelemetry-exporter-otlp-proto-http \ + opentelemetry-exporter-otlp-proto-grpc \ opentelemetry-distro \ opentelemetry-instrumentation-httpx \ openinference-instrumentation-openai