Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #4295: Add unit test for pkg/agent/route #4419: Set NO_FLOOD to IPsec tunnel ports #4470: Fix that Service routes may get lost when starting on Windows #4654: Restore NO_FLOOD to OVS ports after reconnecting the OVS #4711: Fix route deletion for Service ClusterIP and LoadBalancerIP #4767

5 changes: 3 additions & 2 deletions ci/jenkins/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,9 @@ function deliver_antrea_windows {
sleep 5
# Some tests need us.gcr.io/k8s-artifacts-prod/e2e-test-images/agnhost:2.13 image but it is not for windows/amd64 10.0.17763
# Use e2eteam/agnhost:2.13 instead
harbor_images=("sigwindowstools-kube-proxy:v1.18.0" "agnhost:2.13" "agnhost:2.13" "agnhost:2.29" "e2eteam-jessie-dnsutils:1.0" "e2eteam-pause:3.2")
antrea_images=("sigwindowstools/kube-proxy:v1.18.0" "e2eteam/agnhost:2.13" "us.gcr.io/k8s-artifacts-prod/e2e-test-images/agnhost:2.13" "k8s.gcr.io/e2e-test-images/agnhost:2.29" "e2eteam/jessie-dnsutils:1.0" "e2eteam/pause:3.2")
harbor_images=("sigwindowstools-kube-proxy:v1.18.0" "agnhost:2.13" "agnhost:2.13" "agnhost:2.13" "agnhost:2.29" "e2eteam-jessie-dnsutils:1.0" "e2eteam-jessie-dnsutils:1.0" "e2eteam-pause:3.2" "e2eteam-pause:3.2" "e2eteam-busybox:1.29-windows-amd64-1809")
antrea_images=("sigwindowstools/kube-proxy:v1.18.0" "e2eteam/agnhost:2.13" "us.gcr.io/k8s-artifacts-prod/e2e-test-images/agnhost:2.13" "k8sprow.azurecr.io/kubernetes-e2e-test-images/agnhost:2.13" "k8s.gcr.io/e2e-test-images/agnhost:2.29" "e2eteam/jessie-dnsutils:1.0" "gcr.io/kubernetes-e2e-test-images/jessie-dnsutils:1.0" "e2eteam/pause:3.2" "k8s.gcr.io/pause:3.2" "docker.io/library/busybox:1.29")
common_images=("mcr.microsoft.com/windows/servercore/iis:latest")
# Pull necessary images in advance to avoid transient error
for i in "${!harbor_images[@]}"; do
ssh -o StrictHostKeyChecking=no -n Administrator@${IP} "docker pull -q ${DOCKER_REGISTRY}/antrea/${harbor_images[i]} && docker tag ${DOCKER_REGISTRY}/antrea/${harbor_images[i]} ${antrea_images[i]}" || true
Expand Down
2 changes: 2 additions & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ func run(o *Options) error {
k8sClient,
crdClient,
ovsBridgeClient,
ovsctl.NewClient(o.config.OVSBridge),
ofClient,
routeClient,
ifaceStore,
Expand Down Expand Up @@ -267,6 +268,7 @@ func run(o *Options) error {
k8sClient,
informerFactory,
ofClient,
ovsctl.NewClient(o.config.OVSBridge),
ovsBridgeClient,
routeClient,
ifaceStore,
Expand Down
39 changes: 25 additions & 14 deletions hack/update-codegen-dockerized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ function generate_mocks {
"pkg/agent/querier AgentQuerier testing"
"pkg/agent/route Interface testing"
"pkg/agent/ipassigner IPAssigner testing"
"pkg/agent/util/ipset Interface testing"
"pkg/agent/util/iptables Interface testing mock_iptables_linux.go" # Must specify linux.go suffix, otherwise compilation would fail on windows platform as source file has linux build tag.
"pkg/agent/util/netlink Interface testing mock_netlink_linux.go"
"pkg/antctl AntctlClient ."
"pkg/controller/networkpolicy EndpointQuerier testing"
"pkg/controller/querier ControllerQuerier testing"
Expand All @@ -67,21 +70,29 @@ function generate_mocks {
current_year=$(date +"%Y")
sed -i "s/YEAR/${current_year}/g" hack/boilerplate/license_header.raw.txt
for target in "${MOCKGEN_TARGETS[@]}"; do
read -r package interfaces mock_package <<<"${target}"
package_name=$(basename "${package}")
if [[ "${mock_package}" == "." ]]; then # generate mocks in same package as src
$GOPATH/bin/mockgen \
-copyright_file hack/boilerplate/license_header.raw.txt \
-destination "${package}/mock_${package_name}_test.go" \
-package="${package_name}" \
"${ANTREA_PKG}/${package}" "${interfaces}"
else # generate mocks in subpackage
$GOPATH/bin/mockgen \
-copyright_file hack/boilerplate/license_header.raw.txt \
-destination "${package}/${mock_package}/mock_${package_name}.go" \
-package="${mock_package}" \
"${ANTREA_PKG}/${package}" "${interfaces}"
read -r src_package interfaces dst_package_name dst_file_name <<<"${target}"
src_package_name=$(basename "${src_package}")
# Generate mocks in the same package as src if dst_file_name is ".", otherwise create a sub package.
if [[ "${dst_package_name}" == "." ]]; then
package="${src_package_name}"
if [ -n "${dst_file_name}" ]; then
destination="${src_package}/${dst_file_name}"
else
destination="${src_package}/mock_${src_package_name}_test.go"
fi
else
package="${dst_package_name}"
if [ -n "${dst_file_name}" ]; then
destination="${src_package}/${dst_package_name}/${dst_file_name}"
else
destination="${src_package}/${dst_package_name}/mock_${src_package_name}.go"
fi
fi
$GOPATH/bin/mockgen \
-copyright_file hack/boilerplate/license_header.raw.txt \
-destination "${destination}" \
-package "${package}" \
"${ANTREA_PKG}/${src_package}" "${interfaces}"
done
git checkout HEAD -- hack/boilerplate/license_header.raw.txt
}
Expand Down
65 changes: 51 additions & 14 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type Initializer struct {
client clientset.Interface
crdClient versioned.Interface
ovsBridgeClient ovsconfig.OVSBridgeClient
ovsCtlClient ovsctl.OVSCtlClient
ofClient openflow.Client
routeClient route.Interface
wireGuardClient wireguard.Interface
Expand Down Expand Up @@ -122,6 +123,7 @@ func NewInitializer(
k8sClient clientset.Interface,
crdClient versioned.Interface,
ovsBridgeClient ovsconfig.OVSBridgeClient,
ovsCtlClient ovsctl.OVSCtlClient,
ofClient openflow.Client,
routeClient route.Interface,
ifaceStore interfacestore.InterfaceStore,
Expand All @@ -142,6 +144,7 @@ func NewInitializer(
) *Initializer {
return &Initializer{
ovsBridgeClient: ovsBridgeClient,
ovsCtlClient: ovsCtlClient,
client: k8sClient,
crdClient: crdClient,
ifaceStore: ifaceStore,
Expand Down Expand Up @@ -279,7 +282,6 @@ func (i *Initializer) initInterfaceStore() error {
return intf
}
ifaceList := make([]*interfacestore.InterfaceConfig, 0, len(ovsPorts))
ovsCtlClient := ovsctl.NewClient(i.ovsBridge)
for index := range ovsPorts {
port := &ovsPorts[index]
ovsPort := &interfacestore.OVSPortConfig{
Expand All @@ -297,6 +299,8 @@ func (i *Initializer) initInterfaceStore() error {
case interfacestore.AntreaUplink:
intf = parseUplinkInterfaceFunc(port, ovsPort)
case interfacestore.AntreaTunnel:
fallthrough
case interfacestore.AntreaIPsecTunnel:
intf = parseTunnelInterfaceFunc(port, ovsPort)
case interfacestore.AntreaHost:
if port.Name == i.ovsBridge {
Expand All @@ -314,9 +318,6 @@ func (i *Initializer) initInterfaceStore() error {
intf = cniserver.ParseOVSPortInterfaceConfig(port, ovsPort, true)
case interfacestore.AntreaTrafficControl:
intf = trafficcontrol.ParseTrafficControlInterfaceConfig(port, ovsPort)
if err := ovsCtlClient.SetPortNoFlood(int(ovsPort.OFPort)); err != nil {
klog.ErrorS(err, "Failed to set port with no-flood config", "PortName", port.Name)
}
default:
klog.InfoS("Unknown Antrea interface type", "type", interfaceType)
}
Expand All @@ -340,7 +341,11 @@ func (i *Initializer) initInterfaceStore() error {
fallthrough
case port.IFType == ovsconfig.STTTunnel:
intf = parseTunnelInterfaceFunc(port, ovsPort)
antreaIFType = interfacestore.AntreaTunnel
if intf.Type == interfacestore.IPSecTunnelInterface {
antreaIFType = interfacestore.AntreaIPsecTunnel
} else {
antreaIFType = interfacestore.AntreaTunnel
}
case port.Name == i.ovsBridge:
intf = nil
antreaIFType = interfacestore.AntreaHost
Expand Down Expand Up @@ -368,6 +373,26 @@ func (i *Initializer) initInterfaceStore() error {
return nil
}

func (i *Initializer) restorePortConfigs() error {
interfaces := i.ifaceStore.ListInterfaces()
for _, intf := range interfaces {
switch intf.Type {
case interfacestore.IPSecTunnelInterface:
fallthrough
case interfacestore.TrafficControlInterface:
if intf.OFPort < 0 {
klog.InfoS("Skipped setting no-flood for port due to invalid ofPort", "port", intf.InterfaceName, "ofport", intf.OFPort)
continue
}
if err := i.ovsCtlClient.SetPortNoFlood(int(intf.OFPort)); err != nil {
return fmt.Errorf("failed to set no-flood for port %s: %w", intf.InterfaceName, err)
}
klog.InfoS("Set no-flood for port", "port", intf.InterfaceName)
}
}
return nil
}

// Initialize sets up agent initial configurations.
func (i *Initializer) Initialize() error {
klog.Info("Setting up node network")
Expand All @@ -386,6 +411,10 @@ func (i *Initializer) Initialize() error {
return err
}

if err := i.restorePortConfigs(); err != nil {
return err
}

// initializeWireGuard must be executed after setupOVSBridge as it requires gateway addresses on the OVS bridge.
if i.networkConfig.TrafficEncryptionMode == config.TrafficEncryptionModeWireGuard {
if err := i.initializeWireGuard(); err != nil {
Expand Down Expand Up @@ -487,14 +516,15 @@ func persistRoundNum(num uint64, bridgeClient ovsconfig.OVSBridgeClient, interva

// initOpenFlowPipeline sets up necessary Openflow entries, including pipeline, classifiers, conn_track, and gateway flows
// Every time the agent is (re)started, we go through the following sequence:
// 1. agent determines the new round number (this is done by incrementing the round number
// persisted in OVSDB, or if it's not available by picking round 1).
// 2. any existing flow for which the round number matches the round number obtained from step 1
// is deleted.
// 3. all required flows are installed, using the round number obtained from step 1.
// 4. after convergence, all existing flows for which the round number matches the previous round
// number (i.e. the round number which was persisted in OVSDB, if any) are deleted.
// 5. the new round number obtained from step 1 is persisted to OVSDB.
// 1. agent determines the new round number (this is done by incrementing the round number
// persisted in OVSDB, or if it's not available by picking round 1).
// 2. any existing flow for which the round number matches the round number obtained from step 1
// is deleted.
// 3. all required flows are installed, using the round number obtained from step 1.
// 4. after convergence, all existing flows for which the round number matches the previous round
// number (i.e. the round number which was persisted in OVSDB, if any) are deleted.
// 5. the new round number obtained from step 1 is persisted to OVSDB.
//
// The rationale for not persisting the new round number until after all previous flows have been
// deleted is to avoid a situation in which some stale flows are never deleted because of successive
// agent restarts (with the agent crashing before step 4 can be completed). With the sequence
Expand Down Expand Up @@ -552,6 +582,13 @@ func (i *Initializer) initOpenFlowPipeline() error {
i.ofClient.ReplayFlows()
klog.Info("Flow replay completed")

klog.InfoS("Restoring OF port configs to OVS bridge")
if err := i.restorePortConfigs(); err != nil {
klog.ErrorS(err, "Failed to restore OF port configs")
} else {
klog.InfoS("Port configs restoration completed")
}

if i.ovsBridgeClient.GetOVSDatapathType() == ovsconfig.OVSDatapathNetdev {
// we don't set flow-restore-wait when using the OVS netdev datapath
return
Expand All @@ -561,7 +598,7 @@ func (i *Initializer) initOpenFlowPipeline() error {
// happen that ovsBridgeClient's connection is not ready when ofClient completes flow replay. We retry it
// with a timeout that is longer time than ovsBridgeClient's maximum connecting retry interval (8 seconds)
// to ensure the flag can be removed successfully.
err := wait.PollImmediate(200*time.Millisecond, 10*time.Second, func() (done bool, err error) {
err = wait.PollImmediate(200*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if err := i.FlowRestoreComplete(); err != nil {
return false, nil
}
Expand Down
76 changes: 76 additions & 0 deletions pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/ovs/ovsconfig"
ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing"
ovsctltest "antrea.io/antrea/pkg/ovs/ovsctl/testing"
"antrea.io/antrea/pkg/util/env"
"antrea.io/antrea/pkg/util/ip"
)
Expand Down Expand Up @@ -518,3 +519,78 @@ func mockConfigureLinkAddress(returnedErr error) func() {
configureLinkAddresses = originalConfigureLinkAddresses
}
}

func TestRestorePortConfigs(t *testing.T) {
ipsecTunnelInterface := interfacestore.NewIPSecTunnelInterface("antrea-ipsec1",
ovsconfig.GeneveTunnel,
"node1",
net.ParseIP("1.1.1.1"),
"abcdefg",
"node1")
ipsecTunnelInterface.OVSPortConfig = &interfacestore.OVSPortConfig{OFPort: 11, PortUUID: "uuid1"}
tunnelInterface := interfacestore.NewTunnelInterface(defaultTunInterfaceName,
ovsconfig.GeneveTunnel,
0,
net.ParseIP("1.1.1.10"),
true)
tunnelInterface.OVSPortConfig = &interfacestore.OVSPortConfig{OFPort: 12}
trafficControlInterface1 := interfacestore.NewTrafficControlInterface("antrea-tap1")
trafficControlInterface1.OVSPortConfig = &interfacestore.OVSPortConfig{OFPort: 13, PortUUID: "uuid3"}
trafficControlInterface2 := interfacestore.NewTrafficControlInterface("antrea-tap2")
trafficControlInterface2.OVSPortConfig = &interfacestore.OVSPortConfig{OFPort: -1, PortUUID: "uuid3"}

tests := []struct {
name string
existingInterfaces []*interfacestore.InterfaceConfig
expectedOVSCtlCalls func(client *ovsctltest.MockOVSCtlClientMockRecorder)
expectedErr string
}{
{
name: "success",
existingInterfaces: []*interfacestore.InterfaceConfig{
ipsecTunnelInterface,
tunnelInterface,
trafficControlInterface1,
trafficControlInterface2,
},
expectedOVSCtlCalls: func(client *ovsctltest.MockOVSCtlClientMockRecorder) {
client.SetPortNoFlood(11).Return(nil)
client.SetPortNoFlood(13).Return(nil)
},
},
{
name: "fail",
existingInterfaces: []*interfacestore.InterfaceConfig{
{
InterfaceName: "antrea-tap1",
Type: interfacestore.TrafficControlInterface,
OVSPortConfig: &interfacestore.OVSPortConfig{OFPort: 10, PortUUID: "uuid3"},
},
},
expectedOVSCtlCalls: func(client *ovsctltest.MockOVSCtlClientMockRecorder) {
client.SetPortNoFlood(10).Return(fmt.Errorf("server unavailable"))
},
expectedErr: "failed to set no-flood for port antrea-tap1: server unavailable",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
controller := mock.NewController(t)
defer controller.Finish()
mockOVSCtlClient := ovsctltest.NewMockOVSCtlClient(controller)
ifaceStore := interfacestore.NewInterfaceStore()
initializer := &Initializer{
ifaceStore: ifaceStore,
ovsCtlClient: mockOVSCtlClient,
}
ifaceStore.Initialize(tt.existingInterfaces)
tt.expectedOVSCtlCalls(mockOVSCtlClient.EXPECT())
err := initializer.restorePortConfigs()
if tt.expectedErr == "" {
assert.NoError(t, err)
} else {
assert.ErrorContains(t, err, tt.expectedErr)
}
})
}
}
Loading