Skip to content

Commit

Permalink
whitelist traffic to cluster IP and node ports in INPUT chain to bypa…
Browse files Browse the repository at this point in the history
…ss netwrok policy enforcement (cloudnativelabs#914)

* whitelist traffic to cluster IP and node ports in INPUT chain to bypass
netwrok policy enforcement

Fixes cloudnativelabs#905

* fix unit test failure

* ensure netpol firewall rules are configured after service proxy firewall rules
  • Loading branch information
murali-reddy authored and FabienZouaoui committed Jun 22, 2020
1 parent 9c2f3b8 commit a01b81e
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 55 deletions.
2 changes: 2 additions & 0 deletions docs/user-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ Usage of kube-router:
--run-firewall Enables Network Policy -- sets up iptables to provide ingress firewall for pods. (default true)
--run-router Enables Pod Networking -- Advertises and learns the routes to Pods via iBGP. (default true)
--run-service-proxy Enables Service Proxy -- sets up IPVS for Kubernetes Services. (default true)
--service-cluster-ip-range string CIDR value from which service cluster IPs are assigned. Default: 10.96.0.0/12 (default "10.96.0.0/12")
--service-node-port-range string NodePort range. Default: 30000-32767 (default "30000:32767")
-v, --v string log level for V logs (default "0")
-V, --version Print version information.
```
Expand Down
37 changes: 22 additions & 15 deletions pkg/cmd/kube-router.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,21 +127,6 @@ func (kr *KubeRouter) Run() error {
kr.Config.MetricsEnabled = false
}

if kr.Config.RunFirewall {
npc, err := netpol.NewNetworkPolicyController(kr.Client,
kr.Config, podInformer, npInformer, nsInformer)
if err != nil {
return errors.New("Failed to create network policy controller: " + err.Error())
}

podInformer.AddEventHandler(npc.PodEventHandler)
nsInformer.AddEventHandler(npc.NamespaceEventHandler)
npInformer.AddEventHandler(npc.NetworkPolicyEventHandler)

wg.Add(1)
go npc.Run(healthChan, stopCh, &wg)
}

if kr.Config.BGPGracefulRestart {
if kr.Config.BGPGracefulRestartDeferralTime > time.Hour*18 {
return errors.New("BGPGracefuleRestartDeferralTime should be less than 18 hours")
Expand Down Expand Up @@ -177,6 +162,28 @@ func (kr *KubeRouter) Run() error {

wg.Add(1)
go nsc.Run(healthChan, stopCh, &wg)

// wait for the proxy firewall rules to be setup before network policies
if kr.Config.RunFirewall {
nsc.ProxyFirewallSetup.L.Lock()
nsc.ProxyFirewallSetup.Wait()
nsc.ProxyFirewallSetup.L.Unlock()
}
}

if kr.Config.RunFirewall {
npc, err := netpol.NewNetworkPolicyController(kr.Client,
kr.Config, podInformer, npInformer, nsInformer)
if err != nil {
return errors.New("Failed to create network policy controller: " + err.Error())
}

podInformer.AddEventHandler(npc.PodEventHandler)
nsInformer.AddEventHandler(npc.NamespaceEventHandler)
npInformer.AddEventHandler(npc.NetworkPolicyEventHandler)

wg.Add(1)
go npc.Run(healthChan, stopCh, &wg)
}

// Handle SIGINT and SIGTERM
Expand Down
102 changes: 62 additions & 40 deletions pkg/controllers/netpol/network_policy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,16 @@ const (

// NetworkPolicyController strcut to hold information required by NetworkPolicyController
type NetworkPolicyController struct {
nodeIP net.IP
nodeHostName string
mu sync.Mutex
syncPeriod time.Duration
MetricsEnabled bool
v1NetworkPolicy bool
healthChan chan<- *healthcheck.ControllerHeartbeat
fullSyncRequestChan chan struct{}
nodeIP net.IP
nodeHostName string
serviceClusterIPRange string
serviceNodePortRange string
mu sync.Mutex
syncPeriod time.Duration
MetricsEnabled bool
v1NetworkPolicy bool
healthChan chan<- *healthcheck.ControllerHeartbeat
fullSyncRequestChan chan struct{}

ipSetHandler *utils.IPSet

Expand Down Expand Up @@ -197,48 +199,65 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() {
glog.Fatalf("Failed to initialize iptables executor due to %s", err.Error())
}

chains := map[string]string{"INPUT": kubeInputChainName, "FORWARD": kubeForwardChainName, "OUTPUT": kubeOutputChainName}

for builtinChain, customChain := range chains {
err = iptablesCmdHandler.NewChain("filter", customChain)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
glog.Fatalf("Failed to run iptables command to create %s chain due to %s", customChain, err.Error())
ensureRuleAtposition := func(chain string, ruleSpec []string, position int) {
rules, err := iptablesCmdHandler.List("filter", chain)
if err != nil {
glog.Fatalf("failed to list rules in filter table %s chain due to %s", chain, err.Error())
}
args := []string{"-m", "comment", "--comment", "kube-router netpol", "-j", customChain}
exists, err := iptablesCmdHandler.Exists("filter", builtinChain, args...)

exists, err := iptablesCmdHandler.Exists("filter", chain, ruleSpec...)
if err != nil {
glog.Fatalf("Failed to verify rule exists to jump to chain %s in %s chain due to %s", customChain, builtinChain, err.Error())
glog.Fatalf("Failed to verify rule exists in %s chain due to %s", chain, err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", builtinChain, 1, args...)
err := iptablesCmdHandler.Insert("filter", chain, position, ruleSpec...)
if err != nil {
glog.Fatalf("Failed to run iptables command to insert in %s chain %s", builtinChain, err.Error())
glog.Fatalf("Failed to run iptables command to insert in %s chain %s", chain, err.Error())
}
} else {
rules, err := iptablesCmdHandler.List("filter", builtinChain)
if err != nil {
glog.Fatalf("failed to list rules in filter table %s chain due to %s", builtinChain, err.Error())
return
}
var ruleNo int
for i, rule := range rules {
rule = strings.Replace(rule, "\"", "", 2) //removes quote from comment string
if strings.Contains(rule, strings.Join(ruleSpec, " ")) {
ruleNo = i
break
}

var ruleNo int
for i, rule := range rules {
if strings.Contains(rule, customChain) {
ruleNo = i
break
}
}
if ruleNo != position {
err = iptablesCmdHandler.Insert("filter", chain, position, ruleSpec...)
if err != nil {
glog.Fatalf("Failed to run iptables command to insert in %s chain %s", chain, err.Error())
}
if ruleNo != 1 {
err = iptablesCmdHandler.Insert("filter", builtinChain, 1, args...)
if err != nil {
glog.Fatalf("Failed to run iptables command to insert in %s chain %s", builtinChain, err.Error())
}
err = iptablesCmdHandler.Delete("filter", builtinChain, strconv.Itoa(ruleNo+1))
if err != nil {
glog.Fatalf("Failed to delete wrong rule to jump to chain %s in %s chain due to %s", customChain, builtinChain, err.Error())
}
err = iptablesCmdHandler.Delete("filter", chain, strconv.Itoa(ruleNo+1))
if err != nil {
glog.Fatalf("Failed to delete incorrect rule in %s chain due to %s", chain, err.Error())
}
}
}

chains := map[string]string{"INPUT": kubeInputChainName, "FORWARD": kubeForwardChainName, "OUTPUT": kubeOutputChainName}

for builtinChain, customChain := range chains {
err = iptablesCmdHandler.NewChain("filter", customChain)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
glog.Fatalf("Failed to run iptables command to create %s chain due to %s", customChain, err.Error())
}
args := []string{"-m", "comment", "--comment", "kube-router netpol", "-j", customChain}
ensureRuleAtposition(builtinChain, args, 1)
}

whitelistServiceVips := []string{"-m", "comment", "--comment", "allow traffic to cluster IP", "-d", npc.serviceClusterIPRange, "-j", "RETURN"}
ensureRuleAtposition(kubeInputChainName, whitelistServiceVips, 1)

whitelistTCPNodeports := []string{"-p", "tcp", "-m", "comment", "--comment", "allow LOCAL traffic to node ports", "-m", "addrtype", "--dst-type", "LOCAL",
"-m", "multiport", "--dports", npc.serviceNodePortRange, "-j", "RETURN"}
ensureRuleAtposition(kubeInputChainName, whitelistTCPNodeports, 2)

whitelistUDPNodeports := []string{"-p", "udp", "-m", "comment", "--comment", "allow LOCAL traffic to node ports", "-m", "addrtype", "--dst-type", "LOCAL",
"-m", "multiport", "--dports", npc.serviceNodePortRange, "-j", "RETURN"}
ensureRuleAtposition(kubeInputChainName, whitelistUDPNodeports, 3)

}

// OnPodUpdate handles updates to pods from the Kubernetes api server
Expand Down Expand Up @@ -953,7 +972,7 @@ func (npc *NetworkPolicyController) syncPodFirewallChains(networkPoliciesInfo []
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
if !exists {
err := iptablesCmdHandler.Insert("filter", chain, 1, args...)
err := iptablesCmdHandler.AppendUnique("filter", chain, args...)
if err != nil {
return nil, fmt.Errorf("Failed to run iptables command: %s", err.Error())
}
Expand Down Expand Up @@ -1780,6 +1799,9 @@ func NewNetworkPolicyController(clientset kubernetes.Interface,
// be up to date with all of the policy changes from any enqueued request after that
npc.fullSyncRequestChan = make(chan struct{}, 1)

npc.serviceClusterIPRange = config.ClusterIPCIDR
npc.serviceNodePortRange = config.NodePortRange

if config.MetricsEnabled {
//Register the metrics for this controller
prometheus.MustRegister(metrics.ControllerIptablesSyncTime)
Expand Down
4 changes: 4 additions & 0 deletions pkg/controllers/proxy/network_services_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ type NetworkServicesController struct {
MetricsEnabled bool
ln LinuxNetworking
readyForUpdates bool
ProxyFirewallSetup *sync.Cond

// Map of ipsets that we use.
ipsetMap map[string]*utils.Set
Expand Down Expand Up @@ -350,6 +351,7 @@ func (nsc *NetworkServicesController) Run(healthChan chan<- *healthcheck.Control
if err != nil {
glog.Error("Error setting up ipvs firewall: " + err.Error())
}
nsc.ProxyFirewallSetup.Broadcast()

gracefulTicker := time.NewTicker(5 * time.Second)
defer gracefulTicker.Stop()
Expand Down Expand Up @@ -2228,6 +2230,8 @@ func NewNetworkServicesController(clientset kubernetes.Interface,
nsc.endpointsMap = make(endpointsInfoMap)
nsc.client = clientset

nsc.ProxyFirewallSetup = sync.NewCond(&sync.Mutex{})

nsc.masqueradeAll = false
if config.MasqueradeAll {
nsc.masqueradeAll = true
Expand Down
8 changes: 8 additions & 0 deletions pkg/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type KubeRouterConfig struct {
CleanupConfig bool
ClusterAsn uint
ClusterCIDR string
ClusterIPCIDR string
NodePortRange string
DisableSrcDstCheck bool
EnableCNI bool
EnableiBGP bool
Expand Down Expand Up @@ -74,6 +76,8 @@ func NewKubeRouterConfig() *KubeRouterConfig {
BGPGracefulRestartDeferralTime: 360 * time.Second,
EnableOverlay: true,
OverlayType: "subnet",
ClusterIPCIDR: "10.96.0.0/12",
NodePortRange: "30000:32767",
}
}

Expand Down Expand Up @@ -102,6 +106,10 @@ func (s *KubeRouterConfig) AddFlags(fs *pflag.FlagSet) {
"CIDR range of pods in the cluster. It is used to identify traffic originating from and destinated to pods.")
fs.StringSliceVar(&s.ExcludedCidrs, "excluded-cidrs", s.ExcludedCidrs,
"Excluded CIDRs are used to exclude IPVS rules from deletion.")
fs.StringVar(&s.ClusterIPCIDR, "service-cluster-ip-range", s.ClusterIPCIDR,
"CIDR value from which service cluster IPs are assigned. Default: 10.96.0.0/12")
fs.StringVar(&s.NodePortRange, "service-node-port-range", s.NodePortRange,
"NodePort range. Default: 30000-32767")
fs.BoolVar(&s.EnablePodEgress, "enable-pod-egress", true,
"SNAT traffic from Pods to destinations outside the cluster.")
fs.DurationVar(&s.IPTablesSyncPeriod, "iptables-sync-period", s.IPTablesSyncPeriod,
Expand Down

0 comments on commit a01b81e

Please sign in to comment.