Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add http, https and multiple port support #1881

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions adapter/internal/dataholder/dataholder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package dataholder

import (
k8types "k8s.io/apimachinery/pkg/types"
gwapiv1b1 "sigs.k8s.io/gateway-api/apis/v1beta1"
)

// The following variables will be used to store the state of the apk.
// This data should not be utilized by operator thread as its not designed for parallel access.
var (
// This variable in the structure of gateway's namespaced name -> gateway
gatewayMap map[string]gwapiv1b1.Gateway
)

func init() {
gatewayMap = make(map[string]gwapiv1b1.Gateway)
}

// GetGatewayMap returns list of cached gateways
func GetGatewayMap() map[string]gwapiv1b1.Gateway {
return gatewayMap
}

// UpdateGateway caches the gateway
func UpdateGateway(gateway gwapiv1b1.Gateway) {
gatewayMap[k8types.NamespacedName{Name: gateway.Name, Namespace: gateway.Namespace}.String()] = gateway
}

// RemoveGateway removes the gateway from the cache
func RemoveGateway(gateway gwapiv1b1.Gateway) {
delete(gatewayMap, k8types.NamespacedName{Name: gateway.Name, Namespace: gateway.Namespace}.String())
}

// GetAllGatewayListeners return the list of all the listeners that stored in the gateway cache
func GetAllGatewayListeners() []gwapiv1b1.Listener {
listeners := make([]gwapiv1b1.Listener, 0)
for _, gateway := range gatewayMap {
for _, listener := range gateway.Spec.Listeners {
listeners = append(listeners, listener)
}
}
return listeners
}


// GetListenersAsPortalPortMap returns a map that have a structure protocol -> port -> list of listeners for that port and protocol combination
// Data is derived based on the current status of the gatwayMap cache
func GetListenersAsPortalPortMap() map[string]map[uint32][]gwapiv1b1.Listener{
listenersAsPortalPortMap := make(map[string]map[uint32][]gwapiv1b1.Listener)
for _, gateway := range gatewayMap {
for _, listener := range gateway.Spec.Listeners {
protocol := string(listener.Protocol)
port := uint32(listener.Port)
if portMap, portFound := listenersAsPortalPortMap[protocol]; portFound {
if listenersList, listenerListFound := portMap[port]; listenerListFound {
if (listenersList == nil) {
listenersList = []gwapiv1b1.Listener{listener}
} else {
listenersList = append(listenersList, listener)
}
listenersAsPortalPortMap[protocol][port] = listenersList
} else {
listenerList := []gwapiv1b1.Listener{listener}
listenersAsPortalPortMap[protocol][port] = listenerList
}
} else {
listenersAsPortalPortMap[protocol] = make(map[uint32][]gwapiv1b1.Listener)
listenerList := []gwapiv1b1.Listener{listener}
listenersAsPortalPortMap[protocol][port] = listenerList
}
}
}
return listenersAsPortalPortMap
}
41 changes: 40 additions & 1 deletion adapter/internal/discovery/xds/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ package common

import (
"sync"

"fmt"
"regexp"
"strings"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
)

Expand Down Expand Up @@ -92,3 +94,40 @@ func GetNodeIdentifier(request *discovery.DiscoveryRequest) string {
}
return nodeIdentifier
}

// GetEnvoyListenerName prepares the envoy listener name based on the protocol and port
func GetEnvoyListenerName(protocol string, port uint32) string {
return fmt.Sprintf("%s_%d_listener", protocol, port)
}

// GetEnvoyRouteConfigName prepares Envoy route config name based on Gateway spec's listener name and section name
func GetEnvoyRouteConfigName(listenerName string, sectionName string) string {
return fmt.Sprintf("%s_%s", listenerName, sectionName)
}

// FindElement searches for an element in a slice based on a given predicate.
// It returns the element and true if the element was found.
func FindElement[T any](collection []T, predicate func(item T) bool) (T, bool) {
for _, item := range collection {
if predicate(item) {
return item, true
}
}
var dummy T
return dummy, false
}

// MatchesHostname check whether the domain matches the hostname pattern
func MatchesHostname(domain, pattern string) bool {
// Escape special characters in the pattern and replace wildcard with regex pattern
pattern = strings.ReplaceAll(regexp.QuoteMeta(pattern), `\*`, `.*`)
// Append start and end of line anchors
pattern = "^" + pattern + "$"

matched, err := regexp.MatchString(pattern, domain)
if err != nil {
return false
}

return matched
}
89 changes: 69 additions & 20 deletions adapter/internal/discovery/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import (

envoy_resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/wso2/apk/adapter/config"
"github.com/wso2/apk/adapter/internal/dataholder"
"github.com/wso2/apk/adapter/internal/discovery/xds/common"
logger "github.com/wso2/apk/adapter/internal/loggers"
logging "github.com/wso2/apk/adapter/internal/logging"
oasParser "github.com/wso2/apk/adapter/internal/oasparser"
Expand Down Expand Up @@ -67,8 +69,8 @@ type EnvoyInternalAPI struct {

// EnvoyGatewayConfig struct use to hold envoy gateway resources
type EnvoyGatewayConfig struct {
listener *listenerv3.Listener
routeConfig *routev3.RouteConfiguration
listeners []*listenerv3.Listener
routeConfigs []*routev3.RouteConfiguration
clusters []*clusterv3.Cluster
endpoints []*corev3.Address
customRateLimitPolicies []*model.CustomRateLimitPolicy
Expand Down Expand Up @@ -125,6 +127,10 @@ const (
apiController string = "APIController"
)

type envoyRoutesWithSectionName struct {
routes []*routev3.Route
}

func maxRandomBigInt() *big.Int {
return big.NewInt(int64(maxRandomInt))
}
Expand Down Expand Up @@ -399,23 +405,61 @@ func GenerateEnvoyResoucesForGateway(gatewayName string) ([]types.Resource,
}

envoyGatewayConfig, gwFound := gatewayLabelConfigMap[gatewayName]
listener := envoyGatewayConfig.listener
if !gwFound || listener == nil {
listeners := envoyGatewayConfig.listeners
if !gwFound || listeners == nil || len(listeners) == 0 {
return nil, nil, nil, nil, nil
}
routesFromListener := listenerToRouteArrayMap[listener.Name]
var vhostToRouteArrayFilteredMap = make(map[string][]*routev3.Route)
for vhost, routes := range vhostToRouteArrayMap {
if vhost == systemHost || checkRoutes(routes, routesFromListener) {
vhostToRouteArrayFilteredMap[vhost] = routes
routeConfigs := make([]*routev3.RouteConfiguration, 0)
for _, listener := range listeners {
for vhost, routes := range vhostToRouteArrayMap {
matchedListener, found := common.FindElement(dataholder.GetAllGatewayListeners(), func(listenerLocal gwapiv1b1.Listener) bool {
if listenerLocal.Hostname != nil && common.MatchesHostname(vhost, string(*listenerLocal.Hostname)) {
if listener.Name == common.GetEnvoyListenerName(string(listenerLocal.Protocol), uint32(listenerLocal.Port)) {
return true
}
}
return false
})
if found {
// Prepare the route config name based on the gateway listener section name.
routeConfigName := common.GetEnvoyRouteConfigName(listener.Name, string(matchedListener.Name))
routesConfig := oasParser.GetRouteConfigs(map[string][]*routev3.Route{vhost: routes}, routeConfigName, envoyGatewayConfig.customRateLimitPolicies)

routeConfigMatched, alreadyExistsInRouteConfigList := common.FindElement(routeConfigs, func(routeConf *routev3.RouteConfiguration) bool {
if routeConf.Name == routesConfig.Name {
return true
}
return false
})
if alreadyExistsInRouteConfigList {
logger.LoggerAPKOperator.Debugf("Route already exists. %+v", routesConfig.Name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this happen? I didn't get how there could be duplicate routes at this level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this check I faced only a part of routes added to the speific route config. Thats why i added this check

routeConfigMatched.VirtualHosts = append(routeConfigMatched.VirtualHosts, routesConfig.VirtualHosts...)
} else {
routeConfigs = append(routeConfigs, routesConfig)
}
} else {
logger.LoggerAPKOperator.Errorf("Failed to find a matching gateway listener for this vhost: %s", vhost)
}
}
}
routesConfig := oasParser.GetRouteConfigs(vhostToRouteArrayFilteredMap, listener.Name, envoyGatewayConfig.customRateLimitPolicies)
envoyGatewayConfig.routeConfig = routesConfig

// Find gateway listeners that has $systemHost as its hostname and add the system routeConfig referencing those listeners
gatewayListeners := dataholder.GetAllGatewayListeners()
for _, listener := range gatewayListeners {
if systemHost == string(*listener.Hostname) {
var vhostToRouteArrayFilteredMapForSystemEndpoints = make(map[string][]*routev3.Route)
vhostToRouteArrayFilteredMapForSystemEndpoints[systemHost] = vhostToRouteArrayMap[systemHost]
routeConfigName := common.GetEnvoyRouteConfigName(common.GetEnvoyListenerName(string(listener.Protocol), uint32(listener.Port)), string(listener.Name))
systemRoutesConfig := oasParser.GetRouteConfigs(vhostToRouteArrayFilteredMapForSystemEndpoints, routeConfigName, envoyGatewayConfig.customRateLimitPolicies)
routeConfigs = append(routeConfigs, systemRoutesConfig)
}
}

envoyGatewayConfig.routeConfigs = routeConfigs
clusterArray = append(clusterArray, envoyGatewayConfig.clusters...)
endpointArray = append(endpointArray, envoyGatewayConfig.endpoints...)
endpoints, clusters, listeners, routeConfigs := oasParser.GetCacheResources(endpointArray, clusterArray, listener, routesConfig)
return endpoints, clusters, listeners, routeConfigs, apis
generatedListeners, clusters, generatedRouteConfigs, endpoints := oasParser.GetCacheResources(endpointArray, clusterArray, listeners, routeConfigs)
return generatedListeners, clusters, generatedRouteConfigs, endpoints, apis
}

// function to check routes []*routev3.Route equlas routes []*routev3.Route
Expand Down Expand Up @@ -488,7 +532,6 @@ func updateXdsCache(label string, endpoints []types.Resource, clusters []types.R
logger.LoggerXds.ErrorC(logging.PrintError(logging.Error1414, logging.MAJOR, "Error while setting the snapshot : %v", errSetSnap.Error()))
return false
}
logger.LoggerXds.Infof("New Router cache updated for the label: " + label + " version: " + fmt.Sprint(version))
return true
}

Expand Down Expand Up @@ -641,7 +684,7 @@ func RemoveAPIFromOrgAPIMap(uuid string, orgID string) {
}

// UpdateAPICache updates the xDS cache related to the API Lifecycle event.
func UpdateAPICache(vHosts []string, newLabels []string, newlistenersForRoutes []string, adapterInternalAPI model.AdapterInternalAPI) error {
func UpdateAPICache(vHosts []string, newLabels []string, listener string, sectionName string, adapterInternalAPI model.AdapterInternalAPI) error {
mutexForInternalMapUpdate.Lock()
defer mutexForInternalMapUpdate.Unlock()

Expand Down Expand Up @@ -671,6 +714,7 @@ func UpdateAPICache(vHosts []string, newLabels []string, newlistenersForRoutes [

// Create internal mappigs for new vHosts
for _, vHost := range vHosts {
logger.LoggerAPKOperator.Debugf("Creating internal mapping for vhost: %s", vHost)
apiUUID := adapterInternalAPI.UUID
apiIdentifier := GenerateIdentifierForAPIWithUUID(vHost, apiUUID)
var oldLabels []string
Expand Down Expand Up @@ -700,10 +744,15 @@ func UpdateAPICache(vHosts []string, newLabels []string, newlistenersForRoutes [
endpointAddresses: endpoints,
enforcerAPI: oasParser.GetEnforcerAPI(adapterInternalAPI, vHost),
}
if _, ok := listenerToRouteArrayMap[newlistenersForRoutes[0]]; ok {
listenerToRouteArrayMap[newlistenersForRoutes[0]] = append(listenerToRouteArrayMap[newlistenersForRoutes[0]], routes...)
if _, ok := listenerToRouteArrayMap[listener]; ok {
routesList := listenerToRouteArrayMap[listener]
if routesList == nil {
routesList = make([]*routev3.Route, 0)
}
routesList = append(routesList, routes...)
listenerToRouteArrayMap[listener] = routesList
} else {
listenerToRouteArrayMap[newlistenersForRoutes[0]] = routes
listenerToRouteArrayMap[listener] = routes
}

revisionStatus := updateXdsCacheOnAPIChange(oldLabels, newLabels)
Expand All @@ -715,8 +764,8 @@ func UpdateAPICache(vHosts []string, newLabels []string, newlistenersForRoutes [
// UpdateGatewayCache updates the xDS cache related to the Gateway Lifecycle event.
func UpdateGatewayCache(gateway *gwapiv1b1.Gateway, resolvedListenerCerts map[string]map[string][]byte,
gwLuaScript string, customRateLimitPolicies []*model.CustomRateLimitPolicy) error {
listener := oasParser.GetProductionListener(gateway, resolvedListenerCerts, gwLuaScript)
gatewayLabelConfigMap[gateway.Name].listener = listener
listeners := oasParser.GetProductionListener(gateway, resolvedListenerCerts, gwLuaScript)
gatewayLabelConfigMap[gateway.Name].listeners = listeners
conf := config.ReadConfigs()
if conf.Envoy.RateLimit.Enabled {
gatewayLabelConfigMap[gateway.Name].customRateLimitPolicies = customRateLimitPolicies
Expand Down
2 changes: 1 addition & 1 deletion adapter/internal/discovery/xds/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestUpdateAPICache(t *testing.T) {
for _, label := range test.labels {
SanitizeGateway(label, true)
}
UpdateAPICache(test.vHosts, test.labels, test.listeners, test.adapterInternalAPI)
UpdateAPICache(test.vHosts, test.labels, test.listeners[0], "httpslistener", test.adapterInternalAPI)
identifier := GetvHostsIdentifier(test.adapterInternalAPI.UUID, "prod")
actualvHosts, ok := orgIDAPIvHostsMap[test.adapterInternalAPI.OrganizationID][identifier]
if !ok {
Expand Down
18 changes: 12 additions & 6 deletions adapter/internal/oasparser/config_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func GetGlobalClusters() ([]*clusterv3.Cluster, []*corev3.Address) {
// The provided set of envoy routes will be assigned under the virtual host
//
// The RouteConfiguration is named as "default"
func GetProductionListener(gateway *gwapiv1b1.Gateway, resolvedListenerCerts map[string]map[string][]byte, gwLuaScript string) *listenerv3.Listener {
func GetProductionListener(gateway *gwapiv1b1.Gateway, resolvedListenerCerts map[string]map[string][]byte, gwLuaScript string) []*listenerv3.Listener {
listeners := envoy.CreateListenerByGateway(gateway, resolvedListenerCerts, gwLuaScript)
return listeners
}
Expand All @@ -98,10 +98,10 @@ func GetProductionListener(gateway *gwapiv1b1.Gateway, resolvedListenerCerts map
// The provided set of envoy routes will be assigned under the virtual host
//
// The RouteConfiguration is named as "default"
func GetRouteConfigs(vhostToRouteArrayMap map[string][]*routev3.Route, httpListener string,
func GetRouteConfigs(vhostToRouteArrayMap map[string][]*routev3.Route, routeConfigName string,
customRateLimitPolicies []*model.CustomRateLimitPolicy) *routev3.RouteConfiguration {
vHosts := envoy.CreateVirtualHosts(vhostToRouteArrayMap, customRateLimitPolicies)
routeConfig := envoy.CreateRoutesConfigForRds(vHosts, httpListener)
routeConfig := envoy.CreateRoutesConfigForRds(vHosts, routeConfigName)
return routeConfig
}

Expand All @@ -110,7 +110,7 @@ func GetRouteConfigs(vhostToRouteArrayMap map[string][]*routev3.Route, httpListe
//
// The returned resources are listeners, clusters, routeConfigurations, endpoints
func GetCacheResources(endpoints []*corev3.Address, clusters []*clusterv3.Cluster,
listeners *listenerv3.Listener, routeConfig *routev3.RouteConfiguration) (
listeners []*listenerv3.Listener, routeConfigs []*routev3.RouteConfiguration) (
listenerRes []types.Resource, clusterRes []types.Resource, routeConfigRes []types.Resource,
endpointRes []types.Resource) {

Expand All @@ -122,8 +122,14 @@ func GetCacheResources(endpoints []*corev3.Address, clusters []*clusterv3.Cluste
for _, endpoint := range endpoints {
endpointRes = append(endpointRes, endpoint)
}
listenerRes = []types.Resource{listeners}
routeConfigRes = []types.Resource{routeConfig}
listenerRes = []types.Resource{}
for _, listener := range listeners {
listenerRes = append(listenerRes, listener)
}
routeConfigRes = []types.Resource{}
for _, routeConfig := range routeConfigs {
routeConfigRes = append(routeConfigRes, routeConfig)
}
return listenerRes, clusterRes, routeConfigRes, endpointRes
}

Expand Down
4 changes: 0 additions & 4 deletions adapter/internal/oasparser/envoyconf/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ const (
compressorFilterName string = "envoy.filters.http.compressor"
)

const (
defaultHTTPSListenerName string = "httpslistener"
)

// cluster prefixes
const (
requestInterceptClustersNamePrefix string = "reqInterceptor"
Expand Down
Loading
Loading