-
Notifications
You must be signed in to change notification settings - Fork 476
/
layers.py
223 lines (181 loc) · 8.85 KB
/
layers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
from copy import deepcopy
from functools import lru_cache
from itertools import combinations
from collections import defaultdict, MutableSet
from aimacode.planning import Action
from aimacode.utils import expr, Expr
##############################################################################
# YOU DO NOT NEED TO MODIFY CODE IN THIS FILE #
##############################################################################
@lru_cache()
def make_node(action, no_op=False):
""" Convert Action objects to planning graph nodes by creating distinct
symbols for positive and negative fluents and then combining positive &
negative preconditions and effects into sets. This allows efficient membership
testing and perserves logical negation semantics on the symbolic actions.
"""
preconditions = set(action.precond_pos) | set([~p for p in action.precond_neg])
effects = set(action.effect_add) | set([~e for e in action.effect_rem])
return ActionNode(str(action), frozenset(preconditions), frozenset(effects), no_op)
@lru_cache()
def makeNoOp(literal):
""" Create so-called 'no-op' actions, which only exist in a planning graph
(they are not real actions in the problem domain) to persist a literal
from one layer of the planning graph to the next.
no-op actions are created such that logical negation is correctly evaluated.
i.e., the no-op action of the negative literal ~At(place) is the logical
negation of the no-op action of positive literal At(place); in other words
NoOp::~At(place) == ~(NoOp::At(place) -- NOTE: NoOp::~At(place) is not a valid
action, but the correct semantics are handled and enforced automatically.
"""
action = Expr("NoOp::" + literal.op, literal.args)
return (Action(action, [set([literal]), []], [set([literal]), []]),
Action(~action, [set([~literal]), []], [set([~literal]), []]))
class ActionNode(object):
""" Efficient representation of Actions for planning graph
Attributes
----------
expr : Expr
An instance of aimacode.utils.Expr (a string-based symbolic expression)
preconditions : set()
A set of mixed positive and negative literal aimacode.utils.Expr
expressions (symbolic representations like X, ~Y, etc.) that are
preconditions of this action
effects : set()
A set of mixed positive and negative literal aimacode.utils.Expr
expressions (symbolic representations like X, ~Y, etc.) that are
results of applying this action
no_op : bool
A boolean flag indicating whether the instance is a no-op action
(used to serialize planning graphs)
"""
__slots__ = ['expr', 'preconditions', 'effects', 'no_op', '__hash']
def __init__(self, symbol, preconditions, effects, no_op):
self.expr = symbol
self.preconditions = preconditions
self.effects = effects
self.no_op = no_op
self.__hash = hash(symbol)
def __hash__(self): return self.__hash
def __str__(self): return str(self.expr)
def __repr__(self): return self.__str__()
def __eq__(self, other):
return (isinstance(other, ActionNode)
and self.expr == other.expr)
class BaseLayer(MutableSet):
""" Base class for ActionLayer and LiteralLayer classes for planning graphs
that stores actions or literals as a mutable set (which enables terse,
efficient membership testing and expansion)
Attributes
----------
parents : dict
Mapping from each item (action or literal) in the current layer to the
symbolic node(s) in parent layer of the planning graph. E.g.,
parents[actionA] is a set containing the symbolic literals (positive AND
negative) that are preconditions of the action.
children : dict
Mapping from each item (action or literal) in the current layer to the
symbolic node(s) in the child layer of the planning graph. E.g.,
children[actionA] is a set containing the symbolic literals (positive AND
negative) that are set by performing actionA.
parent_layer : BaseLayer (or subclass)
Contains a reference to the layer preceding this one in the planning graph;
the root literal layer of a planning graph contains an empty ActionLayer as
parent. (This ensures that parent_layer.is_mutex() is always defined for
real layers in the planning graph) Action layers always have a literal layer
as parent, and literal layers always have an action layer as parent.
_mutexes : dict
Mapping from each item (action or literal) to a set containing all items
that are mutex to the key. E.g., _mutexes[literaA] is a set of literals
that are mutex to literalA in this level of the planning graph
_ignore_mutexes : bool
If _ignore_mutexes is True then _dynamic_ mutexes will be ignored (static
mutexes are *always* enforced). For example, a literal X is always mutex
with ~X, but "competing needs" or "inconsistent support" can be skipped
"""
def __init__(self, items=[], parent_layer=None, ignore_mutexes=False):
"""
Parameters
----------
items : iterable
Collection of items to store in the layer (literals or actions)
parent_layer : BaseLayer (or subclass)
See parent_layer attribute
ignore_mutexes : bool
See _ignore_mutexes attribute
"""
super().__init__()
self.__store = set(iter(items))
self.parents = defaultdict(set)
self.children = defaultdict(set)
self._mutexes = defaultdict(set)
self.parent_layer = parent_layer
self._ignore_mutexes = ignore_mutexes
def __contains__(self, item):
return item in self.__store
def __iter__(self):
return iter(self.__store)
def __len__(self):
return len(self.__store)
def __eq__(self, other):
return (len(self) == len(other) and
len(self._mutexes) == len(other._mutexes) and
0 == len(self ^ other) and self._mutexes == other._mutexes)
def add(self, item):
self.__store.add(item)
def discard(self, item):
try:
self.__store.discard(item)
except ValueError:
pass
def set_mutex(self, itemA, itemB):
self._mutexes[itemA].add(itemB)
self._mutexes[itemB].add(itemA)
def is_mutex(self, itemA, itemB):
return itemA in self._mutexes.get(itemB, [])
class BaseActionLayer(BaseLayer):
def __init__(self, actions=[], parent_layer=None, serialize=True, ignore_mutexes=False):
super().__init__(actions, parent_layer, ignore_mutexes)
self._serialize=serialize
if isinstance(actions, BaseActionLayer):
self.parents.update({k: set(v) for k, v in actions.parents.items()})
self.children.update({k: set(v) for k, v in actions.children.items()})
def update_mutexes(self):
for actionA, actionB in combinations(iter(self), 2):
if self._serialize and actionA.no_op == actionB.no_op == False:
self.set_mutex(actionA, actionB)
elif (self._inconsistent_effects(actionA, actionB)
or self._interference(actionA, actionB)):
self.set_mutex(actionA, actionB)
elif self._ignore_mutexes:
continue
elif self._competing_needs(actionA, actionB):
self.set_mutex(actionA, actionB)
def add_inbound_edges(self, action, literals):
# inbound action edges are many-to-one
self.parents[action] |= set(literals)
def add_outbound_edges(self, action, literals):
# outbound action edges are one-to-many
self.children[action] |= set(literals)
class BaseLiteralLayer(BaseLayer):
def __init__(self, literals=[], parent_layer=None, ignore_mutexes=False):
super().__init__(literals, parent_layer, ignore_mutexes)
if isinstance(literals, BaseLiteralLayer):
self.parents.update({k: set(v) for k, v in literals.parents.items()})
self.children.update({k: set(v) for k, v in literals.children.items()})
def update_mutexes(self):
for literalA, literalB in combinations(iter(self), 2):
if self._negation(literalA, literalB):
self.set_mutex(literalA, literalB)
elif self._ignore_mutexes:
continue
elif len(self.parent_layer) and self._inconsistent_support(literalA, literalB):
self.set_mutex(literalA, literalB)
def add_inbound_edges(self, action, literals):
# inbound literal edges are many-to-many
for literal in literals:
self.parents[literal].add(action)
def add_outbound_edges(self, action, literals):
# outbound literal edges are many-to-many
for literal in literals:
self.children[literal].add(action)