-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_qabal.py
executable file
·137 lines (113 loc) · 3.73 KB
/
test_qabal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import pytest
from qabal import Item, Session, inject
from types import ModuleType
def foo(item):
item['foo'] = 'foo'
def bar(item):
item['bar'] = 'bar'
def baz(item):
item['baz'] = 'baz'
return item
def bar_baz(item):
item['bar'] = 'bar!'
item['baz'] = 'baz!'
return item
def sig_based_analytic(bar, baz):
return {'bar_baz': bar + baz}
def fubar(item):
item['fubared'] = True
def analytic_with_metadata(bar):
return bar + '?'
analytic_with_metadata.__inject__ = True
analytic_with_metadata.__trigger__ = Item['bar'] == 'bar'
analytic_with_metadata.__creates__ = 'bar'
# Dynamically create a module
test_module = ModuleType('test_module')
test_module.__dict__['analytic_with_metadata'] = analytic_with_metadata
def test_qabal_empty_route():
sess = Session()
res = sess.feed({'hello': 'world'})
assert res['hello'] == 'world'
def test_qabal_empty_message():
sess = Session()
sess.add(foo, Item['type'] == 'foo')
res = sess.feed({})
assert len(res.provenance) == 0
def test_qabal_simple_route():
sess = Session(provenance='extended')
sess.add(foo, Item['type'] == 'foo')
res = sess.feed({'type': 'foo'})
assert res['foo'] == 'foo'
assert res['type'] == 'foo'
assert len(res.provenance) == 1
analytic, timestamp, changes = res.provenance[0]
assert analytic.__name__ == 'foo'
assert len(changes) == 1
def test_qabal_complex_route():
sess = Session()
sess.add(foo, Item['type'] == 'foo')
sess.add(bar, Item['foo'] == 'foo')
res = sess.feed({'type': 'foo'})
assert res['bar'] == 'bar'
sess.add(bar_baz, Item['foo'] == 'foo')
res = sess.feed({'type': 'foo'})
assert res['bar'] == 'bar!'
sess.add(fubar, Item['baz'] == 'baz!')
res = sess.feed({'type': 'foo'})
assert res['fubared']
res = sess.feed({'type': 'bar'})
assert not res.get('fubared')
sess.add(baz, Item['fubared'])
res = sess.feed({'fubared': True})
assert res['baz'] == 'baz'
res = sess.feed({'type': 'foo'})
assert res['baz'] == 'baz'
assert res['bar'] == 'bar!'
sess.add(inject(sig_based_analytic), Item['baz'] == 'baz!')
res = sess.feed({'type': 'foo'})
assert res['bar_baz'] == 'bar!baz!'
assert len(res.provenance) == 6
assert res.provenance[0].__name__ == 'foo'
def test_qabal_remove():
sess = Session()
sess.add(foo, Item['type'] == 'foo')
bar_trigger = sess.add(bar, Item['foo'] == 'foo')
res = sess.feed({'type': 'foo'})
assert res['bar'] == 'bar'
sess.remove(bar_trigger)
res = sess.feed({'type': 'foo'})
print(res)
assert res.get('bar') is None
def test_qabal_query_ast():
assert Item['foo'].keys == ['foo']
Item.keys = []
trigger, _ = Item['foo'] == 'foo'
assert trigger == 'foo'
assert Item['foo']['bar'] == ['foo', 'bar']
Item.keys = []
trigger, test = Item['foo']['bar']['baz'] == 'foo'
assert trigger == 'foo.bar.baz'
assert test('foo')
trigger, test = Item['foo']['bar']['baz'] > 5
assert test(6)
assert not test(5)
trigger, test = Item['foo']['bar']['baz'] != 'foo'
assert test('foob')
assert not test('foo')
assert trigger == 'foo.bar.baz'
def test_analytic_metadata():
sess = Session()
sess.add(bar, Item['foo'] == 'foo')
res = sess.feed({'foo': 'foo'})
assert res['bar'] == 'bar'
handle = sess.add(analytic_with_metadata)
res = sess.feed({'foo': 'foo'})
assert res['bar'] == 'bar?'
sess.remove(handle)
res = sess.feed({'foo': 'foo'})
assert res['bar'] == 'bar'
sess.add(test_module)
res = sess.feed({'foo': 'foo'})
assert res['bar'] == 'bar?'
with pytest.raises(ValueError, message='Passing analytics without metadata and no route'):
sess.add(foo)