diff --git a/go.mod b/go.mod index a8c7a68817..45040f8cb7 100644 --- a/go.mod +++ b/go.mod @@ -13,10 +13,12 @@ require ( github.com/google/uuid v1.6.0 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/klauspost/compress v1.17.11 + github.com/klauspost/reedsolomon v1.12.4 github.com/mitchellh/go-homedir v1.1.0 github.com/mitchellh/mapstructure v1.5.0 github.com/mr-tron/base58 v1.2.0 github.com/multiformats/go-multiaddr v0.12.2 + github.com/mxschmitt/golang-combinations v1.2.0 github.com/nspcc-dev/hrw/v2 v2.0.3 github.com/nspcc-dev/locode-db v0.6.0 github.com/nspcc-dev/neo-go v0.110.0 @@ -63,7 +65,7 @@ require ( github.com/holiman/uint256 v1.3.2 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/ipfs/go-cid v0.4.1 // indirect - github.com/klauspost/cpuid/v2 v2.2.7 // indirect + github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect github.com/minio/sha256-simd v1.0.1 // indirect diff --git a/go.sum b/go.sum index 1a331b1fad..17e9bd5d00 100644 --- a/go.sum +++ b/go.sum @@ -119,8 +119,10 @@ github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk= github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= -github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= -github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= +github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/klauspost/reedsolomon v1.12.4 h1:5aDr3ZGoJbgu/8+j45KtUJxzYm8k08JGtB9Wx1VQ4OA= +github.com/klauspost/reedsolomon v1.12.4/go.mod h1:d3CzOMOt0JXGIFZm1StgkyF14EYr3xneR2rNWo7NcMU= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -183,6 +185,8 @@ github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/n github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/mxschmitt/golang-combinations v1.2.0 h1:V5E7MncIK8Yr1SL/SpdqMuSquFsfoIs5auI7Y3n8z14= +github.com/mxschmitt/golang-combinations v1.2.0/go.mod h1:RCm5eR03B+JrBOMRDLsKZWShluXdrHu+qwhPEJ0miBM= github.com/nspcc-dev/bbolt v0.0.0-20250612101626-5df2544a4a22 h1:M5Nmg1iCnbZngzIBDIlMr9vW+okFfcSMBvBlXG8r+14= github.com/nspcc-dev/bbolt v0.0.0-20250612101626-5df2544a4a22/go.mod h1:AsD+OCi/qPN1giOX1aiLAha3o1U8rAz65bvN4j0sRuk= github.com/nspcc-dev/dbft v0.3.3-0.20250321140139-7462b47e4d2d h1:Mm0bp0YRAuGfoUDPbleQ9zByJc6HTCu3B4/UBoen9cQ= diff --git a/pkg/services/object/put/ec.go b/pkg/services/object/put/ec.go new file mode 100644 index 0000000000..3b7113e743 --- /dev/null +++ b/pkg/services/object/put/ec.go @@ -0,0 +1,157 @@ +package putsvc + +import ( + "bytes" + "encoding/base64" + "encoding/hex" + "fmt" + "strconv" + + "github.com/klauspost/reedsolomon" + "github.com/nspcc-dev/neofs-node/pkg/services/object/internal" + "github.com/nspcc-dev/neofs-sdk-go/checksum" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/tzhash/tz" +) + +type reedSolomonEncoder struct { + dst internal.Target + + dataShards int + parityShards int + signer neofscrypto.Signer + + srcHdr object.Object + pldBuf bytes.Buffer +} + +func newReedSolomonEncoder(dst internal.Target, dataShards, parityShards int, signer neofscrypto.Signer) internal.Target { + return &reedSolomonEncoder{ + dst: dst, + dataShards: dataShards, + parityShards: parityShards, + signer: signer, + } +} + +// TODO: Place in SDK. +const ( + reedSolomonAttrIdx = "__NEOFS__EC_RS_IDX" + reedSolomonAttrSrcPayloadLen = "__NEOFS__EC_RS_SRC_PAYLOAD_LEN" + reedSolomonAttrSrcPayloadSHA256 = "__NEOFS__EC_RS_SRC_PAYLOAD_HASH_SHA256" + reedSolomonAttrSrcPayloadTZ = "__NEOFS__EC_RS_SRC_PAYLOAD_HASH_TZ" + reedSolomonAttrSrcID = "__NEOFS__EC_RS_SRC_ID" + reedSolomonAttrSrcSignature = "__NEOFS__EC_RS_SRC_SIGNATURE" +) + +func (x *reedSolomonEncoder) WriteHeader(hdr *object.Object) error { + if cs, ok := x.srcHdr.PayloadChecksum(); ok && cs.Type() != checksum.SHA256 { + return fmt.Errorf("unexpected payloaad checksum type %v instead of %v", cs.Type(), checksum.SHA256) + } + if cs, ok := x.srcHdr.PayloadHomomorphicHash(); ok && cs.Type() != checksum.TillichZemor { + return fmt.Errorf("unexpected payloaad checksum type %v instead of %v", cs.Type(), checksum.TillichZemor) + } + + x.srcHdr = *hdr + x.pldBuf.Reset() + return nil +} + +func (x *reedSolomonEncoder) Write(p []byte) (int, error) { + return x.pldBuf.Write(p) +} + +func (x *reedSolomonEncoder) Close() (oid.ID, error) { + if x.srcHdr.PayloadSize() == 0 { + // TODO: avoid make + return x.putShards(make([][]byte, x.dataShards+x.parityShards)) + } + + // TODO: Explore possibility to reset and reuse encoder for next object. + // TODO: Explore reedsolomon.Option for performance improvement. + // TODO: Compare with reedsolomon.StreamEncoder. + enc, err := reedsolomon.New(x.dataShards, x.parityShards) + if err != nil { // should never happen + return oid.ID{}, fmt.Errorf("init Reed-Solomon(%d,%d) encoder: %w", x.dataShards, x.parityShards, err) + } + + shards, err := enc.Split(x.pldBuf.Bytes()) + if err != nil { + return oid.ID{}, fmt.Errorf("split data into Reed-Solomon(%d,%d) shards: %w", x.dataShards, x.parityShards, err) + } + + if err := enc.Encode(shards); err != nil { + return oid.ID{}, fmt.Errorf("calculate parity Reed-Solomon(%d,%d) shards: %w", x.dataShards, x.parityShards, err) + } + + return x.putShards(shards) +} + +func (x *reedSolomonEncoder) putShards(shards [][]byte) (oid.ID, error) { + srcSig := x.srcHdr.Signature() + srcPldSHA256, withSrcPldSHA256 := x.srcHdr.PayloadChecksum() + srcPldTZ, withSrcPldTZ := x.srcHdr.PayloadHomomorphicHash() + + srcIdxAttr := object.NewAttribute(reedSolomonAttrIdx, "") + srcIDAttr := object.NewAttribute(reedSolomonAttrSrcID, x.srcHdr.GetID().String()) + srcPldLenAttr := object.NewAttribute(reedSolomonAttrSrcPayloadLen, strconv.FormatUint(x.srcHdr.PayloadSize(), 10)) + var srcPldSHA256Attr, srcPldTZAttr, srcSigAttr object.Attribute + if withSrcPldSHA256 { + srcPldSHA256Attr = object.NewAttribute(reedSolomonAttrSrcPayloadSHA256, hex.EncodeToString(srcPldSHA256.Value())) + } + if withSrcPldTZ { + srcPldTZAttr = object.NewAttribute(reedSolomonAttrSrcPayloadTZ, hex.EncodeToString(srcPldTZ.Value())) + } + if srcSig != nil { + srcSigAttr = object.NewAttribute(reedSolomonAttrSrcSignature, base64.StdEncoding.EncodeToString(srcSig.Marshal())) + } + + for i := range shards { + shardHdr := x.srcHdr + + srcIdxAttr.SetValue(strconv.Itoa(i)) + + attrs := append(shardHdr.Attributes(), + srcIDAttr, + srcPldLenAttr, + srcIdxAttr, + ) + if withSrcPldSHA256 { + attrs = append(attrs, srcPldSHA256Attr) + } + if withSrcPldTZ { + attrs = append(attrs, srcPldTZAttr) + } + if srcSig != nil { + attrs = append(attrs, srcSigAttr) + } + shardHdr.SetAttributes(attrs...) + + shardHdr.SetPayloadSize(uint64(len(shards[i]))) + shardHdr.SetPayloadChecksum(object.CalculatePayloadChecksum(shards[i])) + if withSrcPldTZ { + shardHdr.SetPayloadHomomorphicHash(checksum.NewTillichZemor(tz.Sum(shards[i]))) + } + + if err := shardHdr.SetIDWithSignature(x.signer); err != nil { + return oid.ID{}, fmt.Errorf("finalize object header for Reed-Solomon(%d,%d) shard #%d: %w", x.dataShards, x.parityShards, i, err) + } + + if err := x.dst.WriteHeader(&shardHdr); err != nil { + return oid.ID{}, fmt.Errorf("write object header for Reed-Solomon(%d,%d) shard #%d: %w", x.dataShards, x.parityShards, i, err) + } + // TODO: Provide way to notice that payload buffer can be retained (but not modified) to avoid excessive copy. + // io.Writer prohibits retention. Instead, consider attaching payload to instance passed to WriteHeader(). + // The caller may check that its len is PayloadSize() and not copy. + if _, err := x.dst.Write(shards[i]); err != nil { + return oid.ID{}, fmt.Errorf("write object payload for Reed-Solomon(%d,%d) shard #%d: %w", x.dataShards, x.parityShards, i, err) + } + if _, err := x.dst.Close(); err != nil { + return oid.ID{}, fmt.Errorf("save object for Reed-Solomon(%d,%d) shard #%d: %w", x.dataShards, x.parityShards, i, err) + } + } + + return x.srcHdr.GetID(), nil +} diff --git a/pkg/services/object/put/slice.go b/pkg/services/object/put/slice.go index fd4eaa1f2a..409d5a5af1 100644 --- a/pkg/services/object/put/slice.go +++ b/pkg/services/object/put/slice.go @@ -27,6 +27,11 @@ type slicingTarget struct { payloadWriter *slicer.PayloadWriter } +type reedSolomonPolicy struct { + dataShards int + parityShards int +} + // returns [internal.Target] for raw root object streamed by the client // with payload slicing and child objects' formatting. Each ready child object // is written into destination target constructed via the given [internal.Target]. @@ -38,7 +43,12 @@ func newSlicingTarget( sessionToken *session.Object, curEpoch uint64, initNextTarget internal.Target, + rsp *reedSolomonPolicy, ) internal.Target { + if rsp != nil { + initNextTarget = newReedSolomonEncoder(initNextTarget, rsp.dataShards, rsp.parityShards, signer) + } + return &slicingTarget{ ctx: ctx, signer: signer, diff --git a/pkg/services/object/put/slice_test.go b/pkg/services/object/put/slice_test.go new file mode 100644 index 0000000000..4eef611d6b --- /dev/null +++ b/pkg/services/object/put/slice_test.go @@ -0,0 +1,441 @@ +package putsvc + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/base64" + "encoding/hex" + "fmt" + "iter" + "slices" + "strconv" + "strings" + "testing" + + "github.com/klauspost/reedsolomon" + combinations "github.com/mxschmitt/golang-combinations" + "github.com/nspcc-dev/neofs-node/internal/testutil" + "github.com/nspcc-dev/neofs-sdk-go/checksum" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/neofs-sdk-go/session" + sessiontest "github.com/nspcc-dev/neofs-sdk-go/session/test" + usertest "github.com/nspcc-dev/neofs-sdk-go/user/test" + "github.com/nspcc-dev/neofs-sdk-go/version" + "github.com/nspcc-dev/tzhash/tz" + "github.com/stretchr/testify/require" +) + +type inMemObjects struct { + list []object.Object +} + +func (x *inMemObjects) WriteHeader(hdr *object.Object) error { + // TODO: For linker, payload appears twice: here in header, and in Write. It's better to avoid doing this. + hdr = hdr.CutPayload() + + // TODO: It's easy to forget, and then the data will be mutated. Try not to require data flush. + var cp object.Object + hdr.CopyTo(&cp) + + x.list = append(x.list, cp) + return nil +} + +func (x *inMemObjects) Write(p []byte) (int, error) { + last := &x.list[len(x.list)-1] + last.SetPayload(append(last.Payload(), p...)) + return len(p), nil +} + +func (x *inMemObjects) Close() (oid.ID, error) { + return x.list[len(x.list)-1].GetID(), nil +} + +func combos(n, k int) iter.Seq[[]int] { + s := make([]int, n) + for i := range s { + s[i] = i + } + return slices.Values(combinations.Combinations(s, k)) +} + +func excludeShards(shards [][]byte, idsx []int) [][]byte { + shardsCp := slices.Clone(shards) + for i, shard := range shardsCp { + shardsCp[i] = slices.Clone(shard) + } + for _, idx := range idsx { + shardsCp[idx] = nil + } + return shardsCp +} + +func getAttribute(s []object.Attribute, k string) string { + if ind := slices.IndexFunc(s, func(e object.Attribute) bool { return e.Key() == k }); ind >= 0 { + return s[ind].Value() + } + return "" +} + +func assertTZHash(t *testing.T, must bool, obj object.Object) { + cs, ok := obj.PayloadHomomorphicHash() + require.Equal(t, must, ok) + if must { + calc := tz.Sum(obj.Payload()) + require.Equal(t, calc[:], cs.Value()) + } +} + +func recoverPayloadFromReedSolomonShards(t *testing.T, ln uint64, rsp reedSolomonPolicy, shardObjs []object.Object) []byte { + var shards [][]byte + for i := range shardObjs { + shards = append(shards, shardObjs[i].Payload()) + } + + if ln == 0 { + ind := slices.IndexFunc(shards, func(e []byte) bool { return len(e) > 0 }) + require.Negative(t, ind, "non-empty payload from empty source object in Reed-Solomon shard") + return nil + } + + enc, err := reedsolomon.New(rsp.dataShards, rsp.parityShards) + require.NoError(t, err) + + ok, err := enc.Verify(shards) + require.NoError(t, err) + require.True(t, ok) + + required := make([]bool, rsp.dataShards+rsp.parityShards) + for i := range rsp.dataShards { + required[i] = true + } + + for lostCount := 1; lostCount <= rsp.parityShards; lostCount++ { + for lostIdxs := range combos(len(shards), lostCount) { + shardsCp := excludeShards(shards, lostIdxs) + require.NoError(t, enc.Reconstruct(shardsCp), + "source payload cannot be recovered from Reed-Solomon shards when no more than %d items lost (%v)", rsp.parityShards, lostIdxs) + require.Equal(t, shards, shardsCp, "wrong payload recovered without an error") + + shardsCp = excludeShards(shards, lostIdxs) + require.NoError(t, enc.ReconstructSome(shardsCp, required), + "source payload cannot be recovered from Reed-Solomon shards when no more than %d items lost (%v)", rsp.parityShards, lostIdxs) + require.Equal(t, shards[:rsp.dataShards], shardsCp[:rsp.dataShards], "wrong payload recovered without an error") + } + } + + for lostIdxs := range combos(len(shards), rsp.parityShards+1) { + require.Error(t, enc.Reconstruct(excludeShards(shards, lostIdxs)), + "source payload can be recovered from Reed-Solomon shards when more than %d items lost (%v)", rsp.parityShards, lostIdxs) + require.Error(t, enc.ReconstructSome(excludeShards(shards, lostIdxs), required), + "source payload can be recovered from Reed-Solomon shards when more than %d items lost (%v)", rsp.parityShards, lostIdxs) + } + + data := make([]byte, 0, ln) + for i := range shards[:rsp.dataShards] { + data = append(data, shardObjs[i].Payload()...) + } + + require.GreaterOrEqual(t, uint64(len(data)), ln, "source payload is truncated in Reed-Solomon shards") + + require.False(t, slices.ContainsFunc(data[ln:], func(b byte) bool { return b != 0 }), + "source payload is aligned with non-zero byte in Reed-Solomon shards") + + return data[:ln] +} + +func recoverHeaderFromReedSolomonShard(t *testing.T, shard object.Object) (object.Object, int) { + attrs := shard.Attributes() + + idxAttr := getAttribute(attrs, "__NEOFS__EC_RS_IDX") + require.NotEmpty(t, idxAttr, "missing attribute for shard index in Reed-Solomon shard") + idx, err := strconv.Atoi(idxAttr) + require.NoError(t, err, "attribute for shard index contains invalid int in Reed-Solomon shard") + require.True(t, idx >= 0, "attribute for shard index contains negative value in Reed-Solomon shard") + + srcPldLenAttr := getAttribute(attrs, "__NEOFS__EC_RS_SRC_PAYLOAD_LEN") + require.NotEmpty(t, srcPldLenAttr, "missing attribute for source payload len in Reed-Solomon shard") + srcPldLen, err := strconv.ParseUint(srcPldLenAttr, 10, 64) + require.NoError(t, err, "attribute for source payload len attribute contains invalid uint in Reed-Solomon shard") + + srcPldSHA256Attr := getAttribute(attrs, "__NEOFS__EC_RS_SRC_PAYLOAD_HASH_SHA256") + require.NotEmpty(t, srcPldSHA256Attr, "missing attribute for source payload SHA256 hash in Reed-Solomon shard") + srcPldSHA256, err := hex.DecodeString(srcPldSHA256Attr) + require.NoError(t, err, "attribute for source payload SHA256 hash contains invalid HEX in Reed-Solomon shard") + require.Len(t, srcPldSHA256, sha256.Size, "attribute for source payload SHA256 hash contains wrong len value in Reed-Solomon shard") + + srcPldTZAttr := getAttribute(attrs, "__NEOFS__EC_RS_SRC_PAYLOAD_HASH_TZ") + var srcPldTZ []byte + if srcPldTZAttr != "" { + srcPldTZ, err = hex.DecodeString(srcPldTZAttr) + require.NoError(t, err, "attribute for source payload TZ hash contains invalid HEX in Reed-Solomon shard") + require.Len(t, srcPldTZ, tz.Size, "attribute for source payload TZ hash contains wrong len value in Reed-Solomon shard") + } + + idAttr := getAttribute(attrs, "__NEOFS__EC_RS_SRC_ID") + require.NotEmpty(t, idAttr, "missing attribute for source object ID in Reed-Solomon shard") + var id oid.ID + require.NoError(t, id.DecodeString(idAttr), "attribute for source object ID contains invalid value in Reed-Solomon shard") + + srcSigAttr := getAttribute(attrs, "__NEOFS__EC_RS_SRC_SIGNATURE") + require.NotEmpty(t, srcSigAttr, "missing attribute for source object signature in Reed-Solomon shard") + binSig, err := base64.StdEncoding.DecodeString(srcSigAttr) + require.NoError(t, err, "attribute for source object signature contains invalid Base-64 in Reed-Solomon shard") + var srcSig neofscrypto.Signature + require.NoError(t, srcSig.Unmarshal(binSig), "attribute for source object signature contains invalid BLOB in Reed-Solomon shard") + + attrs = slices.DeleteFunc(attrs, func(a object.Attribute) bool { + return strings.HasPrefix(a.Key(), "__NEOFS__EC_RS_") + }) + + srcHdr := *shard.CutPayload() + srcHdr.SetPayloadSize(srcPldLen) + srcHdr.SetPayloadChecksum(checksum.New(checksum.SHA256, srcPldSHA256)) + if srcPldTZAttr != "" { + srcHdr.SetPayloadHomomorphicHash(checksum.New(checksum.TillichZemor, srcPldTZ)) + } + srcHdr.SetAttributes(attrs...) + srcHdr.SetID(id) + srcHdr.SetSignature(&srcSig) + + return srcHdr, idx +} + +func recoverObjectFromReedSolomonShards(t *testing.T, pldLenLimit uint64, withTZHash bool, rsp reedSolomonPolicy, shards []object.Object) object.Object { + for i, shard := range shards { + require.LessOrEqual(t, shard.PayloadSize(), pldLenLimit, "payload overflows len limit in Reed-Solomon shard #%d", i) + require.EqualValues(t, len(shard.Payload()), shard.PayloadSize(), "wrong payload length in Reed-Solomon shard #%d", i) + assertTZHash(t, withTZHash, shard) + require.NoError(t, shard.CheckVerificationFields(), "invalid object for Reed-Solomon shard #%d", i) + } + + var srcHdr object.Object + for i := range shards { + srcHdrI, idx := recoverHeaderFromReedSolomonShard(t, shards[i]) + require.EqualValues(t, idx, i, "order of shards is broken in Reed-Solomon shards") + + if i == 0 { + srcHdr = srcHdrI + } else { + require.Equal(t, srcHdr, srcHdrI, "different source object headers in Reed-Solomon shard") + } + } + + srcPld := recoverPayloadFromReedSolomonShards(t, srcHdr.PayloadSize(), rsp, shards) + + srcObj := srcHdr + srcObj.SetPayload(srcPld) + + return srcObj +} + +func verifySplitShards(t *testing.T, pldLenLimit uint64, withTZHash bool, shards []object.Object) { + for i, shard := range shards { + require.LessOrEqual(t, shard.PayloadSize(), pldLenLimit, "payload overflows len limit in split shard #%d", i) + require.EqualValues(t, len(shard.Payload()), shard.PayloadSize(), "wrong payload length in split shard #%d", i) + assertTZHash(t, withTZHash, shard) + require.NoError(t, shard.CheckVerificationFields(), "invalid object for split shard #%d", i) + } + + if len(shards) == 1 { + require.Zero(t, shards[0].GetParentID(), "parent ID in unsplit object") + require.Nil(t, shards[0].Parent(), "parent header in unsplit object") + return + } + + require.GreaterOrEqual(t, len(shards), 3, "less than 3 split shards") // at least two shard + linker + + dataShards := shards[:len(shards)-1] // linker is last + + parHdrInLast := dataShards[len(dataShards)-1].Parent() + require.NotZero(t, parHdrInLast, "missing parent header in last split shard") + require.NotZero(t, parHdrInLast.GetID(), "missing parent object ID in last split shard") + require.NotZero(t, parHdrInLast.Signature(), "missing parent object signature in last split shard") + + partHdrInFirst := dataShards[0].Parent() + require.NotZero(t, partHdrInFirst, "missing parent header in first split shard") + require.Zero(t, partHdrInFirst.GetID(), "parent object ID is set while should not be in first split shard") + require.Zero(t, partHdrInFirst.Signature(), "parent object signature is set while should not be in first split shard") + + ind := slices.IndexFunc(dataShards[1:len(dataShards)-1], func(shard object.Object) bool { return !shard.GetParentID().IsZero() || shard.Parent() != nil }) + require.Negative(t, ind, "parent header is set while should not be in intermediate split shards") + + parCnr := parHdrInLast.GetContainerID() + require.Equal(t, parCnr, partHdrInFirst.GetContainerID(), "different parent containers in first and last split shards") + ind = slices.IndexFunc(shards, func(shard object.Object) bool { return shard.GetContainerID() != parCnr }) + require.Negative(t, ind, "wrong container in split shard") + + parOwner := parHdrInLast.Owner() + require.Equal(t, parOwner, partHdrInFirst.Owner(), "different parent owners in first and last split shards") + ind = slices.IndexFunc(shards, func(shard object.Object) bool { return shard.Owner() != parOwner }) + require.Negative(t, ind, "wrong owner in split shard") + + ind = slices.IndexFunc(shards, func(shard object.Object) bool { return len(shard.Attributes()) > 0 }) + require.Negative(t, ind, "attributes are set in split shard") + + require.Zero(t, dataShards[0].GetFirstID(), "first shard ID is set while should not be in first split shard") + require.Zero(t, dataShards[0].GetPreviousID(), "previous shard ID is set while should not be in first split shard") + + for i := range dataShards[1:] { + require.Equal(t, dataShards[i].GetID(), dataShards[i+1].GetPreviousID(), "wrong previous shard ID in split shard #%d", i) + require.Equal(t, dataShards[0].GetID(), dataShards[i+1].GetFirstID(), "wrong first shard ID in split shard #%d", i) + } + + linker := shards[len(shards)-1] + require.Equal(t, linker.Parent(), parHdrInLast, "different parent headers in last and link split shards") + require.Equal(t, object.TypeLink, linker.Type(), "wrong object type in link split shard") + + require.NotEmpty(t, linker.Payload(), "empty payload in link split shard") + var linkerData object.Link + require.NoError(t, linkerData.Unmarshal(linker.Payload()), "invalid payload in link split shard") + linkerObjs := linkerData.Objects() + require.Len(t, linkerObjs, len(dataShards), "wrong number of items in link split shard") + for i := range linkerObjs { + require.Equal(t, dataShards[i].GetID(), linkerObjs[i].ObjectID(), "wrong shard ID in link split shard item #%d", i) + require.EqualValues(t, dataShards[i].PayloadSize(), linkerObjs[i].ObjectSize(), "wrong shard payload len in link split shard item #%d", i) + } +} + +func recoverObjectFromSplitShards(t *testing.T, pldLenLimit uint64, withTZHash bool, shards []object.Object) object.Object { + verifySplitShards(t, pldLenLimit, withTZHash, shards) + + if len(shards) == 1 { + return shards[0] + } + + var srcPld []byte + for _, shard := range shards[:len(shards)-1] { // cut linker + srcPld = append(srcPld, shard.Payload()...) + } + + srcObj := *shards[len(shards)-2].Parent() + srcObj.SetPayload(srcPld) + + return srcObj +} + +func recoverSplitObjectFromReedSolomonShards(t *testing.T, pldLenLimit uint64, withTZHash bool, rsp reedSolomonPolicy, rsShards []object.Object) object.Object { + rsShardsPerObject := rsp.dataShards + rsp.parityShards + + var splitShards []object.Object + for s := range slices.Chunk(rsShards, rsShardsPerObject) { + splitShard := recoverObjectFromReedSolomonShards(t, pldLenLimit, withTZHash, rsp, s) + splitShards = append(splitShards, splitShard) + } + + srcObj := recoverObjectFromSplitShards(t, pldLenLimit, withTZHash, splitShards) + + return srcObj +} + +func splitAndRecoverObject(t *testing.T, srcObj object.Object, pldLenLimit uint64, withTZHash bool, st *session.Object, rsp reedSolomonPolicy) object.Object { + const curEpoch = 123 + signer := usertest.User() + + var objs inMemObjects + slcr := newSlicingTarget(context.Background(), pldLenLimit, !withTZHash, signer, st, curEpoch, &objs, &rsp) + + srcObj.SetOwner(signer.ID) + + err := slcr.WriteHeader(srcObj.CutPayload()) + require.NoError(t, err) + _, err = slcr.Write(srcObj.Payload()) + require.NoError(t, err) + id, err := slcr.Close() + require.NoError(t, err) + + recvObj := recoverSplitObjectFromReedSolomonShards(t, pldLenLimit, withTZHash, rsp, objs.list) + + require.Equal(t, id, recvObj.GetID()) + require.NotNil(t, recvObj.Version()) + require.Equal(t, version.Current(), *recvObj.Version()) + require.EqualValues(t, len(recvObj.Payload()), recvObj.PayloadSize(), "wrong payload length in recovered object") + require.NoError(t, recvObj.CheckVerificationFields(), "recovered object is invalid") + assertTZHash(t, withTZHash, recvObj) + require.EqualValues(t, curEpoch, recvObj.CreationEpoch()) + if st != nil { + require.NotNil(t, recvObj) + require.Equal(t, st, recvObj.SessionToken()) + require.Equal(t, st.Issuer(), recvObj.Owner()) + } else { + require.Nil(t, recvObj.SessionToken()) + require.Equal(t, srcObj.Owner(), recvObj.Owner()) + } + + return recvObj +} + +func testReedSolomonPolicy(t *testing.T, pldLenLimit, pldLen uint64, rsp reedSolomonPolicy) { + cnr := cidtest.ID() + payload := testutil.RandByteSlice(pldLen) + attrs := []object.Attribute{ + object.NewAttribute("attr_1", "val_1"), + object.NewAttribute("attr_2", "val_2"), + } + + var srcObj object.Object + srcObj.SetContainerID(cnr) + srcObj.SetAttributes(attrs...) + srcObj.SetPayload(payload) + + check := func(t *testing.T, recvObj object.Object) { + require.Equal(t, cnr, recvObj.GetContainerID()) + require.Equal(t, attrs, recvObj.Attributes()) + require.True(t, bytes.Equal(payload, recvObj.Payload())) + } + + t.Run("with TZ hash", func(t *testing.T) { + recvObj := splitAndRecoverObject(t, srcObj, pldLenLimit, true, nil, rsp) + check(t, recvObj) + }) + + t.Run("with session token", func(t *testing.T) { + tok := sessiontest.Object() + recvObj := splitAndRecoverObject(t, srcObj, pldLenLimit, false, &tok, rsp) + check(t, recvObj) + }) + + recvObj := splitAndRecoverObject(t, srcObj, pldLenLimit, false, nil, rsp) + check(t, recvObj) +} + +func TestReedSolomon(t *testing.T) { + const maxObjectSize = 4 << 10 + + for _, rsp := range []reedSolomonPolicy{ + {dataShards: 3, parityShards: 1}, + {dataShards: 6, parityShards: 3}, + {dataShards: 12, parityShards: 4}, + } { + t.Run(fmt.Sprintf("(%d,%d)", rsp.dataShards, rsp.parityShards), func(t *testing.T) { + t.Run("empty payload", func(t *testing.T) { + testReedSolomonPolicy(t, maxObjectSize, 0, rsp) + }) + t.Run("one byte", func(t *testing.T) { + testReedSolomonPolicy(t, maxObjectSize, 1, rsp) + }) + t.Run("limit-1", func(t *testing.T) { + testReedSolomonPolicy(t, maxObjectSize, maxObjectSize-1, rsp) + }) + t.Run("exactly limit", func(t *testing.T) { + testReedSolomonPolicy(t, maxObjectSize, maxObjectSize, rsp) + }) + t.Run("limit+1", func(t *testing.T) { + testReedSolomonPolicy(t, maxObjectSize, maxObjectSize+1, rsp) + }) + t.Run("limit+50%", func(t *testing.T) { + testReedSolomonPolicy(t, maxObjectSize, maxObjectSize+maxObjectSize/2, rsp) + }) + t.Run("limitX2", func(t *testing.T) { + testReedSolomonPolicy(t, maxObjectSize, maxObjectSize*2, rsp) + }) + t.Run("limitX5+1", func(t *testing.T) { + testReedSolomonPolicy(t, maxObjectSize, maxObjectSize*5+1, rsp) + }) + }) + } +} diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index 05d0d9fffd..7a23b9651f 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -133,6 +133,7 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error { sToken, p.networkState.CurrentEpoch(), p.newCommonTarget(prm), + nil, ), homomorphicChecksumRequired: homomorphicChecksumRequired, }