diff --git a/transport.go b/transport.go index d0f39d99a..3ac30d332 100644 --- a/transport.go +++ b/transport.go @@ -96,16 +96,25 @@ func getRequestBodyFromEvent(event *Event) []byte { // HTTPTransport // ================================ +// A batch groups items that are processed sequentially. +type batch struct { + items chan *http.Request + started chan struct{} // closed to signal items started to be worked on + done chan struct{} // closed to signal completion of all items +} + // HTTPTransport is a default implementation of `Transport` interface used by `Client`. type HTTPTransport struct { dsn *Dsn client *http.Client transport *http.Transport - buffer chan *http.Request + // buffer is a channel of batches. Calling Flush terminates work on the + // current in-flight items and starts a new batch for subsequent events. + buffer chan batch + disabledUntil time.Time - wg sync.WaitGroup start sync.Once // Size of the transport buffer. Defaults to 30. @@ -130,9 +139,14 @@ func (t *HTTPTransport) Configure(options ClientOptions) { Logger.Printf("%v\n", err) return } - t.dsn = dsn - t.buffer = make(chan *http.Request, t.BufferSize) + + t.buffer = make(chan batch, 1) + t.buffer <- batch{ + items: make(chan *http.Request, t.BufferSize), + started: make(chan struct{}), + done: make(chan struct{}), + } if options.HTTPTransport != nil { t.transport = options.HTTPTransport @@ -178,10 +192,10 @@ func (t *HTTPTransport) SendEvent(event *Event) { request.Header.Set(headerKey, headerValue) } - t.wg.Add(1) + b := <-t.buffer select { - case t.buffer <- request: + case b.items <- request: Logger.Printf( "Sending %s event [%s] to %s project: %d\n", event.Level, @@ -190,51 +204,87 @@ func (t *HTTPTransport) SendEvent(event *Event) { t.dsn.projectID, ) default: - t.wg.Done() Logger.Println("Event dropped due to transport buffer being full.") - // worker would block, drop the packet } + + t.buffer <- b } // Flush notifies when all the buffered events have been sent by returning `true` // or `false` if timeout was reached. func (t *HTTPTransport) Flush(timeout time.Duration) bool { - c := make(chan struct{}) + toolate := time.After(timeout) + + var b batch + for { + // Wait until processing the current batch has started or the timeout. + select { + case b = <-t.buffer: + select { + case <-b.started: + goto started + default: + t.buffer <- b + } + case <-toolate: + goto fail + } + } - go func() { - t.wg.Wait() - close(c) - }() +started: + // Signal that there won't be any more items in this batch, so that the + // worker inner loop can end. + close(b.items) + // Start a new batch for subsequent events. + t.buffer <- batch{ + items: make(chan *http.Request, t.BufferSize), + started: make(chan struct{}), + done: make(chan struct{}), + } + // Wait until the current batch is done or the timeout. select { - case <-c: + case <-b.done: Logger.Println("Buffer flushed successfully.") return true - case <-time.After(timeout): - Logger.Println("Buffer flushing reached the timeout.") - return false + case <-toolate: + goto fail } + +fail: + Logger.Println("Buffer flushing reached the timeout.") + return false } func (t *HTTPTransport) worker() { - for request := range t.buffer { - if time.Now().Before(t.disabledUntil) { - t.wg.Done() - continue - } - - response, err := t.client.Do(request) - - if err != nil { - Logger.Printf("There was an issue with sending an event: %v", err) - } - - if response != nil && response.StatusCode == http.StatusTooManyRequests { - t.disabledUntil = time.Now().Add(retryAfter(time.Now(), response)) - Logger.Printf("Too many requests, backing off till: %s\n", t.disabledUntil) + for b := range t.buffer { + // Signal that processing of the current batch has started. + close(b.started) + + // Return the batch to the buffer so that other goroutines can use it. + // Equivalent to releasing a lock. + t.buffer <- b + + // Process all batch items. + for request := range b.items { + if time.Now().Before(t.disabledUntil) { + continue + } + + response, err := t.client.Do(request) + + if err != nil { + Logger.Printf("There was an issue with sending an event: %v", err) + } + + if response != nil && response.StatusCode == http.StatusTooManyRequests { + t.disabledUntil = time.Now().Add(retryAfter(time.Now(), response)) + Logger.Printf("Too many requests, backing off till: %s\n", t.disabledUntil) + } } - t.wg.Done() + // Signal that processing of the batch is done. + close(b.done) } }