@@ -199,21 +199,21 @@ type InputRunner interface {
199
199
200
200
type iRunner struct {
201
201
pRunnerBase
202
- input Input
203
- config CommonInputConfig
204
- pConfig * PipelineConfig
205
- inChan chan * PipelinePack
206
- ticker <- chan time.Time
207
- transient bool
208
- syncDecode bool
209
- sendDecodeFailures bool
210
- logDecodeFailures bool
211
- deliver DeliverFunc
212
- delivererOnce sync.Once
213
- delivererLock sync.Mutex
214
- canExit bool
215
- shutdownWanters []WantsDecoderRunnerShutdown
216
- shutdownLock sync.Mutex
202
+ input Input
203
+ config CommonInputConfig
204
+ pConfig * PipelineConfig
205
+ inChan chan * PipelinePack
206
+ ticker <- chan time.Time
207
+ transient bool
208
+ syncDecode bool
209
+ sendDecodeFailures bool
210
+ logDecodeFailures bool
211
+ deliver DeliverFunc
212
+ delivererOnce sync.Once
213
+ delivererLock sync.Mutex
214
+ canExit bool
215
+ shutdownWanters []WantsDecoderRunnerShutdown
216
+ shutdownLock sync.Mutex
217
217
}
218
218
219
219
func (ir * iRunner ) Ticker () (ticker <- chan time.Time ) {
@@ -314,7 +314,7 @@ func (ir *iRunner) Starter(h PluginHelper, wg *sync.WaitGroup) {
314
314
if err != nil {
315
315
ir .LogError (err )
316
316
if ! ir .IsStoppable () {
317
- globals .ShutDown ()
317
+ globals .ShutDown (1 )
318
318
}
319
319
return
320
320
}
@@ -344,7 +344,7 @@ func (ir *iRunner) Starter(h PluginHelper, wg *sync.WaitGroup) {
344
344
break
345
345
}
346
346
347
- // Otherwise we'll execute the Retry config
347
+ // Otherwise we'll execute the Retry config.
348
348
recon .CleanupForRestart ()
349
349
if ir .maker == nil {
350
350
ir .pConfig .makersLock .RLock ()
@@ -364,10 +364,16 @@ func (ir *iRunner) Starter(h PluginHelper, wg *sync.WaitGroup) {
364
364
ir .LogMessage (fmt .Sprintf ("Restarting (attempt %d/%d)\n " ,
365
365
rh .times , rh .retries ))
366
366
367
- // If we've not been created elsewhere, call the plugin's Init()
367
+ // If we've not been created elsewhere, call the plugin's Init().
368
368
if ! ir .transient {
369
- if err = ir .plugin .Init (ir .maker .Config ()); err != nil {
370
- // We couldn't reInit the plugin, do a mini-retry loop
369
+ var config interface {}
370
+ if config , err = ir .maker .PrepConfig (); err != nil {
371
+ // We couldn't reInit the plugin, do a mini-retry loop.
372
+ ir .LogError (err )
373
+ goto initLoop
374
+ }
375
+ if err = ir .plugin .Init (config ); err != nil {
376
+ // We couldn't reInit the plugin, do a mini-retry loop.
371
377
ir .LogError (err )
372
378
goto initLoop
373
379
}
@@ -380,7 +386,7 @@ func (ir *iRunner) Starter(h PluginHelper, wg *sync.WaitGroup) {
380
386
381
387
// If we're not a stoppable input, trigger Heka shutdown.
382
388
if ! ir .IsStoppable () {
383
- globals .ShutDown ()
389
+ globals .ShutDown (1 )
384
390
}
385
391
}
386
392
@@ -957,10 +963,43 @@ func (foRunner *foRunner) BackPressured() bool {
957
963
return len (foRunner .inChan ) >= foRunner .capacity ||
958
964
foRunner .matcher .InChanLen () >= foRunner .capacity
959
965
}
960
-
961
966
return foRunner .capacity > 0 && foRunner .bufReader .queueSize .Get () >= uint64 (foRunner .capacity )
962
967
}
963
968
969
+ func (foRunner * foRunner ) waitForBackPressure () error {
970
+ globals := foRunner .pConfig .Globals
971
+ retryOptions := getDefaultRetryOptions ()
972
+ retryOptions .MaxDelay = "1s"
973
+ retryOptions .MaxRetries = int (globals .FullBufferMaxRetries )
974
+ // NewRetryHelper will only return an error if the duration strings don't
975
+ // parse. Ours are hard-coded, so this error shouldn't happen.
976
+ retry , err := NewRetryHelper (retryOptions )
977
+ if err != nil {
978
+ return fmt .Errorf ("can't create retry helper: %s" , err .Error ())
979
+ }
980
+ for ! globals .IsShuttingDown () {
981
+ bp := foRunner .BackPressured ()
982
+ fmt .Println ("back-pressured?: " , bp )
983
+ if ! bp {
984
+ return nil
985
+ }
986
+ err = retry .Wait ()
987
+ if err != nil {
988
+ // We've exhausted our max allowed retries, so we honor the
989
+ // buffer's 'full_action' setting and trigger a shutdown if
990
+ // necessary.
991
+ if foRunner .bufReader .config .FullAction == "shutdown" {
992
+ globals .ShutDown (1 )
993
+ foRunner .LogError (errors .New ("back-pressure not resolving: triggering shutdown" ))
994
+ }
995
+ // But we always return `nil` so that regular start up sequence can
996
+ // continue.
997
+ return nil
998
+ }
999
+ }
1000
+ return nil
1001
+ }
1002
+
964
1003
func (foRunner * foRunner ) Start (h PluginHelper , wg * sync.WaitGroup ) (err error ) {
965
1004
foRunner .h = h
966
1005
foRunner .pConfig = h .PipelineConfig ()
@@ -1034,6 +1073,14 @@ func (foRunner *foRunner) Start(h PluginHelper, wg *sync.WaitGroup) (err error)
1034
1073
} else {
1035
1074
go foRunner .OldStarter (h , wg )
1036
1075
}
1076
+
1077
+ if foRunner .useBuffering && foRunner .BackPressured () {
1078
+ foRunner .LogMessage ("Delaying start while trying to relieve back-pressure..." )
1079
+ if err = foRunner .waitForBackPressure (); err != nil {
1080
+ return err
1081
+ }
1082
+ }
1083
+
1037
1084
return
1038
1085
}
1039
1086
@@ -1145,7 +1192,7 @@ func (foRunner *foRunner) Starter(plugin MessageProcessor, h PluginHelper,
1145
1192
if err != nil {
1146
1193
foRunner .LogError (err )
1147
1194
if ! foRunner .IsStoppable () {
1148
- globals .ShutDown ()
1195
+ globals .ShutDown (1 )
1149
1196
}
1150
1197
return
1151
1198
}
@@ -1183,7 +1230,7 @@ func (foRunner *foRunner) Starter(plugin MessageProcessor, h PluginHelper,
1183
1230
// No more retries.
1184
1231
foRunner .lastErr = err
1185
1232
if ! foRunner .IsStoppable () {
1186
- globals .ShutDown ()
1233
+ globals .ShutDown (1 )
1187
1234
}
1188
1235
return
1189
1236
}
@@ -1251,7 +1298,12 @@ func (foRunner *foRunner) Starter(plugin MessageProcessor, h PluginHelper,
1251
1298
break
1252
1299
}
1253
1300
foRunner .LogMessage ("now restarting" )
1254
- if err = foRunner .plugin .Init (foRunner .maker .Config ()); err != nil {
1301
+ var config interface {}
1302
+ if config , err = foRunner .maker .PrepConfig (); err != nil {
1303
+ foRunner .LogError (err )
1304
+ goto initLoop
1305
+ }
1306
+ if err = foRunner .plugin .Init (config ); err != nil {
1255
1307
foRunner .LogError (err )
1256
1308
goto initLoop
1257
1309
}
@@ -1308,7 +1360,7 @@ func (foRunner *foRunner) exit() {
1308
1360
// Also, if this isn't a "stoppable" plugin we shut everything down.
1309
1361
if ! foRunner .IsStoppable () {
1310
1362
foRunner .LogMessage ("has stopped, shutting down." )
1311
- foRunner .pConfig .Globals .ShutDown ()
1363
+ foRunner .pConfig .Globals .ShutDown (1 )
1312
1364
return
1313
1365
}
1314
1366
@@ -1424,7 +1476,7 @@ func (foRunner *foRunner) OldStarter(helper PluginHelper, wg *sync.WaitGroup) {
1424
1476
if err != nil {
1425
1477
foRunner .LogError (err )
1426
1478
if ! foRunner .IsStoppable () {
1427
- globals .ShutDown ()
1479
+ globals .ShutDown (1 )
1428
1480
}
1429
1481
return
1430
1482
}
@@ -1497,7 +1549,12 @@ func (foRunner *foRunner) OldStarter(helper PluginHelper, wg *sync.WaitGroup) {
1497
1549
break
1498
1550
}
1499
1551
foRunner .LogMessage ("now restarting" )
1500
- if err = foRunner .plugin .Init (foRunner .maker .Config ()); err != nil {
1552
+ var config interface {}
1553
+ if config , err = foRunner .maker .PrepConfig (); err != nil {
1554
+ foRunner .LogError (err )
1555
+ goto initLoop
1556
+ }
1557
+ if err = foRunner .plugin .Init (config ); err != nil {
1501
1558
foRunner .LogError (err )
1502
1559
goto initLoop
1503
1560
}
0 commit comments