Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(shwap): Add ODS file #3482

Merged
merged 13 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
github.com/ipfs/go-ipld-format v0.6.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipld/go-car v0.6.2
github.com/klauspost/reedsolomon v1.12.1
github.com/libp2p/go-libp2p v0.33.2
github.com/libp2p/go-libp2p-kad-dht v0.25.2
github.com/libp2p/go-libp2p-pubsub v0.10.1
Expand Down Expand Up @@ -228,7 +229,6 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/compress v1.17.6 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/klauspost/reedsolomon v1.12.1 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/lib/pq v1.10.7 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
Expand Down
47 changes: 47 additions & 0 deletions share/new_eds/axis_half.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package eds

import (
"fmt"

"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/shwap"
)
Expand All @@ -20,3 +22,48 @@ func (a AxisHalf) ToRow() shwap.Row {
}
return shwap.NewRow(a.Shares, side)
}

// Extended returns full axis shares from half axis shares.
func (a AxisHalf) Extended() ([]share.Share, error) {
if a.IsParity {
return reconstructShares(a.Shares)
}
return extendShares(a.Shares)
}

// extendShares constructs full axis shares from original half axis shares.
func extendShares(original []share.Share) ([]share.Share, error) {
if len(original) == 0 {
return nil, fmt.Errorf("original shares are empty")
}

codec := share.DefaultRSMT2DCodec()
parity, err := codec.Encode(original)
if err != nil {
return nil, fmt.Errorf("encoding: %w", err)
}
shares := make([]share.Share, len(original)*2)
copy(shares, original)
copy(shares[len(original):], parity)
return shares, nil
Comment on lines +45 to +48
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like there is an optimization opportunity to avoid copying that we can return to later

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to prealloc shares anyway, so copying is necessary. Can do implicitly by append tho

}

// reconstructShares constructs full axis shares from parity half axis shares.
func reconstructShares(parity []share.Share) ([]share.Share, error) {
if len(parity) == 0 {
return nil, fmt.Errorf("parity shares are empty")
}

sqLen := len(parity) * 2
shares := make([]share.Share, sqLen)
for i := sqLen / 2; i < sqLen; i++ {
shares[i] = parity[i-sqLen/2]
}

codec := share.DefaultRSMT2DCodec()
shares, err := codec.Decode(shares)
if err != nil {
return nil, fmt.Errorf("reconstructing: %w", err)
}
return shares, nil
}
32 changes: 32 additions & 0 deletions share/new_eds/axis_half_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package eds

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/share/sharetest"
)

func TestExtendAxisHalf(t *testing.T) {
shares := sharetest.RandShares(t, 16)

original := AxisHalf{
Shares: shares,
IsParity: false,
}

extended, err := original.Extended()
require.NoError(t, err)
require.Len(t, extended, len(shares)*2)

parity := AxisHalf{
Shares: extended[len(shares):],
IsParity: true,
}

parityExtended, err := parity.Extended()
require.NoError(t, err)

require.Equal(t, extended, parityExtended)
}
2 changes: 1 addition & 1 deletion share/new_eds/rsmt2d_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

func TestMemFile(t *testing.T) {
odsSize := 8
newAccessor := func(eds *rsmt2d.ExtendedDataSquare) Accessor {
newAccessor := func(tb testing.TB, eds *rsmt2d.ExtendedDataSquare) Accessor {
return &Rsmt2D{ExtendedDataSquare: eds}
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
12 changes: 6 additions & 6 deletions share/new_eds/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/celestiaorg/celestia-node/share/shwap"
)

type createAccessor func(eds *rsmt2d.ExtendedDataSquare) Accessor
type createAccessor func(testing.TB, *rsmt2d.ExtendedDataSquare) Accessor

// TestSuiteAccessor runs a suite of tests for the given Accessor implementation.
func TestSuiteAccessor(
Expand Down Expand Up @@ -51,7 +51,7 @@ func testAccessorSample(
odsSize int,
) {
eds := edstest.RandEDS(t, odsSize)
fl := createAccessor(eds)
fl := createAccessor(t, eds)

dah, err := share.NewRoot(eds)
require.NoError(t, err)
Expand Down Expand Up @@ -108,7 +108,7 @@ func testAccessorRowNamespaceData(
for amount := 1; amount < sharesAmount; amount++ {
// select random amount of shares, but not less than 1
eds, dah := edstest.RandEDSWithNamespace(t, namespace, amount, odsSize)
f := createAccessor(eds)
f := createAccessor(t, eds)

var actualSharesAmount int
// loop over all rows and check that the amount of shares in the namespace is equal to the expected
Expand Down Expand Up @@ -149,7 +149,7 @@ func testAccessorRowNamespaceData(
absentNs, err := share.Namespace(maxNs).AddInt(-1)
require.NoError(t, err)

f := createAccessor(eds)
f := createAccessor(t, eds)
rowData, err := f.RowNamespaceData(ctx, absentNs, i)
require.NoError(t, err)

Expand All @@ -170,7 +170,7 @@ func testAccessorAxisHalf(
odsSize int,
) {
eds := edstest.RandEDS(t, odsSize)
fl := createAccessor(eds)
fl := createAccessor(t, eds)

t.Run("single thread", func(t *testing.T) {
for _, axisType := range []rsmt2d.Axis{rsmt2d.Col, rsmt2d.Row} {
Expand Down Expand Up @@ -224,7 +224,7 @@ func testAccessorShares(
odsSize int,
) {
eds := edstest.RandEDS(t, odsSize)
fl := createAccessor(eds)
fl := createAccessor(t, eds)

shares, err := fl.Shares(ctx)
require.NoError(t, err)
Expand Down
10 changes: 7 additions & 3 deletions share/shwap/row_namespace_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,14 @@ func (rnd RowNamespaceData) Validate(dah *share.Root, namespace share.Namespace,
// verifyInclusion checks the inclusion of the row's shares in the provided root using NMT.
func (rnd RowNamespaceData) verifyInclusion(rowRoot []byte, namespace share.Namespace) bool {
leaves := make([][]byte, 0, len(rnd.Shares))
for _, shr := range rnd.Shares {
namespaceBytes := share.GetNamespace(shr)
leaves = append(leaves, append(namespaceBytes, shr...))
for _, sh := range rnd.Shares {
namespaceBytes := share.GetNamespace(sh)
leave := make([]byte, len(sh)+len(namespaceBytes))
copy(leave, namespaceBytes)
copy(leave[len(namespaceBytes):], sh)
leaves = append(leaves, leave)
Comment on lines +168 to +172
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a refactoring from appends to copies?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that you noticed. There was a very nasty mutations bug that took me few hours to fix.

}

return rnd.Proof.VerifyNamespace(
share.NewSHA256Hasher(),
namespace.ToNMT(),
Expand Down
24 changes: 24 additions & 0 deletions share/shwap/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"

"github.com/celestiaorg/celestia-app/pkg/wrapper"
"github.com/celestiaorg/nmt"
nmt_pb "github.com/celestiaorg/nmt/pb"
"github.com/celestiaorg/rsmt2d"
Expand All @@ -24,6 +25,29 @@ type Sample struct {
ProofType rsmt2d.Axis // ProofType indicates whether the proof is against a row or a column.
}

// SampleFromShares creates a Sample from a list of shares, using the specified proof type and
// the share index to be included in the sample.
func SampleFromShares(shares []share.Share, proofType rsmt2d.Axis, axisIdx, shrIdx int) (Sample, error) {
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(len(shares)/2), uint(axisIdx))
for _, shr := range shares {
err := tree.Push(shr)
if err != nil {
return Sample{}, err
}
}

proof, err := tree.ProveRange(shrIdx, shrIdx+1)
if err != nil {
return Sample{}, err
}

return Sample{
Share: shares[shrIdx],
Proof: &proof,
ProofType: proofType,
}, nil
}

// SampleFromProto converts a protobuf Sample back into its domain model equivalent.
func SampleFromProto(s *pb.Sample) Sample {
proof := nmt.NewInclusionProof(
Expand Down
38 changes: 38 additions & 0 deletions store/file/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package file

import (
"sync"

"github.com/klauspost/reedsolomon"
)

var codec Codec
Wondertan marked this conversation as resolved.
Show resolved Hide resolved

func init() {
codec = NewCodec()
}

type Codec interface {
Encoder(len int) (reedsolomon.Encoder, error)
}

type codecCache struct {
cache sync.Map
}

func NewCodec() Codec {
return &codecCache{}
}

func (l *codecCache) Encoder(len int) (reedsolomon.Encoder, error) {
enc, ok := l.cache.Load(len)
if !ok {
var err error
enc, err = reedsolomon.New(len/2, len/2, reedsolomon.WithLeopardGF(true))
if err != nil {
return nil, err
}
l.cache.Store(len, enc)
}
return enc.(reedsolomon.Encoder), nil
}
83 changes: 83 additions & 0 deletions store/file/codec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package file

import (
"fmt"
"testing"

"github.com/klauspost/reedsolomon"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/share/sharetest"
)

func BenchmarkCodec(b *testing.B) {
minSize, maxSize := 32, 128

for size := minSize; size <= maxSize; size *= 2 {
walldiss marked this conversation as resolved.
Show resolved Hide resolved
// BenchmarkCodec/Leopard/size:32-10 409194 2793 ns/op
// BenchmarkCodec/Leopard/size:64-10 190969 6170 ns/op
// BenchmarkCodec/Leopard/size:128-10 82821 14287 ns/op
b.Run(fmt.Sprintf("Leopard/size:%v", size), func(b *testing.B) {
enc, err := reedsolomon.New(size/2, size/2, reedsolomon.WithLeopardGF(true))
require.NoError(b, err)

shards := newShards(b, size, true)

b.ResetTimer()
for i := 0; i < b.N; i++ {
err = enc.Encode(shards)
require.NoError(b, err)
}
})

// BenchmarkCodec/default/size:32-10 222153 5364 ns/op
// BenchmarkCodec/default/size:64-10 58831 20349 ns/op
// BenchmarkCodec/default/size:128-10 14940 80471 ns/op
b.Run(fmt.Sprintf("default/size:%v", size), func(b *testing.B) {
enc, err := reedsolomon.New(size/2, size/2, reedsolomon.WithLeopardGF(false))
require.NoError(b, err)

shards := newShards(b, size, true)

b.ResetTimer()
for i := 0; i < b.N; i++ {
err = enc.Encode(shards)
require.NoError(b, err)
}
})

// BenchmarkCodec/default-reconstructSome/size:32-10 1263585 954.4 ns/op
// BenchmarkCodec/default-reconstructSome/size:64-10 762273 1554 ns/op
// BenchmarkCodec/default-reconstructSome/size:128-10 429268 2974 ns/op
b.Run(fmt.Sprintf("default-reconstructSome/size:%v", size), func(b *testing.B) {
enc, err := reedsolomon.New(size/2, size/2, reedsolomon.WithLeopardGF(false))
require.NoError(b, err)

shards := newShards(b, size, false)
targets := make([]bool, size)
target := size - 2
targets[target] = true

b.ResetTimer()
for i := 0; i < b.N; i++ {
err = enc.ReconstructSome(shards, targets)
require.NoError(b, err)
shards[target] = nil
}
})
}
}

func newShards(b require.TestingT, size int, fillParity bool) [][]byte {
shards := make([][]byte, size)
original := sharetest.RandShares(b, size/2)
copy(shards, original)

if fillParity {
// fill with parity empty Shares
for j := len(original); j < len(shards); j++ {
shards[j] = make([]byte, len(original[0]))
}
}
return shards
}
Loading
Loading