diff --git a/store/file/ods.go b/store/file/ods.go index 0ea4b71890..1e6af4af22 100644 --- a/store/file/ods.go +++ b/store/file/ods.go @@ -1,6 +1,7 @@ package file import ( + "bufio" "context" "errors" "fmt" @@ -17,6 +18,10 @@ import ( var _ eds.AccessorStreamer = (*ODSFile)(nil) +// writeBufferSize defines buffer size for optimized batched writes into the file system. +// TODO(@Wondertan): Consider making it configurable +const writeBufferSize = 64 << 10 + // ErrEmptyFile signals that the ODS file is empty. // This helps avoid storing empty block EDSes. var ErrEmptyFile = errors.New("file is empty") @@ -72,11 +77,19 @@ func CreateODSFile( return nil, fmt.Errorf("file create: %w", err) } - h, err := writeODSFile(f, eds, roots) + // buffering gives us ~4x speed up + buf := bufio.NewWriterSize(f, writeBufferSize) + + h, err := writeODSFile(buf, eds, roots) if err != nil { return nil, fmt.Errorf("writing ODS file: %w", err) } + err = buf.Flush() + if err != nil { + return nil, fmt.Errorf("flushing ODS file: %w", err) + } + err = f.Sync() if err != nil { return nil, fmt.Errorf("syncing file: %w", err) @@ -108,25 +121,47 @@ func writeODSFile(w io.Writer, eds *rsmt2d.ExtendedDataSquare, axisRoots *share. return nil, fmt.Errorf("writing axis roots: %w", err) } - for _, shr := range eds.FlattenedODS() { - if _, err := w.Write(shr); err != nil { - return nil, fmt.Errorf("writing shares: %w", err) - } + // write quadrants + err = writeQ1(w, eds) + if err != nil { + return nil, fmt.Errorf("writing Q1: %w", err) } + return h, nil } -func writeAxisRoots(w io.Writer, roots *share.AxisRoots) error { - for _, roots := range [][][]byte{roots.RowRoots, roots.ColumnRoots} { - for _, root := range roots { - if _, err := w.Write(root); err != nil { - return fmt.Errorf("writing axis root: %w", err) +// writeQ1 writes the first quadrant of the square to the writer. It writes the quadrant in row-major +// order +func writeQ1(w io.Writer, eds *rsmt2d.ExtendedDataSquare) error { + for i := range eds.Width() / 2 { + for j := range eds.Width() / 2 { + shr := eds.GetCell(i, j) // TODO: Avoid copying inside GetCell + _, err := w.Write(shr) + if err != nil { + return fmt.Errorf("writing share: %w", err) } } } return nil } +// writeAxisRoots writes RowRoots followed by ColumnRoots. +func writeAxisRoots(w io.Writer, roots *share.AxisRoots) error { + for _, root := range roots.RowRoots { + if _, err := w.Write(root); err != nil { + return fmt.Errorf("writing row roots: %w", err) + } + } + + for _, root := range roots.ColumnRoots { + if _, err := w.Write(root); err != nil { + return fmt.Errorf("writing columm roots: %w", err) + } + } + + return nil +} + // Size returns square size of the Accessor. func (f *ODSFile) Size(context.Context) int { return f.size() diff --git a/store/file/q1q4_file.go b/store/file/q1q4_file.go index ad861a382c..d44d7b2ef2 100644 --- a/store/file/q1q4_file.go +++ b/store/file/q1q4_file.go @@ -1,6 +1,7 @@ package file import ( + "bufio" "context" "fmt" "io" @@ -39,16 +40,45 @@ func CreateQ1Q4File(path string, roots *share.AxisRoots, eds *rsmt2d.ExtendedDat return nil, err } - err = writeQ4(ods.fl, eds) + // buffering gives us ~4x speed up + buf := bufio.NewWriterSize(ods.fl, writeBufferSize) + + err = writeQ4(buf, eds) if err != nil { return nil, fmt.Errorf("writing Q4: %w", err) } + err = buf.Flush() + if err != nil { + return nil, fmt.Errorf("flushing Q4: %w", err) + } + + err = ods.fl.Sync() + if err != nil { + return nil, fmt.Errorf("syncing file: %w", err) + } + return &Q1Q4File{ ods: ods, }, nil } +// writeQ4 writes the frth quadrant of the square to the writer. iIt writes the quadrant in row-major +// order +func writeQ4(w io.Writer, eds *rsmt2d.ExtendedDataSquare) error { + half := eds.Width() / 2 + for i := range half { + for j := range half { + shr := eds.GetCell(i+half, j+half) // TODO: Avoid copying inside GetCell + _, err := w.Write(shr) + if err != nil { + return fmt.Errorf("writing share: %w", err) + } + } + } + return nil +} + func (f *Q1Q4File) Size(ctx context.Context) int { return f.ods.Size(ctx) } @@ -113,19 +143,6 @@ func (f *Q1Q4File) Close() error { return f.ods.Close() } -func writeQ4(w io.Writer, eds *rsmt2d.ExtendedDataSquare) error { - odsLn := int(eds.Width()) / 2 - for x := odsLn; x < int(eds.Width()); x++ { - for y := odsLn; y < int(eds.Width()); y++ { - _, err := w.Write(eds.GetCell(uint(x), uint(y))) - if err != nil { - return err - } - } - } - return nil -} - func (f *Q1Q4File) readAxisHalfFromQ4(axisType rsmt2d.Axis, axisIdx int) (eds.AxisHalf, error) { q4idx := axisIdx - f.ods.size()/2 if q4idx < 0 { diff --git a/store/store_test.go b/store/store_test.go index b58562e1a8..8868b376b1 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -254,7 +254,7 @@ func BenchmarkStore(b *testing.B) { eds := edstest.RandEDS(b, 128) require.NoError(b, err) - // BenchmarkStore/bench_put_128-10 27 79025268 ns/op (~79ms) + // BenchmarkStore/bench_put_128-10 27 19209780 ns/op (~19ms) b.Run("put 128", func(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ {