Skip to content

Commit

Permalink
[SecondaryNetwork] Attach OVS uplink after removing flow-restore-wait…
Browse files Browse the repository at this point in the history
… flag

Physcial network interface is attached on the OVS bridge with SecondaryNetwork
feature enabled. Antrea uses a global configuration flow-restore-wait='true'
to ensure that OVS OpenFlow entries can start working after the dependencies are
ready. A connectivity issue exists if a setup uses the Node NIC as secondary
network interface and connects the NIC to OVS bridge before removing the
flow-restore-wait option.

This change ensures agent attaches the physical network interface to the secondary
OVS bridge after the global flow-restore-wait option is removed.

Signed-off-by: Wenying Dong <[email protected]>
  • Loading branch information
wenyingd committed Jul 9, 2024
1 parent 75699cf commit 6bd1a4c
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 71 deletions.
22 changes: 16 additions & 6 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,14 +731,15 @@ func run(o *Options) error {
go ipamController.Run(stopCh)
}

var secondaryNetworkController *secondarynetwork.Controller
if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
defer secondarynetwork.RestoreHostInterfaceConfiguration(&o.config.SecondaryNetwork)
if err := secondarynetwork.Initialize(
secondaryNetworkController, err = secondarynetwork.NewController(
o.config.ClientConnection, o.config.KubeAPIServerOverride,
k8sClient, localPodInformer.Get(), nodeConfig.Name,
podUpdateChannel, stopCh,
&o.config.SecondaryNetwork, ovsdbConnection); err != nil {
return fmt.Errorf("failed to initialize secondary network: %v", err)
k8sClient, localPodInformer.Get(),
podUpdateChannel,
&o.config.SecondaryNetwork, ovsdbConnection)
if err != nil {
return fmt.Errorf("failed to create secondary network controller: %w", err)
}
}

Expand Down Expand Up @@ -864,6 +865,15 @@ func run(o *Options) error {
return fmt.Errorf("failed to connect uplink to OVS bridge: %w", err)
}
}
// secondaryNetworkController Initialize must be run after FlowRestoreComplete for the case that Node
// IPs are moved to the secondary OVS bridge
if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
defer secondaryNetworkController.Restore()
if err = secondaryNetworkController.Initialize(); err != nil {
return fmt.Errorf("failed to initialize secondary network: %v", err)
}
go secondaryNetworkController.Run(stopCh)
}

// statsCollector collects stats and reports to the antrea-controller periodically. For now it's only used for
// NetworkPolicy stats and Multicast stats.
Expand Down
20 changes: 20 additions & 0 deletions pkg/agent/secondarynetwork/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package secondarynetwork

// Copyright 2024 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Run starts the Pod controller for secondary networks.
func (c *Controller) Run(stopCh <-chan struct{}) {
c.podController.Run(stopCh)
}
75 changes: 42 additions & 33 deletions pkg/agent/secondarynetwork/init_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,30 +43,56 @@ var (
newOVSBridgeFn = ovsconfig.NewOVSBridge
)

// Initialize sets up OVS bridges and starts the Pod controller for secondary networks.
func Initialize(
type Controller struct {
ovsBridgeClient ovsconfig.OVSBridgeClient
secNetConfig *agentconfig.SecondaryNetworkConfig
podController *podwatch.PodController
}

func NewController(
clientConnectionConfig componentbaseconfig.ClientConnectionConfiguration,
kubeAPIServerOverride string,
k8sClient clientset.Interface,
podInformer cache.SharedIndexInformer,
nodeName string,
podUpdateSubscriber channel.Subscriber,
stopCh <-chan struct{},
secNetConfig *agentconfig.SecondaryNetworkConfig, ovsdb *ovsdb.OVSDB) error {

secNetConfig *agentconfig.SecondaryNetworkConfig, ovsdb *ovsdb.OVSDB,
) (*Controller, error) {
ovsBridgeClient, err := createOVSBridge(secNetConfig.OVSBridges, ovsdb)
if err != nil {
return err
return nil, err
}

// Create the NetworkAttachmentDefinition client, which handles access to secondary network object
// definition from the API Server.
netAttachDefClient, err := createNetworkAttachDefClient(clientConnectionConfig, kubeAPIServerOverride)
if err != nil {
return nil, fmt.Errorf("NetworkAttachmentDefinition client creation failed: %v", err)
}

// Create podController to handle secondary network configuration for Pods with
// k8s.v1.cni.cncf.io/networks Annotation defined.
podWatchController, err := podwatch.NewPodController(
k8sClient, netAttachDefClient, podInformer,
podUpdateSubscriber, ovsBridgeClient)
if err != nil {
return nil, err
}
return &Controller{
ovsBridgeClient: ovsBridgeClient,
secNetConfig: secNetConfig,
podController: podWatchController}, nil
}

// Initialize sets up OVS bridges.
func (c *Controller) Initialize() error {
// We only support moving and restoring of interface configuration to OVS Bridge for the single physical interface case.
if len(secNetConfig.OVSBridges) != 0 {
phyInterfaces := make([]string, len(secNetConfig.OVSBridges[0].PhysicalInterfaces))
copy(phyInterfaces, secNetConfig.OVSBridges[0].PhysicalInterfaces)
if len(c.secNetConfig.OVSBridges) != 0 {
phyInterfaces := make([]string, len(c.secNetConfig.OVSBridges[0].PhysicalInterfaces))
copy(phyInterfaces, c.secNetConfig.OVSBridges[0].PhysicalInterfaces)
if len(phyInterfaces) == 1 {

bridgedName, _, err := util.PrepareHostInterfaceConnection(
ovsBridgeClient,
c.ovsBridgeClient,
phyInterfaces[0],
0,
map[string]interface{}{
Expand All @@ -78,34 +104,17 @@ func Initialize(
}
phyInterfaces[0] = bridgedName
}
if err = connectPhyInterfacesToOVSBridge(ovsBridgeClient, phyInterfaces); err != nil {
if err := connectPhyInterfacesToOVSBridge(c.ovsBridgeClient, phyInterfaces); err != nil {
return err
}
}

// Create the NetworkAttachmentDefinition client, which handles access to secondary network object
// definition from the API Server.
netAttachDefClient, err := createNetworkAttachDefClient(clientConnectionConfig, kubeAPIServerOverride)
if err != nil {
return fmt.Errorf("NetworkAttachmentDefinition client creation failed: %v", err)
}

// Create podController to handle secondary network configuration for Pods with
// k8s.v1.cni.cncf.io/networks Annotation defined.
if podWatchController, err := podwatch.NewPodController(
k8sClient, netAttachDefClient, podInformer,
podUpdateSubscriber, ovsBridgeClient); err != nil {
return err
} else {
go podWatchController.Run(stopCh)
}
return nil
}

// RestoreHostInterfaceConfiguration restores interface configuration from secondary-bridge back to host-interface.
func RestoreHostInterfaceConfiguration(secNetConfig *agentconfig.SecondaryNetworkConfig) {
if len(secNetConfig.OVSBridges) != 0 && len(secNetConfig.OVSBridges[0].PhysicalInterfaces) == 1 {
util.RestoreHostInterfaceConfiguration(secNetConfig.OVSBridges[0].BridgeName, secNetConfig.OVSBridges[0].PhysicalInterfaces[0])
// Restore restores interface configuration from secondary-bridge back to host-interface.
func (c *Controller) Restore() {
if len(c.secNetConfig.OVSBridges) != 0 && len(c.secNetConfig.OVSBridges[0].PhysicalInterfaces) == 1 {
util.RestoreHostInterfaceConfiguration(c.secNetConfig.OVSBridges[0].BridgeName, c.secNetConfig.OVSBridges[0].PhysicalInterfaces[0])
}
}

Expand Down
20 changes: 14 additions & 6 deletions pkg/agent/secondarynetwork/init_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,30 @@ import (
"k8s.io/client-go/tools/cache"
componentbaseconfig "k8s.io/component-base/config"

"antrea.io/antrea/pkg/agent/secondarynetwork/podwatch"
agentconfig "antrea.io/antrea/pkg/config/agent"
"antrea.io/antrea/pkg/util/channel"
)

func Initialize(
type Controller struct {
podController *podwatch.PodController
}

func NewController(
clientConnectionConfig componentbaseconfig.ClientConnectionConfiguration,
kubeAPIServerOverride string,
k8sClient clientset.Interface,
podInformer cache.SharedIndexInformer,
nodeName string,
podUpdateSubscriber channel.Subscriber,
stopCh <-chan struct{},
secNetConfig *agentconfig.SecondaryNetworkConfig, ovsdb *ovsdb.OVSDB) error {
return errors.New("not supported on Windows")
secNetConfig *agentconfig.SecondaryNetworkConfig, ovsdb *ovsdb.OVSDB,
) (*Controller, error) {
return nil, errors.New("not supported on Windows")
}

func (c *Controller) Initialize() error {
return nil
}

func RestoreHostInterfaceConfiguration(secNetConfig *agentconfig.SecondaryNetworkConfig) {
func (c *Controller) Restore() {
// Not supported on Windows.
}
26 changes: 13 additions & 13 deletions pkg/agent/secondarynetwork/podwatch/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type podCNIInfo struct {
netNS string
}

type podController struct {
type PodController struct {
kubeClient clientset.Interface
netAttachDefClient netdefclient.K8sCniCncfIoV1Interface
queue workqueue.RateLimitingInterface
Expand All @@ -103,13 +103,13 @@ func NewPodController(
podInformer cache.SharedIndexInformer,
podUpdateSubscriber channel.Subscriber,
ovsBridgeClient ovsconfig.OVSBridgeClient,
) (*podController, error) {
) (*PodController, error) {
ifaceStore := interfacestore.NewInterfaceStore()
interfaceConfigurator, err := cniserver.NewSecondaryInterfaceConfigurator(ovsBridgeClient, ifaceStore)
if err != nil {
return nil, fmt.Errorf("failed to create SecondaryInterfaceConfigurator: %v", err)
}
pc := podController{
pc := PodController{
kubeClient: kubeClient,
netAttachDefClient: netAttachDefClient,
queue: workqueue.NewNamedRateLimitingQueue(
Expand Down Expand Up @@ -153,7 +153,7 @@ func allocatePodSecondaryIfaceName(usedIFNames sets.Set[string]) (string, error)
return "", fmt.Errorf("no more interface names")
}

func (pc *podController) enqueuePod(obj interface{}) {
func (pc *PodController) enqueuePod(obj interface{}) {
pod, isPod := obj.(*corev1.Pod)
if !isPod {
podDeletedState, ok := obj.(cache.DeletedFinalStateUnknown)
Expand All @@ -172,7 +172,7 @@ func (pc *podController) enqueuePod(obj interface{}) {
}

// processCNIUpdate will be called when CNIServer publishes a Pod update event.
func (pc *podController) processCNIUpdate(e interface{}) {
func (pc *PodController) processCNIUpdate(e interface{}) {
event := e.(types.PodUpdate)
podKey := podKeyGet(event.PodName, event.PodNamespace)
if event.IsAdd {
Expand All @@ -184,7 +184,7 @@ func (pc *podController) processCNIUpdate(e interface{}) {
}

// handleAddUpdatePod handles Pod Add, Update events and updates annotation if required.
func (pc *podController) handleAddUpdatePod(pod *corev1.Pod, podCNIInfo *podCNIInfo,
func (pc *PodController) handleAddUpdatePod(pod *corev1.Pod, podCNIInfo *podCNIInfo,
storedInterfaces []*interfacestore.InterfaceConfig) error {
if len(storedInterfaces) > 0 {
// We do not support secondary network update at the moment. Return as long as one
Expand Down Expand Up @@ -219,7 +219,7 @@ func (pc *podController) handleAddUpdatePod(pod *corev1.Pod, podCNIInfo *podCNII
return pc.configurePodSecondaryNetwork(pod, networklist, podCNIInfo)
}

func (pc *podController) removeInterfaces(interfaces []*interfacestore.InterfaceConfig) error {
func (pc *PodController) removeInterfaces(interfaces []*interfacestore.InterfaceConfig) error {
var savedErr error
for _, interfaceConfig := range interfaces {
podName := interfaceConfig.PodName
Expand Down Expand Up @@ -256,7 +256,7 @@ func (pc *podController) removeInterfaces(interfaces []*interfacestore.Interface
return savedErr
}

func (pc *podController) syncPod(key string) error {
func (pc *PodController) syncPod(key string) error {
var pod *corev1.Pod
var cniInfo *podCNIInfo
podExists := false
Expand Down Expand Up @@ -297,12 +297,12 @@ func (pc *podController) syncPod(key string) error {
return pc.handleAddUpdatePod(pod, cniInfo, storedInterfaces)
}

func (pc *podController) Worker() {
func (pc *PodController) Worker() {
for pc.processNextWorkItem() {
}
}

func (pc *podController) processNextWorkItem() bool {
func (pc *PodController) processNextWorkItem() bool {
obj, quit := pc.queue.Get()
if quit {
return false
Expand All @@ -319,7 +319,7 @@ func (pc *podController) processNextWorkItem() bool {
}

// Configure Secondary Network Interface.
func (pc *podController) configureSecondaryInterface(
func (pc *PodController) configureSecondaryInterface(
pod *corev1.Pod,
network *netdefv1.NetworkSelectionElement,
podCNIInfo *podCNIInfo,
Expand Down Expand Up @@ -370,7 +370,7 @@ func (pc *podController) configureSecondaryInterface(
return ifConfigErr
}

func (pc *podController) configurePodSecondaryNetwork(pod *corev1.Pod, networkList []*netdefv1.NetworkSelectionElement, podCNIInfo *podCNIInfo) error {
func (pc *PodController) configurePodSecondaryNetwork(pod *corev1.Pod, networkList []*netdefv1.NetworkSelectionElement, podCNIInfo *podCNIInfo) error {
usedIFNames := sets.New[string]()
for _, network := range networkList {
if network.InterfaceRequest != "" {
Expand Down Expand Up @@ -477,7 +477,7 @@ func validateNetworkConfig(cniConfig []byte) (*SecondaryNetworkConfig, error) {
return &networkConfig, nil
}

func (pc *podController) Run(stopCh <-chan struct{}) {
func (pc *PodController) Run(stopCh <-chan struct{}) {
defer func() {
klog.InfoS("Shutting down", "controller", controllerName)
pc.queue.ShutDown()
Expand Down
14 changes: 7 additions & 7 deletions pkg/agent/secondarynetwork/podwatch/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ func TestPodControllerAddPod(t *testing.T) {
podKey := podKeyGet(podName, testNamespace)

// Create Pod and wait for Informer cache updated.
createPodFn := func(pc *podController, pod *corev1.Pod) {
createPodFn := func(pc *PodController, pod *corev1.Pod) {
_, err := pc.kubeClient.CoreV1().Pods(testNamespace).Create(context.Background(),
pod, metav1.CreateOptions{})
require.NoError(t, err, "error when creating test Pod")
Expand All @@ -596,7 +596,7 @@ func TestPodControllerAddPod(t *testing.T) {
return ok == true && err == nil
}, 1*time.Second, 10*time.Millisecond)
}
deletePodFn := func(pc *podController, podName string) {
deletePodFn := func(pc *PodController, podName string) {
require.NoError(t, pc.kubeClient.CoreV1().Pods(testNamespace).Delete(context.Background(),
podName, metav1.DeleteOptions{}), "error when deleting test Pod")
assert.Eventually(t, func() bool {
Expand Down Expand Up @@ -899,16 +899,16 @@ func TestPodControllerAddPod(t *testing.T) {
}

func testPodController(ctrl *gomock.Controller) (
*podController, *podwatchtesting.MockIPAMAllocator,
*PodController, *podwatchtesting.MockIPAMAllocator,
*podwatchtesting.MockInterfaceConfigurator) {
client := fake.NewSimpleClientset()
netdefclient := netdefclientfake.NewSimpleClientset().K8sCniCncfIoV1()
informerFactory := informers.NewSharedInformerFactory(client, resyncPeriod)
interfaceConfigurator := podwatchtesting.NewMockInterfaceConfigurator(ctrl)
mockIPAM := podwatchtesting.NewMockIPAMAllocator(ctrl)

// podController without event handlers.
return &podController{
// PodController without event handlers.
return &PodController{
kubeClient: client,
netAttachDefClient: netdefclient,
queue: workqueue.NewNamedRateLimitingQueue(
Expand All @@ -921,9 +921,9 @@ func testPodController(ctrl *gomock.Controller) (
}, mockIPAM, interfaceConfigurator
}

// Create a test podController and start informerFactory.
// Create a test PodController and start informerFactory.
func testPodControllerStart(ctrl *gomock.Controller) (
*podController, *podwatchtesting.MockIPAMAllocator,
*PodController, *podwatchtesting.MockIPAMAllocator,
*podwatchtesting.MockInterfaceConfigurator) {
podController, mockIPAM, interfaceConfigurator := testPodController(ctrl)
informerFactory := informers.NewSharedInformerFactory(podController.kubeClient, resyncPeriod)
Expand Down
Loading

0 comments on commit 6bd1a4c

Please sign in to comment.