Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add airl DP to CP flow #2490

Merged
merged 2 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions adapter/internal/controlplane/eventPublisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,17 @@ type API struct {
APIKeyHeader string `json:"apiKeyHeader"`
Operations []Operation `json:"operations"`
APIHash string `json:"-"`
SandAIRL *AIRL `json:"sandAIRL"`
ProdAIRL *AIRL `json:"prodAIRL"`
}

// AIRL holds AI ratelimit related data
type AIRL struct {
PromptTokenCount *uint32 `json:"promptTokenCount"`
CompletionTokenCount *uint32 `json:"CompletionTokenCount"`
TotalTokenCount *uint32 `json:"totalTokenCount"`
TimeUnit string `json:"timeUnit"`
RequestCount *uint32 `json:"requestCount"`
}

// Headers contains the request and response header modifier information
Expand Down
91 changes: 71 additions & 20 deletions adapter/internal/operator/controllers/dp/api_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (apiReconciler *APIReconciler) Reconcile(ctx context.Context, req ctrl.Requ
if apiReconciler.apiPropagationEnabled && isAPIPropagatable(&apiState) {
// Convert api state to api cp data
loggers.LoggerAPKOperator.Info("Sending API deletion event to agent")
apiCpData := apiReconciler.convertAPIStateToAPICp(ctx, apiState, "")
apiCpData := apiReconciler.convertAPIStateToAPICp(ctx, apiState, "", nil, nil)
apiCpData.Event = controlplane.EventTypeDelete
controlplane.AddToEventQueue(apiCpData)
}
Expand Down Expand Up @@ -413,10 +413,10 @@ func (apiReconciler *APIReconciler) resolveAPIRefs(ctx context.Context, api dpv1
apiState.Authentications, namespace, err.Error())
}
}

var prodAirl *dpv1alpha3.AIRateLimitPolicy
if len(prodRouteRefs) > 0 && apiState.APIDefinition.Spec.APIType == "REST" {
apiState.ProdHTTPRoute = &synchronizer.HTTPRouteState{}
if apiState.ProdHTTPRoute, err = apiReconciler.resolveHTTPRouteRefs(ctx, apiState.ProdHTTPRoute, prodRouteRefs,
if apiState.ProdHTTPRoute, prodAirl, err = apiReconciler.resolveHTTPRouteRefs(ctx, apiState.ProdHTTPRoute, prodRouteRefs,
namespace, apiState.InterceptorServiceMapping, api); err != nil {
return nil, fmt.Errorf("error while resolving production httpRouteref %s in namespace :%s has not found. %s",
prodRouteRefs, namespace, err.Error())
Expand All @@ -430,10 +430,10 @@ func (apiReconciler *APIReconciler) resolveAPIRefs(ctx context.Context, api dpv1
prodRouteRefs, namespace)
}
}

var sandAirl *dpv1alpha3.AIRateLimitPolicy
if len(sandRouteRefs) > 0 && apiState.APIDefinition.Spec.APIType == "REST" {
apiState.SandHTTPRoute = &synchronizer.HTTPRouteState{}
if apiState.SandHTTPRoute, err = apiReconciler.resolveHTTPRouteRefs(ctx, apiState.SandHTTPRoute, sandRouteRefs,
if apiState.SandHTTPRoute, sandAirl, err = apiReconciler.resolveHTTPRouteRefs(ctx, apiState.SandHTTPRoute, sandRouteRefs,
namespace, apiState.InterceptorServiceMapping, api); err != nil {
return nil, fmt.Errorf("error while resolving sandbox httpRouteref %s in namespace :%s has not found. %s",
sandRouteRefs, namespace, err.Error())
Expand Down Expand Up @@ -518,7 +518,7 @@ func (apiReconciler *APIReconciler) resolveAPIRefs(ctx context.Context, api dpv1
if push {
loggers.LoggerAPKOperator.Infof("API hash changed sending the API to agent")
// Publish the api data to CP
apiCpData := apiReconciler.convertAPIStateToAPICp(ctx, *apiState, apiHash)
apiCpData := apiReconciler.convertAPIStateToAPICp(ctx, *apiState, apiHash, prodAirl, sandAirl)
apiCpData.Event = controlplane.EventTypeCreate
controlplane.AddToEventQueue(apiCpData)
}
Expand All @@ -540,7 +540,7 @@ func (apiReconciler *APIReconciler) resolveAPIRefs(ctx context.Context, api dpv1
if push {
loggers.LoggerAPKOperator.Infof("API hash changed sending the API to agent")
// Publish the api data to CP
apiCpData := apiReconciler.convertAPIStateToAPICp(ctx, *apiState, apiHash)
apiCpData := apiReconciler.convertAPIStateToAPICp(ctx, *apiState, apiHash, prodAirl, sandAirl)
apiCpData.Event = controlplane.EventTypeUpdate
controlplane.AddToEventQueue(apiCpData)
}
Expand Down Expand Up @@ -586,19 +586,20 @@ func (apiReconciler *APIReconciler) resolveGQLRouteRefs(ctx context.Context, gql
// - Authentications
func (apiReconciler *APIReconciler) resolveHTTPRouteRefs(ctx context.Context, httpRouteState *synchronizer.HTTPRouteState,
httpRouteRefs []string, namespace string, interceptorServiceMapping map[string]dpv1alpha1.InterceptorService,
api dpv1alpha3.API) (*synchronizer.HTTPRouteState, error) {
api dpv1alpha3.API) (*synchronizer.HTTPRouteState, *dpv1alpha3.AIRateLimitPolicy, error) {
var err error
httpRouteState.HTTPRouteCombined, httpRouteState.HTTPRoutePartitions, err = apiReconciler.concatHTTPRoutes(ctx, httpRouteRefs, namespace, api)
if err != nil {
return nil, err
return nil, nil, err
}
httpRouteState.BackendMapping, err = apiReconciler.getResolvedBackendsMapping(ctx, httpRouteState, interceptorServiceMapping, api)
var airl *dpv1alpha3.AIRateLimitPolicy
httpRouteState.BackendMapping, airl, err = apiReconciler.getResolvedBackendsMapping(ctx, httpRouteState, interceptorServiceMapping, api)
if err != nil {
return nil, err
return nil, nil, err
}
httpRouteState.Scopes, err = apiReconciler.getScopesForHTTPRoute(ctx, httpRouteState.HTTPRouteCombined, api)

return httpRouteState, err
return httpRouteState, airl, err
}

func (apiReconciler *APIReconciler) resolveGRPCRouteRefs(ctx context.Context, grpcRouteRefs []string,
Expand Down Expand Up @@ -983,9 +984,9 @@ func (apiReconciler *APIReconciler) resolveAuthentications(ctx context.Context,

func (apiReconciler *APIReconciler) getResolvedBackendsMapping(ctx context.Context,
httpRouteState *synchronizer.HTTPRouteState, interceptorServiceMapping map[string]dpv1alpha1.InterceptorService,
api dpv1alpha3.API) (map[string]*dpv1alpha2.ResolvedBackend, error) {
api dpv1alpha3.API) (map[string]*dpv1alpha2.ResolvedBackend, *dpv1alpha3.AIRateLimitPolicy, error) {
backendMapping := make(map[string]*dpv1alpha2.ResolvedBackend)

var airl *dpv1alpha3.AIRateLimitPolicy
// Resolve backends in HTTPRoute
httpRoute := httpRouteState.HTTPRouteCombined
ruleIdxToAiRatelimitPolicyMapping := make(map[int]*dpv1alpha3.AIRateLimitPolicy)
Expand All @@ -1004,15 +1005,19 @@ func (apiReconciler *APIReconciler) getResolvedBackendsMapping(ctx context.Conte
} else {
for _, aiRLPolicy := range aiRLPolicyList.Items {
loggers.LoggerAPKOperator.Debugf("Adding mapping for ruleid: %d to aiRLPolicy: %s", id, utils.NamespacedName(&aiRLPolicy))
if aiRLPolicy.Spec.Override == nil {
aiRLPolicy.Spec.Override = aiRLPolicy.Spec.Default
}
ruleIdxToAiRatelimitPolicyMapping[id] = &aiRLPolicy
airl = &aiRLPolicy
}
}
if _, exists := backendMapping[backendNamespacedName.String()]; !exists {
resolvedBackend := utils.GetResolvedBackend(ctx, apiReconciler.client, backendNamespacedName, &api)
if resolvedBackend != nil {
backendMapping[backendNamespacedName.String()] = resolvedBackend
} else {
return nil, fmt.Errorf("unable to find backend %s", backendNamespacedName.String())
return nil, nil, fmt.Errorf("unable to find backend %s", backendNamespacedName.String())
}
}

Expand All @@ -1031,18 +1036,18 @@ func (apiReconciler *APIReconciler) getResolvedBackendsMapping(ctx context.Conte
if resolvedMirrorBackend != nil {
backendMapping[mirrorBackendNamespacedName.String()] = resolvedMirrorBackend
} else {
return nil, fmt.Errorf("unable to find backend %s", mirrorBackendNamespacedName.String())
return nil, nil, fmt.Errorf("unable to find backend %s", mirrorBackendNamespacedName.String())
}
}
} else if string(*mirrorBackend.Kind) == constants.KindService {
var err error
service, err := utils.GetService(ctx, apiReconciler.client, utils.GetNamespace(mirrorBackend.Namespace, httpRoute.Namespace), string(mirrorBackend.Name))
if err != nil {
return nil, fmt.Errorf("unable to find service %s", mirrorBackendNamespacedName.String())
return nil, nil, fmt.Errorf("unable to find service %s", mirrorBackendNamespacedName.String())
}
backendMapping[mirrorBackendNamespacedName.String()], err = utils.GetResolvedBackendFromService(service, int(*mirrorBackend.Port))
if err != nil {
return nil, fmt.Errorf("error in getting service information %s", service)
return nil, nil, fmt.Errorf("error in getting service information %s", service)
}
}
}
Expand All @@ -1057,7 +1062,7 @@ func (apiReconciler *APIReconciler) getResolvedBackendsMapping(ctx context.Conte
}

loggers.LoggerAPKOperator.Debugf("Generated backendMapping: %v", backendMapping)
return backendMapping, nil
return backendMapping, airl, nil
}

// These proxy methods are designed as intermediaries for the getAPIsFor<CR objects> methods.
Expand Down Expand Up @@ -2829,7 +2834,7 @@ func prepareOwnerReference(apiItems []dpv1alpha3.API) []metav1.OwnerReference {
return ownerReferences
}

func (apiReconciler *APIReconciler) convertAPIStateToAPICp(ctx context.Context, apiState synchronizer.APIState, apiHash string) controlplane.APICPEvent {
func (apiReconciler *APIReconciler) convertAPIStateToAPICp(ctx context.Context, apiState synchronizer.APIState, apiHash string, prodAIRL *dpv1alpha3.AIRateLimitPolicy, sandAIRL *dpv1alpha3.AIRateLimitPolicy) controlplane.APICPEvent {
apiCPEvent := controlplane.APICPEvent{}
spec := apiState.APIDefinition.Spec
configMap := &corev1.ConfigMap{}
Expand Down Expand Up @@ -2874,6 +2879,50 @@ func (apiReconciler *APIReconciler) convertAPIStateToAPICp(ctx context.Context,
sandVhost := geSandVhost(&apiState)
securityScheme, authHeader, apiKeyHeader := prepareSecuritySchemeForCP(&apiState)
operations := prepareOperations(&apiState)
var sandAIRLToAgent controlplane.AIRL
var prodAIRLToAgent controlplane.AIRL
if prodAIRL != nil {
var promptTC, completionTC, totalTC, requestC *uint32
var timeUnit string
if prodAIRL.Spec.Override.TokenCount != nil {
promptTC = &prodAIRL.Spec.Override.TokenCount.RequestTokenCount
completionTC = &prodAIRL.Spec.Override.TokenCount.ResponseTokenCount
totalTC = &prodAIRL.Spec.Override.TokenCount.TotalTokenCount
timeUnit = prodAIRL.Spec.Override.TokenCount.Unit
}
if prodAIRL.Spec.Override.RequestCount != nil {
timeUnit = prodAIRL.Spec.Override.RequestCount.Unit
requestC = &prodAIRL.Spec.Override.RequestCount.RequestsPerUnit
}
prodAIRLToAgent = controlplane.AIRL{
PromptTokenCount: promptTC,
CompletionTokenCount: completionTC,
TotalTokenCount: totalTC,
TimeUnit: timeUnit,
RequestCount: requestC,
}
}
if sandAIRL != nil {
var promptTC, completionTC, totalTC, requestC *uint32
var timeUnit string
if sandAIRL.Spec.Override.TokenCount != nil {
promptTC = &sandAIRL.Spec.Override.TokenCount.RequestTokenCount
completionTC = &sandAIRL.Spec.Override.TokenCount.ResponseTokenCount
totalTC = &sandAIRL.Spec.Override.TokenCount.TotalTokenCount
timeUnit = sandAIRL.Spec.Override.TokenCount.Unit
}
if sandAIRL.Spec.Override.RequestCount != nil {
timeUnit = sandAIRL.Spec.Override.RequestCount.Unit
requestC = &sandAIRL.Spec.Override.RequestCount.RequestsPerUnit
}
sandAIRLToAgent = controlplane.AIRL{
PromptTokenCount: promptTC,
CompletionTokenCount: completionTC,
TotalTokenCount: totalTC,
TimeUnit: timeUnit,
RequestCount: requestC,
}
}
api := controlplane.API{
APIName: spec.APIName,
APIVersion: spec.APIVersion,
Expand All @@ -2898,6 +2947,8 @@ func (apiReconciler *APIReconciler) convertAPIStateToAPICp(ctx context.Context,
Operations: operations,
APIHash: apiHash,
APIKeyHeader: apiKeyHeader,
SandAIRL: &sandAIRLToAgent,
ProdAIRL: &prodAIRLToAgent,
}
apiCPEvent.API = api
apiCPEvent.CRName = apiState.APIDefinition.ObjectMeta.Name
Expand Down
Loading