Skip to content

Commit 476e3ad

Browse files
committed
pkg/util/log: Implement handling for oversized log messages in bufferedSink
This update introduces functionality to gracefully drop oversized log messages that exceed the max buffer size without crashing the CRDB process when exit-on-error is enabled. A warning is logged to indicate that the message was dropped, ensuring that the logging system remains stable. Part of: CRDB-53951 Epic: CRDB-56325 Release note: None
1 parent 9510b99 commit 476e3ad

File tree

2 files changed

+67
-0
lines changed

2 files changed

+67
-0
lines changed

pkg/util/log/buffered_sink.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,12 @@ func (bs *bufferedSink) output(b []byte, opts sinkOutputOptions) error {
221221

222222
var errC chan error
223223

224+
// Variables to log an oversized message warning after releasing the lock.
225+
// We can't call Ops.Warningf() while holding bs.mu because that would
226+
// cause a deadlock (the warning message would try to log through this same sink).
227+
var logOversizedWarning bool
228+
var maxSize uint64
229+
224230
err := func() error {
225231
bs.mu.Lock()
226232
defer bs.mu.Unlock()
@@ -229,6 +235,16 @@ func (bs *bufferedSink) output(b []byte, opts sinkOutputOptions) error {
229235
if err != nil {
230236
// Release the msg buffer, since our append failed.
231237
putBuffer(msg)
238+
// If the message is too large to fit in the buffer, we log a warning
239+
// but don't return an error. This prevents the server from crashing
240+
// due to exit-on-error when a single log message exceeds max-buffer-size.
241+
// Instead, we gracefully drop the oversized message.
242+
if errors.Is(err, errMsgTooLarge) {
243+
// Set flag to log warning after releasing the lock (to avoid recursion/deadlock)
244+
logOversizedWarning = true
245+
maxSize = bs.mu.buf.maxSizeBytes
246+
return nil // Don't propagate the error - handle gracefully
247+
}
232248
return err
233249
}
234250

@@ -281,6 +297,14 @@ func (bs *bufferedSink) output(b []byte, opts sinkOutputOptions) error {
281297
}
282298
return nil
283299
}()
300+
301+
// Log the oversized message warning after releasing the lock to avoid deadlock.
302+
// This warning will be logged through the normal logging system and will appear
303+
// in the log files.
304+
if logOversizedWarning {
305+
Ops.Warningf(context.Background(), "dropped log message because it exceeds max-buffer-size (%d bytes) for sink %T", maxSize, bs.child)
306+
}
307+
284308
if err != nil {
285309
return err
286310
}

pkg/util/log/buffered_sink_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -630,3 +630,46 @@ func (t *testWaitGroupSink) exitCode() exit.Code {
630630
}
631631

632632
var _ logSink = (*testWaitGroupSink)(nil)
633+
634+
// TestBufferedSinkOversizedMessage tests that oversized messages (larger than
635+
// maxBufferSize) are dropped gracefully with a warning, rather than causing
636+
// the process to crash when exit-on-error is enabled.
637+
func TestBufferedSinkOversizedMessage(t *testing.T) {
638+
defer leaktest.AfterTest(t)()
639+
defer Scope(t).Close(t)
640+
641+
ctrl := gomock.NewController(t)
642+
defer ctrl.Finish()
643+
mock := NewMockLogSink(ctrl)
644+
645+
// Set up a buffered sink with a small max buffer size.
646+
maxBufferSize := uint64(10)
647+
sink := newBufferedSink(mock, noMaxStaleness, noSizeTrigger, maxBufferSize, false /* crashOnAsyncFlushErr */, nil)
648+
closer := newBufferedSinkCloser()
649+
sink.Start(closer)
650+
defer func() { require.NoError(t, closer.Close(defaultCloserTimeout)) }()
651+
652+
// Create a message that exceeds the max buffer size.
653+
oversizedMessage := bytes.Repeat([]byte("x"), int(maxBufferSize)+10)
654+
655+
// We expect that the oversized message will NOT be sent to the child sink.
656+
// Instead, a warning message will be logged (via Ops.Warningf).
657+
// The warning message itself is small and should succeed.
658+
mock.EXPECT().
659+
output(gomock.Any(), gomock.Any()).
660+
Do(func(b []byte, opts sinkOutputOptions) {
661+
// Verify this is the warning message, not the oversized message.
662+
msg := string(b)
663+
require.Contains(t, msg, "dropped log message because it exceeds max-buffer-size")
664+
require.NotContains(t, msg, string(oversizedMessage))
665+
}).
666+
MaxTimes(1) // The warning may or may not be logged through this same sink
667+
668+
// Attempt to output the oversized message. This should succeed (return nil)
669+
// and not cause the process to crash.
670+
err := sink.output(oversizedMessage, sinkOutputOptions{})
671+
require.NoError(t, err, "oversized message should be handled gracefully without returning an error")
672+
673+
// Give a moment for any async warning to be logged.
674+
time.Sleep(100 * time.Millisecond)
675+
}

0 commit comments

Comments
 (0)