Skip to content

Commit 4074a06

Browse files
shmuelkBenjaminBraunDev
authored andcommitted
feat: Added a factory function for the DecisionTree filter (kubernetes-sigs#1053)
* Added a factory function for the DecisionTreeFilter Signed-off-by: Shmuel Kallner <[email protected]> * Added tests of the factory function of the DecisionTreeFilter Signed-off-by: Shmuel Kallner <[email protected]> * Registered the factory function of the DecisionTreeFilter Signed-off-by: Shmuel Kallner <[email protected]> * Refactored the configuration loading Signed-off-by: Shmuel Kallner <[email protected]> --------- Signed-off-by: Shmuel Kallner <[email protected]>
1 parent 2158570 commit 4074a06

File tree

4 files changed

+396
-23
lines changed

4 files changed

+396
-23
lines changed

cmd/epp/runner/register.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727

2828
// RegisterAllPlugins registers the factory functions of all known plugins
2929
func RegisterAllPlugins() {
30+
plugins.Register(filter.DecisionTreeFilterType, filter.DecisionTreeFilterFactory)
3031
plugins.Register(filter.LeastKVCacheFilterType, filter.LeastKVCacheFilterFactory)
3132
plugins.Register(filter.LeastQueueFilterType, filter.LeastQueueFilterFactory)
3233
plugins.Register(filter.LoraAffinityFilterType, filter.LoraAffinityFilterFactory)

cmd/epp/runner/runner.go

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -242,29 +242,10 @@ func (r *Runner) Run(ctx context.Context) error {
242242

243243
// ===================================================================
244244

245-
if len(*configText) != 0 || len(*configFile) != 0 {
246-
theConfig, err := loader.LoadConfig([]byte(*configText), *configFile)
247-
if err != nil {
248-
setupLog.Error(err, "Failed to load the configuration")
249-
return err
250-
}
251-
252-
epp := newEppHandle()
253-
254-
err = loader.LoadPluginReferences(theConfig.Plugins, epp)
255-
if err != nil {
256-
setupLog.Error(err, "Failed to instantiate the plugins")
257-
return err
258-
}
259-
260-
r.schedulerConfig, err = loader.LoadSchedulerConfig(theConfig.SchedulingProfiles, epp)
261-
if err != nil {
262-
setupLog.Error(err, "Failed to create Scheduler configuration")
263-
return err
264-
}
265-
266-
// Add requestControl plugins
267-
r.requestControlConfig.AddPlugins(epp.Plugins().GetAllPlugins()...)
245+
err = r.parseConfiguration()
246+
if err != nil {
247+
setupLog.Error(err, "Failed to parse the configuration")
248+
return err
268249
}
269250

270251
// --- Initialize Core EPP Components ---
@@ -357,6 +338,31 @@ func (r *Runner) initializeScheduler(datastore datastore.Datastore) (*scheduling
357338
return scheduler, nil
358339
}
359340

341+
func (r *Runner) parseConfiguration() error {
342+
if len(*configText) != 0 || len(*configFile) != 0 {
343+
theConfig, err := loader.LoadConfig([]byte(*configText), *configFile)
344+
if err != nil {
345+
return fmt.Errorf("failed to load the configuration - %w", err)
346+
}
347+
348+
epp := newEppHandle()
349+
350+
err = loader.LoadPluginReferences(theConfig.Plugins, epp)
351+
if err != nil {
352+
return fmt.Errorf("failed to instantiate the plugins - %w", err)
353+
}
354+
355+
r.schedulerConfig, err = loader.LoadSchedulerConfig(theConfig.SchedulingProfiles, epp)
356+
if err != nil {
357+
return fmt.Errorf("failed to create Scheduler configuration - %w", err)
358+
}
359+
360+
// Add requestControl plugins
361+
r.requestControlConfig.AddPlugins(epp.Plugins().GetAllPlugins()...)
362+
}
363+
return nil
364+
}
365+
360366
func initLogging(opts *zap.Options) {
361367
// Unless -zap-log-level is explicitly set, use -v
362368
useV := true

pkg/epp/scheduling/framework/plugins/filter/decision_tree_filter.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,21 @@ package filter
1818

1919
import (
2020
"context"
21+
"encoding/json"
22+
"errors"
23+
"fmt"
2124

2225
"sigs.k8s.io/controller-runtime/pkg/log"
26+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
2327
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
2428
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
2529
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
2630
)
2731

32+
const (
33+
DecisionTreeFilterType = "decision-tree"
34+
)
35+
2836
// compile-time type assertion
2937
var _ framework.Filter = &DecisionTreeFilter{}
3038

@@ -47,6 +55,82 @@ type DecisionTreeFilter struct {
4755
NextOnSuccessOrFailure framework.Filter
4856
}
4957

58+
type decisionTreeFilterParameters struct {
59+
Current *decisionTreeFilterEntry `json:"current"`
60+
NextOnSuccess *decisionTreeFilterEntry `json:"nextOnSuccess"`
61+
NextOnFailure *decisionTreeFilterEntry `json:"nextOnFailure"`
62+
NextOnSuccessOrFailure *decisionTreeFilterEntry `json:"nextOnSuccessOrFailure"`
63+
}
64+
65+
type decisionTreeFilterEntry struct {
66+
PluginRef *string `json:"pluginRef"`
67+
DecisionTree *decisionTreeFilterParameters `json:"decisionTree"`
68+
}
69+
70+
func DecisionTreeFilterFactory(name string, rawParameters json.RawMessage, handle plugins.Handle) (plugins.Plugin, error) {
71+
parameters := decisionTreeFilterParameters{}
72+
if err := json.Unmarshal(rawParameters, &parameters); err != nil {
73+
return nil, fmt.Errorf("failed to parse the parameters of the '%s' filter - %w", name, err)
74+
}
75+
return loadDecisionTree(&parameters, handle)
76+
}
77+
78+
func loadDecisionTree(parameters *decisionTreeFilterParameters, handle plugins.Handle) (*DecisionTreeFilter, error) {
79+
result := &DecisionTreeFilter{}
80+
var err error
81+
82+
if parameters.Current == nil {
83+
return nil, errors.New("a current filter must be specified")
84+
}
85+
result.Current, err = loadDecisionTreeEntry(parameters.Current, handle)
86+
if err != nil {
87+
return nil, err
88+
}
89+
90+
if parameters.NextOnSuccess != nil {
91+
result.NextOnSuccess, err = loadDecisionTreeEntry(parameters.NextOnSuccess, handle)
92+
if err != nil {
93+
return nil, err
94+
}
95+
}
96+
97+
if parameters.NextOnFailure != nil {
98+
result.NextOnFailure, err = loadDecisionTreeEntry(parameters.NextOnFailure, handle)
99+
if err != nil {
100+
return nil, err
101+
}
102+
}
103+
104+
if parameters.NextOnSuccessOrFailure != nil {
105+
result.NextOnSuccessOrFailure, err = loadDecisionTreeEntry(parameters.NextOnSuccessOrFailure, handle)
106+
if err != nil {
107+
return nil, err
108+
}
109+
}
110+
111+
return result, nil
112+
}
113+
114+
func loadDecisionTreeEntry(entry *decisionTreeFilterEntry, handle plugins.Handle) (framework.Filter, error) {
115+
if entry.PluginRef != nil && entry.DecisionTree != nil {
116+
return nil, errors.New("both pluginRef and decisionTree may not be specified")
117+
}
118+
119+
if entry.PluginRef != nil {
120+
instance := handle.Plugins().Plugin(*entry.PluginRef)
121+
if instance == nil {
122+
return nil, errors.New(*entry.PluginRef + " is a reference to an undefined Plugin")
123+
}
124+
if theFilter, ok := instance.(framework.Filter); ok {
125+
return theFilter, nil
126+
}
127+
return nil, errors.New(*entry.PluginRef + " is not a filter")
128+
} else if entry.DecisionTree != nil {
129+
return loadDecisionTree(entry.DecisionTree, handle)
130+
}
131+
return nil, errors.New("either pluginRef or decisionTree must be specified")
132+
}
133+
50134
// Type returns the type of the filter.
51135
func (f *DecisionTreeFilter) Type() string {
52136
if f == nil {

0 commit comments

Comments
 (0)