-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmoSummaryMetrics.py
133 lines (123 loc) · 5.97 KB
/
moSummaryMetrics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import numpy as np
import numpy.ma as ma
import warnings
from moMetrics import BaseMoMetric
__all__ = ['ValueAtHMetric',
'CompletenessMetric', 'CumulativeCompletenessMetric',
'integrateOverH']
def integrateOverH(Mvalues, Hvalues, Hindex = 0.3):
# Set expected H distribution.
# dndh = differential size distribution (number in this bin)
dndh = np.power(10., Hindex*(Hvalues-Hvalues.min()))
# dn = cumulative size distribution (number in this bin and brighter)
intVals = np.cumsum(Mvalues*dndh)/np.cumsum(dndh)
return intVals
class ValueAtHMetric(BaseMoMetric):
"""
Return the value of a metric at a given H.
"""
def __init__(self, Hmark=22, **kwargs):
metricName = 'Value At H=%.1f' %(Hmark)
super(ValueAtHMetric, self).__init__(metricName=metricName, **kwargs)
self.Hmark = Hmark
def run(self, metricVals, Hvals):
# Check if desired H value is within range of H values.
if (self.Hmark < Hvals.min()) or (self.Hmark > Hvals.max()):
warnings.warn('Desired H value of metric outside range of provided H values.')
return None
nHvals = len(Hvals)
nHMetricVals = metricVals.shape[1]
if nHvals == nHMetricVals:
# Hvals matched the points where the metric values were calculated (clone H distribution).
eps = 1.0e-6
# Hvals is an array used for each metric value,
# we have to pick out the particular metricValues to use.
diffs = np.abs(self.Hmark - Hvals)
Hidx = np.where(diffs == diffs.min())[0]
result = metricVals.swapaxes(0,1)[Hidx]
Hmark = Hvals[Hidx]
self.name = 'Value At H=%.1f' %(Hmark)
else:
# We have a range of metric values, one per Hval.
result = np.interpolate([self.Hmark], Hvals, metricVals.swapaxes(0, 1))
return result, Hmark
class CompletenessMetric(BaseMoMetric):
"""
Take the discoveryChances metric results and turn it into
completeness estimate (relative to the entire population).
Require at least 'requiredChances' to count an object as "found".
"""
def __init__(self, requiredChances=1, nbins=20, minHrange=1.0, **kwargs):
super(CompletenessMetric, self).__init__(**kwargs)
self.units = '@ H'
self.requiredChances = requiredChances
# If H is not a cloned distribution, then we need to specify how to bin these values.
self.nbins = nbins
self.minHrange = minHrange
def run(self, discoveryChances, Hvals):
nSsos = discoveryChances.shape[0]
nHval = len(Hvals)
discoveriesH = discoveryChances.swapaxes(0, 1)
if nHval == discoveryChances.shape[1]:
# Hvals array is probably the same as the cloned H array.
completeness = np.zeros(len(Hvals), float)
for i, H in enumerate(Hvals):
completeness[i] = np.where(discoveriesH[i].filled(0) >= self.requiredChances)[0].size
completeness = completeness / float(nSsos)
else:
# The Hvals are spread more randomly among the objects (we probably used one per object).
hrange = Hvals.max() - Hvals.min()
minH = Hvals.min()
if hrange < self.minHrange:
hrange = self.minHrange
minH = Hvals.min() - hrange/2.0
stepsize = hrange / float(self.nbins)
bins = np.arange(minH, minH + hrange + stepsize/2.0, stepsize)
Hvals = bins[:-1]
n_all, b = np.histogram(discoveriesH[0], bins)
condition = np.where(discoveriesH[0] >= self.requiredChances)[0]
n_found, b = np.histogram(discoveriesH[0][condition], bins)
completeness = n_found.astype(float) / n_all.astype(float)
completeness = np.where(n_all==0, 0, completeness)
return completeness, Hvals
class CumulativeCompletenessMetric(BaseMoMetric):
"""
Take the discoveryChances metric results and turn it into
completeness estimate (relative to the entire population).
Require at least 'requiredChances' to count an object as "found".
"""
def __init__(self, requiredChances=1, nbins=20, minHrange=1.0, Hindex=0.3, **kwargs):
super(CumulativeCompletenessMetric, self).__init__(**kwargs)
self.units = '<= H'
self.requiredChances = requiredChances
# If H is not a cloned distribution, then we need to specify how to bin these values.
self.nbins = nbins
self.minHrange = minHrange
self.Hindex = Hindex
def run(self, discoveryChances, Hvals):
nSsos = discoveryChances.shape[0]
nHval = len(Hvals)
discoveriesH = discoveryChances.swapaxes(0, 1)
if nHval == discoveryChances.shape[1]:
# Hvals array is probably the same as the cloned H array.
completeness = np.zeros(len(Hvals), float)
for i, H in enumerate(Hvals):
completeness[i] = np.where(discoveriesH[i].filled(0) >= self.requiredChances)[0].size
completeness = completeness / float(nSsos)
else:
# The Hvals are spread more randomly among the objects (we probably used one per object).
hrange = Hvals.max() - Hvals.min()
minH = Hvals.min()
if hrange < self.minHrange:
hrange = self.minHrange
minH = Hvals.min() - hrange/2.0
stepsize = hrange / float(self.nbins)
bins = np.arange(minH, minH + hrange + stepsize/2.0, stepsize)
Hvals = bins[:-1]
n_all, b = np.histogram(discoveriesH[0], bins)
condition = np.where(discoveriesH[0] >= self.requiredChances)[0]
n_found, b = np.histogram(discoveriesH[0][condition], bins)
completeness = n_found.astype(float) / n_all.astype(float)
completeness = np.where(n_all==0, 0, completeness)
completenessInt = integrateOverH(completeness, Hvals, self.Hindex)
return completenessInt, Hvals