Skip to content

Commit 7996f43

Browse files
committed
Tweaks to test timings to put a dent in Travis failures.
1 parent 298bc1e commit 7996f43

File tree

4 files changed

+64
-23
lines changed

4 files changed

+64
-23
lines changed

.travis.yml

+7-4
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
1+
sudo: false
12
language: go
23
go:
34
- 1.4
45
notifications:
56
irc:
67
channels:
78
- "irc.mozilla.org#heka"
8-
before_install:
9-
- sudo apt-get install protobuf-compiler cmake libgeoip-dev
10-
9+
addons:
10+
apt:
11+
packages:
12+
- protobuf-compiler
13+
- cmake
14+
- libgeoip-dev
1115
install:
1216
- . build.sh
13-
1417
script:
1518
- make test

CHANGES.txt

+3
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ Bug Handling
3131
* Fixed bug where a SandboxInput configured with a `ticker_interval` would
3232
get stuck in an infinite loop on shutdown (#1705).
3333

34+
* Changes in StatAccumInput and FileOutput tests to minimize intermittent
35+
Travis failures.
36+
3437
0.10.0b1 (2015-08-07)
3538
=====================
3639

pipeline/stat_accum_input_test.go

+29-4
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package pipeline
1616

1717
import (
18+
"log"
1819
"strconv"
1920
"strings"
2021
"sync"
@@ -94,7 +95,8 @@ func StatAccumInputSpec(c gs.Context) {
9495
}
9596

9697
ith.MockInputRunner.EXPECT().InChan().Return(ith.PackSupply)
97-
ith.MockInputRunner.EXPECT().Name().Return("StatAccumInput").AnyTimes()
98+
// Need one of these for every Inject
99+
ith.MockInputRunner.EXPECT().Name().Return("StatAccumInput")
98100

99101
injectCall := ith.MockInputRunner.EXPECT().Inject(ith.Pack)
100102
var injectCalled sync.WaitGroup
@@ -131,9 +133,26 @@ func StatAccumInputSpec(c gs.Context) {
131133
}
132134
}
133135

136+
drainStats := func() {
137+
ok := true
138+
for ok {
139+
if len(statAccumInput.statChan) > 0 {
140+
time.Sleep(100 * time.Millisecond)
141+
} else {
142+
ok = false
143+
}
144+
}
145+
}
146+
134147
validateValueAtKey := func(msg *message.Message, key string, value interface{}) {
135148
fieldValue, ok := msg.GetFieldValue(key)
149+
if !ok {
150+
log.Printf("%s field is missing from the message\n", key)
151+
}
136152
c.Expect(ok, gs.IsTrue)
153+
if fieldValue != value {
154+
log.Printf("%s should be %v is %v\n", key, value, fieldValue)
155+
}
137156
c.Expect(fieldValue, gs.Equals, value)
138157
}
139158

@@ -203,15 +222,17 @@ func StatAccumInputSpec(c gs.Context) {
203222

204223
c.Specify("emits proper idle stats", func() {
205224
startInput()
225+
inputStarted.Wait()
206226
sendGauge("sample.gauge", 1, 2)
207227
sendCounter("sample.cnt", 1, 2, 3, 4, 5)
208228
sendTimer("sample.timer", 10, 10, 20, 20)
209-
inputStarted.Wait()
229+
drainStats()
210230
tickChan <- time.Now()
211231

212232
injectCalled.Wait()
213233
ith.Pack.Recycle(nil)
214234
ith.PackSupply <- ith.Pack
235+
ith.MockInputRunner.EXPECT().Name().Return("StatAccumInput")
215236
ith.MockInputRunner.EXPECT().Inject(ith.Pack)
216237

217238
msg, err := finalizeSendingStats()
@@ -225,19 +246,22 @@ func StatAccumInputSpec(c gs.Context) {
225246
c.Specify("omits idle stats", func() {
226247
config.DeleteIdleStats = true
227248
err := statAccumInput.Init(config)
228-
c.Expect(err, gs.IsNil)
249+
c.Assume(err, gs.IsNil)
229250

230251
startInput()
252+
inputStarted.Wait() // Can't flush until the input has started.
231253
sendGauge("sample.gauge", 1, 2)
232254
sendCounter("sample.cnt", 1, 2, 3, 4, 5)
233255
sendTimer("sample.timer", 10, 10, 20, 20)
234-
inputStarted.Wait() // Can't flush until the input has started.
256+
drainStats()
235257
tickChan <- time.Now()
236258
injectCalled.Wait()
237259

238260
sendTimer("sample2.timer", 10, 20)
261+
drainStats()
239262
ith.Pack.Recycle(nil)
240263
ith.PackSupply <- ith.Pack
264+
ith.MockInputRunner.EXPECT().Name().Return("StatAccumInput")
241265
ith.MockInputRunner.EXPECT().Inject(ith.Pack)
242266
msg, err := finalizeSendingStats()
243267
c.Assume(err, gs.IsNil)
@@ -379,6 +403,7 @@ func StatAccumInputSpec(c gs.Context) {
379403

380404
// Prep pack and EXPECTS for the close.
381405
ith.PackSupply <- ith.Pack
406+
ith.MockInputRunner.EXPECT().Name().Return("StatAccumInput")
382407
ith.MockInputRunner.EXPECT().Inject(ith.Pack)
383408

384409
close(statAccumInput.statChan)

plugins/file/file_output_test.go

+25-15
Original file line numberDiff line numberDiff line change
@@ -107,34 +107,40 @@ func FileOutputSpec(c gs.Context) {
107107
})
108108
})
109109

110-
c.Specify("tests rotation of files", func() {
110+
c.Specify("rotates files correctly", func() {
111111
config.Path = "%Y-%m-%d"
112+
config.RotationInterval = 24
112113
rotateChan := make(chan time.Time)
113114
closingChan := make(chan struct{})
114115

115116
err := fileOutput.Init(config)
116-
defer fileOutput.file.Close()
117-
118117
c.Assume(err, gs.IsNil)
119118

119+
defer fileOutput.file.Close()
120+
120121
fileOutput.rotateChan = rotateChan
121122
fileOutput.closing = closingChan
122123

123124
fileOutput.startRotateNotifier()
124125

125-
go fileOutput.committer(oth.MockOutputRunner, errChan)
126+
committerChan := make(chan struct{})
127+
go func() {
128+
fileOutput.committer(oth.MockOutputRunner, errChan)
129+
close(committerChan)
130+
}()
126131

127132
c.Assume(fileOutput.path, gs.Equals, time.Now().Format("2006-01-02"))
128133

129134
futureDuration, _ := time.ParseDuration("24h")
130135
futureNow := time.Now().Add(futureDuration)
131136

132137
rotateChan <- futureNow
138+
close(inChan)
139+
close(fileOutput.batchChan)
140+
<-committerChan
133141

134142
c.Assume(fileOutput.path, gs.Equals, futureNow.Format("2006-01-02"))
135143

136-
close(inChan)
137-
close(fileOutput.batchChan)
138144
})
139145

140146
c.Specify("processes incoming messages", func() {
@@ -256,6 +262,7 @@ func FileOutputSpec(c gs.Context) {
256262
timerChan := make(chan time.Time)
257263

258264
msg2 := pipeline_ts.GetTestMessage()
265+
msg2.SetPayload("MESSAGE 2")
259266
pack2 := NewPipelinePack(pConfig.InputRecycleChan())
260267
pack2.Message = msg2
261268

@@ -299,24 +306,26 @@ func FileOutputSpec(c gs.Context) {
299306
defer cleanUp()
300307
inChan <- pack
301308

309+
after := time.After(100 * time.Millisecond)
302310
select {
303311
case <-fileOutput.batchChan:
304312
c.Expect("", gs.Equals, "fileOutput.batchChan should NOT have fired yet")
305-
default:
313+
case <-after:
306314
}
307315

308316
timerChan <- time.Now()
317+
after = time.After(100 * time.Millisecond)
309318
select {
310319
case <-fileOutput.batchChan:
311320
c.Expect("", gs.Equals, "fileOutput.batchChan should NOT have fired yet")
312-
default:
321+
case <-after:
313322
}
314323

324+
after = time.After(100 * time.Millisecond)
315325
inChan <- pack2
316-
runtime.Gosched()
317326
select {
318327
case <-fileOutput.batchChan:
319-
default:
328+
case <-after:
320329
c.Expect("", gs.Equals, "fileOutput.batchChan SHOULD have fired by now")
321330
}
322331
})
@@ -329,18 +338,19 @@ func FileOutputSpec(c gs.Context) {
329338
defer cleanUp()
330339
inChan <- pack
331340

341+
after := time.After(100 * time.Millisecond)
332342
select {
333343
case <-fileOutput.batchChan:
334344
c.Expect("", gs.Equals, "fileOutput.batchChan should NOT have fired yet")
335-
default:
345+
case <-after:
336346
}
337347

338348
c.Specify("when interval triggers first", func() {
339349
timerChan <- time.Now()
340-
runtime.Gosched()
350+
after = time.After(100 * time.Millisecond)
341351
select {
342352
case <-fileOutput.batchChan:
343-
default:
353+
case <-after:
344354
c.Expect("", gs.Equals, "fileOutput.batchChan SHOULD have fired by now")
345355
}
346356
})
@@ -349,10 +359,10 @@ func FileOutputSpec(c gs.Context) {
349359
out, err := encoder.Encode(pack2)
350360
oth.MockOutputRunner.EXPECT().Encode(gomock.Any()).Return(out, err)
351361
inChan <- pack2
352-
runtime.Gosched()
362+
after = time.After(100 * time.Millisecond)
353363
select {
354364
case <-fileOutput.batchChan:
355-
default:
365+
case <-after:
356366
c.Expect("", gs.Equals, "fileOutput.batchChan SHOULD have fired by now")
357367
}
358368
})

0 commit comments

Comments
 (0)