Skip to content

Commit

Permalink
Merge pull request #39 from aojea/local_pods
Browse files Browse the repository at this point in the history
Only process traffic impacted by network policies
  • Loading branch information
aojea authored Jun 24, 2024
2 parents a58319a + f0eb5c6 commit 7c4dfc0
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 13 deletions.
6 changes: 6 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,17 @@ func main() {
klog.Fatalf("error parsing metrics bind address %s : %v", metricsBindAddress, err)
}

nodeName := os.Getenv("MY_NODE_NAME")
if nodeName == "" {
klog.Fatalf("node name not set, please set the environment variable using the Downward API")
}

cfg := networkpolicy.Config{
AdminNetworkPolicy: adminNetworkPolicy,
BaselineAdminNetworkPolicy: baselineAdminNetworkPolicy,
FailOpen: failOpen,
QueueID: queueID,
NodeName: nodeName,
}
// creates the in-cluster config
config, err := rest.InClusterConfig()
Expand Down
5 changes: 5 additions & 0 deletions install-anp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ spec:
privileged: true
capabilities:
add: ["NET_ADMIN"]
env:
- name: MY_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
volumes:
- name: lib-modules
hostPath:
Expand Down
5 changes: 5 additions & 0 deletions install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ spec:
privileged: true
capabilities:
add: ["NET_ADMIN"]
env:
- name: MY_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
volumes:
- name: lib-modules
hostPath:
Expand Down
256 changes: 243 additions & 13 deletions pkg/networkpolicy/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
v1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
networkinginformers "k8s.io/client-go/informers/networking/v1"
Expand All @@ -22,7 +24,10 @@ import (
networkinglisters "k8s.io/client-go/listers/networking/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
netutils "k8s.io/utils/net"
"k8s.io/utils/ptr"

"sigs.k8s.io/knftables"
npav1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1"
Expand Down Expand Up @@ -50,13 +55,17 @@ import (
const (
controllerName = "kube-network-policies"
podIPIndex = "podIPKeyIndex"
syncKey = "dummy-key" // use the same key to sync to aggregate the events
podV4IPsSet = "podips-v4"
podV6IPsSet = "podips-v6"
)

type Config struct {
FailOpen bool // allow traffic if the controller is not available
AdminNetworkPolicy bool
BaselineAdminNetworkPolicy bool
QueueID int
NodeName string
}

// NewController returns a new *Controller.
Expand All @@ -82,6 +91,7 @@ func NewController(client clientset.Interface,
client: client,
config: config,
nft: nft,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
}

err := podInformer.Informer().AddIndexers(cache.Indexers{
Expand Down Expand Up @@ -143,6 +153,85 @@ func NewController(client clientset.Interface,
utilruntime.HandleError(err)
}

// process only local Pods that are affected by network policices
_, _ = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
if pod.Spec.NodeName != c.config.NodeName {
return
}
if len(c.getNetworkPoliciesForPod(pod)) > 0 {
c.queue.Add(syncKey)
}
},
UpdateFunc: func(old, cur interface{}) {
pod := cur.(*v1.Pod)
if pod.Spec.NodeName != c.config.NodeName {
return
}
if len(c.getNetworkPoliciesForPod(pod)) > 0 {
c.queue.Add(syncKey)
}
},
DeleteFunc: func(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
// If we reached here it means the pod was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Pod: %#v", obj))
return
}
}
if pod.Spec.NodeName != c.config.NodeName {
return
}
if len(c.getNetworkPoliciesForPod(pod)) > 0 {
c.queue.Add(syncKey)
}
},
})

// only process network policies that impact Pods on this node
_, _ = networkpolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
networkPolicy := obj.(*networkingv1.NetworkPolicy)
if len(c.getLocalPodsForNetworkPolicy(networkPolicy)) > 0 {
c.queue.Add(syncKey)
}
},
UpdateFunc: func(old, cur interface{}) {
networkPolicy := cur.(*networkingv1.NetworkPolicy)
if len(c.getLocalPodsForNetworkPolicy(networkPolicy)) > 0 {
c.queue.Add(syncKey)
}
},
DeleteFunc: func(obj interface{}) {
networkPolicy, ok := obj.(*networkingv1.NetworkPolicy)
if !ok {
// If we reached here it means the policy was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}
networkPolicy, ok = tombstone.Obj.(*networkingv1.NetworkPolicy)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a NetworkPolicy: %#v", obj))
return
}
}
if len(c.getLocalPodsForNetworkPolicy(networkPolicy)) > 0 {
c.queue.Add(syncKey)
}
},
})

c.podLister = podInformer.Lister()
c.podsSynced = podInformer.Informer().HasSynced
c.namespaceLister = namespaceInformer.Lister()
Expand Down Expand Up @@ -187,6 +276,8 @@ type Controller struct {
podLister corelisters.PodLister
podsSynced cache.InformerSynced

queue workqueue.RateLimitingInterface

npaClient npaclient.Interface

adminNetworkPolicyLister policylisters.AdminNetworkPolicyLister
Expand All @@ -208,6 +299,7 @@ type Controller struct {
// endpoints will be handled in parallel.
func (c *Controller) Run(ctx context.Context) error {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

klog.Infof("Starting controller %s", controllerName)
defer klog.Infof("Shutting down controller %s", controllerName)
Expand Down Expand Up @@ -250,12 +342,9 @@ func (c *Controller) Run(ctx context.Context) error {

// Start the workers after the repair loop to avoid races
klog.Info("Syncing nftables rules")
c.syncNFTablesRules(ctx)
_ = c.syncNFTablesRules(ctx)
defer c.cleanNFTablesRules()
// FIXME: there should be no need to ever resync our rules, but if we're going to
// do that, then knftables should provide us with an API to tell us when we need
// to resync (using `nft monitor` or direct netlink), rather than us polling.
go wait.Until(func() { c.syncNFTablesRules(ctx) }, 60*time.Second, ctx.Done())
go wait.Until(c.runWorker, time.Second, ctx.Done())

var flags uint32
// https://netfilter.org/projects/libnetfilter_queue/doxygen/html/group__Queue.html
Expand Down Expand Up @@ -438,10 +527,59 @@ func (c *Controller) evaluatePacket(p packet) bool {
return true
}

func (c *Controller) runWorker() {
for c.processNextItem() {
}
}

func (c *Controller) processNextItem() bool {
// Wait until there is a new item in the working queue
key, quit := c.queue.Get()
if quit {
return false
}
// Tell the queue that we are done with processing this key. This unblocks the key for other workers
// This allows safe parallel processing because two pods with the same key are never processed in
// parallel.
defer c.queue.Done(key)

// Invoke the method containing the business logic
err := c.syncNFTablesRules(context.Background())
// Handle the error if something went wrong during the execution of the business logic
c.handleErr(err, key.(string))
return true
}

// handleErr checks if an error happened and makes sure we will retry later.
func (c *Controller) handleErr(err error, key string) {
if err == nil {
// Forget about the #AddRateLimited history of the key on every successful synchronization.
// This ensures that future processing of updates for this key is not delayed because of
// an outdated error history.
c.queue.Forget(key)
return
}

// This controller retries 5 times if something goes wrong. After that, it stops trying.
if c.queue.NumRequeues(key) < 5 {
klog.Infof("Error syncing %v: %v", key, err)

// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
c.queue.AddRateLimited(key)
return
}

c.queue.Forget(key)
// Report to an external entity that, even after several retries, we could not successfully process this key
utilruntime.HandleError(err)
klog.Infof("Dropping %q out of the queue: %v", key, err)
}

// syncNFTablesRules adds the necessary rules to process the first connection packets in userspace
// and check if network policies must apply.
// TODO: We can divert only the traffic affected by network policies using a set in nftables or an IPset.
func (c *Controller) syncNFTablesRules(ctx context.Context) {
func (c *Controller) syncNFTablesRules(ctx context.Context) error {
table := &knftables.Table{
Comment: knftables.PtrTo("rules for kubernetes NetworkPolicy"),
}
Expand All @@ -454,6 +592,59 @@ func (c *Controller) syncNFTablesRules(ctx context.Context) {
}
tx.Add(table)

// only if no admin network policies are used
if !c.config.AdminNetworkPolicy && !c.config.BaselineAdminNetworkPolicy {
// add set with Local Pod IPs impacted by network policies
tx.Add(&knftables.Set{
Name: podV4IPsSet,
Type: "ipv4_addr",
Comment: ptr.To("Local V4 Pod IPs with Network Policies"),
})
tx.Flush(&knftables.Set{
Name: podV4IPsSet,
})
tx.Add(&knftables.Set{
Name: podV6IPsSet,
Type: "ipv6_addr",
Comment: ptr.To("Local V6 Pod IPs with Network Policies"),
})
tx.Flush(&knftables.Set{
Name: podV6IPsSet,
})

networkPolicies, err := c.networkpolicyLister.List(labels.Everything())
if err != nil {
return err
}
podV4IPs := sets.New[string]()
podV6IPs := sets.New[string]()
for _, networkPolicy := range networkPolicies {
pods := c.getLocalPodsForNetworkPolicy(networkPolicy)
for _, pod := range pods {
for _, ip := range pod.Status.PodIPs {
if netutils.IsIPv4String(ip.IP) {
podV4IPs.Insert(ip.IP)
} else {
podV6IPs.Insert(ip.IP)
}
}
}
}

for _, ip := range podV4IPs.UnsortedList() {
tx.Add(&knftables.Element{
Set: podV4IPsSet,
Key: []string{ip},
})
}
for _, ip := range podV6IPs.UnsortedList() {
tx.Add(&knftables.Element{
Set: podV6IPsSet,
Key: []string{ip},
})
}
}

for _, hook := range []knftables.BaseChainHook{knftables.ForwardHook} {
chainName := string(hook)
tx.Add(&knftables.Chain{
Expand All @@ -471,21 +662,60 @@ func (c *Controller) syncNFTablesRules(ctx context.Context) {
tx.Add(&knftables.Rule{
Chain: chainName,
Rule: knftables.Concat(
"ct", "state", "!=", "new", "return"),
"ct", "state", "established,related", "accept"),
})
rule := fmt.Sprintf("queue num %d", c.config.QueueID)

action := fmt.Sprintf("queue num %d", c.config.QueueID)
if c.config.FailOpen {
rule += " bypass"
action += " bypass"
}

// only if no admin network policies are used
if !c.config.AdminNetworkPolicy && !c.config.BaselineAdminNetworkPolicy {
tx.Add(&knftables.Rule{
Chain: chainName,
Rule: knftables.Concat(
"ip", "saddr", "@", podV4IPsSet, action,
),
Comment: ptr.To("process IPv4 traffic with network policy enforcement"),
})

tx.Add(&knftables.Rule{
Chain: chainName,
Rule: knftables.Concat(
"ip", "daddr", "@", podV4IPsSet, action,
),
Comment: ptr.To("process IPv4 traffic with network policy enforcement"),
})

tx.Add(&knftables.Rule{
Chain: chainName,
Rule: knftables.Concat(
"ip6", "saddr", "@", podV6IPsSet, action,
),
Comment: ptr.To("process IPv6 traffic with network policy enforcement"),
})

tx.Add(&knftables.Rule{
Chain: chainName,
Rule: knftables.Concat(
"ip6", "daddr", "@", podV6IPsSet, action,
),
Comment: ptr.To("process IPv6 traffic with network policy enforcement"),
})
} else {
tx.Add(&knftables.Rule{
Chain: chainName,
Rule: action,
})
}
tx.Add(&knftables.Rule{
Chain: chainName,
Rule: rule,
})
}

if err := c.nft.Run(ctx, tx); err != nil {
klog.Infof("error syncing nftables rules %v", err)
return err
}
return nil
}

func (c *Controller) cleanNFTablesRules() {
Expand Down
Loading

0 comments on commit 7c4dfc0

Please sign in to comment.