Skip to content

Commit

Permalink
Fix #2092
Browse files Browse the repository at this point in the history
  • Loading branch information
hongliangl committed Apr 20, 2021
1 parent b675264 commit 275ef43
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 45 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/confluentinc/bincover v0.1.0
github.com/containernetworking/cni v0.8.0
github.com/containernetworking/plugins v0.8.7
github.com/contiv/libOpenflow v0.0.0-20210312221048-1d504242120d
github.com/contiv/libOpenflow v0.0.0-20210416161057-27e68f0e5fce
github.com/contiv/ofnet v0.0.0-00010101000000-000000000000
github.com/coreos/go-iptables v0.4.5
github.com/elazarl/goproxy v0.0.0-20190911111923-ecfe977594f1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ github.com/containernetworking/cni v0.8.0/go.mod h1:LGwApLUm2FpoOfxTDEeq8T9ipbpZ
github.com/containernetworking/plugins v0.8.7 h1:bU7QieuAp+sACI2vCzESJ3FoT860urYP+lThyZkb/2M=
github.com/containernetworking/plugins v0.8.7/go.mod h1:R7lXeZaBzpfqapcAbHRW8/CYwm0dHzbz0XEjofx0uB0=
github.com/contiv/libOpenflow v0.0.0-20201014051314-c1702744526c/go.mod h1:DtsPlJOByJZ+MO9YITEGUlbJ/jfh/ef0qeNyBYaeNR4=
github.com/contiv/libOpenflow v0.0.0-20210312221048-1d504242120d h1:qHFOB6hTrpBzbYdhOMyQwHdX7XfLGGoqN/NfgDrSEXg=
github.com/contiv/libOpenflow v0.0.0-20210312221048-1d504242120d/go.mod h1:DtsPlJOByJZ+MO9YITEGUlbJ/jfh/ef0qeNyBYaeNR4=
github.com/contiv/libOpenflow v0.0.0-20210416161057-27e68f0e5fce h1:pcwRbFTaAKuEUWkLM6Ywm/s75GcWmp4bPUXlXQS1V8s=
github.com/contiv/libOpenflow v0.0.0-20210416161057-27e68f0e5fce/go.mod h1:DtsPlJOByJZ+MO9YITEGUlbJ/jfh/ef0qeNyBYaeNR4=
github.com/contiv/libovsdb v0.0.0-20170227191248-d0061a53e358 h1:AiA9SKyNXulsU7aAnyka3UFHYOIH00A9HvdIRnDXlg0=
github.com/contiv/libovsdb v0.0.0-20170227191248-d0061a53e358/go.mod h1:+qKEHaNVPj+wrn5st7TEFH9wcUWCJq5ZBvVKPQwzAeg=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
Expand Down
38 changes: 35 additions & 3 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,18 @@ type Client interface {

// InstallServiceGroup installs a group for Service LB. Each endpoint
// is a bucket of the group. For now, each bucket has the same weight.
InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints []proxy.Endpoint) error
InstallServiceGroup(groupID binding.GroupIDType, endpoints []proxy.Endpoint) error
// UninstallServiceGroup removes the group and its buckets that are
// installed by InstallServiceGroup.
UninstallServiceGroup(groupID binding.GroupIDType) error

// InstallEndpointRegSetFlows installs flows that set registers to make service group action shorter, thus more items
// can be included in a add-group message.
InstallEndpointRegSetFlows(withSessionAffinity bool, clusterIP net.IP, port int, endpoints []proxy.Endpoint) error

// UninstallEndpointRegSetFlows removes the flows installed by InstallEndpointRegSetFlows.
UninstallEndpointRegSetFlows(clusterIP net.IP, port int) error

// InstallEndpointFlows installs flows for accessing Endpoints.
// If an Endpoint is on the current Node, then flows for hairpin and endpoint
// L2 forwarding should also be installed.
Expand Down Expand Up @@ -431,13 +438,14 @@ func (c *client) GetPodFlowKeys(interfaceName string) []string {
return c.getFlowKeysFromCache(c.podFlowCache, interfaceName)
}

func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints []proxy.Endpoint) error {
func (c *client) InstallServiceGroup(groupID binding.GroupIDType, endpoints []proxy.Endpoint) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
group := c.serviceEndpointGroup(groupID, withSessionAffinity, endpoints...)
group := c.serviceEndpointGroup(groupID, endpoints...)
if err := group.Add(); err != nil {
return fmt.Errorf("error when installing Service Endpoints Group: %w", err)
}

c.groupCache.Store(groupID, group)
return nil
}
Expand All @@ -452,6 +460,29 @@ func (c *client) UninstallServiceGroup(groupID binding.GroupIDType) error {
return nil
}

func (c *client) InstallEndpointRegSetFlows(withSessionAffinity bool, clusterIP net.IP, port int, endpoints []proxy.Endpoint) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
var flows []binding.Flow
for _, endpoint := range endpoints {
flows = append(flows, c.endpointRegSetFlow(clusterIP, port, withSessionAffinity, endpoint))
}
cacheKey := fmt.Sprintf("%s-%d", clusterIP, port)
if err := c.addFlows(c.epRegSetFlowCache, cacheKey, flows); err != nil {
return err
}

return nil
}

func (c *client) UninstallEndpointRegSetFlows(clusterIP net.IP, port int) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()

cacheKey := fmt.Sprintf("%s-%d", clusterIP, port)
return c.deleteFlows(c.epRegSetFlowCache, cacheKey)
}

func generateEndpointFlowCacheKey(endpointIP string, endpointPort int, protocol binding.Protocol) string {
return fmt.Sprintf("E%s%s%x", endpointIP, protocol, endpointPort)
}
Expand Down Expand Up @@ -762,6 +793,7 @@ func (c *client) ReplayFlows() {
c.nodeFlowCache.Range(installCachedFlows)
c.podFlowCache.Range(installCachedFlows)
c.serviceFlowCache.Range(installCachedFlows)
c.epRegSetFlowCache.Range(installCachedFlows)

c.replayPolicyFlows()
}
Expand Down
58 changes: 36 additions & 22 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ const (
sessionAffinityTable binding.TableIDType = 40
dnatTable binding.TableIDType = 40
serviceLBTable binding.TableIDType = 41
endpointDNATTable binding.TableIDType = 42
endpointRegSetTable binding.TableIDType = 42
endpointDNATTable binding.TableIDType = 43
AntreaPolicyEgressRuleTable binding.TableIDType = 45
DefaultTierEgressRuleTable binding.TableIDType = 49
EgressRuleTable binding.TableIDType = 50
Expand Down Expand Up @@ -116,6 +117,7 @@ var (
{dnatTable, "DNAT(SessionAffinity)"},
{sessionAffinityTable, "SessionAffinity"},
{serviceLBTable, "ServiceLB"},
{endpointRegSetTable, "endpointRegSetTable"},
{endpointDNATTable, "EndpointDNAT"},
{AntreaPolicyEgressRuleTable, "AntreaPolicyEgressRule"},
{EgressRuleTable, "EgressRule"},
Expand Down Expand Up @@ -362,7 +364,7 @@ type client struct {
ingressEntryTable binding.TableIDType
pipeline map[binding.TableIDType]binding.Table
// Flow caches for corresponding deletions.
nodeFlowCache, podFlowCache, serviceFlowCache, snatFlowCache, tfFlowCache *flowCategoryCache
nodeFlowCache, podFlowCache, serviceFlowCache, snatFlowCache, tfFlowCache, epRegSetFlowCache *flowCategoryCache
// "fixed" flows installed by the agent after initialization and which do not change during
// the lifetime of the client.
gatewayFlows, defaultServiceFlows, defaultTunnelFlows, hostNetworkingFlows []binding.Flow
Expand Down Expand Up @@ -1971,46 +1973,57 @@ func (c *client) hairpinSNATFlow(endpointIP net.IP) binding.Flow {
// serviceLBTable to trigger the learn flow, the learn flow will then send packets
// to endpointDNATTable. Otherwise, buckets will resubmit packets to
// endpointDNATTable directly.
func (c *client) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints ...proxy.Endpoint) binding.Group {
func (c *client) serviceEndpointGroup(groupID binding.GroupIDType, endpoints ...proxy.Endpoint) binding.Group {
group := c.bridge.CreateGroup(groupID).ResetBuckets()
var resubmitTableID binding.TableIDType
var lbResultMark uint32
if withSessionAffinity {
resubmitTableID = serviceLBTable
lbResultMark = marksRegServiceNeedLearn
} else {
resubmitTableID = endpointDNATTable
lbResultMark = marksRegServiceSelected
}

for _, endpoint := range endpoints {
endpointPort, _ := endpoint.Port()
endpointIP := net.ParseIP(endpoint.IP())
portVal := portToUint16(endpointPort)
ipProtocol := getIPProtocol(endpointIP)
if ipProtocol == binding.ProtocolIP {
ipVal := binary.BigEndian.Uint32(endpointIP.To4())
group = group.Bucket().Weight(100).
LoadReg(int(endpointIPReg), ipVal).
LoadRegRange(int(endpointPortReg), uint32(portVal), endpointPortRegRange).
LoadRegRange(int(serviceLearnReg), lbResultMark, serviceLearnRegRange).
LoadRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange).
ResubmitToTable(resubmitTableID).
ResubmitToTable(endpointRegSetTable).
Done()
} else if ipProtocol == binding.ProtocolIPv6 {
ipVal := []byte(endpointIP)
group = group.Bucket().Weight(100).
LoadXXReg(int(endpointIPv6XXReg), ipVal).
LoadRegRange(int(endpointPortReg), uint32(portVal), endpointPortRegRange).
LoadRegRange(int(serviceLearnReg), lbResultMark, serviceLearnRegRange).
LoadRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange).
ResubmitToTable(resubmitTableID).
ResubmitToTable(endpointRegSetTable).
Done()
}
}
return group
}

// endpointRegSetFlow generates the flow which sets endpoint port, service learn status and marks registers.
func (c *client) endpointRegSetFlow(clusterIP net.IP, port int, withSessionAffinity bool, endpoint proxy.Endpoint) binding.Flow {
var resubmitTableID binding.TableIDType
var lbResultMark uint32
if withSessionAffinity {
resubmitTableID = serviceLBTable
lbResultMark = marksRegServiceNeedLearn
} else {
resubmitTableID = endpointDNATTable
lbResultMark = marksRegServiceSelected
}

endpointPort, _ := endpoint.Port()
endpointIP := net.ParseIP(endpoint.IP())
portVal := portToUint16(endpointPort)
ipProtocol := getIPProtocol(endpointIP)
return c.pipeline[endpointRegSetTable].BuildFlow(priorityNormal).
Cookie(c.cookieAllocator.Request(cookie.Service).Raw()).
MatchProtocol(ipProtocol).
MatchDstIP(clusterIP).
MatchDstPort(portToUint16(port), nil).
Action().LoadRegRange(int(endpointPortReg), uint32(portVal), endpointPortRegRange).
Action().LoadRegRange(int(serviceLearnReg), lbResultMark, serviceLearnRegRange).
Action().LoadRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange).
Action().GotoTable(resubmitTableID).
Done()
}

// decTTLFlows decrements TTL by one for the packets forwarded across Nodes.
// The TTL decrement should be skipped for the packets which enter OVS pipeline
// from the gateway interface, as the host IP stack should have decremented the
Expand Down Expand Up @@ -2114,6 +2127,7 @@ func NewClient(bridgeName, mgmtAddr string, ovsDatapathType ovsconfig.OVSDatapat
podFlowCache: newFlowCategoryCache(),
serviceFlowCache: newFlowCategoryCache(),
tfFlowCache: newFlowCategoryCache(),
epRegSetFlowCache: newFlowCategoryCache(),
policyCache: policyCache,
groupCache: sync.Map{},
globalConjMatchFlowCache: map[string]*conjMatchFlowContext{},
Expand Down
36 changes: 32 additions & 4 deletions pkg/agent/openflow/testing/mock_openflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 16 additions & 1 deletion pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import (
const (
resyncPeriod = time.Minute
componentName = "antrea-agent-proxy"

maxEnpoints = 1169
)

// Proxier wraps proxy.Provider and adds extra methods. It is introduced for
Expand Down Expand Up @@ -134,6 +136,10 @@ func (p *proxier) removeStaleServices() {
klog.Errorf("Failed to remove flows of Service %v: %v", svcPortName, err)
continue
}
if err := p.ofClient.UninstallEndpointRegSetFlows(svcInfo.ClusterIP(), svcInfo.Port()); err != nil {
klog.Errorf("Failed to remove flows of Endpoint register setting %v: %v", svcPortName, err)
continue
}
delete(p.serviceInstalledMap, svcPortName)
p.deleteServiceByIP(svcInfo.String())
p.groupCounter.Recycle(svcPortName)
Expand Down Expand Up @@ -293,16 +299,25 @@ func (p *proxier) installServices() {
}

if needUpdateEndpoints {
if len(endpointUpdateList) > maxEnpoints {
endpointUpdateList = endpointUpdateList[:maxEnpoints]
}
err := p.ofClient.InstallEndpointFlows(svcInfo.OFProtocol, endpointUpdateList)
if err != nil {
klog.Errorf("Error when installing Endpoints flows: %v", err)
continue
}
err = p.ofClient.InstallServiceGroup(groupID, svcInfo.StickyMaxAgeSeconds() != 0, endpointUpdateList)
err = p.ofClient.InstallServiceGroup(groupID, endpointUpdateList)
if err != nil {
klog.Errorf("Error when installing Endpoints groups: %v", err)
continue
}
err = p.ofClient.InstallEndpointRegSetFlows(svcInfo.StickyMaxAgeSeconds() != 0, svcInfo.ClusterIP(), svcInfo.Port(), endpointUpdateList)
if err != nil {
klog.Errorf("Error when installing Endpoint register setting flows: %v", err)
continue
}

for _, e := range endpointUpdateList {
// If the Endpoint is newly installed, add a reference.
if _, ok := endpointsInstalled[e.String()]; !ok {
Expand Down
Loading

0 comments on commit 275ef43

Please sign in to comment.