Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile.sidecar
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ RUN go mod download
COPY cmd/pd-sidecar/main.go cmd/cmd.go
COPY pkg/sidecar pkg/sidecar
COPY pkg/common pkg/common
COPY pkg/telemetry pkg/telemetry

# Build
# the GOARCH has not a default value to allow the binary be built according to the host where the command
Expand Down
29 changes: 27 additions & 2 deletions cmd/epp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,40 @@ import (

"github.com/llm-d/llm-d-inference-scheduler/pkg/metrics"
"github.com/llm-d/llm-d-inference-scheduler/pkg/plugins"
"github.com/llm-d/llm-d-inference-scheduler/pkg/telemetry"
)

func main() {
os.Exit(run())
}

func run() int {
ctx := ctrl.SetupSignalHandler()

// Initialize tracing before creating any spans
shutdownTracing, err := telemetry.InitTracing(ctx)
if err != nil {
// Log error but don't fail - tracing is optional
ctrl.Log.Error(err, "Failed to initialize tracing")
}
if shutdownTracing != nil {
defer func() {
if err := shutdownTracing(ctx); err != nil {
ctrl.Log.Error(err, "Failed to shutdown tracing")
}
}()
}

// Register llm-d-inference-scheduler plugins
plugins.RegisterAllPlugins()

// Note: GIE built-in plugins are automatically registered by the runner
// when it processes configuration in runner.parsePluginsConfiguration()

if err := runner.NewRunner().
WithCustomCollectors(metrics.GetCollectors()...).
Run(ctrl.SetupSignalHandler()); err != nil {
os.Exit(1)
Run(ctx); err != nil {
return 1
}
return 0
}
15 changes: 15 additions & 0 deletions cmd/pd-sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/llm-d/llm-d-inference-scheduler/pkg/sidecar/proxy"
"github.com/llm-d/llm-d-inference-scheduler/pkg/sidecar/version"
"github.com/llm-d/llm-d-inference-scheduler/pkg/telemetry"
)

var (
Expand Down Expand Up @@ -70,6 +71,20 @@ func main() {
ctx := ctrl.SetupSignalHandler()
log.IntoContext(ctx, logger)

// Initialize tracing before creating any spans
shutdownTracing, err := telemetry.InitTracing(ctx)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would we ever want to be able to disable this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than introduce a config option, InitTracing is safe to always call - the conditional is within the OTEL SDK so if no collector -> no generated spans (tracer.Start is a no-op) -> no cost. If the default collector is unreachable or doesn't exist, there's no error & negligible overhead.

if err != nil {
// Log error but don't fail - tracing is optional
logger.Error(err, "Failed to initialize tracing")
}
if shutdownTracing != nil {
defer func() {
if err := shutdownTracing(ctx); err != nil {
logger.Error(err, "Failed to shutdown tracing")
}
}()
}

logger.Info("Proxy starting", "Built on", version.BuildRef, "From Git SHA", version.CommitSHA)

// Validate connector
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ require (
github.com/openai/openai-go v1.12.0
github.com/prometheus/client_golang v1.23.2
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0
go.opentelemetry.io/otel v1.39.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0
go.opentelemetry.io/otel/sdk v1.39.0
go.opentelemetry.io/otel/trace v1.39.0
golang.org/x/sync v0.19.0
google.golang.org/grpc v1.79.1
k8s.io/api v0.34.4
Expand Down Expand Up @@ -103,14 +108,9 @@ require (
github.com/x448/float16 v0.8.4 // indirect
github.com/xlab/treeprint v1.2.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0 // indirect
go.opentelemetry.io/otel v1.39.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.39.0 // indirect
go.opentelemetry.io/otel/metric v1.39.0 // indirect
go.opentelemetry.io/otel/sdk v1.39.0 // indirect
go.opentelemetry.io/otel/trace v1.39.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
12 changes: 12 additions & 0 deletions pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,22 @@
//revive:disable:var-naming
package common

import "net/url"

const (
// PrefillPodHeader is the header name used to indicate Prefill worker <ip:port>
PrefillPodHeader = "x-prefiller-host-port"

// DataParallelPodHeader is the header name used to indicate the worker <ip:port> for Data Parallel
DataParallelPodHeader = "x-data-parallel-host-port"
)

// StripScheme removes the scheme from an endpoint URL, returning host:port.
// This is useful for gRPC clients that expect host:port format only.
func StripScheme(endpoint string) string {
u, err := url.Parse(endpoint)
if err != nil || u.Host == "" {
return endpoint // not a valid URL, return as-is
}
return u.Host
}
90 changes: 90 additions & 0 deletions pkg/common/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Package common contains items common to both the
// EPP/Inference-Scheduler and the Routing Sidecar
//
//revive:disable:var-naming
package common

import "testing"

func TestStripScheme(t *testing.T) {
tests := []struct {
name string
input string
Comment thread
elevran marked this conversation as resolved.
expected string
}{
{
name: "http scheme",
input: "http://localhost:4317",
expected: "localhost:4317",
},
{
name: "https scheme",
input: "https://localhost:4317",
expected: "localhost:4317",
},
{
name: "no scheme",
input: "localhost:4317",
expected: "localhost:4317",
},
{
name: "host only",
input: "localhost",
expected: "localhost",
},
{
name: "http with domain",
input: "http://otel-collector.monitoring.svc.cluster.local:4317",
expected: "otel-collector.monitoring.svc.cluster.local:4317",
},
{
name: "https with domain",
input: "https://otel-collector.monitoring.svc.cluster.local:4317",
expected: "otel-collector.monitoring.svc.cluster.local:4317",
},
{
name: "empty string",
input: "",
expected: "",
},
{
name: "ip address with http",
input: "http://10.0.0.1:4317",
expected: "10.0.0.1:4317",
},
{
name: "ip address with https",
input: "https://10.0.0.1:4317",
expected: "10.0.0.1:4317",
},
{
name: "ip address without scheme",
input: "10.0.0.1:4317",
expected: "10.0.0.1:4317",
},
{
name: "schemeless with double slash",
input: "//192.168.1.1:80",
expected: "192.168.1.1:80",
},
{
name: "uppercase scheme",
input: "HTTP://localhost:4317",
expected: "localhost:4317",
},
{
name: "port only",
input: ":9090",
expected: ":9090",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := StripScheme(tt.input)
if result != tt.expected {
t.Errorf("StripScheme(%q) = %q, want %q", tt.input, result, tt.expected)
}
})
}
}
27 changes: 26 additions & 1 deletion pkg/plugins/pre-request/pd_prerequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ import (
"fmt"
"net"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/requestcontrol"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/scheduling"

"github.com/llm-d/llm-d-inference-scheduler/pkg/common"
"github.com/llm-d/llm-d-inference-scheduler/pkg/telemetry"
)

const (
Expand Down Expand Up @@ -67,17 +70,39 @@ func (p *PrefillHeaderHandler) WithName(name string) *PrefillHeaderHandler {
}

// PreRequest wires prefill SchedulerProfile result into a header to indicate prefill worker
func (p *PrefillHeaderHandler) PreRequest(_ context.Context, request *scheduling.LLMRequest, schedulingResult *scheduling.SchedulingResult) {
func (p *PrefillHeaderHandler) PreRequest(ctx context.Context, request *scheduling.LLMRequest, schedulingResult *scheduling.SchedulingResult) {
tracer := telemetry.Tracer()
_, span := tracer.Start(ctx, "llm_d.epp.prerequest.pd_disaggregation",
Comment thread
sallyom marked this conversation as resolved.
trace.WithSpanKind(trace.SpanKindInternal),
)
defer span.End()

if request != nil && request.TargetModel != "" {
span.SetAttributes(attribute.String("gen_ai.request.model", request.TargetModel))
}
if request != nil && request.RequestId != "" {
span.SetAttributes(attribute.String("gen_ai.request.id", request.RequestId))
}
if _, found := request.Headers[common.PrefillPodHeader]; found {
request.Headers[common.PrefillPodHeader] = "" // clear header, if already set
}

prefillProfileRunResult, exists := schedulingResult.ProfileResults[p.prefillProfile]
if !exists {
span.SetAttributes(
attribute.Bool("llm_d.epp.pd.disaggregation_used", false),
attribute.String("llm_d.epp.pd.reason", "no_prefill_profile_result"),
)
return // prefill profile failed to run or we chose not to run it, no-op in this case
}

targetPod := prefillProfileRunResult.TargetEndpoints[0].GetMetadata()
prefillHostPort := net.JoinHostPort(targetPod.Address, targetPod.Port)
request.Headers[common.PrefillPodHeader] = prefillHostPort // in the form of <ip:port>

span.SetAttributes(
attribute.Bool("llm_d.epp.pd.disaggregation_used", true),
attribute.String("llm_d.epp.pd.prefill_pod_address", targetPod.Address),
attribute.String("llm_d.epp.pd.prefill_pod_port", targetPod.Port),
)
}
45 changes: 45 additions & 0 deletions pkg/plugins/profile/pd_profile_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"net"
"strconv"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"sigs.k8s.io/controller-runtime/pkg/log"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/common/util/logging"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
Expand All @@ -17,6 +20,7 @@ import (

"github.com/llm-d/llm-d-inference-scheduler/pkg/common"
"github.com/llm-d/llm-d-inference-scheduler/pkg/metrics"
"github.com/llm-d/llm-d-inference-scheduler/pkg/telemetry"
)

const (
Expand Down Expand Up @@ -143,8 +147,35 @@ func (h *PdProfileHandler) WithName(name string) *PdProfileHandler {
// previously executed cycles along with their results.
func (h *PdProfileHandler) Pick(ctx context.Context, _ *scheduling.CycleState, request *scheduling.LLMRequest, profiles map[string]scheduling.SchedulerProfile,
profileResults map[string]*scheduling.ProfileRunResult) map[string]scheduling.SchedulerProfile {
// Start tracing span for profile picking operation
tracer := telemetry.Tracer()
ctx, span := tracer.Start(ctx, "llm_d.epp.pd.profile_handler.pick",
trace.WithSpanKind(trace.SpanKindInternal),
)
defer span.End()

// Set initial attributes
span.SetAttributes(
attribute.Int("llm_d.profile_handler.total_profiles", len(profiles)),
attribute.Int("llm_d.profile_handler.executed_profiles", len(profileResults)),
)

// Set optional request attributes if request is not nil
if request != nil {
if request.TargetModel != "" {
span.SetAttributes(attribute.String("gen_ai.request.model", request.TargetModel))
}
if request.RequestId != "" {
span.SetAttributes(attribute.String("gen_ai.request.id", request.RequestId))
}
}

if _, executed := profileResults[h.decodeProfile]; !executed {
// if decode profile was not executed yet, first let the scheduler run the decode profile
span.SetAttributes(
attribute.String("llm_d.profile_handler.decision", "run_decode"),
attribute.String("llm_d.profile_handler.selected_profile", h.decodeProfile),
)
return map[string]scheduling.SchedulerProfile{
h.decodeProfile: profiles[h.decodeProfile],
}
Expand All @@ -154,24 +185,38 @@ func (h *PdProfileHandler) Pick(ctx context.Context, _ *scheduling.CycleState, r
// when a profile run fails its result value is nil. we need to check decode result before continuing to prefill
// check if all configured profiles have been executed, or if decode failed, no need to run more profiles.
if len(profiles) == len(profileResults) || profileResults[h.decodeProfile] == nil {
span.SetAttributes(
attribute.String("llm_d.profile_handler.decision", "complete"),
attribute.Bool("llm_d.profile_handler.decode_failed", profileResults[h.decodeProfile] == nil),
)
return map[string]scheduling.SchedulerProfile{}
}

inputTokens, err := getUserInputLenInTokens(request)
if err != nil {
log.FromContext(ctx).V(logutil.DEBUG).Error(err, "Failed to get user input")
span.SetStatus(codes.Error, err.Error())
return nil
}

span.SetAttributes(attribute.Int("llm_d.profile_handler.input_tokens", inputTokens))

if h.decider != nil && h.decider.disaggregate(ctx, inputTokens, profileResults[h.decodeProfile].TargetEndpoints[0]) {
metrics.RecordPDDecision(request.TargetModel, metrics.DecisionTypePrefillDecode)
// run the prefill profile
span.SetAttributes(
attribute.String("llm_d.profile_handler.decision", "prefill_decode"),
attribute.String("llm_d.profile_handler.selected_profile", h.prefillProfile),
)
return map[string]scheduling.SchedulerProfile{
h.prefillProfile: profiles[h.prefillProfile],
}
}

metrics.RecordPDDecision(request.TargetModel, metrics.DecisionTypeDecodeOnly)
span.SetAttributes(
attribute.String("llm_d.profile_handler.decision", "decode_only"),
)
return map[string]scheduling.SchedulerProfile{} // do not run prefill
}

Expand Down
Loading