This repository has been archived by the owner on Aug 12, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlogger.go
364 lines (312 loc) · 12.2 KB
/
logger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
package sawmill
import (
"fmt"
"os"
"sync"
"sync/atomic"
"github.com/phemmer/sawmill/event"
"github.com/phemmer/sawmill/handler/filter"
"github.com/phemmer/sawmill/handler/syslog"
"github.com/phemmer/sawmill/handler/writer"
)
var exit func(code int) = os.Exit // this is for testing so we can prevent an actual exit
// Fields is a convenience type for passing ancillary data when generating events.
type Fields map[string]interface{}
// Handler represents a destination for sawmill to send events to.
// It responds to a single method, `Event`, which accepts the event to process. It must not return until the event has been fully processed.
type Handler interface {
Event(event *event.Event) error
}
type eventHandlerSpec struct {
name string
handler Handler
eventChannel chan *event.Event
finishChannel chan bool
lastSentEventId uint64
lastProcessedEventId uint64
lastProcessedEventIdCond *sync.Cond
}
// Logger is the core type in sawmill.
// The logger tracks a list of destinations, and when given an event, will asynchronously send that event to all registered destination handlers.
type Logger struct {
eventHandlerMap map[string]*eventHandlerSpec
stackMinLevel int32 // we store this as int32 instead of event.Level so that we can use atomic
mutex sync.RWMutex
waitgroup sync.WaitGroup
lastEventId uint64
syncEnabled uint32
}
// NewLogger constructs a Logger.
// The new Logger will not have any registered handlers.
//
// By default events will not include a stack trace. If any destination
// handler makes use of a stack trace, call SetStackMinLevel on the logger.
func NewLogger() *Logger {
return &Logger{
eventHandlerMap: make(map[string]*eventHandlerSpec),
stackMinLevel: int32(event.Emergency) + 1,
}
}
// SetStackMinLevel sets the minimum level at which to include a stack trace
// in events.
func (logger *Logger) SetStackMinLevel(level event.Level) {
atomic.StoreInt32(&logger.stackMinLevel, int32(level))
}
// GetStackMinLevel gets the minimum level at which to include a stack trace
// in events.
func (logger *Logger) GetStackMinLevel() event.Level {
return event.Level(atomic.LoadInt32(&logger.stackMinLevel))
}
// AddHandler registers a new destination handler with the logger.
//
// The name parameter is a unique identifier so that the handler can be targeted with RemoveHandler().
//
// If a handler with the same name already exists, it will be replaced by the new one.
// During replacement, the function will block waiting for any pending events to be flushed to the old handler.
func (logger *Logger) AddHandler(name string, handler Handler) {
spec := &eventHandlerSpec{
name: name,
handler: handler,
eventChannel: make(chan *event.Event, 100),
finishChannel: make(chan bool, 1),
lastProcessedEventIdCond: sync.NewCond(&sync.Mutex{}),
}
logger.waitgroup.Add(1)
go handlerDriver(spec, handler, &logger.waitgroup)
logger.mutex.Lock()
oldSpec := logger.eventHandlerMap[name]
logger.eventHandlerMap[name] = spec
logger.mutex.Unlock()
//TODO we need a way to leave the handler in the map while letting it drain.
// With the current code, we remove the handler and wait. But if someone
// calls Sync(), it won't know about this handler any more.
//
// We could add the handler into the map under an alternate name, and remove
// it once drained.
// Or we could have 2 maps. Add another one for "pending removal" handlers.
if oldSpec != nil {
oldSpec.eventChannel <- nil
<-oldSpec.finishChannel
}
}
func handlerDriver(spec *eventHandlerSpec, handler Handler, waitgroup *sync.WaitGroup) {
defer waitgroup.Done()
eventChannel := spec.eventChannel
finishChannel := spec.finishChannel
for logEvent := range eventChannel {
if logEvent == nil {
break
}
handler.Event(logEvent) //TODO error handler
spec.lastProcessedEventIdCond.L.Lock()
spec.lastProcessedEventId = logEvent.Id
spec.lastProcessedEventIdCond.Broadcast()
spec.lastProcessedEventIdCond.L.Unlock()
}
finishChannel <- true
}
// RemoveHandler removes the named handler from the logger, preventing any further events from being sent to it.
// The wait parameter will result in the function blocking until all events queued for the handler have finished processing.
func (logger *Logger) RemoveHandler(name string, wait bool) {
logger.mutex.Lock()
eventHandlerSpec := logger.eventHandlerMap[name]
if eventHandlerSpec == nil {
// doesn't exist
logger.mutex.Unlock()
return
}
delete(logger.eventHandlerMap, name)
logger.mutex.Unlock()
eventHandlerSpec.eventChannel <- nil
if !wait {
return
}
<-eventHandlerSpec.finishChannel
}
// GetHandler retrieves the handler with the given name.
// Returns nil if no such handler exists.
func (logger *Logger) GetHandler(name string) Handler {
logger.mutex.RLock()
handlerSpec := logger.eventHandlerMap[name]
logger.mutex.RUnlock()
if handlerSpec == nil {
return nil
}
return handlerSpec.handler
}
// FilterHandler is a convience wrapper for filter.New().
//
// Example usage:
// stdStreamsHandler := logger.GetHandler("stdStreams")
// stdStreamsHandler = logger.FilterHandler(stdStreamsHandler).LevelMin(sawmill.ErrorLevel)
// logger.AddHandler("stdStreams", stdStreamsHandler)
func (logger *Logger) FilterHandler(handler Handler, filterFuncs ...filter.FilterFunc) *filter.FilterHandler {
return filter.New(handler, filterFuncs...)
}
// Stop removes all destination handlers on the logger, and waits for any pending events to flush out.
func (logger *Logger) Stop() {
logger.checkPanic(recover())
logger.mutex.RLock()
handlerNames := make([]string, len(logger.eventHandlerMap))
for handlerName, _ := range logger.eventHandlerMap {
handlerNames = append(handlerNames, handlerName)
}
logger.mutex.RUnlock()
for _, handlerName := range handlerNames {
logger.RemoveHandler(handlerName, false)
}
logger.waitgroup.Wait() //TODO timeout?
}
// CheckPanic is used to check for panics and log them when encountered.
// The function must be executed via defer.
// CheckPanic will not halt the panic. After logging, the panic will be passed
// through.
func (logger *Logger) CheckPanic() {
// recover only works when called in the function that was deferred, not
// recurisvely. But the code is shared by several other functions.
logger.checkPanic(recover())
}
func (logger *Logger) checkPanic(err interface{}) {
if err == nil {
return
}
logger.Sync(logger.Critical("panic", Fields{"error": err}))
panic(err)
}
// InitStdStreams is a convience function to register a STDOUT/STDERR handler with the logger.
//
// The handler is added with the name 'stdStreams'
func (logger *Logger) InitStdStreams() {
logger.AddHandler("stdStreams", writer.NewStandardStreamsHandler())
}
// InitStdSyslog is a convenience function to register a syslog handler with the logger.
//
// The handler is added with the name 'syslog'
func (logger *Logger) InitStdSyslog() error {
syslogHandler, err := syslog.New("", "", 0, "")
if err != nil {
return err
}
logger.AddHandler("syslog", syslogHandler)
return nil
}
// Event queues a message at the given level.
// Additional fields may be provided, which will be recursively copied at the time of the function call, and provided to the destination output handler.
// It returns an event Id that can be used with Sync().
func (logger *Logger) Event(level event.Level, message string, fields ...interface{}) uint64 {
var eventFields interface{}
if len(fields) > 1 {
eventFields = fields
} else if len(fields) == 1 {
eventFields = fields[0]
} else if len(fields) == 0 {
eventFields = nil
}
getStack := int32(level) >= atomic.LoadInt32(&logger.stackMinLevel)
//TODO do we want to just remove the id param from event.New()?
logEvent := event.New(0, level, message, eventFields, getStack)
return logger.SendEvent(logEvent)
}
// SendEvent queues the given event.
// The event's `Id` field will be updated with a value that can be used by
// Sync(). This value is also provided as the return value for convenience.
func (logger *Logger) SendEvent(logEvent *event.Event) uint64 {
logEvent.Id = atomic.AddUint64(&logger.lastEventId, 1)
logger.mutex.RLock()
for _, eventHandlerSpec := range logger.eventHandlerMap {
if true { //TODO make dropping configurable per-handler
select {
case eventHandlerSpec.eventChannel <- logEvent:
atomic.StoreUint64(&eventHandlerSpec.lastSentEventId, logEvent.Id)
default:
fmt.Fprintf(os.Stderr, "Unable to send event to handler. Buffer full. handler=%s\n", eventHandlerSpec.name)
//TODO generate an event for this, but put in a time-last-dropped so we don't send the message to the handler which is dropping
// basically if we are dropping, and we last dropped < X seconds ago, don't generate another "event dropped" message
}
} else {
eventHandlerSpec.eventChannel <- logEvent
atomic.StoreUint64(&eventHandlerSpec.lastSentEventId, logEvent.Id)
}
}
logger.mutex.RUnlock()
if logger.GetSync() {
logger.Sync(logEvent.Id)
}
return logEvent.Id
}
// Emergency generates an event at the emergency level.
// It returns an event Id that can be used with Sync().
func (logger *Logger) Emergency(message string, fields ...interface{}) uint64 {
return logger.Event(event.Emergency, message, fields...)
}
// Alert generates an event at the alert level.
// It returns an event Id that can be used with Sync().
func (logger *Logger) Alert(message string, fields ...interface{}) uint64 {
return logger.Event(event.Alert, message, fields...)
}
// Critical generates an event at the critical level.
// It returns an event Id that can be used with Sync().
func (logger *Logger) Critical(message string, fields ...interface{}) uint64 {
return logger.Event(event.Critical, message, fields...)
}
// Error generates an event at the error level.
// It returns an event Id that can be used with Sync().
func (logger *Logger) Error(message string, fields ...interface{}) uint64 {
return logger.Event(event.Error, message, fields...)
}
// Warning generates an event at the warning level.
// It returns an event Id that can be used with Sync().
func (logger *Logger) Warning(message string, fields ...interface{}) uint64 {
return logger.Event(event.Warning, message, fields...)
}
// Notice generates an event at the notice level.
// It returns an event Id that can be used with Sync().
func (logger *Logger) Notice(message string, fields ...interface{}) uint64 {
return logger.Event(event.Notice, message, fields...)
}
// Info generates an event at the info level.
// It returns an event Id that can be used with Sync().
func (logger *Logger) Info(message string, fields ...interface{}) uint64 {
return logger.Event(event.Info, message, fields...)
}
// Debug generates an event at the debug level.
// It returns an event Id that can be used with Sync().
func (logger *Logger) Debug(message string, fields ...interface{}) uint64 {
return logger.Event(event.Debug, message, fields...)
}
// Fatal generates an event at the critical level, and then exits the program with status 1
func (logger *Logger) Fatal(message string, fields ...interface{}) {
logger.Critical(message, fields...)
logger.Stop()
exit(1)
}
// Sync blocks until the given event Id has been flushed out to all destinations.
func (logger *Logger) Sync(eventId uint64) {
logger.mutex.RLock()
for _, eventHandlerSpec := range logger.eventHandlerMap {
if atomic.LoadUint64(&eventHandlerSpec.lastSentEventId) < eventId {
// lastSentEventId wasn't incremented, meaning it was dropped. no point waiting for it
continue
}
// wait for the lastProcessedEventId to become >= eventId
eventHandlerSpec.lastProcessedEventIdCond.L.Lock()
for eventHandlerSpec.lastProcessedEventId < eventId {
eventHandlerSpec.lastProcessedEventIdCond.Wait()
}
eventHandlerSpec.lastProcessedEventIdCond.L.Unlock()
}
logger.mutex.RUnlock()
}
// SetSync controls synchronous event mode. When set to true, a function call
// to generate an event does not return until the event has been processed.
func (logger *Logger) SetSync(enabled bool) {
if enabled {
atomic.StoreUint32(&logger.syncEnabled, 1)
} else {
atomic.StoreUint32(&logger.syncEnabled, 0)
}
}
// GetSync indicates whether syncronous mode is enabled.
func (logger *Logger) GetSync() bool {
return atomic.LoadUint32(&logger.syncEnabled) == 1
}