This repository has been archived by the owner on May 18, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathnode.py
68 lines (54 loc) · 2.31 KB
/
node.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import numpy as np
from functools import reduce
from utils import dot_T
class Node:
def __init__(self, name):
self.name = name
self.cardinality = None
self.likelihood = None
self.priors = None
self.belief = None
self.parents = []
self.children = []
self.m = None
def add_parent(self, node):
self.parents.append(node)
node.children.append(self)
def __str__(self):
return self.name
def message_to_parent(self, parent):
"""
returns marginalized out parent message:
- in m: group all entries by receiver parent values (all with 0 together, all with 1 together)
- use other values in groups to get likelihood and messages from other parents
- multiply those values in each group element
- sum each group
"""
likelihood = self.get_likelihood()
parents_priors = np.array([p.message_to_child(self) for p in self.parents if p != parent])
parent_i = self.parents.index(parent)
stack = np.vstack([np.dot(self.m.take(r, axis=parent_i).transpose(), parents_priors.prod(axis=0)) for r in range(parent.cardinality)])
return np.dot(stack, likelihood)
def message_to_child(self, child):
children_messages = np.array([c.message_to_parent(self) for c in self.children if c != child])
if len(children_messages) > 0:
unnormalized = (children_messages * self.get_priors()).prod(axis=0)
message = unnormalized/unnormalized.sum()
return message
return self.get_priors()
def get_likelihood(self):
if self.likelihood is not None:
return self.likelihood
incoming_children_messages = np.array([c.message_to_parent(self) for c in self.children])
return incoming_children_messages.prod(axis=0)
def get_priors(self):
if self.priors is not None:
return self.priors
parents_messages = [p.message_to_child(self) for p in self.parents]
priors = reduce(np.dot, [self.m.transpose()]+parents_messages)
return priors
def get_belief(self):
if self.belief is not None:
return self.belief
unnormalized = self.get_likelihood() * self.get_priors()
return unnormalized/unnormalized.sum()