forked from timbray/quamina
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpruner.go
342 lines (291 loc) · 8.75 KB
/
pruner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
package quamina
import (
"sync"
"time"
)
// prunerStats reports basic counts to aid in deciding when to rebuildWhileLocked.
type prunerStats struct {
// Some of these values are ints instead of uints because Go
// likes to print uints in hex, and I'd like to see some
// temporary logging output in decimal. ToDo: back to uints?
// Live the count of total live patterns.
Live int
// Added is the count of total patterns added.
Added int
// Deleted is the count of the patterns removed.
Deleted int
// Emitted is the count to total patterns found since the last
// rebuildWhileLocked.
Emitted int64
// Filtered is the count of pattners that have been removed
// from MatchFor results (since the last rebuildWhileLocked) because
// their patterns had been removed.
Filtered int64
// LastRebuilt is the time the last rebuildWhileLocked started.
LastRebuilt time.Time
// RebuildDuration is the duration of the last rebuildWhileLocked.
RebuildDuration time.Duration
// RebuildPurged is the count of patterns removed during
// rebuildWhileLocked.
RebuildPurged int
}
// prunerMatcher provides DeletePattern on top of quamina.matcher.
//
// prunerMatcher maintains the set of live patterns, and it will rebuildWhileLocked the
// underlying matcher synchronously periodically during standard
// operations (addPattern, DeletePattern, MatchesForFields).
//
// Roughly speaking, the current rebuildWhileLocked policy automatically rebuilds
// the index when the ratio of filtered patterns to emitted patterns
// exceeds 0.2 (and if there's been some traffic).
//
// An application can call rebuild to force a rebuildWhileLocked at any time.
// See prunerStats() to obtain some useful statistics about the matcher.
//
// Eventually automatically-invoked rebuildWhileLocked policies might be
// pluggable.
type prunerMatcher struct {
// Matcher is the underlying matcher that does the hard work.
Matcher *coreMatcher
// Maybe prunerMatcher should maybe not be embedded or public.
// live is live set of patterns.
live LivePatternsState
stats prunerStats
// rebuildTrigger, if not nil, determines when a mutation
// triggers a rebuildWhileLocked.
//
// If nil, no automatic rebuild is ever triggered.
rebuildTrigger rebuildTrigger
// lock protectes the pointer the underlying Matcher as well as stats.
//
// The Matcher pointer is updated after a successful rebuild.
// Stats are updated by Add, Delete, and rebuild.
lock sync.RWMutex
}
func (m *prunerMatcher) IsNameUsed(label []byte) bool {
return m.Matcher.IsNameUsed(label)
}
var defaultRebuildTrigger = newTooMuchFiltering(0.2, 1000)
// tooMuchFiltering is the standard rebuildTrigger, which will fire
// when:
//
// MinAction is less than the sum of counts of found and filtered
// patterns and
//
// FilteredToEmitted is greater than the ratio of counts of filtered
// and found patterns.
//
// defaultRebuildTrigger provides the default trigger policy used by
// newPrunerMatcher.
type tooMuchFiltering struct {
FilteredToEmitted float64
MinAction int64
}
func newTooMuchFiltering(ratio float64, min int64) *tooMuchFiltering {
return &tooMuchFiltering{
FilteredToEmitted: ratio,
MinAction: min,
}
}
// TODO: Figure out how to expose this through the Quamina type
func (t *tooMuchFiltering) rebuild(added bool, s *prunerStats) bool {
if added {
// No need to think when we're adding a pattern since
// that operation cannot result in an increase of
// filtered patterns.
return false
}
// If we haven't seen enough patterns emitted by the core
// prunerMatcher, don't rebuildWhileLocked.
if s.Emitted+s.Filtered < t.MinAction {
return false
}
// We won't rebuildWhileLocked if nothing's been emitted yet.
//
// In isolation, this heuristic is arguable, but for this
// policy we need it. Otherwise we'll divide by zero, and
// nobody wants that.
if s.Emitted == 0 {
return false
}
var (
numerator = float64(s.Filtered)
denominator = float64(s.Emitted)
ratio = numerator / denominator
)
return t.FilteredToEmitted < ratio
}
// disableRebuild will prevent any automatic rebuilds.
func (m *prunerMatcher) disableRebuild() {
m.lock.Lock()
m.rebuildTrigger = nil
m.lock.Unlock()
}
// rebuildTrigger provides a way to control when rebuilds are
// automatically triggered during standard operations.
//
// Currently an addPattern, deletePatterns, or matchesForFields can
// trigger a rebuild. When a rebuild is triggered, it's executed
// synchronously: the the Add/Delete/Match method doesn't return until
// the rebuild is complete.
type rebuildTrigger interface {
// rebuild should return true to trigger a rebuild.
//
// This method is called by AddPatter,deletePatterns, and
// matchesForFields. added is true when called by addPattern;
// false otherwise. These methods currently do not return
// until the rebuild is complete, so beware.
rebuild(added bool, s *prunerStats) bool
}
// newPrunerMatcher does what you'd expect.
//
// The LivePatternsState defaults to memState.
func newPrunerMatcher(s LivePatternsState) *prunerMatcher {
if s == nil {
s = newMemState()
}
trigger := *defaultRebuildTrigger // Copy
return &prunerMatcher{
Matcher: newCoreMatcher(),
live: s,
rebuildTrigger: &trigger,
}
}
// maybeRebuild calls rebuildTrigger and calls rebuildWhileLocked() if that
// trigger said to do that. If rebuildTrigger is nil, no rebuildWhileLocked is
// executed.
//
// This method assumes the caller has a write lock.
func (m *prunerMatcher) maybeRebuild(added bool) error {
if m.rebuildTrigger == nil {
return nil
}
if m.rebuildTrigger.rebuild(added, &m.stats) {
return m.rebuildWhileLocked(added)
}
return nil
}
// addPattern calls the underlying quamina.coreMatcher.addPattern
// method and then maybe rebuilds the index (if the addPattern
// succeeded).
func (m *prunerMatcher) addPattern(x X, pat string) error {
var err error
// Do we m.live.Add first or do we m.prunerMatcher.addPattern first?
if err = m.Matcher.addPattern(x, pat); err == nil {
m.lock.Lock()
m.stats.Added++
m.stats.Live++
_ = m.maybeRebuild(true)
m.lock.Unlock()
err = m.live.Add(x, pat)
// ToDo: Contemplate what do to about an error here
// (or if we got an error from addPattern after we did
// live.Add.
}
return err
}
// MatchesForJSONEvent calls MatchesForFields with a new Flattener.
func (m *prunerMatcher) MatchesForJSONEvent(event []byte) ([]X, error) {
fs, err := newJSONFlattener().Flatten(event, m)
if err != nil {
return nil, err
}
return m.matchesForFields(fs)
}
// MatchesForFields calls the underlying
// quamina.coreMatcher.matchesForFields and then maybe rebuilds the
// index.
func (m *prunerMatcher) matchesForFields(fields []Field) ([]X, error) {
xs, err := m.Matcher.matchesForFields(fields)
if err != nil {
return nil, err
}
// Remove any X that isn't in the live set.
acc := make([]X, 0, len(xs))
var emitted, filtered int64
for _, x := range xs {
have, err := m.live.Contains(x)
if err != nil {
return nil, err
}
if !have {
filtered++
continue
}
acc = append(acc, x)
emitted++
}
m.lock.Lock()
m.stats.Filtered += filtered
m.stats.Emitted += emitted
_ = m.maybeRebuild(false)
m.lock.Unlock()
return acc, nil
}
// DeletePattern removes the pattern from the index and maybe rebuilds
// the index.
func (m *prunerMatcher) deletePatterns(x X) error {
n, err := m.live.Delete(x)
if err == nil {
if 0 < n {
m.lock.Lock()
m.stats.Deleted += n
m.stats.Live -= n
_ = m.maybeRebuild(false)
m.lock.Unlock()
}
}
return err
}
// rebuild rebuilds the matcher state based on only live patterns.
//
// If calling fearlessly, then the old matcher is released before
// building the new one.
//
// This method resets the prunerStats.
func (m *prunerMatcher) rebuild(fearlessly bool) error {
m.lock.Lock()
err := m.rebuildWhileLocked(fearlessly)
m.lock.Unlock()
return err
}
// rebuildWhileLocked is rebuild but assumes having the lock.
func (m *prunerMatcher) rebuildWhileLocked(fearlessly bool) error {
// We assume we have the lock.
// Nothing fancy here now.
var (
then = time.Now()
m1 = newCoreMatcher()
)
if fearlessly {
// Let the GC reduce heap requirements?
m.Matcher = nil
}
count := 0
err := m.live.Iterate(func(x X, p string) error {
err := m1.addPattern(x, p)
if err == nil {
count++
}
return err
})
if err == nil {
m.Matcher = m1
m.stats.RebuildPurged = m.stats.Deleted
m.stats.Live = count
m.stats.Added = 0
m.stats.Deleted = 0
m.stats.Filtered = 0
m.stats.LastRebuilt = then
m.stats.RebuildDuration = time.Since(then)
}
return err
}
// prunerStats returns some statistics that might be helpful to rebuildWhileLocked
// policies.
func (m *prunerMatcher) getStats() prunerStats {
m.lock.RLock()
s := m.stats // Copies
m.lock.RUnlock()
return s
}