Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(network_policy): consolidate policy chains #906

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 125 additions & 79 deletions pkg/controllers/netpol/network_policy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/base32"
"errors"
"fmt"
"github.com/cloudnativelabs/kube-router/pkg/controllers/proxy"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"net"
Expand Down Expand Up @@ -33,6 +34,7 @@ import (

const (
networkPolicyAnnotation = "net.beta.kubernetes.io/network-policy"
kubeGlobalFirewallChain = "KUBE-ROUTER-FIREWALL"
kubePodFirewallChainPrefix = "KUBE-POD-FW-"
kubeNetworkPolicyChainPrefix = "KUBE-NWPLCY-"
kubeSourceIpSetPrefix = "KUBE-SRC-"
Expand Down Expand Up @@ -60,6 +62,7 @@ type NetworkPolicyController struct {
v1NetworkPolicy bool
healthChan chan<- *healthcheck.ControllerHeartbeat
fullSyncRequestChan chan struct{}
podCidr string

ipSetHandler *utils.IPSet

Expand Down Expand Up @@ -253,6 +256,13 @@ func (npc *NetworkPolicyController) fullPolicySync() {
}
}

// Create iptables definitions that are relevant to all network policies
err = npc.addGlobalPolicyChains()
if err != nil {
glog.Errorf("Failed to add global policy chains due to: %s", err.Error())
return
}

activePolicyChains, activePolicyIpSets, err := npc.syncNetworkPolicyChains(networkPoliciesInfo, syncVersion)
if err != nil {
glog.Errorf("Aborting sync. Failed to sync network policy chains: %v" + err.Error())
Expand Down Expand Up @@ -352,6 +362,96 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo
return activePolicyChains, activePolicyIpSets, nil
}

func (npc *NetworkPolicyController) addGlobalPolicyChains() error {
iptablesCmdHandler, err := iptables.New()
if err != nil {
return err
}

// Here we setup references to our global filter chains from the standard FORWARD, OUTER, and INPUT chains. This
// allows us to conserve or iptables definitions and allows us to have more control over the positioning of our
// filter chains within the standard iptables chains.
//
// Specifically we need to ensure that KUBE-ROUTER-SERVICES comes before KUBE-ROUTER-FIREWALL so that traffic
// from pods destined to IPVS services local to the hosts continue to be allowed. This traffic later gets properly
// filtered in the OUTPUT chain after IPVS disambiguates the destination to a pod IP address
err = iptablesCmdHandler.NewChain("filter", kubeGlobalFirewallChain)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
return err
}

filterChains := []string{"FORWARD", "OUTPUT", "INPUT"}
filterDirections := map[string]string{"-s": "from", "-d": "to"}
for _, chain := range filterChains {
for direction, commentHelper := range filterDirections {
pos := 1
if chain == "INPUT" {
// We don't want to match on destination for INPUT chain at this time (keeps consistency with how things worked
// before global chains)
if direction == "-d" {
continue
}

// Here we ensure that the firewall jump happens after the services jump, the services jump only exists on
// the INPUT chain for now, so we only consider this table
chainRules, err := iptablesCmdHandler.List("filter", chain)
if err != nil {
return err
}

for i, rule := range chainRules {
// It's possible that the firewall jump was created as the 1st rule before the services chain existed,
// if we find the firewall chain before the services chain, delete it and allow it to be created again
// below
if strings.Contains(rule, kubeGlobalFirewallChain) {
err = iptablesCmdHandler.Delete("filter", chain, strconv.Itoa(i))
if err != nil {
return fmt.Errorf("failed to delete rule: %s from the %s chain of filter table due to %s", rule, chain, err.Error())
}
}
// Find the position of the jump to the services chain and put the position of the firewall chain after
// that one.
if strings.Contains(rule, proxy.IpvsFirewallChainName) {
pos = i + 1
break
}
}
}
comment := fmt.Sprintf("rule to send traffic %s pod network to firewall chain", commentHelper)
args := []string{"-m", "comment", "--comment", comment, direction, npc.podCidr, "-j", kubeGlobalFirewallChain}
exists, err := iptablesCmdHandler.Exists("filter", chain, args...)
if err != nil {
return err
}
if !exists {
err := iptablesCmdHandler.Insert("filter", chain, pos, args...)
if err != nil {
return err
}
}

// ensure there is rule in the forward chain of the filter table to jump to the firewall chain for
// traffic getting switched (coming for same node pods)
if chain == "FORWARD" {
comment = fmt.Sprintf("rule to send bridged traffic %s pod network to firewall chain", commentHelper)
args = []string{"-m", "physdev", "--physdev-is-bridged", "-m", "comment", "--comment", comment, direction, npc.podCidr, "-j", kubeGlobalFirewallChain}
exists, err = iptablesCmdHandler.Exists("filter", chain, args...)
if err != nil {
return err
}
if !exists {
err = iptablesCmdHandler.Insert("filter", chain, pos, args...)
if err != nil {
return err
}
}
}
}
}

return nil
}

func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo,
targetDestPodIpSetName string, activePolicyIpSets map[string]bool, version string) error {

Expand Down Expand Up @@ -768,49 +868,16 @@ func (npc *NetworkPolicyController) syncPodFirewallChains(networkPoliciesInfo []
}
}

// ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain
// this rule applies to the traffic getting routed (coming for other node pods)
// ensure there is rule in filter table and KUBE-ROUTER-FIREWALL chain to jump to pod specific firewall chain
comment = "rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName
args = []string{"-m", "comment", "--comment", comment, "-d", pod.ip, "-j", podFwChainName}
exists, err = iptablesCmdHandler.Exists("filter", "FORWARD", args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", "FORWARD", 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}

// ensure there is rule in filter table and OUTPUT chain to jump to pod specific firewall chain
// this rule applies to the traffic from a pod getting routed back to another pod on same node by service proxy
exists, err = iptablesCmdHandler.Exists("filter", "OUTPUT", args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", "OUTPUT", 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}

// ensure there is rule in filter table and forward chain to jump to pod specific firewall chain
// this rule applies to the traffic getting switched (coming for same node pods)
comment = "rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName
args = []string{"-m", "physdev", "--physdev-is-bridged",
"-m", "comment", "--comment", comment,
"-d", pod.ip,
"-j", podFwChainName}
exists, err = iptablesCmdHandler.Exists("filter", "FORWARD", args...)
exists, err = iptablesCmdHandler.Exists("filter", kubeGlobalFirewallChain, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err = iptablesCmdHandler.Insert("filter", "FORWARD", 1, args...)
err := iptablesCmdHandler.Insert("filter", kubeGlobalFirewallChain, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
Expand Down Expand Up @@ -887,40 +954,16 @@ func (npc *NetworkPolicyController) syncPodFirewallChains(networkPoliciesInfo []
}
}

egressFilterChains := []string{"FORWARD", "OUTPUT", "INPUT"}
for _, chain := range egressFilterChains {
// ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain
// this rule applies to the traffic getting forwarded/routed (traffic from the pod destinted
// to pod on a different node)
comment = "rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName
args = []string{"-m", "comment", "--comment", comment, "-s", pod.ip, "-j", podFwChainName}
exists, err = iptablesCmdHandler.Exists("filter", chain, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", chain, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
}
}

// ensure there is rule in filter table and forward chain to jump to pod specific firewall chain
// this rule applies to the traffic getting switched (coming for same node pods)
// ensure there is rule in filter table and KUBE-ROUTER-FIREWALL chain to jump to pod specific firewall chain
comment = "rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace +
" to chain " + podFwChainName
args = []string{"-m", "physdev", "--physdev-is-bridged",
"-m", "comment", "--comment", comment,
"-s", pod.ip,
"-j", podFwChainName}
exists, err = iptablesCmdHandler.Exists("filter", "FORWARD", args...)
args = []string{"-m", "comment", "--comment", comment, "-s", pod.ip, "-j", podFwChainName}
exists, err = iptablesCmdHandler.Exists("filter", kubeGlobalFirewallChain, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err = iptablesCmdHandler.Insert("filter", "FORWARD", 1, args...)
err := iptablesCmdHandler.Insert("filter", kubeGlobalFirewallChain, 1, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
Expand Down Expand Up @@ -992,23 +1035,20 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets
// remove stale iptables podFwChain references from the filter table chains
for _, podFwChain := range cleanupPodFwChains {

primaryChains := []string{"FORWARD", "OUTPUT", "INPUT"}
for _, egressChain := range primaryChains {
forwardChainRules, err := iptablesCmdHandler.List("filter", egressChain)
if err != nil {
return fmt.Errorf("failed to list rules in filter table, %s podFwChain due to %s", egressChain, err.Error())
}
globalChainRules, err := iptablesCmdHandler.List("filter", kubeGlobalFirewallChain)
if err != nil {
return fmt.Errorf("failed to list rules in filter table, %s podFwChain due to %s", kubeGlobalFirewallChain, err.Error())
}

// TODO delete rule by spec, than rule number to avoid extra loop
var realRuleNo int
for i, rule := range forwardChainRules {
if strings.Contains(rule, podFwChain) {
err = iptablesCmdHandler.Delete("filter", egressChain, strconv.Itoa(i-realRuleNo))
if err != nil {
return fmt.Errorf("failed to delete rule: %s from the %s podFwChain of filter table due to %s", rule, egressChain, err.Error())
}
realRuleNo++
// TODO delete rule by spec, than rule number to avoid extra loop
var realRuleNo int
for i, rule := range globalChainRules {
if strings.Contains(rule, podFwChain) {
err = iptablesCmdHandler.Delete("filter", kubeGlobalFirewallChain, strconv.Itoa(i-realRuleNo))
if err != nil {
return fmt.Errorf("failed to delete rule: %s from the %s podFwChain of filter table due to %s", rule, kubeGlobalFirewallChain, err.Error())
}
realRuleNo++
}
}
}
Expand Down Expand Up @@ -1780,6 +1820,12 @@ func NewNetworkPolicyController(clientset kubernetes.Interface,
}
npc.nodeIP = nodeIP

cidr, err := utils.GetPodCidrFromNodeSpec(clientset, config.HostnameOverride)
if err != nil {
return nil, fmt.Errorf("failed to get pod CIDR details from Node.spec: %s", err.Error())
}
npc.podCidr = cidr

ipset, err := utils.NewIPSet(false)
if err != nil {
return nil, err
Expand Down
18 changes: 9 additions & 9 deletions pkg/controllers/proxy/network_services_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ const (
svcSchedFlagsAnnotation = "kube-router.io/service.schedflags"

LeaderElectionRecordAnnotationKey = "control-plane.alpha.kubernetes.io/leader"
IpvsFirewallChainName = "KUBE-ROUTER-SERVICES"
localIPsIPSetName = "kube-router-local-ips"
ipvsServicesIPSetName = "kube-router-ipvs-services"
serviceIPsIPSetName = "kube-router-service-ips"
ipvsFirewallChainName = "KUBE-ROUTER-SERVICES"
synctypeAll = iota
synctypeIpvs
)
Expand Down Expand Up @@ -466,7 +466,7 @@ func getIpvsFirewallInputChainRule() []string {
return []string{
"-m", "comment", "--comment", "handle traffic to IPVS service IPs in custom chain",
"-m", "set", "--match-set", serviceIPsIPSetName, "dst",
"-j", ipvsFirewallChainName}
"-j", IpvsFirewallChainName}
}

func (nsc *NetworkServicesController) setupIpvsFirewall() error {
Expand Down Expand Up @@ -514,7 +514,7 @@ func (nsc *NetworkServicesController) setupIpvsFirewall() error {
}

// ClearChain either clears an existing chain or creates a new one.
err = iptablesCmdHandler.ClearChain("filter", ipvsFirewallChainName)
err = iptablesCmdHandler.ClearChain("filter", IpvsFirewallChainName)
if err != nil {
return fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
Expand All @@ -532,12 +532,12 @@ func (nsc *NetworkServicesController) setupIpvsFirewall() error {
args = []string{"-m", "comment", "--comment", comment,
"-m", "set", "--match-set", ipvsServicesIPSetName, "dst,dst",
"-j", "ACCEPT"}
exists, err = iptablesCmdHandler.Exists("filter", ipvsFirewallChainName, args...)
exists, err = iptablesCmdHandler.Exists("filter", IpvsFirewallChainName, args...)
if err != nil {
return fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", ipvsFirewallChainName, 1, args...)
err := iptablesCmdHandler.Insert("filter", IpvsFirewallChainName, 1, args...)
if err != nil {
return fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
Expand All @@ -547,7 +547,7 @@ func (nsc *NetworkServicesController) setupIpvsFirewall() error {
args = []string{"-m", "comment", "--comment", comment,
"-p", "icmp", "--icmp-type", "echo-request",
"-j", "ACCEPT"}
err = iptablesCmdHandler.AppendUnique("filter", ipvsFirewallChainName, args...)
err = iptablesCmdHandler.AppendUnique("filter", IpvsFirewallChainName, args...)
if err != nil {
return fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
Expand All @@ -558,7 +558,7 @@ func (nsc *NetworkServicesController) setupIpvsFirewall() error {
args = []string{"-m", "comment", "--comment", comment,
"-m", "set", "!", "--match-set", localIPsIPSetName, "dst",
"-j", "REJECT", "--reject-with", "icmp-port-unreachable"}
err = iptablesCmdHandler.AppendUnique("filter", ipvsFirewallChainName, args...)
err = iptablesCmdHandler.AppendUnique("filter", IpvsFirewallChainName, args...)
if err != nil {
return fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
Expand Down Expand Up @@ -597,12 +597,12 @@ func (nsc *NetworkServicesController) cleanupIpvsFirewall() {
glog.Errorf("Failed to run iptables command: %s", err.Error())
}

err = iptablesCmdHandler.ClearChain("filter", ipvsFirewallChainName)
err = iptablesCmdHandler.ClearChain("filter", IpvsFirewallChainName)
if err != nil {
glog.Errorf("Failed to run iptables command: %s", err.Error())
}

err = iptablesCmdHandler.DeleteChain("filter", ipvsFirewallChainName)
err = iptablesCmdHandler.DeleteChain("filter", IpvsFirewallChainName)
if err != nil {
glog.Errorf("Failed to run iptables command: %s", err.Error())
}
Expand Down