Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/aigw/.env.otel.otel-tui
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# otel-tui configuration - Terminal UI for OpenTelemetry
OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-tui:4318
OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf
OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-tui:4317
OTEL_EXPORTER_OTLP_PROTOCOL=grpc
# Reduce trace and metrics export delay for demo purposes
OTEL_BSP_SCHEDULE_DELAY=100
OTEL_METRIC_EXPORT_INTERVAL=100
Expand Down
6 changes: 3 additions & 3 deletions cmd/aigw/.env.otel.phoenix
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Phoenix configuration - LLM-specific observability
# Phoenix uses port 6006 for OTLP
OTEL_EXPORTER_OTLP_ENDPOINT=http://phoenix:6006
OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf
# Phoenix uses port 4317 for OTLP gRPC, port 6006 for web UI
OTEL_EXPORTER_OTLP_ENDPOINT=http://phoenix:4317
OTEL_EXPORTER_OTLP_PROTOCOL=grpc
# Phoenix only supports traces, not metrics
OTEL_METRICS_EXPORTER=none
# Reduce trace export delay for demo purposes
Expand Down
2 changes: 1 addition & 1 deletion cmd/aigw/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ traces and metrics in real-time.
The `.env.otel.otel-tui` file is already provided and will be used automatically
when you set `COMPOSE_PROFILES=otel-tui`. This also starts the otel-tui service.

This configures the OTLP endpoint to otel-tui on port 4318.
This configures the OTLP gRPC endpoint to otel-tui on port 4317.

</details>

Expand Down
5 changes: 3 additions & 2 deletions cmd/aigw/docker-compose-otel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ services:
container_name: otel-tui
profiles: ["otel-tui"]
ports:
- "4318:4318"
- "4317:4317"
stdin_open: true
tty: true

Expand All @@ -35,7 +35,8 @@ services:
container_name: phoenix
profiles: ["phoenix"] # Only start when explicitly requested
ports:
- "6006:6006"
- "4317:4317" # OTLP gRPC
- "6006:6006" # Web UI
environment:
PHOENIX_ENABLE_AUTH: "false"

Expand Down
4 changes: 2 additions & 2 deletions examples/mcp/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
#
# MCP_URL=http://localhost:1975/mcp
#
# OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318
# OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf
# OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
# OTEL_EXPORTER_OTLP_PROTOCOL=grpc
#
# /// script
# dependencies = [
Expand Down
120 changes: 60 additions & 60 deletions internal/testing/testotel/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,98 +8,97 @@
package testotel

import (
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net"
"strings"
"time"

collectmetricsv1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
collecttracev1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
metricsv1 "go.opentelemetry.io/proto/otlp/metrics/v1"
tracev1 "go.opentelemetry.io/proto/otlp/trace/v1"
"google.golang.org/protobuf/proto"
"google.golang.org/grpc"
)

// otlpTimeout is the timeout for spans to read back.
const otlpTimeout = 1 * time.Second // OTEL_BSP_SCHEDULE_DELAY + overhead..

// StartOTLPCollector starts a test OTLP collector server that receives trace and metrics data.
func StartOTLPCollector() *OTLPCollector {
spanCh := make(chan *tracev1.ResourceSpans, 10)
metricsCh := make(chan *metricsv1.ResourceMetrics, 10)
mux := http.NewServeMux()
// traceServer implements the OTLP trace service.
type traceServer struct {
collecttracev1.UnimplementedTraceServiceServer
spanCh chan *tracev1.ResourceSpans
}

mux.HandleFunc("/v1/traces", func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Failed to read body", http.StatusBadRequest)
return
func (s *traceServer) Export(_ context.Context, req *collecttracev1.ExportTraceServiceRequest) (*collecttracev1.ExportTraceServiceResponse, error) {
for _, resourceSpans := range req.ResourceSpans {
timeout := time.After(otlpTimeout)
select {
case s.spanCh <- resourceSpans:
case <-timeout:
// Avoid blocking if the channel is full. Likely indicates a test issue or spans not being read like
// the ones emitted during test shutdown. Otherwise, server shutdown blocks the test indefinitely.
fmt.Println("Warning: Dropping spans due to timeout")
}
}
return &collecttracev1.ExportTraceServiceResponse{}, nil
}

var traces collecttracev1.ExportTraceServiceRequest
if err := proto.Unmarshal(body, &traces); err != nil {
http.Error(w, "Failed to parse traces", http.StatusBadRequest)
return
}
// metricsServer implements the OTLP metrics service.
type metricsServer struct {
collectmetricsv1.UnimplementedMetricsServiceServer
metricsCh chan *metricsv1.ResourceMetrics
}

for _, resourceSpans := range traces.ResourceSpans {
timeout := time.After(otlpTimeout)
select {
case spanCh <- resourceSpans:
case <-timeout:
// Avoid blocking if the channel is full. Likely indicates a test issue or spans not being read like
// the ones emitted during test shutdown. Otherwise, testerver shutdown blocks the test indefinitely.
fmt.Println("Warning: Dropping spans due to timeout")
}
func (s *metricsServer) Export(_ context.Context, req *collectmetricsv1.ExportMetricsServiceRequest) (*collectmetricsv1.ExportMetricsServiceResponse, error) {
for _, resourceMetrics := range req.ResourceMetrics {
timeout := time.After(otlpTimeout)
select {
case s.metricsCh <- resourceMetrics:
case <-timeout:
// Avoid blocking if the channel is full. Likely indicates a test issue or metrics not being read like
// the ones emitted during test shutdown. Otherwise, server shutdown blocks the test indefinitely.
fmt.Println("Warning: Dropping metrics due to timeout")
}
}
return &collectmetricsv1.ExportMetricsServiceResponse{}, nil
}

w.WriteHeader(http.StatusOK)
})

mux.HandleFunc("/v1/metrics", func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Failed to read body", http.StatusBadRequest)
return
}
// StartOTLPCollector starts a test OTLP collector server that receives trace and metrics data via gRPC.
func StartOTLPCollector() *OTLPCollector {
spanCh := make(chan *tracev1.ResourceSpans, 10)
metricsCh := make(chan *metricsv1.ResourceMetrics, 10)

var metrics collectmetricsv1.ExportMetricsServiceRequest
if err := proto.Unmarshal(body, &metrics); err != nil {
http.Error(w, "Failed to parse metrics", http.StatusBadRequest)
return
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
panic(fmt.Sprintf("failed to listen: %v", err))
}

for _, resourceMetrics := range metrics.ResourceMetrics {
timeout := time.After(otlpTimeout)
select {
case metricsCh <- resourceMetrics:
case <-timeout:
// Avoid blocking if the channel is full. Likely indicates a test issue or metrics not being read like
// the ones emitted during test shutdown. Otherwise, testerver shutdown blocks the test indefinitely.
fmt.Println("Warning: Dropping metrics due to timeout")
}
}
server := grpc.NewServer()
collecttracev1.RegisterTraceServiceServer(server, &traceServer{spanCh: spanCh})
collectmetricsv1.RegisterMetricsServiceServer(server, &metricsServer{metricsCh: metricsCh})

w.WriteHeader(http.StatusOK)
})
go func() {
// Server.Serve returns error on Stop/GracefulStop which is expected.
_ = server.Serve(listener)
}()

s := httptest.NewServer(mux)
endpoint := fmt.Sprintf("http://%s", listener.Addr().String())
env := []string{
fmt.Sprintf("OTEL_EXPORTER_OTLP_ENDPOINT=%s", s.URL),
"OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf",
fmt.Sprintf("OTEL_EXPORTER_OTLP_ENDPOINT=%s", endpoint),
"OTEL_EXPORTER_OTLP_PROTOCOL=grpc",
"OTEL_SERVICE_NAME=ai-gateway-extproc",
"OTEL_BSP_SCHEDULE_DELAY=100",
"OTEL_METRIC_EXPORT_INTERVAL=100",
// Use delta temporality to prevent metric accumulation across subtests.
"OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE=delta",
}
return &OTLPCollector{s, env, spanCh, metricsCh}
return &OTLPCollector{server, listener, env, spanCh, metricsCh}
}

type OTLPCollector struct {
s *httptest.Server
server *grpc.Server
listener net.Listener
env []string
spanCh chan *tracev1.ResourceSpans
metricsCh chan *metricsv1.ResourceMetrics
Expand Down Expand Up @@ -185,5 +184,6 @@ func (o *OTLPCollector) TakeMetrics(expectedCount int) []*metricsv1.ResourceMetr

// Close shuts down the collector and cleans up resources.
func (o *OTLPCollector) Close() {
o.s.Close()
o.server.GracefulStop()
o.listener.Close()
}
10 changes: 6 additions & 4 deletions tests/data-plane/vcr/docker-compose-otel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ services:
image: arizephoenix/phoenix:latest
container_name: phoenix
ports:
- "6006:6006"
- "4317:4317" # OTLP gRPC
- "6006:6006" # Web UI
environment:
PHOENIX_ENABLE_AUTH: "false"

Expand All @@ -41,7 +42,8 @@ services:
- extproc-target:/app
- ./extproc.yaml:/etc/extproc/config.yaml:ro
environment:
- OTEL_EXPORTER_OTLP_ENDPOINT=http://phoenix:6006
- OTEL_EXPORTER_OTLP_ENDPOINT=http://phoenix:4317
- OTEL_EXPORTER_OTLP_PROTOCOL=grpc
- OTEL_BSP_SCHEDULE_DELAY=100
- OTEL_METRIC_EXPORT_INTERVAL=100
# ExtProc ports (internal to Docker network, consumed by Envoy):
Expand Down Expand Up @@ -92,7 +94,7 @@ services:
environment:
- OPENAI_BASE_URL=http://envoy:1062/v1
- OPENAI_API_KEY=unused
- OTEL_EXPORTER_OTLP_ENDPOINT=http://phoenix:6006
- OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf
- OTEL_EXPORTER_OTLP_ENDPOINT=http://phoenix:4317
- OTEL_EXPORTER_OTLP_PROTOCOL=grpc
- OTEL_BSP_SCHEDULE_DELAY=100
- OTEL_METRIC_EXPORT_INTERVAL=100
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ RUN python -m pip install --upgrade pip
RUN pip install openai \
opentelemetry-sdk \
opentelemetry-exporter-otlp-proto-http \
opentelemetry-exporter-otlp-proto-grpc \
opentelemetry-distro \
opentelemetry-instrumentation-httpx \
openinference-instrumentation-openai
Expand Down
Loading