diff --git a/felix/dataplane/linux/qos/qos.go b/felix/dataplane/linux/qos/qos.go index 423edef68a3..5a2ed006691 100644 --- a/felix/dataplane/linux/qos/qos.go +++ b/felix/dataplane/linux/qos/qos.go @@ -242,20 +242,12 @@ func TeardownIfb(deviceName string) error { return nil } -func CreateIngressQdisc(tbs *TokenBucketState, workloadDeviceName string) error { +func CreateOrUpdateIngressQdisc(tbs *TokenBucketState, workloadDeviceName string) error { workloadDevice, err := netlink.LinkByName(workloadDeviceName) if err != nil { return fmt.Errorf("get host device %s: %w", workloadDeviceName, err) } - return createTBF(tbs, workloadDevice) -} - -func UpdateIngressQdisc(tbs *TokenBucketState, workloadDeviceName string) error { - workloadDevice, err := netlink.LinkByName(workloadDeviceName) - if err != nil { - return fmt.Errorf("get host device %s: %w", workloadDeviceName, err) - } - return updateTBF(tbs, workloadDevice) + return createOrUpdateTBF(tbs, workloadDevice) } func CreateEgressQdisc(tbs *TokenBucketState, workloadDeviceName string, ifbDeviceName string) error { @@ -293,7 +285,7 @@ func CreateEgressQdisc(tbs *TokenBucketState, workloadDeviceName string, ifbDevi }, } - err = netlink.QdiscAdd(qdisc) + err = netlink.QdiscReplace(qdisc) if err != nil { return fmt.Errorf("create ingress qdisc %+v on dev %s: %w", qdisc, workloadDeviceName, err) } @@ -363,7 +355,7 @@ func CreateEgressQdisc(tbs *TokenBucketState, workloadDeviceName string, ifbDevi } // throttle traffic on ifb device - err = createTBF(tbs, ifbDevice) + err = createOrUpdateTBF(tbs, ifbDevice) if err != nil { return fmt.Errorf("create ifb qdisc on dev %s: %w", ifbDeviceName, err) } @@ -375,7 +367,7 @@ func UpdateEgressQdisc(tbs *TokenBucketState, ifbDeviceName string) error { if err != nil { return fmt.Errorf("get ifb device %s: %w", ifbDeviceName, err) } - return updateTBF(tbs, ifbDevice) + return createOrUpdateTBF(tbs, ifbDevice) } func GetTBFValues(rateBitsPerSec, burstBits, peakrateBitsPerSec uint64, minburstBytes uint32) *TokenBucketState { @@ -454,23 +446,7 @@ func makeTBF(tbs *TokenBucketState, workloadDevice netlink.Link) (*netlink.Tbf, return qdisc, nil } -func createTBF(tbs *TokenBucketState, workloadDevice netlink.Link) error { - // Equivalent to - // tc qdisc add dev link root tbf - - qdisc, err := makeTBF(tbs, workloadDevice) - if err != nil { - return fmt.Errorf("make TBF qdisc %+v from tbs %+v: %w", qdisc, tbs, err) - } - - err = netlink.QdiscAdd(qdisc) - if err != nil { - return fmt.Errorf("add TBF qdisc %+v, limit: %v, rate %v, buffer %v, peakrate %v, minburst %v: %w", qdisc, qdisc.Limit, qdisc.Rate, qdisc.Buffer, qdisc.Peakrate, qdisc.Minburst, err) - } - return nil -} - -func updateTBF(tbs *TokenBucketState, workloadDevice netlink.Link) error { +func createOrUpdateTBF(tbs *TokenBucketState, workloadDevice netlink.Link) error { // Equivalent to // tc qdisc replace dev link root tbf diff --git a/felix/dataplane/linux/qos_controls.go b/felix/dataplane/linux/qos_controls.go index 75a05b08975..1c8e4eed111 100644 --- a/felix/dataplane/linux/qos_controls.go +++ b/felix/dataplane/linux/qos_controls.go @@ -84,7 +84,7 @@ func (m *endpointManager) maybeUpdateQoSBandwidth(old, new *proto.WorkloadEndpoi if currentIngress == nil && desiredIngress != nil { // Add. - err := qos.CreateIngressQdisc(desiredIngress, newName) + err := qos.CreateOrUpdateIngressQdisc(desiredIngress, newName) if err != nil { errs = append(errs, fmt.Errorf("error adding ingress qdisc to workload %s: %w", newName, err)) } @@ -96,7 +96,7 @@ func (m *endpointManager) maybeUpdateQoSBandwidth(old, new *proto.WorkloadEndpoi } } else if !currentIngress.Equals(desiredIngress) { // Update. - err := qos.UpdateIngressQdisc(desiredIngress, newName) + err := qos.CreateOrUpdateIngressQdisc(desiredIngress, newName) if err != nil { errs = append(errs, fmt.Errorf("error changing ingress qdisc on workload %s: %w", newName, err)) } diff --git a/felix/fv/qos_controls_test.go b/felix/fv/qos_controls_test.go index 2a12d1749d7..6a82778ef7f 100644 --- a/felix/fv/qos_controls_test.go +++ b/felix/fv/qos_controls_test.go @@ -180,6 +180,7 @@ var _ = infrastructure.DatastoreDescribe( Skip("Skipping QoS control tests on BPF mode.") } topt = infrastructure.DefaultTopologyOptions() + topt.FelixLogSeverity = "Debug" tc, _ = infrastructure.StartNNodeTopology(2, topt, infra) infra.AddDefaultAllow() @@ -221,6 +222,14 @@ var _ = infrastructure.DatastoreDescribe( } Context("With bandwidth limits", func() { + BeforeEach(func() { + By("Removing all limits from workloads") + for i := range len(w) { + w[i].WorkloadEndpoint.Spec.QoSControls = nil + w[i].UpdateInInfra(infra) + } + }) + getQdisc := func() string { out, err := tc.Felixes[1].ExecOutput("tc", "qdisc") logrus.Infof("tc qdisc output:\n%v", out) @@ -309,6 +318,28 @@ var _ = infrastructure.DatastoreDescribe( Expect(err).NotTo(HaveOccurred()) }) + + It("should correctly apply bandwidth limits when a non-default qdisc exists (handle != 0)", func() { + By("Replacing the default noqueue qdisc with a non-default qdisc (handle 8001)") + out, err := tc.Felixes[1].ExecOutput("tc", "qdisc", "replace", "dev", w[1].InterfaceName, "root", "handle", "8001:", "noqueue") + logrus.Infof("tc qdisc replace output:\n%v", out) + Expect(err).NotTo(HaveOccurred()) + + By("Waiting for the config to appear in 'tc qdisc'") + Eventually(getQdisc, "10s", "1s").Should(MatchRegexp(`qdisc noqueue 8001: dev ` + regexp.QuoteMeta(w[1].InterfaceName) + ` root refcnt \d+`)) + + By("Setting 10Mbps limit and 100Mbps peakrate for ingress on workload 1") + w[1].WorkloadEndpoint.Spec.QoSControls = &api.QoSControls{ + IngressBandwidth: 10000000, + IngressBurst: 300000000, + IngressPeakrate: 100000000, + } + w[1].UpdateInInfra(infra) + + By("Waiting for the config to appear in 'tc qdisc'") + // ingress config should be present + Eventually(getQdisc, "10s", "1s").Should(MatchRegexp(`qdisc tbf \d+: dev ` + regexp.QuoteMeta(w[1].InterfaceName) + ` root refcnt \d+ rate ` + regexp.QuoteMeta("10Mbit") + `.* peakrate ` + regexp.QuoteMeta("100Mbit"))) + }) }) Context("With packet rate limits", func() {