Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions pkg/util/log/buffered_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ func (bs *bufferedSink) output(b []byte, opts sinkOutputOptions) error {

var errC chan error

// Variables to log an oversized message warning after releasing the lock.
// We can't call Ops.Warningf() while holding bs.mu because that would
// cause a deadlock (the warning message would try to log through this same sink).
var logOversizedWarning bool
var maxSize uint64

err := func() error {
bs.mu.Lock()
defer bs.mu.Unlock()
Expand All @@ -229,6 +235,17 @@ func (bs *bufferedSink) output(b []byte, opts sinkOutputOptions) error {
if err != nil {
// Release the msg buffer, since our append failed.
putBuffer(msg)
// If the message is too large to fit in the buffer, we log a warning
// but don't return an error. This prevents the server from crashing
// due to exit-on-error when a single log message exceeds max-buffer-size.
// Instead, we gracefully drop the oversized message.
// NOTE: The message content will not be logged.
if errors.Is(err, errMsgTooLarge) {
// Set flag to log warning after releasing the lock (to avoid recursion/deadlock)
logOversizedWarning = true
maxSize = bs.mu.buf.maxSizeBytes
return nil // Don't propagate the error - handle gracefully
}
return err
}

Expand Down Expand Up @@ -281,6 +298,14 @@ func (bs *bufferedSink) output(b []byte, opts sinkOutputOptions) error {
}
return nil
}()

// Log the oversized message warning after releasing the lock to avoid deadlock.
// We use Ops.Warningf() which routes the warning through the OPS channel,
// ensuring it appears in log files rather than only on stderr.
if logOversizedWarning {
Ops.Warningf(context.Background(), "dropped log message because it exceeds max-buffer-size (%d bytes) for sink %T", maxSize, bs.child)
}

if err != nil {
return err
}
Expand Down
37 changes: 37 additions & 0 deletions pkg/util/log/buffered_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,3 +630,40 @@ func (t *testWaitGroupSink) exitCode() exit.Code {
}

var _ logSink = (*testWaitGroupSink)(nil)

// TestBufferedSinkOversizedMessage tests that oversized messages (larger than
// maxBufferSize) are dropped gracefully without returning an error, rather than
// causing the process to crash when exit-on-error is enabled.
func TestBufferedSinkOversizedMessage(t *testing.T) {
defer leaktest.AfterTest(t)()
defer Scope(t).Close(t)

ctrl := gomock.NewController(t)
defer ctrl.Finish()
mock := NewMockLogSink(ctrl)

// Set up a buffered sink with a small max buffer size.
maxBufferSize := uint64(10)
sink := newBufferedSink(mock, noMaxStaleness, noSizeTrigger, maxBufferSize, false /* crashOnAsyncFlushErr */, nil)
closer := newBufferedSinkCloser()
sink.Start(closer)
defer func() { require.NoError(t, closer.Close(defaultCloserTimeout)) }()

// Create a message that exceeds the max buffer size.
oversizedMessage := bytes.Repeat([]byte("x"), int(maxBufferSize)+10)

// The oversized message should NOT be sent to the child sink because it's
// dropped before reaching the child. The bufferedSink detects the message
// is too large in appendMsg() and returns early without ever calling the
// child sink's output() method.
mock.EXPECT().
output(gomock.Any(), gomock.Any()).
Times(0) // No calls should reach the child sink

// Attempt to output the oversized message. This should succeed (return nil)
// and not cause the process to crash. The key behavior being tested is that
// errMsgTooLarge is caught and handled gracefully, returning nil instead of
// propagating the error.
err := sink.output(oversizedMessage, sinkOutputOptions{})
require.NoError(t, err, "oversized message should be handled gracefully without returning an error")
}