Skip to content

Commit cd21de0

Browse files
mergify[bot]kvch
andauthored
Suppress too many bad error messages when reading from corrupted journal in Journalbeat (#26224) (#26530)
(cherry picked from commit 2f9ae33) Co-authored-by: Noémi Ványi <[email protected]>
1 parent 9037ba3 commit cd21de0

File tree

1 file changed

+34
-0
lines changed

1 file changed

+34
-0
lines changed

journalbeat/input/input.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,18 @@
1818
package input
1919

2020
import (
21+
"context"
2122
"fmt"
23+
"strings"
2224
"sync"
25+
"syscall"
26+
"time"
2327

2428
"github.com/elastic/beats/v7/libbeat/processors/add_formatted_index"
29+
"github.com/elastic/go-concert/timed"
2530

2631
"github.com/elastic/beats/v7/libbeat/common/acker"
32+
"github.com/elastic/beats/v7/libbeat/common/atomic"
2733
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
2834

2935
"github.com/elastic/beats/v7/journalbeat/checkpoint"
@@ -158,6 +164,10 @@ func (i *Input) publishAll() {
158164
go func() {
159165
defer wg.Done()
160166

167+
suppressed := atomic.NewBool(false)
168+
ctx, cancel := context.WithCancel(context.Background())
169+
defer cancel()
170+
161171
for {
162172
select {
163173
case <-i.done:
@@ -168,6 +178,10 @@ func (i *Input) publishAll() {
168178
event, err := r.Next()
169179
if event == nil {
170180
if err != nil {
181+
if i.isErrSuppressed(ctx, err, suppressed) {
182+
i.logger.Debugf("Error message suppressed: EBADMSG")
183+
continue
184+
}
171185
i.logger.Errorf("Error while reading event: %v", err)
172186
}
173187
continue
@@ -191,6 +205,26 @@ func (i *Input) publishAll() {
191205
}
192206
}
193207

208+
// isErrSuppressed checks if the error is due to a corrupt journal. If yes, only the first error message
209+
// is displayed and then it is suppressed for 5 seconds.
210+
func (i *Input) isErrSuppressed(ctx context.Context, err error, suppressed *atomic.Bool) bool {
211+
if strings.Contains(err.Error(), syscall.EBADMSG.Error()) {
212+
if suppressed.Load() {
213+
return true
214+
}
215+
216+
suppressed.Store(true)
217+
go func(ctx context.Context, suppressed *atomic.Bool) {
218+
if err := timed.Wait(ctx, 5*time.Second); err == nil {
219+
suppressed.Store(false)
220+
}
221+
222+
}(ctx, suppressed)
223+
}
224+
225+
return false
226+
}
227+
194228
// Stop stops all readers of the input.
195229
func (i *Input) Stop() {
196230
for _, r := range i.readers {

0 commit comments

Comments
 (0)