Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(shwap): Add bufferisation for writing ods/Q1Q4 files #3592

Merged
merged 9 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 45 additions & 10 deletions store/file/ods.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package file

import (
"bufio"
"context"
"errors"
"fmt"
Expand All @@ -17,6 +18,10 @@ import (

var _ eds.AccessorStreamer = (*ODSFile)(nil)

// writeBufferSize defines buffer size for optimized batched writes into the file system.
// TODO(@Wondertan): Consider making it configurable
const writeBufferSize = 64 << 10

// ErrEmptyFile signals that the ODS file is empty.
// This helps avoid storing empty block EDSes.
var ErrEmptyFile = errors.New("file is empty")
Expand Down Expand Up @@ -72,11 +77,19 @@ func CreateODSFile(
return nil, fmt.Errorf("file create: %w", err)
}

h, err := writeODSFile(f, eds, roots)
// buffering gives us ~4x speed up
buf := bufio.NewWriterSize(f, writeBufferSize)

h, err := writeODSFile(buf, eds, roots)
if err != nil {
return nil, fmt.Errorf("writing ODS file: %w", err)
}

err = buf.Flush()
if err != nil {
return nil, fmt.Errorf("flushing ODS file: %w", err)
}

err = f.Sync()
if err != nil {
return nil, fmt.Errorf("syncing file: %w", err)
Expand Down Expand Up @@ -108,25 +121,47 @@ func writeODSFile(w io.Writer, eds *rsmt2d.ExtendedDataSquare, axisRoots *share.
return nil, fmt.Errorf("writing axis roots: %w", err)
}

for _, shr := range eds.FlattenedODS() {
if _, err := w.Write(shr); err != nil {
return nil, fmt.Errorf("writing shares: %w", err)
}
// write quadrants
err = writeQ1(w, eds)
if err != nil {
return nil, fmt.Errorf("writing Q1: %w", err)
}

return h, nil
}

func writeAxisRoots(w io.Writer, roots *share.AxisRoots) error {
for _, roots := range [][][]byte{roots.RowRoots, roots.ColumnRoots} {
for _, root := range roots {
if _, err := w.Write(root); err != nil {
return fmt.Errorf("writing axis root: %w", err)
// writeQ1 writes the first quadrant of the square to the writer. It writes the quadrant in row-major
// order
func writeQ1(w io.Writer, eds *rsmt2d.ExtendedDataSquare) error {
for i := range eds.Width() / 2 {
for j := range eds.Width() / 2 {
shr := eds.GetCell(i, j) // TODO: Avoid copying inside GetCell
_, err := w.Write(shr)
if err != nil {
return fmt.Errorf("writing share: %w", err)
}
}
}
return nil
}

// writeAxisRoots writes RowRoots followed by ColumnRoots.
func writeAxisRoots(w io.Writer, roots *share.AxisRoots) error {
for _, root := range roots.RowRoots {
if _, err := w.Write(root); err != nil {
return fmt.Errorf("writing row roots: %w", err)
}
}

for _, root := range roots.ColumnRoots {
if _, err := w.Write(root); err != nil {
return fmt.Errorf("writing columm roots: %w", err)
}
}

return nil
}

// Size returns square size of the Accessor.
func (f *ODSFile) Size(context.Context) int {
return f.size()
Expand Down
45 changes: 31 additions & 14 deletions store/file/q1q4_file.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package file

import (
"bufio"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -39,16 +40,45 @@ func CreateQ1Q4File(path string, roots *share.AxisRoots, eds *rsmt2d.ExtendedDat
return nil, err
}

err = writeQ4(ods.fl, eds)
// buffering gives us ~4x speed up
buf := bufio.NewWriterSize(ods.fl, writeBufferSize)

err = writeQ4(buf, eds)
if err != nil {
return nil, fmt.Errorf("writing Q4: %w", err)
}

err = buf.Flush()
if err != nil {
return nil, fmt.Errorf("flushing Q4: %w", err)
}

err = ods.fl.Sync()
if err != nil {
return nil, fmt.Errorf("syncing file: %w", err)
}

return &Q1Q4File{
ods: ods,
}, nil
}

// writeQ4 writes the frth quadrant of the square to the writer. iIt writes the quadrant in row-major
// order
func writeQ4(w io.Writer, eds *rsmt2d.ExtendedDataSquare) error {
half := eds.Width() / 2
for i := range half {
for j := range half {
shr := eds.GetCell(i+half, j+half) // TODO: Avoid copying inside GetCell
_, err := w.Write(shr)
if err != nil {
return fmt.Errorf("writing share: %w", err)
}
}
}
return nil
}

func (f *Q1Q4File) Size(ctx context.Context) int {
return f.ods.Size(ctx)
}
Expand Down Expand Up @@ -113,19 +143,6 @@ func (f *Q1Q4File) Close() error {
return f.ods.Close()
}

func writeQ4(w io.Writer, eds *rsmt2d.ExtendedDataSquare) error {
odsLn := int(eds.Width()) / 2
for x := odsLn; x < int(eds.Width()); x++ {
for y := odsLn; y < int(eds.Width()); y++ {
_, err := w.Write(eds.GetCell(uint(x), uint(y)))
if err != nil {
return err
}
}
}
return nil
}

func (f *Q1Q4File) readAxisHalfFromQ4(axisType rsmt2d.Axis, axisIdx int) (eds.AxisHalf, error) {
q4idx := axisIdx - f.ods.size()/2
if q4idx < 0 {
Expand Down
2 changes: 1 addition & 1 deletion store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func BenchmarkStore(b *testing.B) {
eds := edstest.RandEDS(b, 128)
require.NoError(b, err)

// BenchmarkStore/bench_put_128-10 27 79025268 ns/op (~79ms)
// BenchmarkStore/bench_put_128-10 27 19209780 ns/op (~19ms)
b.Run("put 128", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down