Skip to content

Commit

Permalink
s2: Add AsyncFlush method: Complete the block without flushing (#927)
Browse files Browse the repository at this point in the history
* s2: Add AsyncFlush method: Complete the block without flushing

My use case is to transfer a large compressed S2 stream with a few
changes very often. To get a small diff I want to end blocks at
application decided points rather than at byte offsets. This allows me
to remove the first byte without every single block changing.

Flush() works for this, but it limits concurrency because it waits for
the last block to be compressed rather than allowing that
asynchronously.

So I'd like to propose AsyncFlush, which flushes the buffer to a block,
but doesn't flush the block to the io.Writer.

There were actually a few places in the s2 code that also wanted to end
the block, but didn't necessary want to flush to the writer.

* Update s2/writer.go

Co-authored-by: Klaus Post <[email protected]>

---------

Co-authored-by: Klaus Post <[email protected]>
  • Loading branch information
Jille and klauspost authored Feb 12, 2024
1 parent 4c49017 commit 5895eb4
Showing 1 changed file with 15 additions and 6 deletions.
21 changes: 15 additions & 6 deletions s2/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
return 0, err
}
if len(w.ibuf) > 0 {
err := w.Flush()
err := w.AsyncFlush()
if err != nil {
return 0, err
}
Expand All @@ -225,7 +225,7 @@ func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
if err := w.EncodeBuffer(buf); err != nil {
return 0, err
}
return int64(len(buf)), w.Flush()
return int64(len(buf)), w.AsyncFlush()
}
for {
inbuf := w.buffers.Get().([]byte)[:w.blockSize+obufHeaderLen]
Expand Down Expand Up @@ -354,7 +354,7 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) {
}
// Flush queued data first.
if len(w.ibuf) > 0 {
err := w.Flush()
err := w.AsyncFlush()
if err != nil {
return err
}
Expand Down Expand Up @@ -716,9 +716,9 @@ func (w *Writer) writeSync(p []byte) (nRet int, errRet error) {
return nRet, nil
}

// Flush flushes the Writer to its underlying io.Writer.
// This does not apply padding.
func (w *Writer) Flush() error {
// AsyncFlush writes any buffered bytes to a block and starts compressing it.
// It does not wait for the output has been written as Flush() does.
func (w *Writer) AsyncFlush() error {
if err := w.err(nil); err != nil {
return err
}
Expand All @@ -738,6 +738,15 @@ func (w *Writer) Flush() error {
}
}
}
return w.err(nil)
}

// Flush flushes the Writer to its underlying io.Writer.
// This does not apply padding.
func (w *Writer) Flush() error {
if err := w.AsyncFlush(); err != nil {
return err
}
if w.output == nil {
return w.err(nil)
}
Expand Down

0 comments on commit 5895eb4

Please sign in to comment.