Skip to content

Commit

Permalink
Fix internal map on api updates and refactor api delete
Browse files Browse the repository at this point in the history
  • Loading branch information
AmaliMatharaarachchi committed Mar 1, 2024
1 parent 0da6b51 commit a6fa97f
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 201 deletions.
210 changes: 59 additions & 151 deletions adapter/internal/discovery/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
crand "crypto/rand"
"crypto/sha1"
"encoding/hex"
"errors"
"fmt"
"math/big"
"math/rand"
Expand All @@ -45,10 +44,8 @@ import (
logger "github.com/wso2/apk/adapter/internal/loggers"
logging "github.com/wso2/apk/adapter/internal/logging"
oasParser "github.com/wso2/apk/adapter/internal/oasparser"
"github.com/wso2/apk/adapter/internal/oasparser/constants"
"github.com/wso2/apk/adapter/internal/oasparser/envoyconf"
"github.com/wso2/apk/adapter/internal/oasparser/model"
operatorconsts "github.com/wso2/apk/adapter/internal/operator/constants"
"github.com/wso2/apk/adapter/pkg/discovery/api/wso2/discovery/subscription"
wso2_cache "github.com/wso2/apk/adapter/pkg/discovery/protocol/cache/v3"
wso2_resource "github.com/wso2/apk/adapter/pkg/discovery/protocol/resource/v3"
Expand Down Expand Up @@ -99,11 +96,12 @@ var (
enforcerRevokedTokensCache wso2_cache.SnapshotCache
enforcerThrottleDataCache wso2_cache.SnapshotCache

orgAPIMap map[string]map[string]*EnvoyInternalAPI // organizationID -> Vhost:API_UUID -> EnvoyInternalAPI struct map
orgIDvHostBasepathMap map[string]map[string]string // organizationID -> Vhost:basepath -> Vhost:API_UUID
orgIDAPIvHostsMap map[string]map[string][]string // organizationID -> UUID -> prod/sand -> Envoy Vhost Array map

// todo(amali) there can be multiple vhosts for one EnvoyInternalAPI so handle this
orgAPIMap map[string]map[string]*EnvoyInternalAPI // organizationID -> Vhost:API_UUID -> EnvoyInternalAPI struct map
orgIDvHostBasepathMap map[string]map[string]string // organizationID -> Vhost:basepath -> Vhost:API_UUID
orgIDAPIvHostsMap map[string]map[string][]string // organizationID -> UUID -> prod/sand -> Envoy Vhost Array map
orgIDLatestAPIVersionMap map[string]map[string]map[string]semantic_version.SemVersion // organizationID -> Vhost:APIName -> Version Range -> Latest API Version

// Envoy Label as map key
// TODO(amali) use this without generating all again.
gatewayLabelConfigMap map[string]*EnvoyGatewayConfig // GW-Label -> EnvoyGatewayConfig struct map
Expand Down Expand Up @@ -212,105 +210,13 @@ func GetEnforcerThrottleDataCache() wso2_cache.SnapshotCache {
return enforcerThrottleDataCache
}

// DeleteAPICREvent deletes API with the given UUID from the given gw environments
func DeleteAPICREvent(labels []string, apiUUID string, organizationID string) error {
mutexForInternalMapUpdate.Lock()
defer mutexForInternalMapUpdate.Unlock()

prodvHostIdentifier := GetvHostsIdentifier(apiUUID, operatorconsts.Production)
sandvHostIdentifier := GetvHostsIdentifier(apiUUID, operatorconsts.Sandbox)
vHosts := append(orgIDAPIvHostsMap[organizationID][prodvHostIdentifier],
orgIDAPIvHostsMap[organizationID][sandvHostIdentifier]...)

delete(orgIDAPIvHostsMap[organizationID], prodvHostIdentifier)
delete(orgIDAPIvHostsMap[organizationID], sandvHostIdentifier)
for _, vhost := range vHosts {
apiIdentifier := GenerateIdentifierForAPIWithUUID(vhost, apiUUID)
if err := deleteAPI(apiIdentifier, labels, organizationID); err != nil {
logger.LoggerXds.ErrorC(logging.PrintError(logging.Error1410, logging.MAJOR, "Error undeploying API %v with UUID %v of Organization %v from environments %v, error: %v",
apiIdentifier, apiUUID, organizationID, labels, err.Error()))
return err
}
// if no error, update internal vhost maps
// error only happens when API not found in deleteAPI func
logger.LoggerXds.Infof("Successfully undeployed the API %v with UUID %v under Organization %s and environment %s",
apiIdentifier, apiUUID, organizationID, labels)
}
return nil
}

// deleteAPI deletes an API, its resources and updates the caches of given environments
func deleteAPI(apiIdentifier string, environments []string, organizationID string) error {
apiUUID, _ := ExtractUUIDFromAPIIdentifier(apiIdentifier)
var api *EnvoyInternalAPI

if _, orgExists := orgAPIMap[organizationID]; orgExists {
if oldAPI, apiExists := orgAPIMap[organizationID][apiIdentifier]; apiExists {
api = oldAPI
} else {
logger.LoggerXds.Infof("Unable to delete API: %v from Organization: %v. API Does not exist. API_UUID: %v", apiIdentifier, organizationID, apiUUID)
return errors.New(constants.NotFound)
}

} else {
logger.LoggerXds.Infof("Unable to delete API: %v from Organization: %v. Organization Does not exist. API_UUID: %v", apiIdentifier, organizationID, apiUUID)
return errors.New(constants.NotFound)
}

existingLabels := orgAPIMap[organizationID][apiIdentifier].envoyLabels
toBeDelEnvs, toBeKeptEnvs := getEnvironmentsToBeDeleted(existingLabels, environments)

if isSemanticVersioningEnabled(api.adapterInternalAPI.GetTitle(), api.adapterInternalAPI.GetVersion()) {
updateRoutingRulesOnAPIDelete(organizationID, apiIdentifier, api.adapterInternalAPI)
}

var isAllowedToDelete bool
updatedLabelsMap := make(map[string]struct{})
for _, val := range toBeDelEnvs {
updatedLabelsMap[val] = struct{}{}
if stringutils.StringInSlice(val, existingLabels) {
isAllowedToDelete = true
}
}
if isAllowedToDelete {
// do not delete from all environments, hence do not clear routes, clusters, endpoints, enforcerAPIs
orgAPIMap[organizationID][apiIdentifier].envoyLabels = toBeKeptEnvs
if len(toBeKeptEnvs) != 0 {
UpdateXdsCacheOnAPIChange(updatedLabelsMap)
return nil
}
}

//clean maps of routes, clusters, endpoints, enforcerAPIs
if len(environments) == 0 || isAllowedToDelete {
cleanMapResources(apiIdentifier, organizationID, toBeDelEnvs)
}
UpdateXdsCacheOnAPIChange(updatedLabelsMap)
// DeleteAPI deletes API with the given UUID from the given gw environments
func DeleteAPI(uuid string, gatewayNames map[string]struct{}) error {
RemoveAPIFromAllInternalMaps(uuid)
UpdateXdsCacheOnAPIChange(gatewayNames)
return nil
}

func cleanMapResources(apiIdentifier string, organizationID string, toBeDelEnvs []string) {
if _, orgExists := orgAPIMap[organizationID]; orgExists {
delete(orgAPIMap[organizationID], apiIdentifier)
}

deleteBasepathForVHost(organizationID, apiIdentifier)
//TODO: (SuKSW) clean any remaining in label wise maps, if this is the last API of that label
logger.LoggerXds.Infof("Deleted API %v of organization %v", apiIdentifier, organizationID)
}

func deleteBasepathForVHost(organizationID, apiIdentifier string) {
// Remove the basepath from map (that is used to avoid duplicate basepaths)
if _, orgExists := orgAPIMap[organizationID]; orgExists {
if oldOrgAPIAPI, ok := orgAPIMap[organizationID][apiIdentifier]; ok {
s := strings.Split(apiIdentifier, apiKeyFieldSeparator)
vHost := s[0]
oldBasepath := oldOrgAPIAPI.adapterInternalAPI.GetXWso2Basepath()
delete(orgIDvHostBasepathMap[organizationID], vHost+":"+oldBasepath)
}
}
}

// UpdateXdsCacheOnAPIChange when this method is called, openAPIEnvoy map is updated.
// Old labels refers to the previously assigned labels
// New labels refers to the the updated labels
Expand Down Expand Up @@ -343,40 +249,34 @@ func SetReady() {
func GenerateEnvoyResoucesForGateway(gatewayName string) ([]types.Resource,
[]types.Resource, []types.Resource, []types.Resource, []types.Resource) {
var clusterArray []*clusterv3.Cluster
// Warning: Route order is important. The first route that matches the request will be used.
var vhostToRouteArrayMap = make(map[string][]*routev3.Route)
var endpointArray []*corev3.Address
var apis []types.Resource

for organizationID, entityMap := range orgAPIMap {
for apiKey, envoyInternalAPI := range entityMap {
if stringutils.StringInSlice(gatewayName, envoyInternalAPI.envoyLabels) {
vhost, err := ExtractVhostFromAPIIdentifier(apiKey)
if err != nil {
logger.LoggerXds.ErrorC(logging.PrintError(logging.Error1411, logging.MAJOR, "Error extracting vhost from API identifier: %v for Organization %v. Ignore deploying the API, error: %v", apiKey, organizationID, err))
continue
}
isDefaultVersion := false
var orgAPI *EnvoyInternalAPI
// If the adapterInternalAPI is not found, proceed with other APIs. (Unreachable condition at this point)
// If that happens, there is no purpose in processing clusters too.
if org, ok := orgAPIMap[organizationID]; !ok {
continue
} else if orgAPI, ok = org[apiKey]; !ok {
continue
}
isDefaultVersion = orgAPI.adapterInternalAPI.IsDefaultVersion
// If it is a default versioned API, the routes are added to the end of the existing array.
// Otherwise the routes would be added to the front.
// /fooContext/2.0.0/* resource path should be matched prior to the /fooContext/* .
if isDefaultVersion {
vhostToRouteArrayMap[vhost] = append(vhostToRouteArrayMap[vhost], orgAPI.routes...)
} else {
vhostToRouteArrayMap[vhost] = append(orgAPI.routes, vhostToRouteArrayMap[vhost]...)
}
clusterArray = append(clusterArray, orgAPI.clusters...)
endpointArray = append(endpointArray, orgAPI.endpointAddresses...)
apis = append(apis, orgAPI.enforcerAPI)
if !stringutils.StringInSlice(gatewayName, envoyInternalAPI.envoyLabels) {
// do nothing if the gateway is not found in the envoyInternalAPI
continue
}
vhost, err := ExtractVhostFromAPIIdentifier(apiKey)
if err != nil {
logger.LoggerXds.ErrorC(logging.PrintError(logging.Error1411, logging.MAJOR, "Error extracting vhost from API identifier: %v for Organization %v. Ignore deploying the API, error: %v", apiKey, organizationID, err))
continue
}
isDefaultVersion := envoyInternalAPI.adapterInternalAPI.IsDefaultVersion
// If it is a default versioned API, the routes are added to the end of the existing array.
// Otherwise the routes would be added to the front.
// /fooContext/2.0.0/* resource path should be matched prior to the /fooContext/* .
if isDefaultVersion {
vhostToRouteArrayMap[vhost] = append(vhostToRouteArrayMap[vhost], envoyInternalAPI.routes...)
} else {
vhostToRouteArrayMap[vhost] = append(envoyInternalAPI.routes, vhostToRouteArrayMap[vhost]...)
}
clusterArray = append(clusterArray, envoyInternalAPI.clusters...)
endpointArray = append(endpointArray, envoyInternalAPI.endpointAddresses...)
apis = append(apis, envoyInternalAPI.enforcerAPI)
}
}

Expand Down Expand Up @@ -641,33 +541,41 @@ func ExtractUUIDFromAPIIdentifier(id string) (string, error) {
return "", err
}

// RemoveAPICacheForEnv will remove all the internal mappings for a specific environment
func RemoveAPICacheForEnv(adapterInternalAPI model.AdapterInternalAPI, envType string) {
vHostIdentifier := GetvHostsIdentifier(adapterInternalAPI.UUID, envType)
var oldvHosts []string
if _, ok := orgIDAPIvHostsMap[adapterInternalAPI.OrganizationID]; ok {
oldvHosts = orgIDAPIvHostsMap[adapterInternalAPI.GetOrganizationID()][vHostIdentifier]
for _, oldvhost := range oldvHosts {
apiIdentifier := GenerateIdentifierForAPIWithUUID(oldvhost, adapterInternalAPI.UUID)
if orgMap, orgExists := orgAPIMap[adapterInternalAPI.GetOrganizationID()]; orgExists {
if _, apiExists := orgMap[apiIdentifier]; apiExists {
delete(orgAPIMap[adapterInternalAPI.GetOrganizationID()], apiIdentifier)
}
// RemoveAPIFromAllInternalMaps removes api from all maps
func RemoveAPIFromAllInternalMaps(uuid string) {
mutexForInternalMapUpdate.Lock()
defer mutexForInternalMapUpdate.Unlock()
for orgID, orgAPI := range orgAPIMap {
for apiIdentifier := range orgAPI {
if strings.HasSuffix(apiIdentifier, ":"+uuid) {
delete(orgAPIMap[orgID], apiIdentifier)
}
}
if len(orgAPIMap[orgID]) == 0 {
delete(orgAPIMap, orgID)
delete(orgIDvHostBasepathMap, orgID)
delete(orgIDAPIvHostsMap, orgID)
delete(orgIDLatestAPIVersionMap, orgID)
}
}
}

// RemoveAPIFromOrgAPIMap removes api from orgAPI map
func RemoveAPIFromOrgAPIMap(uuid string, orgID string) {
if orgMap, ok := orgAPIMap[orgID]; ok {
for apiName := range orgMap {
if strings.Contains(apiName, uuid) {
delete(orgMap, apiName)
for orgID, basepathMap := range orgIDvHostBasepathMap {
for basepath, vhostAPIUUID := range basepathMap {
if strings.HasSuffix(vhostAPIUUID, ":"+uuid) {
delete(orgIDvHostBasepathMap[orgID], basepath)
}
}
if len(orgMap) == 0 {
delete(orgAPIMap, orgID)
}

for orgID := range orgIDAPIvHostsMap {
delete(orgIDAPIvHostsMap[orgID], uuid)
}

for orgID, orgIDLatestAPIVersion := range orgIDLatestAPIVersionMap {
for vHostAPIName := range orgIDLatestAPIVersion {
if strings.HasSuffix(vHostAPIName, ":"+uuid) {
delete(orgIDLatestAPIVersionMap[orgID], vHostAPIName)
}
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion adapter/internal/discovery/xds/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,11 @@ func TestUpdateAPICache(t *testing.T) {
GenerateIdentifierForAPIWithUUID(vhsot, test.adapterInternalAPI.UUID), true)
}
case "DELETE":
DeleteAPICREvent(test.labels, test.adapterInternalAPI.UUID, test.adapterInternalAPI.OrganizationID)
gatewayNames := make(map[string]struct{})
for _, label := range test.labels {
gatewayNames[label] = struct{}{}
}
DeleteAPI(test.adapterInternalAPI.UUID, gatewayNames)
prodIdentifier := GetvHostsIdentifier(test.adapterInternalAPI.UUID, "prod")
sandIdentifier := GetvHostsIdentifier(test.adapterInternalAPI.UUID, "sand")
_, prodExists := orgIDAPIvHostsMap[test.adapterInternalAPI.OrganizationID][prodIdentifier]
Expand Down
14 changes: 7 additions & 7 deletions adapter/internal/discovery/xds/server_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ import (

// getEnvironmentsToBeDeleted returns an slice of environments APIs to be u-deployed from
// by considering existing environments list and environments that APIs are wished to be un-deployed
func getEnvironmentsToBeDeleted(existingEnvs, deleteEnvs []string) (toBeDel []string, toBeKept []string) {
toBeDel = make([]string, 0, len(deleteEnvs))
toBeKept = make([]string, 0, len(deleteEnvs))
func getEnvironmentsToBeDeleted(existingGatewayNames, deleteGatewayNames []string) (toBeDel []string, toBeKept []string) {
toBeDel = make([]string, 0, len(deleteGatewayNames))
toBeKept = make([]string, 0, len(deleteGatewayNames))

// if deleteEnvs is empty (deleteEnvs wished to be deleted), delete all environments
if len(deleteEnvs) == 0 {
return existingEnvs, []string{}
if len(deleteGatewayNames) == 0 {
return existingGatewayNames, []string{}
}
// otherwise delete env if it wished to
for _, existingEnv := range existingEnvs {
if stringutils.StringInSlice(existingEnv, deleteEnvs) {
for _, existingEnv := range existingGatewayNames {
if stringutils.StringInSlice(existingEnv, deleteGatewayNames) {
toBeDel = append(toBeDel, existingEnv)
} else {
toBeKept = append(toBeKept, existingEnv)
Expand Down
1 change: 0 additions & 1 deletion adapter/internal/operator/synchronizer/api_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type APIState struct {
InterceptorServiceMapping map[string]v1alpha1.InterceptorService
BackendJWTMapping map[string]v1alpha1.BackendJWT
APIDefinitionFile []byte
OldOrganizationID string
SubscriptionValidation bool
MutualSSL *v1alpha2.MutualSSL
}
Expand Down
1 change: 0 additions & 1 deletion adapter/internal/operator/synchronizer/data_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func (ods *OperatorDataStore) processAPIState(apiNamespacedName types.Namespaced
cachedAPI := ods.apiStore[apiNamespacedName]

if apiState.APIDefinition.Generation > cachedAPI.APIDefinition.Generation {
cachedAPI.OldOrganizationID = cachedAPI.APIDefinition.Spec.Organization
cachedAPI.APIDefinition = apiState.APIDefinition
updated = true
events = append(events, "API Definition")
Expand Down
12 changes: 6 additions & 6 deletions adapter/internal/operator/synchronizer/gql_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,7 @@ func getListenersForGQLAPI(gqlRoute *v1alpha2.GQLRoute, apiUUID string) ([]strin
if found {
// find the matching listener
matchedListener, listenerFound := common.FindElement(gateway.Spec.Listeners, func(listener gwapiv1b1.Listener) bool {
if string(listener.Name) == string(*parentRef.SectionName) {
return true
}
return false
return string(listener.Name) == string(*parentRef.SectionName)
})
if listenerFound {
sectionNames = append(sectionNames, string(matchedListener.Name))
Expand All @@ -151,9 +148,12 @@ func getListenersForGQLAPI(gqlRoute *v1alpha2.GQLRoute, apiUUID string) ([]strin

func deleteGQLAPIFromEnv(gqlRoute *v1alpha2.GQLRoute, apiState APIState) error {
labels := getLabelsForGQLAPI(gqlRoute)
org := apiState.APIDefinition.Spec.Organization
uuid := string(apiState.APIDefinition.ObjectMeta.UID)
return xds.DeleteAPICREvent(labels, uuid, org)
gatewayNames := make(map[string]struct{})
for _, label := range labels {
gatewayNames[label] = struct{}{}
}
return xds.DeleteAPI(uuid, gatewayNames)
}

// undeployGQLAPIInGateway undeploys the related API in CREATE and UPDATE events.
Expand Down
18 changes: 11 additions & 7 deletions adapter/internal/operator/synchronizer/rest_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func undeployRestAPIInGateway(apiState APIState) error {
if err != nil {
loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error2630, logging.MAJOR, "Error undeploying prod httpRoute of API : %v in Organization %v from environments %v."+
" Hence not checking on deleting the sand httpRoute of the API", string(apiState.APIDefinition.ObjectMeta.UID), apiState.APIDefinition.Spec.Organization,
getLabelsForAPI(apiState.ProdHTTPRoute.HTTPRouteCombined)))
getGatewayNameForAPI(apiState.ProdHTTPRoute.HTTPRouteCombined)))
return err
}
if apiState.SandHTTPRoute != nil {
Expand Down Expand Up @@ -97,7 +97,7 @@ func GenerateAdapterInternalAPI(apiState APIState, httpRoute *HTTPRouteState, en
return nil, nil, err
}
vHosts := getVhostsForAPI(httpRoute.HTTPRouteCombined)
labels := getLabelsForAPI(httpRoute.HTTPRouteCombined)
labels := getGatewayNameForAPI(httpRoute.HTTPRouteCombined)
listeners, relativeSectionNames := getListenersForAPI(httpRoute.HTTPRouteCombined, adapterInternalAPI.UUID)
// We dont have a use case where a perticular API's two different http routes refer to two different gateway. Hence get the first listener name for the list for processing.
if len(listeners) == 0 || len(relativeSectionNames) == 0 {
Expand Down Expand Up @@ -133,8 +133,9 @@ func getVhostsForAPI(httpRoute *gwapiv1b1.HTTPRoute) []string {
return vHosts
}

// getLabelsForAPI returns the labels related to an API.
func getLabelsForAPI(httpRoute *gwapiv1b1.HTTPRoute) []string {
// todo(amali) make this return map[string]struct{} instead of []string
// getGatewayNameForAPI returns the labels related to an API.
func getGatewayNameForAPI(httpRoute *gwapiv1b1.HTTPRoute) []string {
var labels []string
var err error
for _, parentRef := range httpRoute.Spec.ParentRefs {
Expand Down Expand Up @@ -178,8 +179,11 @@ func getListenersForAPI(httpRoute *gwapiv1b1.HTTPRoute, apiUUID string) ([]strin
}

func deleteAPIFromEnv(httpRoute *gwapiv1b1.HTTPRoute, apiState APIState) error {
labels := getLabelsForAPI(httpRoute)
org := apiState.APIDefinition.Spec.Organization
labels := getGatewayNameForAPI(httpRoute)
gatewayNames := make(map[string]struct{})
for _, label := range labels {
gatewayNames[label] = struct{}{}
}
uuid := string(apiState.APIDefinition.ObjectMeta.UID)
return xds.DeleteAPICREvent(labels, uuid, org)
return xds.DeleteAPI(uuid, gatewayNames)
}
Loading

0 comments on commit a6fa97f

Please sign in to comment.