forked from casbin/pycasbin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_filter.py
125 lines (101 loc) · 4.83 KB
/
test_filter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import casbin
import os
from unittest import TestCase
import unittest
class Filter:
#P,G are strings
P = []
G = []
class TestFilteredAdapter(TestCase):
def test_init_filtered_adapter(self):
adapter = casbin.persist.adapters.FilteredAdapter("examples/rbac_with_domains_policy.csv")
e = casbin.Enforcer("examples/rbac_with_domains_model.conf", adapter)
self.assertFalse(e.has_policy(['admin', 'domain1', 'data1','read']))
def test_load_filtered_policy(self):
adapter = casbin.persist.adapters.FilteredAdapter("examples/rbac_with_domains_policy.csv")
e = casbin.Enforcer("examples/rbac_with_domains_model.conf", adapter)
filter = Filter()
filter.P = ["", "domain1"]
filter.G = ["", "", "domain1"]
try:
e.load_policy()
except:
raise RuntimeError("nexpected error in LoadFilteredPolicy")
self.assertTrue(e.has_policy(['admin', 'domain1', 'data1','read']))
self.assertTrue(e.has_policy(['admin', 'domain2', 'data2','read']))
try:
e.load_filtered_policy(filter)
except:
raise RuntimeError("unexpected error in LoadFilteredPolicy")
if not e.is_filtered:
raise RuntimeError("adapter did not set the filtered flag correctly")
self.assertTrue(e.has_policy(['admin', 'domain1', 'data1','read']))
self.assertFalse(e.has_policy(['admin', 'domain2', 'data2','read']))
with self.assertRaises(RuntimeError):
e.save_policy()
with self.assertRaises(RuntimeError):
e.get_adapter().save_policy(e.get_model())
def test_append_filtered_policy(self):
adapter = casbin.persist.adapters.FilteredAdapter("examples/rbac_with_domains_policy.csv")
e = casbin.Enforcer("examples/rbac_with_domains_model.conf", adapter)
filter = Filter()
filter.P = ["", "domain1"]
filter.G = ["", "", "domain1"]
try:
e.load_policy()
except:
raise RuntimeError("nexpected error in LoadFilteredPolicy")
self.assertTrue(e.has_policy(['admin', 'domain1', 'data1','read']))
self.assertTrue(e.has_policy(['admin', 'domain2', 'data2','read']))
try:
e.load_filtered_policy(filter)
except:
raise RuntimeError("unexpected error in LoadFilteredPolicy")
if not e.is_filtered:
raise RuntimeError("adapter did not set the filtered flag correctly")
self.assertTrue(e.has_policy(['admin', 'domain1', 'data1','read']))
self.assertFalse(e.has_policy(['admin', 'domain2', 'data2','read']))
filter.P = ["", "domain2"]
filter.G = ["", "", "domain2"]
try:
e.load_increment_filtered_policy(filter)
except:
raise RuntimeError("unexpected error in LoadFilteredPolicy")
self.assertTrue(e.has_policy(['admin', 'domain1', 'data1','read']))
self.assertTrue(e.has_policy(['admin', 'domain2', 'data2','read']))
def test_filtered_policy_invalid_filter(self):
adapter = casbin.persist.adapters.FilteredAdapter("examples/rbac_with_domains_policy.csv")
e = casbin.Enforcer("examples/rbac_with_domains_model.conf", adapter)
filter = ["","domain1"]
with self.assertRaises(RuntimeError):
e.load_filtered_policy(filter)
def test_filtered_policy_empty_filter(self):
adapter = casbin.persist.adapters.FilteredAdapter("examples/rbac_with_domains_policy.csv")
e = casbin.Enforcer("examples/rbac_with_domains_model.conf", adapter)
try:
e.load_filtered_policy(None)
except:
raise RuntimeError("unexpected error in LoadFilteredPolicy")
if e.is_filtered():
raise RuntimeError("adapter did not reset the filtered flag correctly")
try:
e.save_policy()
except:
raise RuntimeError("unexpected error in SavePolicy")
def test_unsupported_filtered_policy(self):
e = casbin.Enforcer("examples/rbac_with_domains_model.conf", "examples/rbac_with_domains_policy.csv")
filter = Filter()
filter.P = ["", "domain1"]
filter.G = ["", "", "domain1"]
with self.assertRaises(ValueError):
e.load_filtered_policy(filter)
def test_filtered_adapter_empty_filepath(self):
adapter = casbin.persist.adapters.FilteredAdapter("")
e = casbin.Enforcer("examples/rbac_with_domains_model.conf",adapter)
with self.assertRaises(RuntimeError):
e.load_filtered_policy(None)
def test_filtered_adapter_invalid_filepath(self):
adapter = casbin.persist.adapters.FilteredAdapter("examples/does_not_exist_policy.csv")
e = casbin.Enforcer("examples/rbac_with_domains_model.conf", adapter)
with self.assertRaises(RuntimeError):
e.load_filtered_policy(None)