Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Choreo] Upstream connection handling support considering TCP and HTTP #3625

Open
wants to merge 3 commits into
base: choreo
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions adapter/cmd/adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func startMicroGateway() {
logger.Fatal("Error starting the adapter", err)
}
conf, errReadConfig := config.ReadConfigs()
config.GetTCPKeepaliveEnabledOrgs()
if errReadConfig != nil {
logger.Fatal("Error loading configuration. ", errReadConfig)
}
Expand Down
8 changes: 8 additions & 0 deletions adapter/config/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,14 @@ var defaultConfig = &Config{
MaxRetries: 50,
},
},
TCPConfigurations: upstreamTCPConfigs{
KeepaliveTimeInMillis: 120000,
KeepaliveProbes: 9,
KeepaliveIntervalInMillis: 75000,
},
HTTPConfigurations: upstreamHTTPConfigs{
IdleTimeoutInMillis: 120000,
},
},
Connection: connection{
Timeouts: connectionTimeouts{
Expand Down
21 changes: 16 additions & 5 deletions adapter/config/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ import (
)

var (
onceConfigRead sync.Once
onceGetDefaultVhost sync.Once
adapterConfig *Config
defaultVhost map[string]string
e error
onceConfigRead sync.Once
onceGetDefaultVhost sync.Once
adapterConfig *Config
defaultVhost map[string]string
e error
// UpstreamConnectionConfEnabledOrgList is the list of orgs that need to handle connection timeouts
UpstreamConnectionConfEnabledOrgList []string
)

// DefaultGatewayName represents the name of the default gateway
Expand Down Expand Up @@ -255,3 +257,12 @@ func (config *Config) validateConfig() error {
func printDeprecatedWarningLog(deprecatedTerm, currentTerm string) {
logger.Warnf("%s is deprecated. Use %s instead", deprecatedTerm, currentTerm)
}

// GetTCPKeepaliveEnabledOrgs returns the list of orgs that need to handle connection timeouts
func GetTCPKeepaliveEnabledOrgs() {
orgs := os.Getenv("TCP_KEEPALIVE_ENABLED_ORGS")
UpstreamConnectionConfEnabledOrgList = strings.Split(orgs, ",")
if len(UpstreamConnectionConfEnabledOrgList) == 0 {
UpstreamConnectionConfEnabledOrgList[0] = ""
}
}
23 changes: 17 additions & 6 deletions adapter/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,14 @@ type globalCors struct {
// Envoy Upstream Related Configurations
type envoyUpstream struct {
// UpstreamTLS related Configuration
TLS upstreamTLS
Timeouts upstreamTimeout
Health upstreamHealth
DNS upstreamDNS
Retry upstreamRetry
CircuitBreakers []upstreamCircuitBreaker
TLS upstreamTLS
Timeouts upstreamTimeout
Health upstreamHealth
DNS upstreamDNS
Retry upstreamRetry
CircuitBreakers []upstreamCircuitBreaker
TCPConfigurations upstreamTCPConfigs
HTTPConfigurations upstreamHTTPConfigs
}

type upstreamTLS struct {
Expand Down Expand Up @@ -276,6 +278,15 @@ type dnsResolverConfig struct {
CAres cAres
}

type upstreamTCPConfigs struct {
KeepaliveTimeInMillis uint32
KeepaliveProbes uint32
KeepaliveIntervalInMillis uint32
}
type upstreamHTTPConfigs struct {
IdleTimeoutInMillis uint32
}

type dnsResolverType string

const (
Expand Down
6 changes: 5 additions & 1 deletion adapter/internal/api/apis_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,11 @@ func ProcessMountedAPIProjects() (err error) {

func validateAndUpdateXds(apiProject mgw.ProjectAPI, override *bool) (err error) {
apiYaml := apiProject.APIYaml.Data
apiProject.OrganizationID = config.GetControlPlaneConnectedTenantDomain()
if (apiProject.APIYaml.Data.OrganizationID != "") {
apiProject.OrganizationID = apiProject.APIYaml.Data.OrganizationID
} else {
apiProject.OrganizationID = config.GetControlPlaneConnectedTenantDomain()
}

// handle panic
defer func() {
Expand Down
32 changes: 32 additions & 0 deletions adapter/internal/oasparser/envoyconf/routes_with_clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ func CreateRoutesWithClusters(mgwSwagger model.MgwSwagger, upstreamCerts map[str
if !strings.Contains(apiLevelEndpointProd.EndpointPrefix, xWso2EPClustersConfigNamePrefix) {
cluster, address, err := processEndpoints(apiLevelClusterNameProd, apiLevelEndpointProd,
upstreamCerts, timeout, apiLevelbasePath)
// assigns specified values for TCP keep-alive and HTTP timeout considering specific organizations
ok := slices.Contains(config.UpstreamConnectionConfEnabledOrgList, organizationID) || slices.Contains(config.UpstreamConnectionConfEnabledOrgList, "*")
if ok {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • support

getKeepAliveConfigs(cluster, conf)
}
if err != nil {
apiLevelClusterNameProd = ""
logger.LoggerOasparser.Errorf("Error while adding api level production endpoints for %s. %v , skipping api...", apiTitle, err.Error())
Expand Down Expand Up @@ -343,6 +348,33 @@ func getClusterName(epPrefix string, organizationID string, vHost string, swagge
swaggerVersion)
}

func getKeepAliveConfigs(cluster *clusterv3.Cluster, conf *config.Config) {
cluster.UpstreamConnectionOptions = &clusterv3.UpstreamConnectionOptions{
TcpKeepalive: &corev3.TcpKeepalive{
KeepaliveProbes: wrapperspb.UInt32(conf.Envoy.Upstream.TCPConfigurations.KeepaliveProbes),
KeepaliveInterval: wrapperspb.UInt32(conf.Envoy.Upstream.TCPConfigurations.KeepaliveIntervalInMillis / 1000),
KeepaliveTime: wrapperspb.UInt32(conf.Envoy.Upstream.TCPConfigurations.KeepaliveTimeInMillis / 1000),
},
}

config := &upstreams.HttpProtocolOptions{
CommonHttpProtocolOptions: &corev3.HttpProtocolOptions{
IdleTimeout: durationpb.New(time.Duration(conf.Envoy.Upstream.HTTPConfigurations.IdleTimeoutInMillis) * time.Millisecond),
},
UpstreamProtocolOptions: &upstreams.HttpProtocolOptions_UseDownstreamProtocolConfig{},
}
MarshalledHTTPProtocolOptions, err := proto.Marshal(config)
if err != nil {
logger.LoggerOasparser.Error("Error while marshalling the upstream TCP keep alive config")
}
cluster.TypedExtensionProtocolOptions = map[string]*anypb.Any{
"envoy.extensions.upstreams.http.v3.HttpProtocolOptions": {
TypeUrl: httpProtocolOptionsName,
Value: MarshalledHTTPProtocolOptions,
},
}
}

// CreateLuaCluster creates lua cluster configuration.
func CreateLuaCluster(interceptorCerts map[string][]byte, endpoint model.InterceptEndpoint) (*clusterv3.Cluster, []*corev3.Address, error) {
logger.LoggerOasparser.Debug("creating a lua cluster ", endpoint.ClusterName)
Expand Down
14 changes: 14 additions & 0 deletions resources/conf/config.toml.template
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,20 @@ retainKeys = ["self_validate_jwt", "issuer", "claim_mappings", "consumer_key_cla
# Max interval for the Envoy's exponential retry back off algorithm
maxInterval = "500ms"

# TCP configurations applicable with the upstream clusters
[router.upstream.tCPConfigurations]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[router.upstream.tCPConfigurations]
[router.upstream.tcpConfigurations]

# The number of milliseconds a connection needs to be idle before keep-alive probes start being sent
keepaliveTimeInMillis = 120000
# Maximum number of keep-alive probes to send without response before deciding the connection is dead
keepaliveProbes = 9

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too high again?

Suggested change
keepaliveProbes = 9
keepaliveProbes = 3

# The number of milliseconds between keep-alive probes
keepaliveIntervalInMillis = 75000

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this too high? Wouldn't something like 30s be better?

Suggested change
keepaliveIntervalInMillis = 75000
keepaliveIntervalInMillis = 30000


# HTTP configurations applicable with the upstream clusters
[router.upstream.hTTPConfigurations]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[router.upstream.hTTPConfigurations]
[router.upstream.httpConfigurations]

# Idle timeout in milliseconds for connections
idleTimeoutInMillis = 120000

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too low for default. We have to set a value which is more than max response timeout (300s)

Suggested change
idleTimeoutInMillis = 120000
idleTimeoutInMillis = 360000


# Timeouts managed by the connection manager
[router.connectionTimeout]
# The amount of time that Envoy will wait for the entire request to be received. Time from client to upstream.
Expand Down
Loading