Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions mem/buffer_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,18 @@ func ReadAll(r io.Reader, pool BufferPool) (BufferSlice, error) {
_, err := wt.WriteTo(w)
return result, err
}

if lr, ok := r.(*io.LimitedReader); ok {
if wt, ok := lr.R.(io.WriterTo); ok {
// This is more optimal since wt knows the size of chunks it wants to
// write and, hence, we can allocate buffers of an optimal size to fit
// them. E.g. might be a single big chunk, and we wouldn't chop it
// into pieces.
w := NewWriter(&result, pool)
_, err := wt.WriteTo(w)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would circumvent the limit of the LimitedReader, wouldn't it? By directly accessing the underlying reader we are bypassing the limiter.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah but I'm not sure how to do a "LimitedWriter" if we could say it that way. Should we add something like https://github.com/nanmu42/limitio/blob/master/limitio.go to grpc-go code (as a library or our own impl)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @GiedriusS, one solution I can think of is to create your own wrapper struct that wraps a io.LimitReader and also implements io.WriteTo. This would allow your reader to control the size of the temporary buffer being used. Here's a example implementation could work.

type LimitWriterTo struct {
    Reader io.Reader // The underlying io.LimitReader
}

func (l *LimitWriterTo) WriteTo(w io.Writer) (n int64, err error) {
    // Define your custom buffer size here (e.g., 64K, 128K)
    buffer := make([]byte, 1024) // You could get this from a buffer pool.
    
    // Use io.CopyBuffer internally with your custom buffer
    // or implement the read/write loop manually for ultimate control.
    return io.CopyBuffer(w, l.Reader, buffer) 
}

Copy link
Author

@GiedriusS GiedriusS Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The core issue is that grpc-go does an assertion (and it wraps io.Reader inside of a io.LimitedReader itself) whether it's a io.Reader and io.LimitedReader is not a io.Reader so I think this path would never be hit.

Copy link
Contributor

@arjan-bal arjan-bal Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I overlooked the line in the PR description: "This happens if some max message size is set."

gRPC controls the the reader type and not external code. Let me think about it a little more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the simplest solution I came up with:

  1. Introduce a LimitWriter: Create a wrapper around io.Writer that restricts the number of bytes written. If a write exceeds the limit, it returns a specific sentinel error (e.g., ErrLimitExhausted).
  2. Update ReadAll: In ReadAll, we check if the reader is an *io.LimitedReader and if the underlying io.Reader implements io.WriterTo. If so, we create a new writer using NewWriter(&result, pool), wrap it in our LimitWriter, and call WriteTo on the underlying reader. This effectively transfers the limit constraint from the reader to the writer.

To keep this optimization transparent to callers, ReadAll must trap the error returned by LimitWriter. It should translate that error into a successful return (nil error) and update the N field on the *io.LimitedReader to reflect the bytes actually consumed.


Here are some snippets to explain this:

The Helper Type

var ErrLimitExhausted = errors.New("limit exhausted")

type limitWriter struct {
	w io.Writer
	n int64
}

func (l *limitWriter) Write(p []byte) (n int, err error) {
	if l.n <= 0 {
		return 0, ErrLimitExhausted
	}
	// If p is larger than remaining limit, truncate it
	if int64(len(p)) > l.n {
		p = p[:l.n]
		err = ErrLimitExhausted // We will return this after the underlying write
	}
	n, wErr := l.w.Write(p)
	l.n -= int64(n)
	
	if wErr != nil {
		return n, wErr
	}
	return n, err
}

The Updated ReadAll Function

Here is how the logic fits into your existing context.

func ReadAll(r io.Reader, pool BufferPool) (BufferSlice, error) {
	var result BufferSlice

	// 1. Check for the specific optimization case: *io.LimitedReader wrapping an io.WriterTo
	if lr, ok := r.(*io.LimitedReader); ok {
		if wt, ok := lr.R.(io.WriterTo); ok {
			w := NewWriter(&result, pool)
			
			// Wrap the writer to enforce the reader's limit
			lw := &limitWriter{w: w, n: lr.N}
			
			// Delegate the heavy lifting to the underlying implementation
			n, err := wt.WriteTo(lw)
			
			// Update the LimitedReader state
			lr.N -= n
			
			// If we hit the limit, it's not a real error in this context; 
			// it just means we read everything we were allowed to.
			if err == ErrLimitExhausted {
				return result, nil
			}
			return result, err
		}
	}

	// 2. Standard optimization for direct io.WriterTo
	if wt, ok := r.(io.WriterTo); ok {
		w := NewWriter(&result, pool)
		_, err := wt.WriteTo(w)
		return result, err
	}

	// ... rest of fallback implementation ...
}

What do you think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 nice. Only one thought that comes to mind - it would be cool to refactor this function & how wrapping is done a little bit so that it wouldn't be needed to do another type assertion. IIRC, each type assertion translates to a new allocation on the heap so it would be nice to avoid this since this is a hot path. In our case, this function is called millions of times so it would mean a lot of extra allocations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think heap allocs are caused due to type assertions, instead they're caused when a new LimitedWriter object is created and passed to another function. The trick to avoid this extra heap allocation is to store the LimitReader and the LimitWriter as a single object without any pointer fields.

To do this, we will need to move this optimization to rpc_util.go where the LimitReader is getting created. The following snippet shows a struct that does this.

var errLimitExhausted = errors.New("limit exhausted")

// limitWriter is the helper stored inside LimitReader.
type limitWriter struct {
	w io.Writer
	n int64
}

func (l *limitWriter) Write(p []byte) (n int, err error) {
	if l.n <= 0 {
		return 0, errLimitExhausted
	}
	if int64(len(p)) > l.n {
		p = p[:l.n]
		err = errLimitExhausted
	}
	n, wErr := l.w.Write(p)
	l.n -= int64(n)

	if wErr != nil {
		return n, wErr
	}
	return n, err
}

type ReaderWriterTo interface {
	io.Reader
	io.WriterTo
}

type LimitReader struct {
	io.LimitedReader
	lw limitWriter // Stored by value to prevent heap allocation
}

func NewLimitReader(r ReaderWriterTo, n int64) *LimitReader {
	return &LimitReader{
		LimitedReader: io.LimitedReader{R: r, N: n},
	}
}

func (l *LimitReader) WriteTo(w io.Writer) (n int64, err error) {
	// 1. Setup the embedded limitWriter
	l.lw.w = w
	l.lw.n = l.N

	// 2. Delegate to the underlying WriterTo
	wt := l.R.(io.WriterTo)
	n, err = wt.WriteTo(&l.lw)

	// 3. Sync state back
	l.N = l.lw.n
	l.lw.w = nil // Avoid memory leak

	if err == errLimitExhausted {
		return n, nil
	}
	return n, err
}

In the callsite, we can do a type assertion to check if the Reader implements WriterTo, if yes, we wrap it in the new LimitReader, else we continue to wrap it in an io.LimitReader.

The following playground demonstrates this: https://go.dev/play/p/vd7V_fXm_in

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the WriterTo interface inside mem.ReadAll should cause an extra copy: 1 copy to a temporary buffer + 1 copy in the call to Write().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 32KB allocation issue was also mentioned here: cockroachdb/cockroach#136278 (comment)

Since the WriterTo interface incurs and extra copy, it doesn't look like a perfect solution. Maybe we can have compressors implement an optional interface that specified the buffer size to use for copies. I'll discuss this with other maintainers.

return result, err
}
}
nextBuffer:
for {
buf := pool.Get(readAllBufSize)
Expand Down
Loading