From d83222933d861d3c827c358451c8d82bacb72248 Mon Sep 17 00:00:00 2001 From: Hongliang Liu <75655411+hongliangl@users.noreply.github.com> Date: Fri, 30 Apr 2021 04:33:09 +0800 Subject: [PATCH] Extends the Endpoints support from 500 to 800, extra ones will be dropped in AntreaProxy (#2101) For #2092 Due to the message size and the implementation of Service in AntreaProxy, the maximum number of Endpoints that AntreaProxy can support now is 800. If the the number of Endpoints in given Service exceeds 800, the extra Endpoints will be dropped and a warning will be logged. In AntreaProxy, OVS group is the key part of Service implementation. For now, Antrea is using Openflow 1.3 to communicate with OVS. In previous design, every bucket of a OVS group has five actions. Two actions for loading Endpoint IP and port to registers and resubmit action must be preserved.The other two actions for loading values to register can be moved to flows (in current patch, they are moved to table 41), and then one message can hold more bucket items. As a result, the maximum Endpoint has changed from 511 to 800. Unfortunately, to ensure AntreaProxy running correctly, the extra Endpoints will be dropped. --- docs/feature-gates.md | 6 +++- pkg/agent/openflow/client.go | 3 +- pkg/agent/openflow/pipeline.go | 18 +++++----- pkg/agent/proxy/endpoints.go | 13 +++++++ pkg/agent/proxy/proxier.go | 48 ++++++++++++++++++++++--- test/e2e/proxy_test.go | 4 +-- test/integration/agent/openflow_test.go | 9 +++-- 7 files changed, 83 insertions(+), 18 deletions(-) diff --git a/docs/feature-gates.md b/docs/feature-gates.md index f9e2edef68b..40d6f25404d 100644 --- a/docs/feature-gates.md +++ b/docs/feature-gates.md @@ -46,7 +46,11 @@ example, to enable `AntreaProxy` on Linux, edit the Agent configuration in the `AntreaProxy` implements Service load-balancing for ClusterIP Services as part of the OVS pipeline, as opposed to relying on kube-proxy. This only applies to traffic originating from Pods, and destined to ClusterIP Services. In -particular, it does not apply to NodePort Services. +particular, it does not apply to NodePort Services. Please note that due to +some restrictions on the implementation of Services in Antrea, the maximum +number of Endpoints that Antrea can support at the moment is 800. If the +number of Endpoints for a given Service exceeds 800, extra Endpoints will +be dropped. Note that this feature must be enabled for Windows. The Antrea Windows YAML manifest provided as part of releases enables this feature by default. If you diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 51da513f161..b45a06974bc 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -384,6 +384,7 @@ func (c *client) GetPodFlowKeys(interfaceName string) []string { func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints []proxy.Endpoint) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() + group := c.serviceEndpointGroup(groupID, withSessionAffinity, endpoints...) if err := group.Add(); err != nil { return fmt.Errorf("error when installing Service Endpoints Group: %w", err) @@ -443,7 +444,7 @@ func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, c.replayMutex.RLock() defer c.replayMutex.RUnlock() var flows []binding.Flow - flows = append(flows, c.serviceLBFlow(groupID, svcIP, svcPort, protocol)) + flows = append(flows, c.serviceLBFlow(groupID, svcIP, svcPort, protocol, affinityTimeout != 0)) if affinityTimeout != 0 { flows = append(flows, c.serviceLearnFlow(groupID, svcIP, svcPort, protocol, affinityTimeout)) } diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 285acacc8fc..7b786b7b510 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -1658,12 +1658,21 @@ func (c *client) serviceLearnFlow(groupID binding.GroupIDType, svcIP net.IP, svc // serviceLBFlow generates the flow which uses the specific group to do Endpoint // selection. -func (c *client) serviceLBFlow(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol) binding.Flow { +func (c *client) serviceLBFlow(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, withSessionAffinity bool) binding.Flow { + var lbResultMark uint32 + if withSessionAffinity { + lbResultMark = marksRegServiceNeedLearn + } else { + lbResultMark = marksRegServiceSelected + } + return c.pipeline[serviceLBTable].BuildFlow(priorityNormal). MatchProtocol(protocol). MatchDstPort(svcPort, nil). MatchDstIP(svcIP). MatchRegRange(int(serviceLearnReg), marksRegServiceNeedLB, serviceLearnRegRange). + Action().LoadRegRange(int(serviceLearnReg), lbResultMark, serviceLearnRegRange). + Action().LoadRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange). Action().Group(groupID). Cookie(c.cookieAllocator.Request(cookie.Service).Raw()). Done() @@ -1729,13 +1738,10 @@ func (c *client) hairpinSNATFlow(endpointIP net.IP) binding.Flow { func (c *client) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints ...proxy.Endpoint) binding.Group { group := c.bridge.CreateGroup(groupID).ResetBuckets() var resubmitTableID binding.TableIDType - var lbResultMark uint32 if withSessionAffinity { resubmitTableID = serviceLBTable - lbResultMark = marksRegServiceNeedLearn } else { resubmitTableID = endpointDNATTable - lbResultMark = marksRegServiceSelected } for _, endpoint := range endpoints { @@ -1748,8 +1754,6 @@ func (c *client) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAf group = group.Bucket().Weight(100). LoadReg(int(endpointIPReg), ipVal). LoadRegRange(int(endpointPortReg), uint32(portVal), endpointPortRegRange). - LoadRegRange(int(serviceLearnReg), lbResultMark, serviceLearnRegRange). - LoadRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange). ResubmitToTable(resubmitTableID). Done() } else if ipProtocol == binding.ProtocolIPv6 { @@ -1757,8 +1761,6 @@ func (c *client) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAf group = group.Bucket().Weight(100). LoadXXReg(int(endpointIPv6XXReg), ipVal). LoadRegRange(int(endpointPortReg), uint32(portVal), endpointPortRegRange). - LoadRegRange(int(serviceLearnReg), lbResultMark, serviceLearnRegRange). - LoadRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange). ResubmitToTable(resubmitTableID). Done() } diff --git a/pkg/agent/proxy/endpoints.go b/pkg/agent/proxy/endpoints.go index 456fdff2485..6504d7f6ae4 100644 --- a/pkg/agent/proxy/endpoints.go +++ b/pkg/agent/proxy/endpoints.go @@ -175,3 +175,16 @@ func (t *endpointsChangesTracker) Update(em types.EndpointsMap) { } } } + +// byEndpoint helps sort Endpoint +type byEndpoint []k8sproxy.Endpoint + +func (p byEndpoint) Len() int { + return len(p) +} +func (p byEndpoint) Swap(i, j int) { + p[i], p[j] = p[j], p[i] +} +func (p byEndpoint) Less(i, j int) bool { + return p[i].String() < p[j].String() +} diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 3af683a7394..714e2f19ed8 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -17,11 +17,13 @@ package proxy import ( "fmt" "net" + "sort" "sync" "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" "k8s.io/client-go/tools/record" "k8s.io/klog" @@ -38,6 +40,10 @@ import ( const ( resyncPeriod = time.Minute componentName = "antrea-agent-proxy" + // Due to the maximum message size in Openflow 1.3 and the implementation of Services in Antrea, the maximum number + // of Endpoints that Antrea can support at the moment is 800. If the number of Endpoints for a given Service exceeds + // 800, extra Endpoints will be dropped. + maxEndpoints = 800 ) // TODO: Add metrics @@ -67,6 +73,8 @@ type proxier struct { serviceStringMap map[string]k8sproxy.ServicePortName // serviceStringMapMutex protects serviceStringMap object. serviceStringMapMutex sync.Mutex + // oversizeServiceSet records the Services that have more than 800 Endpoints. + oversizeServiceSet sets.String runner *k8sproxy.BoundedFrequencyRunner stopChan <-chan struct{} @@ -94,6 +102,9 @@ func (p *proxier) removeStaleServices() { } svcInfo := svcPort.(*types.ServiceInfo) klog.V(2).Infof("Removing stale Service: %s %s", svcPortName.Name, svcInfo.String()) + if p.oversizeServiceSet.Has(svcPortName.String()) { + p.oversizeServiceSet.Delete(svcPortName.String()) + } if err := p.ofClient.UninstallServiceFlows(svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil { klog.Errorf("Failed to remove flows of Service %v: %v", svcPortName, err) continue @@ -235,12 +246,40 @@ func (p *proxier) installServices() { } var endpointUpdateList []k8sproxy.Endpoint - for _, endpoint := range endpoints { // Check if there is any installed Endpoint which is not expected anymore. - if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed. - needUpdateEndpoints = true + if len(endpoints) > maxEndpoints { + if !p.oversizeServiceSet.Has(svcPortName.String()) { + klog.Warningf("Since Endpoints of Service %s exceeds %d, extra Endpoints will be dropped", svcPortName.String(), maxEndpoints) + p.oversizeServiceSet.Insert(svcPortName.String()) + } + // If the length of endpoints > maxEndpoints, endpoints should be cut. However, endpoints is a map. Therefore, + // iterate the map and append every Endpoint to a slice endpointList. Since the iteration order of map in + // Golang is random, if cut directly without any sorting, some Endpoints may not be installed. So cutting + // slice endpointList after sorting can avoid this situation in some degree. + var endpointList []k8sproxy.Endpoint + for _, endpoint := range endpoints { + endpointList = append(endpointList, endpoint) + } + sort.Sort(byEndpoint(endpointList)) + endpointList = endpointList[:maxEndpoints] + + for _, endpoint := range endpointList { // Check if there is any installed Endpoint which is not expected anymore. + if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed. + needUpdateEndpoints = true + } + endpointUpdateList = append(endpointUpdateList, endpoint) + } + } else { + if p.oversizeServiceSet.Has(svcPortName.String()) { + p.oversizeServiceSet.Delete(svcPortName.String()) + } + for _, endpoint := range endpoints { // Check if there is any installed Endpoint which is not expected anymore. + if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed. + needUpdateEndpoints = true + } + endpointUpdateList = append(endpointUpdateList, endpoint) } - endpointUpdateList = append(endpointUpdateList, endpoint) } + if len(endpoints) < len(endpointsInstalled) { // There are Endpoints which expired. klog.V(2).Infof("Some Endpoints of Service %s removed, updating Endpoints", svcInfo.String()) needUpdateEndpoints = true @@ -468,6 +507,7 @@ func NewProxier( endpointsMap: types.EndpointsMap{}, endpointReferenceCounter: map[string]int{}, serviceStringMap: map[string]k8sproxy.ServicePortName{}, + oversizeServiceSet: sets.NewString(), groupCounter: types.NewGroupCounter(), ofClient: ofClient, isIPv6: isIPv6, diff --git a/test/e2e/proxy_test.go b/test/e2e/proxy_test.go index ea7688ae841..75cbacdf589 100644 --- a/test/e2e/proxy_test.go +++ b/test/e2e/proxy_test.go @@ -284,9 +284,9 @@ func testProxyServiceLifeCycle(ipFamily *corev1.IPFamily, ingressIPs []string, d var groupKeyword string if *ipFamily == corev1.IPv6Protocol { - groupKeyword = fmt.Sprintf("set_field:0x%s->xxreg3,load:0x%x->NXM_NX_REG4[0..15],load:0x2->NXM_NX_REG4[16..18]", strings.TrimLeft(hex.EncodeToString(nginxIPs.ipv6.To16()), "0"), 80) + groupKeyword = fmt.Sprintf("set_field:0x%s->xxreg3,load:0x%x->NXM_NX_REG4[0..15]", strings.TrimLeft(hex.EncodeToString(nginxIPs.ipv6.To16()), "0"), 80) } else { - groupKeyword = fmt.Sprintf("load:0x%s->NXM_NX_REG3[],load:0x%x->NXM_NX_REG4[0..15],load:0x2->NXM_NX_REG4[16..18]", strings.TrimLeft(hex.EncodeToString(nginxIPs.ipv4.To4()), "0"), 80) + groupKeyword = fmt.Sprintf("load:0x%s->NXM_NX_REG3[],load:0x%x->NXM_NX_REG4[0..15]", strings.TrimLeft(hex.EncodeToString(nginxIPs.ipv4.To4()), "0"), 80) } groupOutput, _, err := data.runCommandFromPod(metav1.NamespaceSystem, agentName, "antrea-agent", []string{"ovs-ofctl", "dump-groups", defaultBridgeName}) require.NoError(t, err) diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index 274f243f90c..58c91ddcc5f 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -594,11 +594,16 @@ func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList [ nw_proto = 132 learnProtoField = "OXM_OF_SCTP_DST[]" } + + serviceLearnReg := 2 + if stickyAge != 0 { + serviceLearnReg = 3 + } cookieAllocator := cookie.NewAllocator(roundInfo.RoundNum) svcFlows := expectTableFlows{tableID: 41, flows: []*ofTestUtils.ExpectFlow{ { MatchStr: fmt.Sprintf("priority=200,%s,reg4=0x10000/0x70000,nw_dst=%s,tp_dst=%d", string(svc.protocol), svc.ip.String(), svc.port), - ActStr: fmt.Sprintf("group:%d", gid), + ActStr: fmt.Sprintf("load:0x%x->NXM_NX_REG4[16..18],load:0x1->NXM_NX_REG0[19],group:%d", serviceLearnReg, gid), }, { MatchStr: fmt.Sprintf("priority=190,%s,reg4=0x30000/0x70000,nw_dst=%s,tp_dst=%d", string(svc.protocol), svc.ip.String(), svc.port), @@ -611,7 +616,7 @@ func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList [ for _, ep := range endpointList { epIP := ipToHexString(net.ParseIP(ep.IP())) epPort, _ := ep.Port() - bucket := fmt.Sprintf("weight:100,actions=load:%s->NXM_NX_REG3[],load:0x%x->NXM_NX_REG4[0..15],load:0x2->NXM_NX_REG4[16..18],load:0x1->NXM_NX_REG0[19],resubmit(,42)", epIP, epPort) + bucket := fmt.Sprintf("weight:100,actions=load:%s->NXM_NX_REG3[],load:0x%x->NXM_NX_REG4[0..15],resubmit(,42)", epIP, epPort) groupBuckets = append(groupBuckets, bucket) unionVal := (0b010 << 16) + uint32(epPort)