Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 6 additions & 30 deletions felix/dataplane/linux/qos/qos.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,20 +242,12 @@ func TeardownIfb(deviceName string) error {
return nil
}

func CreateIngressQdisc(tbs *TokenBucketState, workloadDeviceName string) error {
func CreateOrUpdateIngressQdisc(tbs *TokenBucketState, workloadDeviceName string) error {
workloadDevice, err := netlink.LinkByName(workloadDeviceName)
if err != nil {
return fmt.Errorf("get host device %s: %w", workloadDeviceName, err)
}
return createTBF(tbs, workloadDevice)
}

func UpdateIngressQdisc(tbs *TokenBucketState, workloadDeviceName string) error {
workloadDevice, err := netlink.LinkByName(workloadDeviceName)
if err != nil {
return fmt.Errorf("get host device %s: %w", workloadDeviceName, err)
}
return updateTBF(tbs, workloadDevice)
return createOrUpdateTBF(tbs, workloadDevice)
}

func CreateEgressQdisc(tbs *TokenBucketState, workloadDeviceName string, ifbDeviceName string) error {
Expand Down Expand Up @@ -293,7 +285,7 @@ func CreateEgressQdisc(tbs *TokenBucketState, workloadDeviceName string, ifbDevi
},
}

err = netlink.QdiscAdd(qdisc)
err = netlink.QdiscReplace(qdisc)
if err != nil {
return fmt.Errorf("create ingress qdisc %+v on dev %s: %w", qdisc, workloadDeviceName, err)
}
Expand Down Expand Up @@ -363,7 +355,7 @@ func CreateEgressQdisc(tbs *TokenBucketState, workloadDeviceName string, ifbDevi
}

// throttle traffic on ifb device
err = createTBF(tbs, ifbDevice)
err = createOrUpdateTBF(tbs, ifbDevice)
if err != nil {
return fmt.Errorf("create ifb qdisc on dev %s: %w", ifbDeviceName, err)
}
Expand All @@ -375,7 +367,7 @@ func UpdateEgressQdisc(tbs *TokenBucketState, ifbDeviceName string) error {
if err != nil {
return fmt.Errorf("get ifb device %s: %w", ifbDeviceName, err)
}
return updateTBF(tbs, ifbDevice)
return createOrUpdateTBF(tbs, ifbDevice)
}

func GetTBFValues(rateBitsPerSec, burstBits, peakrateBitsPerSec uint64, minburstBytes uint32) *TokenBucketState {
Expand Down Expand Up @@ -454,23 +446,7 @@ func makeTBF(tbs *TokenBucketState, workloadDevice netlink.Link) (*netlink.Tbf,
return qdisc, nil
}

func createTBF(tbs *TokenBucketState, workloadDevice netlink.Link) error {
// Equivalent to
// tc qdisc add dev link root tbf

qdisc, err := makeTBF(tbs, workloadDevice)
if err != nil {
return fmt.Errorf("make TBF qdisc %+v from tbs %+v: %w", qdisc, tbs, err)
}

err = netlink.QdiscAdd(qdisc)
if err != nil {
return fmt.Errorf("add TBF qdisc %+v, limit: %v, rate %v, buffer %v, peakrate %v, minburst %v: %w", qdisc, qdisc.Limit, qdisc.Rate, qdisc.Buffer, qdisc.Peakrate, qdisc.Minburst, err)
}
return nil
}

func updateTBF(tbs *TokenBucketState, workloadDevice netlink.Link) error {
func createOrUpdateTBF(tbs *TokenBucketState, workloadDevice netlink.Link) error {
// Equivalent to
// tc qdisc replace dev link root tbf

Expand Down
4 changes: 2 additions & 2 deletions felix/dataplane/linux/qos_controls.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (m *endpointManager) maybeUpdateQoSBandwidth(old, new *proto.WorkloadEndpoi

if currentIngress == nil && desiredIngress != nil {
// Add.
err := qos.CreateIngressQdisc(desiredIngress, newName)
err := qos.CreateOrUpdateIngressQdisc(desiredIngress, newName)
if err != nil {
errs = append(errs, fmt.Errorf("error adding ingress qdisc to workload %s: %w", newName, err))
}
Expand All @@ -104,7 +104,7 @@ func (m *endpointManager) maybeUpdateQoSBandwidth(old, new *proto.WorkloadEndpoi
}
} else if !currentIngress.Equals(desiredIngress) {
// Update.
err := qos.UpdateIngressQdisc(desiredIngress, newName)
err := qos.CreateOrUpdateIngressQdisc(desiredIngress, newName)
if err != nil {
errs = append(errs, fmt.Errorf("error changing ingress qdisc on workload %s: %w", newName, err))
}
Expand Down
35 changes: 31 additions & 4 deletions felix/fv/qos_controls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,6 @@ var _ = infrastructure.DatastoreDescribe(
infra = getInfra()
topt = infrastructure.DefaultTopologyOptions()

if bpfLogLevel != "Debug" && !BPFMode() {
Skip("Skipping QoS control tests with non-debug bpfLogLevel on iptables/nftables mode (for deduplication).")
}

switch encap {
case "none":
if !BPFMode() {
Expand All @@ -237,6 +233,7 @@ var _ = infrastructure.DatastoreDescribe(
topt.UseIPPools = true
topt.DelayFelixStart = true
topt.TriggerDelayedFelixStart = true
topt.FelixLogSeverity = "Debug"
if BPFMode() {
topt.ExtraEnvVars["FELIX_BPFLogLevel"] = bpfLogLevel
}
Expand Down Expand Up @@ -348,7 +345,15 @@ var _ = infrastructure.DatastoreDescribe(
if BPFMode() && BPFAttachType() == "tc" {
Skip("Skipping QoS control bandwidth tests on BPF TC attach mode.")
}

By("Removing all limits from workloads")
for i := range len(w) {
w[i].WorkloadEndpoint.Spec.QoSControls = nil
w[i].UpdateInInfra(infra)
Eventually(tc.Felixes[i].ExecOutputFn("ip", "r", "get", fmt.Sprintf("10.65.%d.2", i)), "10s").Should(ContainSubstring(w[i].InterfaceName))
}
Comment thread
coutinhop marked this conversation as resolved.
})

getQdisc := func() string {
out, err := tc.Felixes[1].ExecOutput("tc", "qdisc")
logrus.Infof("tc qdisc output:\n%v", out)
Expand Down Expand Up @@ -435,7 +440,29 @@ var _ = infrastructure.DatastoreDescribe(
Expect(err).NotTo(HaveOccurred())
err = serverCmd.Process.Release()
Expect(err).NotTo(HaveOccurred())
})

It("should correctly apply bandwidth limits when a non-default qdisc exists (handle != 0)", func() {
By("Replacing the default noqueue qdisc with a non-default qdisc (handle 8001)")
out, err := tc.Felixes[1].ExecOutput("tc", "qdisc", "replace", "dev", w[1].InterfaceName, "root", "handle", "8001:", "noqueue")
logrus.Infof("tc qdisc replace output:\n%v", out)
Expect(err).NotTo(HaveOccurred())

By("Waiting for the config to appear in 'tc qdisc'")
Eventually(getQdisc, "10s", "1s").Should(MatchRegexp(`qdisc noqueue 8001: dev ` + regexp.QuoteMeta(w[1].InterfaceName) + ` root refcnt \d+`))

By("Setting 10Mbps limit and 100Mbps peakrate for ingress on workload 1")
w[1].WorkloadEndpoint.Spec.QoSControls = &api.QoSControls{
IngressBandwidth: 10000000,
IngressBurst: 300000000,
IngressPeakrate: 100000000,
}
Comment thread
coutinhop marked this conversation as resolved.
w[1].UpdateInInfra(infra)
Eventually(tc.Felixes[1].ExecOutputFn("ip", "r", "get", "10.65.1.2"), "10s").Should(ContainSubstring(w[1].InterfaceName))

By("Waiting for the config to appear in 'tc qdisc'")
// ingress config should be present
Eventually(getQdisc, "10s", "1s").Should(MatchRegexp(`qdisc tbf \d+: dev ` + regexp.QuoteMeta(w[1].InterfaceName) + ` root refcnt \d+ rate ` + regexp.QuoteMeta("10Mbit") + `.* peakrate ` + regexp.QuoteMeta("100Mbit")))
})
})

Expand Down