Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ require (
github.com/google/uuid v1.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/klauspost/compress v1.17.11
github.com/klauspost/reedsolomon v1.12.4
github.com/mitchellh/go-homedir v1.1.0
github.com/mitchellh/mapstructure v1.5.0
github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multiaddr v0.12.2
github.com/mxschmitt/golang-combinations v1.2.0
github.com/nspcc-dev/hrw/v2 v2.0.3
github.com/nspcc-dev/locode-db v0.6.0
github.com/nspcc-dev/neo-go v0.110.0
Expand Down Expand Up @@ -63,7 +65,7 @@ require (
github.com/holiman/uint256 v1.3.2 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/ipfs/go-cid v0.4.1 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,10 @@ github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s=
github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM=
github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/klauspost/reedsolomon v1.12.4 h1:5aDr3ZGoJbgu/8+j45KtUJxzYm8k08JGtB9Wx1VQ4OA=
github.com/klauspost/reedsolomon v1.12.4/go.mod h1:d3CzOMOt0JXGIFZm1StgkyF14EYr3xneR2rNWo7NcMU=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand Down Expand Up @@ -183,6 +185,8 @@ github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/n
github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mxschmitt/golang-combinations v1.2.0 h1:V5E7MncIK8Yr1SL/SpdqMuSquFsfoIs5auI7Y3n8z14=
github.com/mxschmitt/golang-combinations v1.2.0/go.mod h1:RCm5eR03B+JrBOMRDLsKZWShluXdrHu+qwhPEJ0miBM=
github.com/nspcc-dev/bbolt v0.0.0-20250612101626-5df2544a4a22 h1:M5Nmg1iCnbZngzIBDIlMr9vW+okFfcSMBvBlXG8r+14=
github.com/nspcc-dev/bbolt v0.0.0-20250612101626-5df2544a4a22/go.mod h1:AsD+OCi/qPN1giOX1aiLAha3o1U8rAz65bvN4j0sRuk=
github.com/nspcc-dev/dbft v0.3.3-0.20250321140139-7462b47e4d2d h1:Mm0bp0YRAuGfoUDPbleQ9zByJc6HTCu3B4/UBoen9cQ=
Expand Down
157 changes: 157 additions & 0 deletions pkg/services/object/put/ec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package putsvc

import (
"bytes"
"encoding/base64"
"encoding/hex"
"fmt"
"strconv"

"github.com/klauspost/reedsolomon"
"github.com/nspcc-dev/neofs-node/pkg/services/object/internal"
"github.com/nspcc-dev/neofs-sdk-go/checksum"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/tzhash/tz"
)

type reedSolomonEncoder struct {
dst internal.Target

dataShards int
parityShards int
signer neofscrypto.Signer

srcHdr object.Object
pldBuf bytes.Buffer
}

func newReedSolomonEncoder(dst internal.Target, dataShards, parityShards int, signer neofscrypto.Signer) internal.Target {
return &reedSolomonEncoder{
dst: dst,
dataShards: dataShards,
parityShards: parityShards,
signer: signer,
}
}

// TODO: Place in SDK.
const (
reedSolomonAttrIdx = "__NEOFS__EC_RS_IDX"
reedSolomonAttrSrcPayloadLen = "__NEOFS__EC_RS_SRC_PAYLOAD_LEN"
reedSolomonAttrSrcPayloadSHA256 = "__NEOFS__EC_RS_SRC_PAYLOAD_HASH_SHA256"
reedSolomonAttrSrcPayloadTZ = "__NEOFS__EC_RS_SRC_PAYLOAD_HASH_TZ"
reedSolomonAttrSrcID = "__NEOFS__EC_RS_SRC_ID"
reedSolomonAttrSrcSignature = "__NEOFS__EC_RS_SRC_SIGNATURE"
)

func (x *reedSolomonEncoder) WriteHeader(hdr *object.Object) error {
if cs, ok := x.srcHdr.PayloadChecksum(); ok && cs.Type() != checksum.SHA256 {
return fmt.Errorf("unexpected payloaad checksum type %v instead of %v", cs.Type(), checksum.SHA256)
}

Check warning on line 52 in pkg/services/object/put/ec.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/ec.go#L51-L52

Added lines #L51 - L52 were not covered by tests
if cs, ok := x.srcHdr.PayloadHomomorphicHash(); ok && cs.Type() != checksum.TillichZemor {
return fmt.Errorf("unexpected payloaad checksum type %v instead of %v", cs.Type(), checksum.TillichZemor)
}

Check warning on line 55 in pkg/services/object/put/ec.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/ec.go#L54-L55

Added lines #L54 - L55 were not covered by tests

x.srcHdr = *hdr
x.pldBuf.Reset()
return nil
}

func (x *reedSolomonEncoder) Write(p []byte) (int, error) {
return x.pldBuf.Write(p)
}

func (x *reedSolomonEncoder) Close() (oid.ID, error) {
if x.srcHdr.PayloadSize() == 0 {
// TODO: avoid make
return x.putShards(make([][]byte, x.dataShards+x.parityShards))
}

// TODO: Explore possibility to reset and reuse encoder for next object.
// TODO: Explore reedsolomon.Option for performance improvement.
// TODO: Compare with reedsolomon.StreamEncoder.
enc, err := reedsolomon.New(x.dataShards, x.parityShards)
if err != nil { // should never happen
return oid.ID{}, fmt.Errorf("init Reed-Solomon(%d,%d) encoder: %w", x.dataShards, x.parityShards, err)
}

Check warning on line 78 in pkg/services/object/put/ec.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/ec.go#L77-L78

Added lines #L77 - L78 were not covered by tests

shards, err := enc.Split(x.pldBuf.Bytes())
if err != nil {
return oid.ID{}, fmt.Errorf("split data into Reed-Solomon(%d,%d) shards: %w", x.dataShards, x.parityShards, err)
}

Check warning on line 83 in pkg/services/object/put/ec.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/ec.go#L82-L83

Added lines #L82 - L83 were not covered by tests

if err := enc.Encode(shards); err != nil {
return oid.ID{}, fmt.Errorf("calculate parity Reed-Solomon(%d,%d) shards: %w", x.dataShards, x.parityShards, err)
}

Check warning on line 87 in pkg/services/object/put/ec.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/ec.go#L86-L87

Added lines #L86 - L87 were not covered by tests

return x.putShards(shards)
}

func (x *reedSolomonEncoder) putShards(shards [][]byte) (oid.ID, error) {
srcSig := x.srcHdr.Signature()
srcPldSHA256, withSrcPldSHA256 := x.srcHdr.PayloadChecksum()
srcPldTZ, withSrcPldTZ := x.srcHdr.PayloadHomomorphicHash()

srcIdxAttr := object.NewAttribute(reedSolomonAttrIdx, "")
srcIDAttr := object.NewAttribute(reedSolomonAttrSrcID, x.srcHdr.GetID().String())
srcPldLenAttr := object.NewAttribute(reedSolomonAttrSrcPayloadLen, strconv.FormatUint(x.srcHdr.PayloadSize(), 10))
var srcPldSHA256Attr, srcPldTZAttr, srcSigAttr object.Attribute
if withSrcPldSHA256 {
srcPldSHA256Attr = object.NewAttribute(reedSolomonAttrSrcPayloadSHA256, hex.EncodeToString(srcPldSHA256.Value()))
}
if withSrcPldTZ {
srcPldTZAttr = object.NewAttribute(reedSolomonAttrSrcPayloadTZ, hex.EncodeToString(srcPldTZ.Value()))
}
if srcSig != nil {
srcSigAttr = object.NewAttribute(reedSolomonAttrSrcSignature, base64.StdEncoding.EncodeToString(srcSig.Marshal()))
}

for i := range shards {
shardHdr := x.srcHdr

srcIdxAttr.SetValue(strconv.Itoa(i))

attrs := append(shardHdr.Attributes(),
srcIDAttr,
srcPldLenAttr,
srcIdxAttr,
)
if withSrcPldSHA256 {
attrs = append(attrs, srcPldSHA256Attr)
}
if withSrcPldTZ {
attrs = append(attrs, srcPldTZAttr)
}
if srcSig != nil {
attrs = append(attrs, srcSigAttr)
}
shardHdr.SetAttributes(attrs...)

shardHdr.SetPayloadSize(uint64(len(shards[i])))
shardHdr.SetPayloadChecksum(object.CalculatePayloadChecksum(shards[i]))
if withSrcPldTZ {
shardHdr.SetPayloadHomomorphicHash(checksum.NewTillichZemor(tz.Sum(shards[i])))
}

if err := shardHdr.SetIDWithSignature(x.signer); err != nil {
return oid.ID{}, fmt.Errorf("finalize object header for Reed-Solomon(%d,%d) shard #%d: %w", x.dataShards, x.parityShards, i, err)
}

Check warning on line 140 in pkg/services/object/put/ec.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/ec.go#L139-L140

Added lines #L139 - L140 were not covered by tests

if err := x.dst.WriteHeader(&shardHdr); err != nil {
return oid.ID{}, fmt.Errorf("write object header for Reed-Solomon(%d,%d) shard #%d: %w", x.dataShards, x.parityShards, i, err)
}

Check warning on line 144 in pkg/services/object/put/ec.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/ec.go#L143-L144

Added lines #L143 - L144 were not covered by tests
// TODO: Provide way to notice that payload buffer can be retained (but not modified) to avoid excessive copy.
// io.Writer prohibits retention. Instead, consider attaching payload to instance passed to WriteHeader().
// The caller may check that its len is PayloadSize() and not copy.
if _, err := x.dst.Write(shards[i]); err != nil {
return oid.ID{}, fmt.Errorf("write object payload for Reed-Solomon(%d,%d) shard #%d: %w", x.dataShards, x.parityShards, i, err)
}

Check warning on line 150 in pkg/services/object/put/ec.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/ec.go#L149-L150

Added lines #L149 - L150 were not covered by tests
if _, err := x.dst.Close(); err != nil {
return oid.ID{}, fmt.Errorf("save object for Reed-Solomon(%d,%d) shard #%d: %w", x.dataShards, x.parityShards, i, err)
}

Check warning on line 153 in pkg/services/object/put/ec.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/ec.go#L152-L153

Added lines #L152 - L153 were not covered by tests
}

return x.srcHdr.GetID(), nil
}
10 changes: 10 additions & 0 deletions pkg/services/object/put/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ type slicingTarget struct {
payloadWriter *slicer.PayloadWriter
}

type reedSolomonPolicy struct {
dataShards int
parityShards int
}

// returns [internal.Target] for raw root object streamed by the client
// with payload slicing and child objects' formatting. Each ready child object
// is written into destination target constructed via the given [internal.Target].
Expand All @@ -38,7 +43,12 @@ func newSlicingTarget(
sessionToken *session.Object,
curEpoch uint64,
initNextTarget internal.Target,
rsp *reedSolomonPolicy,
) internal.Target {
if rsp != nil {
initNextTarget = newReedSolomonEncoder(initNextTarget, rsp.dataShards, rsp.parityShards, signer)
}

return &slicingTarget{
ctx: ctx,
signer: signer,
Expand Down
Loading
Loading