@@ -96,16 +96,25 @@ func getRequestBodyFromEvent(event *Event) []byte {
96
96
// HTTPTransport
97
97
// ================================
98
98
99
+ // A batch groups items that are processed sequentially.
100
+ type batch struct {
101
+ items chan * http.Request
102
+ started chan struct {} // closed to signal items started to be worked on
103
+ done chan struct {} // closed to signal completion of all items
104
+ }
105
+
99
106
// HTTPTransport is a default implementation of `Transport` interface used by `Client`.
100
107
type HTTPTransport struct {
101
108
dsn * Dsn
102
109
client * http.Client
103
110
transport * http.Transport
104
111
105
- buffer chan * http.Request
112
+ // buffer is a channel of batches. Calling Flush terminates work on the
113
+ // current in-flight items and starts a new batch for subsequent events.
114
+ buffer chan batch
115
+
106
116
disabledUntil time.Time
107
117
108
- wg sync.WaitGroup
109
118
start sync.Once
110
119
111
120
// Size of the transport buffer. Defaults to 30.
@@ -130,9 +139,14 @@ func (t *HTTPTransport) Configure(options ClientOptions) {
130
139
Logger .Printf ("%v\n " , err )
131
140
return
132
141
}
133
-
134
142
t .dsn = dsn
135
- t .buffer = make (chan * http.Request , t .BufferSize )
143
+
144
+ t .buffer = make (chan batch , 1 )
145
+ t .buffer <- batch {
146
+ items : make (chan * http.Request , t .BufferSize ),
147
+ started : make (chan struct {}),
148
+ done : make (chan struct {}),
149
+ }
136
150
137
151
if options .HTTPTransport != nil {
138
152
t .transport = options .HTTPTransport
@@ -178,10 +192,10 @@ func (t *HTTPTransport) SendEvent(event *Event) {
178
192
request .Header .Set (headerKey , headerValue )
179
193
}
180
194
181
- t . wg . Add ( 1 )
195
+ b := <- t . buffer
182
196
183
197
select {
184
- case t . buffer <- request :
198
+ case b . items <- request :
185
199
Logger .Printf (
186
200
"Sending %s event [%s] to %s project: %d\n " ,
187
201
event .Level ,
@@ -190,51 +204,87 @@ func (t *HTTPTransport) SendEvent(event *Event) {
190
204
t .dsn .projectID ,
191
205
)
192
206
default :
193
- t .wg .Done ()
194
207
Logger .Println ("Event dropped due to transport buffer being full." )
195
- // worker would block, drop the packet
196
208
}
209
+
210
+ t .buffer <- b
197
211
}
198
212
199
213
// Flush notifies when all the buffered events have been sent by returning `true`
200
214
// or `false` if timeout was reached.
201
215
func (t * HTTPTransport ) Flush (timeout time.Duration ) bool {
202
- c := make (chan struct {})
216
+ toolate := time .After (timeout )
217
+
218
+ var b batch
219
+ for {
220
+ // Wait until processing the current batch has started or the timeout.
221
+ select {
222
+ case b = <- t .buffer :
223
+ select {
224
+ case <- b .started :
225
+ goto started
226
+ default :
227
+ t .buffer <- b
228
+ }
229
+ case <- toolate :
230
+ goto fail
231
+ }
232
+ }
203
233
204
- go func () {
205
- t .wg .Wait ()
206
- close (c )
207
- }()
234
+ started:
235
+ // Signal that there won't be any more items in this batch, so that the
236
+ // worker inner loop can end.
237
+ close (b .items )
238
+ // Start a new batch for subsequent events.
239
+ t .buffer <- batch {
240
+ items : make (chan * http.Request , t .BufferSize ),
241
+ started : make (chan struct {}),
242
+ done : make (chan struct {}),
243
+ }
208
244
245
+ // Wait until the current batch is done or the timeout.
209
246
select {
210
- case <- c :
247
+ case <- b . done :
211
248
Logger .Println ("Buffer flushed successfully." )
212
249
return true
213
- case <- time .After (timeout ):
214
- Logger .Println ("Buffer flushing reached the timeout." )
215
- return false
250
+ case <- toolate :
251
+ goto fail
216
252
}
253
+
254
+ fail:
255
+ Logger .Println ("Buffer flushing reached the timeout." )
256
+ return false
217
257
}
218
258
219
259
func (t * HTTPTransport ) worker () {
220
- for request := range t .buffer {
221
- if time .Now ().Before (t .disabledUntil ) {
222
- t .wg .Done ()
223
- continue
224
- }
225
-
226
- response , err := t .client .Do (request )
227
-
228
- if err != nil {
229
- Logger .Printf ("There was an issue with sending an event: %v" , err )
230
- }
231
-
232
- if response != nil && response .StatusCode == http .StatusTooManyRequests {
233
- t .disabledUntil = time .Now ().Add (retryAfter (time .Now (), response ))
234
- Logger .Printf ("Too many requests, backing off till: %s\n " , t .disabledUntil )
260
+ for b := range t .buffer {
261
+ // Signal that processing of the current batch has started.
262
+ close (b .started )
263
+
264
+ // Return the batch to the buffer so that other goroutines can use it.
265
+ // Equivalent to releasing a lock.
266
+ t .buffer <- b
267
+
268
+ // Process all batch items.
269
+ for request := range b .items {
270
+ if time .Now ().Before (t .disabledUntil ) {
271
+ continue
272
+ }
273
+
274
+ response , err := t .client .Do (request )
275
+
276
+ if err != nil {
277
+ Logger .Printf ("There was an issue with sending an event: %v" , err )
278
+ }
279
+
280
+ if response != nil && response .StatusCode == http .StatusTooManyRequests {
281
+ t .disabledUntil = time .Now ().Add (retryAfter (time .Now (), response ))
282
+ Logger .Printf ("Too many requests, backing off till: %s\n " , t .disabledUntil )
283
+ }
235
284
}
236
285
237
- t .wg .Done ()
286
+ // Signal that processing of the batch is done.
287
+ close (b .done )
238
288
}
239
289
}
240
290
0 commit comments