@@ -17,322 +17,13 @@ limitations under the License.
1717package main
1818
1919import (
20- "flag"
21- "fmt"
2220 "os"
2321
24- "github.com/go-logr/logr"
25- "github.com/prometheus/client_golang/prometheus"
26- uberzap "go.uber.org/zap"
27- "go.uber.org/zap/zapcore"
28- "google.golang.org/grpc"
29- healthPb "google.golang.org/grpc/health/grpc_health_v1"
30- "k8s.io/apimachinery/pkg/types"
31- ctrl "sigs.k8s.io/controller-runtime"
32- "sigs.k8s.io/controller-runtime/pkg/log"
33- "sigs.k8s.io/controller-runtime/pkg/log/zap"
34- "sigs.k8s.io/controller-runtime/pkg/manager"
35- "sigs.k8s.io/controller-runtime/pkg/metrics/filters"
36- metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
37-
38- conformance_epp "sigs.k8s.io/gateway-api-inference-extension/conformance/testing-epp"
39- "sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
40- backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
41- "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
42- "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
43- "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
44- "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
45- "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
46- "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
47- "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
48- "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix"
49- "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
50- profilepicker "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile-picker"
51- "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer"
52- runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
53- envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
54- "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
55- )
56-
57- var (
58- grpcPort = flag .Int (
59- "grpcPort" ,
60- runserver .DefaultGrpcPort ,
61- "The gRPC port used for communicating with Envoy proxy" )
62- grpcHealthPort = flag .Int (
63- "grpcHealthPort" ,
64- 9003 ,
65- "The port used for gRPC liveness and readiness probes" )
66- metricsPort = flag .Int (
67- "metricsPort" , 9090 , "The metrics port" )
68- destinationEndpointHintKey = flag .String (
69- "destinationEndpointHintKey" ,
70- runserver .DefaultDestinationEndpointHintKey ,
71- "Header and response metadata key used by Envoy to route to the appropriate pod. This must match Envoy configuration." )
72- destinationEndpointHintMetadataNamespace = flag .String (
73- "DestinationEndpointHintMetadataNamespace" ,
74- runserver .DefaultDestinationEndpointHintMetadataNamespace ,
75- "The key for the outer namespace struct in the metadata field of the extproc response that is used to wrap the" +
76- "target endpoint. If not set, then an outer namespace struct should not be created." )
77- poolName = flag .String (
78- "poolName" ,
79- runserver .DefaultPoolName ,
80- "Name of the InferencePool this Endpoint Picker is associated with." )
81- poolNamespace = flag .String (
82- "poolNamespace" ,
83- runserver .DefaultPoolNamespace ,
84- "Namespace of the InferencePool this Endpoint Picker is associated with." )
85- refreshMetricsInterval = flag .Duration (
86- "refreshMetricsInterval" ,
87- runserver .DefaultRefreshMetricsInterval ,
88- "interval to refresh metrics" )
89- refreshPrometheusMetricsInterval = flag .Duration (
90- "refreshPrometheusMetricsInterval" ,
91- runserver .DefaultRefreshPrometheusMetricsInterval ,
92- "interval to flush prometheus metrics" )
93- logVerbosity = flag .Int ("v" , logging .DEFAULT , "number for the log level verbosity" )
94- secureServing = flag .Bool (
95- "secureServing" , runserver .DefaultSecureServing , "Enables secure serving. Defaults to true." )
96- certPath = flag .String (
97- "certPath" , "" , "The path to the certificate for secure serving. The certificate and private key files " +
98- "are assumed to be named tls.crt and tls.key, respectively. If not set, and secureServing is enabled, " +
99- "then a self-signed certificate is used." )
100- // metric flags
101- totalQueuedRequestsMetric = flag .String ("totalQueuedRequestsMetric" ,
102- "vllm:num_requests_waiting" ,
103- "Prometheus metric for the number of queued requests." )
104- kvCacheUsagePercentageMetric = flag .String ("kvCacheUsagePercentageMetric" ,
105- "vllm:gpu_cache_usage_perc" ,
106- "Prometheus metric for the fraction of KV-cache blocks currently in use (from 0 to 1)." )
107- // LoRA metrics
108- loraInfoMetric = flag .String ("loraInfoMetric" ,
109- "vllm:lora_requests_info" ,
110- "Prometheus metric for the LoRA info metrics (must be in vLLM label format)." )
111-
112- setupLog = ctrl .Log .WithName ("setup" )
113-
114- // Environment variables
115- schedulerV2 = envutil .GetEnvBool ("EXPERIMENTAL_USE_SCHEDULER_V2" , false , setupLog )
116- prefixCacheScheduling = envutil .GetEnvBool ("ENABLE_PREFIX_CACHE_SCHEDULING" , false , setupLog )
117- reqHeaderBasedSchedulerForTesting = envutil .GetEnvBool ("ENABLE_REQ_HEADER_BASED_SCHEDULER_FOR_TESTING" , false , setupLog )
22+ "sigs.k8s.io/gateway-api-inference-extension/cmd/epp/runner"
11823)
11924
120- func loadPrefixCacheConfig () prefix.Config {
121- baseLogger := log .Log .WithName ("env-config" )
122-
123- return prefix.Config {
124- HashBlockSize : envutil .GetEnvInt ("PREFIX_CACHE_HASH_BLOCK_SIZE" , prefix .DefaultHashBlockSize , baseLogger ),
125- MaxPrefixBlocksToMatch : envutil .GetEnvInt ("PREFIX_CACHE_MAX_PREFIX_BLOCKS" , prefix .DefaultMaxPrefixBlocks , baseLogger ),
126- LRUIndexerCapacity : envutil .GetEnvInt ("PREFIX_CACHE_LRU_CAPACITY" , prefix .DefaultLRUIndexerCapacity , baseLogger ),
127- }
128- }
129-
13025func main () {
131- if err := run (); err != nil {
26+ if err := runner . NewRunner (). Run (); err != nil {
13227 os .Exit (1 )
13328 }
13429}
135-
136- func run () error {
137- opts := zap.Options {
138- Development : true ,
139- }
140- opts .BindFlags (flag .CommandLine )
141- flag .Parse ()
142- initLogging (& opts )
143-
144- // Validate flags
145- if err := validateFlags (); err != nil {
146- setupLog .Error (err , "Failed to validate flags" )
147- return err
148- }
149-
150- // Print all flag values
151- flags := make (map [string ]any )
152- flag .VisitAll (func (f * flag.Flag ) {
153- flags [f .Name ] = f .Value
154- })
155- setupLog .Info ("Flags processed" , "flags" , flags )
156-
157- // --- Load Configurations from Environment Variables ---
158- sdConfig := saturationdetector .LoadConfigFromEnv ()
159-
160- // --- Get Kubernetes Config ---
161- cfg , err := ctrl .GetConfig ()
162- if err != nil {
163- setupLog .Error (err , "Failed to get Kubernetes rest config" )
164- return err
165- }
166-
167- // --- Setup Datastore ---
168- mapping , err := backendmetrics .NewMetricMapping (
169- * totalQueuedRequestsMetric ,
170- * kvCacheUsagePercentageMetric ,
171- * loraInfoMetric ,
172- )
173- if err != nil {
174- setupLog .Error (err , "Failed to create metric mapping from flags." )
175- return err
176- }
177- verifyMetricMapping (* mapping , setupLog )
178- pmf := backendmetrics .NewPodMetricsFactory (& backendmetrics.PodMetricsClientImpl {MetricMapping : mapping }, * refreshMetricsInterval )
179- ctx := ctrl .SetupSignalHandler ()
180- datastore := datastore .NewDatastore (ctx , pmf )
181-
182- // --- Setup Metrics Server ---
183- customCollectors := []prometheus.Collector {collectors .NewInferencePoolMetricsCollector (datastore )}
184- metrics .Register (customCollectors ... )
185- metrics .RecordInferenceExtensionInfo ()
186- // Register metrics handler.
187- // Metrics endpoint is enabled in 'config/default/kustomization.yaml'. The Metrics options configure the server.
188- // More info:
189- // - https://pkg.go.dev/sigs.k8s.io/[email protected] /pkg/metrics/server 190- // - https://book.kubebuilder.io/reference/metrics.html
191- metricsServerOptions := metricsserver.Options {
192- BindAddress : fmt .Sprintf (":%d" , * metricsPort ),
193- FilterProvider : filters .WithAuthenticationAndAuthorization ,
194- }
195-
196- poolNamespacedName := types.NamespacedName {
197- Name : * poolName ,
198- Namespace : * poolNamespace ,
199- }
200- mgr , err := runserver .NewDefaultManager (poolNamespacedName , cfg , metricsServerOptions )
201- if err != nil {
202- setupLog .Error (err , "Failed to create controller manager" )
203- return err
204- }
205-
206- // --- Initialize Core EPP Components ---
207- scheduler := scheduling .NewScheduler (datastore )
208- if schedulerV2 {
209- queueScorerWeight := envutil .GetEnvInt ("QUEUE_SCORE_WEIGHT" , scorer .DefaultQueueScorerWeight , setupLog )
210- kvCacheScorerWeight := envutil .GetEnvInt ("KV_CACHE_SCORE_WEIGHT" , scorer .DefaultKVCacheScorerWeight , setupLog )
211-
212- schedulerProfile := framework .NewSchedulerProfile ().
213- WithScorers (framework .NewWeightedScorer (& scorer.QueueScorer {}, queueScorerWeight ),
214- framework .NewWeightedScorer (& scorer.KVCacheScorer {}, kvCacheScorerWeight )).
215- WithPicker (picker .NewMaxScorePicker ())
216-
217- if prefixCacheScheduling {
218- prefixScorerWeight := envutil .GetEnvInt ("PREFIX_CACHE_SCORE_WEIGHT" , prefix .DefaultScorerWeight , setupLog )
219- if err := schedulerProfile .AddPlugins (framework .NewWeightedScorer (prefix .New (loadPrefixCacheConfig ()), prefixScorerWeight )); err != nil {
220- setupLog .Error (err , "Failed to register scheduler plugins" )
221- return err
222- }
223- }
224-
225- schedulerConfig := scheduling .NewSchedulerConfig (profilepicker .NewAllProfilesPicker (), map [string ]* framework.SchedulerProfile {"schedulerv2" : schedulerProfile })
226- scheduler = scheduling .NewSchedulerWithConfig (datastore , schedulerConfig )
227- }
228-
229- if reqHeaderBasedSchedulerForTesting {
230- scheduler = conformance_epp .NewReqHeaderBasedScheduler (datastore )
231- }
232-
233- saturationDetector := saturationdetector .NewDetector (sdConfig , datastore , ctrl .Log )
234-
235- director := requestcontrol .NewDirector (datastore , scheduler , saturationDetector ) // can call "director.WithPostResponsePlugins" to add post response plugins
236-
237- // --- Setup ExtProc Server Runner ---
238- serverRunner := & runserver.ExtProcServerRunner {
239- GrpcPort : * grpcPort ,
240- DestinationEndpointHintMetadataNamespace : * destinationEndpointHintMetadataNamespace ,
241- DestinationEndpointHintKey : * destinationEndpointHintKey ,
242- PoolNamespacedName : poolNamespacedName ,
243- Datastore : datastore ,
244- SecureServing : * secureServing ,
245- CertPath : * certPath ,
246- RefreshPrometheusMetricsInterval : * refreshPrometheusMetricsInterval ,
247- Director : director ,
248- SaturationDetector : saturationDetector ,
249- }
250- if err := serverRunner .SetupWithManager (ctx , mgr ); err != nil {
251- setupLog .Error (err , "Failed to setup EPP controllers" )
252- return err
253- }
254-
255- // --- Add Runnables to Manager ---
256- // Register health server.
257- if err := registerHealthServer (mgr , ctrl .Log .WithName ("health" ), datastore , * grpcHealthPort ); err != nil {
258- return err
259- }
260-
261- // Register ext-proc server.
262- if err := registerExtProcServer (mgr , serverRunner , ctrl .Log .WithName ("ext-proc" )); err != nil {
263- return err
264- }
265-
266- // --- Start Manager ---
267- // This blocks until a signal is received.
268- setupLog .Info ("Controller manager starting" )
269- if err := mgr .Start (ctx ); err != nil {
270- setupLog .Error (err , "Error starting controller manager" )
271- return err
272- }
273- setupLog .Info ("Controller manager terminated" )
274- return nil
275- }
276-
277- func initLogging (opts * zap.Options ) {
278- // Unless -zap-log-level is explicitly set, use -v
279- useV := true
280- flag .Visit (func (f * flag.Flag ) {
281- if f .Name == "zap-log-level" {
282- useV = false
283- }
284- })
285- if useV {
286- // See https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/log/zap#Options.Level
287- lvl := - 1 * (* logVerbosity )
288- opts .Level = uberzap .NewAtomicLevelAt (zapcore .Level (int8 (lvl )))
289- }
290-
291- logger := zap .New (zap .UseFlagOptions (opts ), zap .RawZapOpts (uberzap .AddCaller ()))
292- ctrl .SetLogger (logger )
293- }
294-
295- // registerExtProcServer adds the ExtProcServerRunner as a Runnable to the manager.
296- func registerExtProcServer (mgr manager.Manager , runner * runserver.ExtProcServerRunner , logger logr.Logger ) error {
297- if err := mgr .Add (runner .AsRunnable (logger )); err != nil {
298- setupLog .Error (err , "Failed to register ext-proc gRPC server runnable" )
299- return err
300- }
301- setupLog .Info ("ExtProc server runner added to manager." )
302- return nil
303- }
304-
305- // registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
306- func registerHealthServer (mgr manager.Manager , logger logr.Logger , ds datastore.Datastore , port int ) error {
307- srv := grpc .NewServer ()
308- healthPb .RegisterHealthServer (srv , & healthServer {
309- logger : logger ,
310- datastore : ds ,
311- })
312- if err := mgr .Add (
313- runnable .NoLeaderElection (runnable .GRPCServer ("health" , srv , port ))); err != nil {
314- setupLog .Error (err , "Failed to register health server" )
315- return err
316- }
317- return nil
318- }
319-
320- func validateFlags () error {
321- if * poolName == "" {
322- return fmt .Errorf ("required %q flag not set" , "poolName" )
323- }
324-
325- return nil
326- }
327-
328- func verifyMetricMapping (mapping backendmetrics.MetricMapping , logger logr.Logger ) {
329- if mapping .TotalQueuedRequests == nil {
330- logger .Info ("Not scraping metric: TotalQueuedRequests" )
331- }
332- if mapping .KVCacheUtilization == nil {
333- logger .Info ("Not scraping metric: KVCacheUtilization" )
334- }
335- if mapping .LoraRequestInfo == nil {
336- logger .Info ("Not scraping metric: LoraRequestInfo" )
337- }
338- }
0 commit comments