forked from projectcalico/kube-controllers
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpolicy_parser.py
167 lines (148 loc) · 6.78 KB
/
policy_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
#!/usr/bin/python
import logging
from constants import *
from pycalico.datastore_datatypes import Rule
_log = logging.getLogger("__main__")
class PolicyError(Exception):
def __init__(self, msg=None, policy=None):
Exception.__init__(self, msg)
self.policy = policy
class PolicyParser(object):
def __init__(self, policy):
self.namespace = policy["metadata"]["namespace"]
self.policy = policy
def calculate_inbound_rules(self):
"""
Takes a NetworkPolicy object from the API and returns a list of
Calico Rule objects which should be applied on ingress.
"""
_log.debug("Calculating inbound rules")
rules = []
# Iterate through and create the appropriate Calico Rule objects.
allow_incomings = self.policy["spec"].get("ingress") or []
_log.debug("Found %s ingress rules", len(allow_incomings))
for allow_incoming_clause in allow_incomings:
# Convert each allow_incoming_clause into one or more
# Calico Rule objects.
if allow_incoming_clause:
# Rule exists - parse it.
r = self._allow_incoming_to_rules(allow_incoming_clause)
else:
# An empty rule means allow all traffic.
r = [Rule(action="allow")]
rules.extend(r)
_log.debug("Calculated total set of rules: %s", rules)
return rules
def _allow_incoming_to_rules(self, allow_incoming_clause):
"""
Takes a single "allowIncoming" rule from a NetworkPolicy object
and returns a list of Calico Rule object with implement it.
"""
_log.debug("Processing ingress rule: %s", allow_incoming_clause)
# Generate to "to" arguments for this Rule.
ports = allow_incoming_clause.get("ports")
if ports:
_log.debug("Parsing 'ports': %s", ports)
to_args = self._generate_to_args(ports)
else:
_log.debug("No ports specified, allow all protocols / ports")
to_args = [{}]
# Generate "from" arguments for this Rule.
froms = allow_incoming_clause.get("from")
if froms:
_log.debug("Parsing 'from': %s", froms)
from_args = self._generate_from_args(froms)
else:
_log.debug("No from specified, allow from all sources")
from_args = [{}]
# Create a Rule per-protocol, per-from-clause.
_log.debug("Creating rules")
rules = []
for to_arg in to_args:
for from_arg in from_args:
_log.debug("\tAllow from %s to %s", from_arg, to_arg)
args = {"action": "allow"}
args.update(from_arg)
args.update(to_arg)
rules.append(Rule(**args))
return rules
def _generate_from_args(self, froms):
"""
Generate an arguments dictionary suitable for passing to
the constructor of a libcalico Rule object using the given
"from" clauses.
"""
from_args = []
for from_clause in froms:
# We need to check if the key exists, not just if there is
# a non-null value. The presence of the key with a null
# value means "select all".
_log.debug("Parsing 'from' clause: %s", from_clause)
pods_present = "pods" in from_clause
namespaces_present = "namespaces" in from_clause
_log.debug("Is 'pods:' present? %s", pods_present)
_log.debug("Is 'namespaces:' present? %s", namespaces_present)
if pods_present and namespaces_present:
# This is an error case according to the API.
msg = "Policy API does not support both 'pods' and " \
"'namespaces' selectors."
raise PolicyError(msg, self.policy)
elif pods_present:
# There is a pod selector in this "from" clause.
pod_selector = from_clause["pods"] or {}
_log.debug("Allow from pods: %s", pod_selector)
selectors = ["%s == '%s'" % (k, v) for k, v
in pod_selector.iteritems()]
# We can only select on pods in this namespace.
selectors.append("%s == '%s'" % (K8S_NAMESPACE_LABEL,
self.namespace))
selector = " && ".join(selectors)
# Append the selector to the from args.
_log.debug("Allowing pods which match: %s", selector)
from_args.append({"src_selector": selector})
elif namespaces_present:
# There is a namespace selector. Namespace labels are
# applied to each pod in the namespace using
# the per-namespace profile. We can select on namespace
# labels using the NS_LABEL_KEY_FMT modifier.
namespaces = from_clause["namespaces"] or {}
_log.debug("Allow from namespaces: %s", namespaces)
selectors = ["%s == '%s'" % (NS_LABEL_KEY_FMT % k, v)
for k, v in namespaces.iteritems()]
selector = " && ".join(selectors)
if selector:
# Allow from the selected namespaces.
_log.debug("Allowing from namespaces which match: %s",
selector)
from_args.append({"src_selector": selector})
else:
# Allow from all pods in all namespaces.
_log.debug("Allowing from all pods in all namespaces")
selector = "has(%s)" % K8S_NAMESPACE_LABEL
from_args.append({"src_selector": selector})
return from_args
def _generate_to_args(self, ports):
"""
Generates an arguments dictionary suitable for passing to
the constructor of a libcalico Rule object from the given ports.
"""
# Generate a list of ports allow for each specified
# protocol.
ports_by_protocol = {}
for to_port in ports:
# Keep a dict of ports exposed, keyed by protocol.
protocol = to_port.get("protocol")
port = to_port.get("port")
ports = ports_by_protocol.setdefault(protocol, [])
if port:
_log.debug("Allow to port: %s/%s", protocol, port)
ports.append(port)
# For each protocol, create a "to_arg" which allows to
# the ports specified for that protocol.
to_args = []
for protocol, ports in ports_by_protocol.iteritems():
arg = {"protocol": protocol.lower()}
if ports:
arg["dst_ports"] = ports
to_args.append(arg)
return to_args