diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index f1e9ab4c0c..472e022cc0 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -10,6 +10,7 @@ import ( "github.com/google/uuid" lru "github.com/hashicorp/golang-lru/v2" + iec "github.com/nspcc-dev/neofs-node/internal/ec" coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client" containercore "github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" @@ -804,6 +805,7 @@ type containerNodesSorter struct { func (x *containerNodesSorter) Unsorted() [][]netmapsdk.NodeInfo { return x.policy.nodeSets } func (x *containerNodesSorter) PrimaryCounts() []uint { return x.policy.repCounts } +func (x *containerNodesSorter) ECRules() []iec.Rule { return nil } func (x *containerNodesSorter) SortForObject(obj oid.ID) ([][]netmapsdk.NodeInfo, error) { cacheKey := objectNodesCacheKey{epoch: x.curEpoch} cacheKey.addr.SetContainer(x.cnrID) diff --git a/go.mod b/go.mod index e02a3ef2da..e133f6c9d3 100644 --- a/go.mod +++ b/go.mod @@ -13,10 +13,12 @@ require ( github.com/google/uuid v1.6.0 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/klauspost/compress v1.17.11 + github.com/klauspost/reedsolomon v1.12.4 github.com/mitchellh/go-homedir v1.1.0 github.com/mitchellh/mapstructure v1.5.0 github.com/mr-tron/base58 v1.2.0 github.com/multiformats/go-multiaddr v0.12.2 + github.com/mxschmitt/golang-combinations v1.2.0 github.com/nspcc-dev/hrw/v2 v2.0.3 github.com/nspcc-dev/locode-db v0.6.0 github.com/nspcc-dev/neo-go v0.111.0 @@ -62,7 +64,7 @@ require ( github.com/holiman/uint256 v1.3.2 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/ipfs/go-cid v0.4.1 // indirect - github.com/klauspost/cpuid/v2 v2.2.7 // indirect + github.com/klauspost/cpuid/v2 v2.2.11 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect github.com/minio/sha256-simd v1.0.1 // indirect diff --git a/go.sum b/go.sum index 835b945279..2b73871dec 100644 --- a/go.sum +++ b/go.sum @@ -121,8 +121,10 @@ github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNU github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= -github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= -github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU= +github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/klauspost/reedsolomon v1.12.4 h1:5aDr3ZGoJbgu/8+j45KtUJxzYm8k08JGtB9Wx1VQ4OA= +github.com/klauspost/reedsolomon v1.12.4/go.mod h1:d3CzOMOt0JXGIFZm1StgkyF14EYr3xneR2rNWo7NcMU= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -185,6 +187,8 @@ github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/n github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/mxschmitt/golang-combinations v1.2.0 h1:V5E7MncIK8Yr1SL/SpdqMuSquFsfoIs5auI7Y3n8z14= +github.com/mxschmitt/golang-combinations v1.2.0/go.mod h1:RCm5eR03B+JrBOMRDLsKZWShluXdrHu+qwhPEJ0miBM= github.com/nspcc-dev/bbolt v0.0.0-20250612101626-5df2544a4a22 h1:M5Nmg1iCnbZngzIBDIlMr9vW+okFfcSMBvBlXG8r+14= github.com/nspcc-dev/bbolt v0.0.0-20250612101626-5df2544a4a22/go.mod h1:AsD+OCi/qPN1giOX1aiLAha3o1U8rAz65bvN4j0sRuk= github.com/nspcc-dev/dbft v0.4.0 h1:4/atD4GrrMEtrYBDiZPrPzdKZ6ws7PR/cg0M4DEdVeI= @@ -358,7 +362,6 @@ golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/internal/ec/ec.go b/internal/ec/ec.go new file mode 100644 index 0000000000..3751ebd860 --- /dev/null +++ b/internal/ec/ec.go @@ -0,0 +1,80 @@ +package ec + +import ( + "fmt" + "slices" + "strconv" + + "github.com/klauspost/reedsolomon" +) + +// Erasure coding attributes. +const ( + AttributePrefix = "__NEOFS__EC_" + AttributeRuleIdx = AttributePrefix + "RULE_IDX" + AttributePartIdx = AttributePrefix + "PART_IDX" +) + +// Rule represents erasure coding rule for object payload's encoding and placement. +type Rule struct { + DataPartNum uint8 + ParityPartNum uint8 +} + +// String implements [fmt.Stringer]. +func (x Rule) String() string { + return strconv.FormatUint(uint64(x.DataPartNum), 10) + "/" + strconv.FormatUint(uint64(x.ParityPartNum), 10) +} + +// Encode encodes given data according to specified EC rule and returns coded +// parts. First [Rule.DataPartNum] elements are data parts, other +// [Rule.ParityPartNum] ones are parity blocks. +// +// All parts are the same length. If data len is not divisible by +// [Rule.DataPartNum], last data part is aligned with zeros. +// +// If data is empty, all parts are nil. +func Encode(rule Rule, data []byte) ([][]byte, error) { + if len(data) == 0 { + return make([][]byte, rule.DataPartNum+rule.ParityPartNum), nil + } + + // TODO: Explore reedsolomon.Option for performance improvement. https://github.com/nspcc-dev/neofs-node/issues/3501 + enc, err := reedsolomon.New(int(rule.DataPartNum), int(rule.ParityPartNum)) + if err != nil { // should never happen with correct rule + return nil, fmt.Errorf("init Reed-Solomon encoder: %w", err) + } + + parts, err := enc.Split(data) + if err != nil { + return nil, fmt.Errorf("split data: %w", err) + } + + if err := enc.Encode(parts); err != nil { + return nil, fmt.Errorf("calculate Reed-Solomon parity: %w", err) + } + + return parts, nil +} + +// Decode decodes source data of known len from EC parts obtained by applying +// specified rule. +func Decode(rule Rule, dataLen uint64, parts [][]byte) ([]byte, error) { + // TODO: Explore reedsolomon.Option for performance improvement. https://github.com/nspcc-dev/neofs-node/issues/3501 + dec, err := reedsolomon.New(int(rule.DataPartNum), int(rule.ParityPartNum)) + if err != nil { // should never happen with correct rule + return nil, fmt.Errorf("init Reed-Solomon decoder: %w", err) + } + + required := make([]bool, rule.DataPartNum+rule.ParityPartNum) + for i := range rule.DataPartNum { + required[i] = true + } + + if err := dec.ReconstructSome(parts, required); err != nil { + return nil, fmt.Errorf("restore Reed-Solomon: %w", err) + } + + // TODO: last part may be shorter, do not overallocate buffer. + return slices.Concat(parts[:rule.DataPartNum]...)[:dataLen], nil +} diff --git a/internal/ec/ec_test.go b/internal/ec/ec_test.go new file mode 100644 index 0000000000..bbf86e784a --- /dev/null +++ b/internal/ec/ec_test.go @@ -0,0 +1,76 @@ +package ec_test + +import ( + "testing" + + "github.com/klauspost/reedsolomon" + iec "github.com/nspcc-dev/neofs-node/internal/ec" + islices "github.com/nspcc-dev/neofs-node/internal/slices" + "github.com/nspcc-dev/neofs-node/internal/testutil" + "github.com/stretchr/testify/require" +) + +func TestRule_String(t *testing.T) { + r := iec.Rule{ + DataPartNum: 12, + ParityPartNum: 23, + } + require.Equal(t, "12/23", r.String()) +} + +func testEncode(t *testing.T, rule iec.Rule, data []byte) { + ln := uint64(len(data)) + + parts, err := iec.Encode(rule, data) + require.NoError(t, err) + + res, err := iec.Decode(rule, ln, parts) + require.NoError(t, err) + require.Equal(t, data, res) + + for lostCount := 1; lostCount <= int(rule.ParityPartNum); lostCount++ { + for _, lostIdxs := range islices.IndexCombos(len(parts), lostCount) { + res, err := iec.Decode(rule, ln, islices.NilTwoDimSliceElements(parts, lostIdxs)) + require.NoError(t, err) + require.Equal(t, data, res) + } + } + + for _, lostIdxs := range islices.IndexCombos(len(parts), int(rule.ParityPartNum)+1) { + _, err := iec.Decode(rule, ln, islices.NilTwoDimSliceElements(parts, lostIdxs)) + require.ErrorContains(t, err, "restore Reed-Solomon") + require.ErrorIs(t, err, reedsolomon.ErrTooFewShards) + } +} + +func TestEncode(t *testing.T) { + rules := []iec.Rule{ + {DataPartNum: 3, ParityPartNum: 1}, + {DataPartNum: 12, ParityPartNum: 4}, + } + + data := testutil.RandByteSlice(4 << 10) + + t.Run("empty", func(t *testing.T) { + for _, rule := range rules { + t.Run(rule.String(), func(t *testing.T) { + test := func(t *testing.T, data []byte) { + res, err := iec.Encode(rule, []byte{}) + require.NoError(t, err) + + total := int(rule.DataPartNum + rule.ParityPartNum) + require.Len(t, res, total) + require.EqualValues(t, total, islices.CountNilsInTwoDimSlice(res)) + } + test(t, nil) + test(t, []byte{}) + }) + } + }) + + for _, rule := range rules { + t.Run(rule.String(), func(t *testing.T) { + testEncode(t, rule, data) + }) + } +} diff --git a/internal/ec/object_test.go b/internal/ec/object_test.go new file mode 100644 index 0000000000..4110c5f040 --- /dev/null +++ b/internal/ec/object_test.go @@ -0,0 +1,174 @@ +package ec_test + +import ( + "crypto/sha256" + "math/rand/v2" + "testing" + + iec "github.com/nspcc-dev/neofs-node/internal/ec" + "github.com/nspcc-dev/neofs-node/internal/testutil" + "github.com/nspcc-dev/neofs-sdk-go/checksum" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + neofscryptotest "github.com/nspcc-dev/neofs-sdk-go/crypto/test" + "github.com/nspcc-dev/neofs-sdk-go/object" + sessiontest "github.com/nspcc-dev/neofs-sdk-go/session/test" + usertest "github.com/nspcc-dev/neofs-sdk-go/user/test" + "github.com/nspcc-dev/neofs-sdk-go/version" + "github.com/nspcc-dev/tzhash/tz" + "github.com/stretchr/testify/require" +) + +func TestGetPartInfo(t *testing.T) { + var obj object.Object + otherAttr := object.NewAttribute("any_attribute", "val") + + obj.SetAttributes(otherAttr) + + t.Run("missing", func(t *testing.T) { + pi, err := iec.GetPartInfo(obj) + require.NoError(t, err) + require.EqualValues(t, -1, pi.RuleIndex) + }) + + t.Run("failure", func(t *testing.T) { + for _, tc := range []struct { + name string + attrs map[string]string + assertErr func(t *testing.T, err error) + }{ + {name: "non-int rule index", + attrs: map[string]string{"__NEOFS__EC_RULE_IDX": "not_an_int", "__NEOFS__EC_PART_IDX": "456"}, + assertErr: func(t *testing.T, err error) { + require.ErrorContains(t, err, "invalid index attribute __NEOFS__EC_RULE_IDX: ") + require.ErrorContains(t, err, "invalid syntax") + }, + }, + {name: "negative rule index", + attrs: map[string]string{"__NEOFS__EC_RULE_IDX": "-123", "__NEOFS__EC_PART_IDX": "456"}, + assertErr: func(t *testing.T, err error) { + require.EqualError(t, err, "invalid index attribute __NEOFS__EC_RULE_IDX: negative value -123") + }, + }, + {name: "non-int part index", + attrs: map[string]string{"__NEOFS__EC_RULE_IDX": "123", "__NEOFS__EC_PART_IDX": "not_an_int"}, + assertErr: func(t *testing.T, err error) { + require.ErrorContains(t, err, "invalid index attribute __NEOFS__EC_PART_IDX: ") + require.ErrorContains(t, err, "invalid syntax") + }, + }, + {name: "negative part index", + attrs: map[string]string{"__NEOFS__EC_RULE_IDX": "123", "__NEOFS__EC_PART_IDX": "-456"}, + assertErr: func(t *testing.T, err error) { + require.EqualError(t, err, "invalid index attribute __NEOFS__EC_PART_IDX: negative value -456") + }, + }, + {name: "rule index without part index", + attrs: map[string]string{"__NEOFS__EC_RULE_IDX": "123"}, + assertErr: func(t *testing.T, err error) { + require.EqualError(t, err, "__NEOFS__EC_RULE_IDX attribute is set while __NEOFS__EC_PART_IDX is not") + }, + }, + {name: "part index without rule index", + attrs: map[string]string{"__NEOFS__EC_PART_IDX": "456"}, + assertErr: func(t *testing.T, err error) { + require.EqualError(t, err, "__NEOFS__EC_PART_IDX attribute is set while __NEOFS__EC_RULE_IDX is not") + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + attrs := make([]object.Attribute, 0, len(tc.attrs)/2) + for k, v := range tc.attrs { + attrs = append(attrs, object.NewAttribute(k, v)) + } + + obj.SetAttributes(append([]object.Attribute{otherAttr}, attrs...)...) + + _, err := iec.GetPartInfo(obj) + tc.assertErr(t, err) + }) + } + }) + + obj.SetAttributes( + otherAttr, + object.NewAttribute("__NEOFS__EC_RULE_IDX", "123"), + object.NewAttribute("__NEOFS__EC_PART_IDX", "456"), + ) + + pi, err := iec.GetPartInfo(obj) + require.NoError(t, err) + require.Equal(t, iec.PartInfo{RuleIndex: 123, Index: 456}, pi) +} + +func TestFormObjectForECPart(t *testing.T) { + ver := version.Current() + st := sessiontest.Object() + signer := neofscryptotest.Signer() + + var parent object.Object + parent.SetVersion(&ver) + parent.SetContainerID(cidtest.ID()) + parent.SetOwner(usertest.ID()) + parent.SetCreationEpoch(rand.Uint64()) + parent.SetType(object.Type(rand.Int32())) + parent.SetSessionToken(&st) + require.NoError(t, parent.SetVerificationFields(signer)) + + partInfo := iec.PartInfo{RuleIndex: 123, Index: 456} + part := testutil.RandByteSlice(32) + + t.Run("signer failure", func(t *testing.T) { + signer := neofscryptotest.FailSigner(signer) + _, sigErr := signer.Sign(nil) + require.Error(t, sigErr) + + _, err := iec.FormObjectForECPart(signer, parent, part, partInfo) + require.ErrorContains(t, err, "set verification fields: could not set signature:") + require.ErrorContains(t, err, sigErr.Error()) + }) + + obj, err := iec.FormObjectForECPart(signer, parent, part, partInfo) + require.NoError(t, err) + + require.NoError(t, obj.VerifyID()) + require.True(t, obj.VerifySignature()) + + require.True(t, obj.HasParent()) + require.NotNil(t, obj.Parent()) + require.Equal(t, parent, *obj.Parent()) + + require.Equal(t, part, obj.Payload()) + require.EqualValues(t, len(part), obj.PayloadSize()) + + pcs, ok := obj.PayloadChecksum() + require.True(t, ok) + require.Equal(t, checksum.NewSHA256(sha256.Sum256(part)), pcs) + + require.Equal(t, parent.Version(), obj.Version()) + require.Equal(t, parent.GetContainerID(), obj.GetContainerID()) + require.Equal(t, parent.Owner(), obj.Owner()) + require.Equal(t, parent.CreationEpoch(), obj.CreationEpoch()) + require.Equal(t, object.TypeRegular, obj.Type()) + require.Equal(t, parent.SessionToken(), obj.SessionToken()) + + _, ok = obj.PayloadHomomorphicHash() + require.False(t, ok) + + require.Len(t, obj.Attributes(), 2) + + pi, err := iec.GetPartInfo(obj) + require.NoError(t, err) + require.Equal(t, partInfo, pi) + + t.Run("with homomorphic hash", func(t *testing.T) { + anyHash := checksum.NewTillichZemor([tz.Size]byte{1, 2, 3}) + parent.SetPayloadHomomorphicHash(anyHash) + + obj, err := iec.FormObjectForECPart(signer, parent, part, partInfo) + require.NoError(t, err) + + phh, ok := obj.PayloadHomomorphicHash() + require.True(t, ok) + require.Equal(t, checksum.NewTillichZemor(tz.Sum(part)), phh) + }) +} diff --git a/internal/ec/objects.go b/internal/ec/objects.go new file mode 100644 index 0000000000..4dd4693fd4 --- /dev/null +++ b/internal/ec/objects.go @@ -0,0 +1,75 @@ +package ec + +import ( + "fmt" + + iobject "github.com/nspcc-dev/neofs-node/internal/object" + "github.com/nspcc-dev/neofs-sdk-go/checksum" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + "github.com/nspcc-dev/neofs-sdk-go/object" + "github.com/nspcc-dev/tzhash/tz" +) + +// PartInfo groups information about single EC part produced according to some [Rule]. +type PartInfo struct { + // Index of EC rule in the container storage policy. + RuleIndex int + // Part index. + Index int +} + +// GetPartInfo fetches EC part info from given object header. It one of +// [AttributeRuleIdx] or [AttributeRuleIdx] attributes is set, the other must be +// set too. If both are missing, GetPartInfo returns [PartInfo.RuleIndex] = -1 +// without error. +func GetPartInfo(obj object.Object) (PartInfo, error) { + ruleIdx, err := iobject.GetIndexAttribute(obj, AttributeRuleIdx) + if err != nil { + return PartInfo{}, fmt.Errorf("invalid index attribute %s: %w", AttributeRuleIdx, err) + } + + partIdx, err := iobject.GetIndexAttribute(obj, AttributePartIdx) + if err != nil { + return PartInfo{}, fmt.Errorf("invalid index attribute %s: %w", AttributePartIdx, err) + } + + if ruleIdx < 0 { + if partIdx >= 0 { + return PartInfo{}, fmt.Errorf("%s attribute is set while %s is not", AttributePartIdx, AttributeRuleIdx) + } + } else if partIdx < 0 { + return PartInfo{}, fmt.Errorf("%s attribute is set while %s is not", AttributeRuleIdx, AttributePartIdx) + } + + return PartInfo{ + RuleIndex: ruleIdx, + Index: partIdx, + }, nil +} + +// FormObjectForECPart forms object for EC part produced from given parent object. +func FormObjectForECPart(signer neofscrypto.Signer, parent object.Object, part []byte, partInfo PartInfo) (object.Object, error) { + var obj object.Object + obj.SetVersion(parent.Version()) + obj.SetContainerID(parent.GetContainerID()) + obj.SetOwner(parent.Owner()) + obj.SetCreationEpoch(parent.CreationEpoch()) + obj.SetType(object.TypeRegular) + obj.SetSessionToken(parent.SessionToken()) + + obj.SetParent(&parent) + iobject.SetIntAttribute(&obj, AttributeRuleIdx, partInfo.RuleIndex) + iobject.SetIntAttribute(&obj, AttributePartIdx, partInfo.Index) + + obj.SetPayload(part) + obj.SetPayloadSize(uint64(len(part))) + if _, ok := parent.PayloadHomomorphicHash(); ok { + obj.SetPayloadHomomorphicHash(checksum.NewTillichZemor(tz.Sum(part))) + } + + if err := obj.SetVerificationFields(signer); err != nil { + return object.Object{}, fmt.Errorf("set verification fields: %w", err) + } + + return obj, nil +} diff --git a/internal/ec/policy.go b/internal/ec/policy.go new file mode 100644 index 0000000000..097e68f25b --- /dev/null +++ b/internal/ec/policy.go @@ -0,0 +1,18 @@ +package ec + +import "iter" + +// NodeSequenceForPart returns sorted sequence of node indexes to store object +// for EC part with index = partIdx produced by applying the rule such that +// [Rule.DataPartNum] + [Rule.ParityPartNum] = totalParts. +func NodeSequenceForPart(partIdx, totalParts, nodes int) iter.Seq[int] { + return func(yield func(int) bool) { + for shift := 0; shift <= totalParts-1; shift++ { + for i := (partIdx + shift) % totalParts; i < nodes; i += totalParts { + if !yield(i) { + return + } + } + } + } +} diff --git a/internal/ec/policy_test.go b/internal/ec/policy_test.go new file mode 100644 index 0000000000..205ff8e67d --- /dev/null +++ b/internal/ec/policy_test.go @@ -0,0 +1,41 @@ +package ec_test + +import ( + "fmt" + "slices" + "testing" + + iec "github.com/nspcc-dev/neofs-node/internal/ec" + "github.com/stretchr/testify/require" +) + +func TestNodeSequenceForPartReading(t *testing.T) { + for _, tc := range []struct { + partIdx int + totalParts int + nodes int + exp []int + }{ + {partIdx: 0, totalParts: 1, nodes: 1, exp: []int{0}}, + {partIdx: 0, totalParts: 5, nodes: 5, exp: []int{0, 1, 2, 3, 4}}, + {partIdx: 2, totalParts: 5, nodes: 5, exp: []int{2, 3, 4, 0, 1}}, + {partIdx: 4, totalParts: 5, nodes: 5, exp: []int{4, 0, 1, 2, 3}}, + {partIdx: 0, totalParts: 5, nodes: 10, exp: []int{0, 5, 1, 6, 2, 7, 3, 8, 4, 9}}, + {partIdx: 2, totalParts: 5, nodes: 10, exp: []int{2, 7, 3, 8, 4, 9, 0, 5, 1, 6}}, + {partIdx: 0, totalParts: 5, nodes: 15, exp: []int{0, 5, 10, 1, 6, 11, 2, 7, 12, 3, 8, 13, 4, 9, 14}}, + {partIdx: 2, totalParts: 5, nodes: 15, exp: []int{2, 7, 12, 3, 8, 13, 4, 9, 14, 0, 5, 10, 1, 6, 11}}, + {partIdx: 4, totalParts: 5, nodes: 15, exp: []int{4, 9, 14, 0, 5, 10, 1, 6, 11, 2, 7, 12, 3, 8, 13}}, + } { + t.Run(fmt.Sprintf("part=%d,parts=%d,nodes=%d", tc.partIdx, tc.totalParts, tc.nodes), func(t *testing.T) { + require.Equal(t, tc.exp, slices.Collect(iec.NodeSequenceForPart(tc.partIdx, tc.totalParts, tc.nodes))) + + var collected []int + for i := range iec.NodeSequenceForPart(tc.partIdx, tc.totalParts, tc.nodes) { + collected = append(collected, i) + break + } + require.Len(t, collected, 1) + require.Equal(t, collected[0], tc.exp[0]) + }) + } +} diff --git a/internal/object/attributes.go b/internal/object/attributes.go new file mode 100644 index 0000000000..8e6109166f --- /dev/null +++ b/internal/object/attributes.go @@ -0,0 +1,80 @@ +package object + +import ( + "errors" + "fmt" + "strconv" + + "github.com/nspcc-dev/neofs-sdk-go/object" +) + +// ErrAttributeNotFound is returned when some object attribute not found. +var ErrAttributeNotFound = errors.New("attribute not found") + +// GetIndexAttribute looks up for specified index attribute in the given object +// header. Returns -1 if the attribute is missing. +// +// GetIndexAttribute ignores all attribute values except the first. +// +// Note that if attribute exists but negative, GetIndexAttribute returns error. +func GetIndexAttribute(hdr object.Object, attr string) (int, error) { + i, err := GetIntAttribute(hdr, attr) + if err != nil { + if errors.Is(err, ErrAttributeNotFound) { + return -1, nil + } + return 0, err + } + + if i < 0 { + return 0, fmt.Errorf("negative value %d", i) + } + + return i, nil +} + +// GetIntAttribute looks up for specified int attribute in the given object +// header. Returns [ErrAttributeNotFound] if the attribute is missing. +// +// GetIntAttribute ignores all attribute values except the first. +func GetIntAttribute(hdr object.Object, attr string) (int, error) { + if s := GetAttribute(hdr, attr); s != "" { + return strconv.Atoi(s) + } + return 0, ErrAttributeNotFound +} + +// GetAttribute looks up for specified attribute in the given object header. +// Returns empty string if the attribute is missing. +// +// GetIntAttribute ignores all attribute values except the first. +func GetAttribute(hdr object.Object, attr string) string { + attrs := hdr.Attributes() + for i := range attrs { + if attrs[i].Key() == attr { + return attrs[i].Value() + } + } + return "" +} + +// SetIntAttribute sets int value for the object attribute. If the attribute +// already exists, SetIntAttribute overwrites its value. +func SetIntAttribute(dst *object.Object, attr string, val int) { + SetAttribute(dst, attr, strconv.Itoa(val)) +} + +// SetAttribute sets value for the object attribute. If the attribute already +// exists, SetAttribute overwrites its value. +func SetAttribute(dst *object.Object, attr, val string) { + attrs := dst.Attributes() + for i := range attrs { + if attrs[i].Key() == attr { + attrs[i].SetValue(val) + dst.SetAttributes(attrs...) + return + } + } + + dst.SetAttributes(append(attrs, object.NewAttribute(attr, val))...) +} diff --git a/internal/object/attributes_test.go b/internal/object/attributes_test.go new file mode 100644 index 0000000000..88c6412dce --- /dev/null +++ b/internal/object/attributes_test.go @@ -0,0 +1,174 @@ +package object_test + +import ( + "strconv" + "testing" + + iobject "github.com/nspcc-dev/neofs-node/internal/object" + "github.com/nspcc-dev/neofs-sdk-go/object" + "github.com/stretchr/testify/require" +) + +func TestGetIndexAttribute(t *testing.T) { + var obj object.Object + const attr = "attr" + + t.Run("missing", func(t *testing.T) { + i, err := iobject.GetIndexAttribute(obj, attr) + require.NoError(t, err) + require.EqualValues(t, -1, i) + }) + + t.Run("not an integer", func(t *testing.T) { + obj.SetAttributes(object.NewAttribute(attr, "not_an_int")) + + _, err := iobject.GetIndexAttribute(obj, attr) + require.ErrorContains(t, err, "invalid syntax") + }) + + t.Run("negative", func(t *testing.T) { + obj.SetAttributes(object.NewAttribute(attr, "-123")) + + _, err := iobject.GetIndexAttribute(obj, attr) + require.EqualError(t, err, "negative value -123") + }) + + obj.SetAttributes(object.NewAttribute(attr, "1234567890")) + + i, err := iobject.GetIndexAttribute(obj, attr) + require.NoError(t, err) + require.EqualValues(t, 1234567890, i) + + t.Run("multiple", func(t *testing.T) { + for _, s := range []string{ + "not_an_int", + "-1", + "2", + } { + obj.SetAttributes( + object.NewAttribute(attr, "1"), + object.NewAttribute(attr, s), + ) + + i, err := iobject.GetIndexAttribute(obj, attr) + require.NoError(t, err) + require.EqualValues(t, 1, i) + } + }) +} + +func TestGetIntAttribute(t *testing.T) { + var obj object.Object + const attr = "attr" + + t.Run("missing", func(t *testing.T) { + _, err := iobject.GetIntAttribute(obj, attr) + require.ErrorIs(t, err, iobject.ErrAttributeNotFound) + }) + + t.Run("not an integer", func(t *testing.T) { + obj.SetAttributes(object.NewAttribute(attr, "not_an_int")) + + _, err := iobject.GetIntAttribute(obj, attr) + require.ErrorContains(t, err, "invalid syntax") + }) + + for _, tc := range []struct { + s string + i int + }{ + {s: "1234567890", i: 1234567890}, + {s: "0", i: 0}, + {s: "-1234567890", i: -1234567890}, + } { + obj.SetAttributes(object.NewAttribute(attr, tc.s)) + + i, err := iobject.GetIntAttribute(obj, attr) + require.NoError(t, err, tc.s) + require.EqualValues(t, tc.i, i) + } + + t.Run("multiple", func(t *testing.T) { + for _, s := range []string{ + "not_an_int", + "-1", + "2", + } { + obj.SetAttributes( + object.NewAttribute(attr, "1"), + object.NewAttribute(attr, s), + ) + + i, err := iobject.GetIntAttribute(obj, attr) + require.NoError(t, err) + require.EqualValues(t, 1, i) + } + }) +} + +func TestGetAttribute(t *testing.T) { + var obj object.Object + const attr = "attr" + + t.Run("missing", func(t *testing.T) { + require.Empty(t, iobject.GetAttribute(obj, attr)) + }) + + obj.SetAttributes(object.NewAttribute(attr, "val")) + require.Equal(t, "val", iobject.GetAttribute(obj, attr)) + + t.Run("multiple", func(t *testing.T) { + obj.SetAttributes( + object.NewAttribute(attr, "val1"), + object.NewAttribute(attr, "val2"), + ) + + require.Equal(t, "val1", iobject.GetAttribute(obj, attr)) + }) +} + +func TestSetIntAttribute(t *testing.T) { + var obj object.Object + const attr = "attr" + + obj.SetAttributes(object.NewAttribute(attr+"_other", "val")) + + check := func(t *testing.T, val int) { + iobject.SetIntAttribute(&obj, attr, val) + + attrs := obj.Attributes() + require.Len(t, attrs, 2) + require.Equal(t, attr, attrs[1].Key()) + require.Equal(t, strconv.Itoa(val), attrs[1].Value()) + + got, err := iobject.GetIntAttribute(obj, attr) + require.NoError(t, err, val) + require.EqualValues(t, val, got) + } + + check(t, 1234567890) + check(t, 0) + check(t, -1234567890) +} + +func TestSetAttribute(t *testing.T) { + var obj object.Object + const attr = "attr" + + obj.SetAttributes(object.NewAttribute(attr+"_other", "val")) + + check := func(t *testing.T, val string) { + iobject.SetAttribute(&obj, attr, val) + + attrs := obj.Attributes() + require.Len(t, attrs, 2) + require.Equal(t, attr, attrs[1].Key()) + require.Equal(t, val, attrs[1].Value()) + + got := iobject.GetAttribute(obj, attr) + require.Equal(t, val, got) + } + + check(t, "val1") + check(t, "val2") +} diff --git a/internal/slices/index.go b/internal/slices/index.go new file mode 100644 index 0000000000..69a0107e07 --- /dev/null +++ b/internal/slices/index.go @@ -0,0 +1,17 @@ +package slices + +import combinations "github.com/mxschmitt/golang-combinations" + +// IndexCombos returns all combinations of n indexes taken k. +func IndexCombos(n, k int) [][]int { + return combinations.Combinations(Indexes(n), k) +} + +// Indexes returns slices filled with n indexes. +func Indexes(n int) []int { + s := make([]int, n) + for i := range s { + s[i] = i + } + return s +} diff --git a/internal/slices/index_test.go b/internal/slices/index_test.go new file mode 100644 index 0000000000..404bf751bb --- /dev/null +++ b/internal/slices/index_test.go @@ -0,0 +1,24 @@ +package slices_test + +import ( + "testing" + + islices "github.com/nspcc-dev/neofs-node/internal/slices" + "github.com/stretchr/testify/require" +) + +func TestIndexes(t *testing.T) { + require.Empty(t, islices.Indexes(0)) + require.Equal(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, islices.Indexes(10)) +} + +func TestIndexCombos(t *testing.T) { + require.ElementsMatch(t, islices.IndexCombos(4, 2), [][]int{ + {0, 1}, + {0, 2}, + {0, 3}, + {1, 2}, + {1, 3}, + {2, 3}, + }) +} diff --git a/internal/slices/slices.go b/internal/slices/slices.go new file mode 100644 index 0000000000..1e81e9e22e --- /dev/null +++ b/internal/slices/slices.go @@ -0,0 +1,38 @@ +package slices + +import "slices" + +// TwoDimSliceElementCount returns sum len for ss. +func TwoDimSliceElementCount[E any](s [][]E) int { + var n int + for i := range s { + n += len(s[i]) + } + return n +} + +// NilTwoDimSliceElements returns clone of ss with nil-ed given indexes. +func NilTwoDimSliceElements[T any](s [][]T, idxs []int) [][]T { + if s == nil { + return nil + } + + c := make([][]T, len(s)) + for i := range c { + if !slices.Contains(idxs, i) { + c[i] = slices.Clone(s[i]) + } + } + return c +} + +// CountNilsInTwoDimSlice counts nil elements of s. +func CountNilsInTwoDimSlice[T any](s [][]T) int { + var n int + for i := range s { + if s[i] == nil { + n++ + } + } + return n +} diff --git a/internal/slices/slices_test.go b/internal/slices/slices_test.go new file mode 100644 index 0000000000..0cc125218b --- /dev/null +++ b/internal/slices/slices_test.go @@ -0,0 +1,40 @@ +package slices_test + +import ( + "testing" + + islices "github.com/nspcc-dev/neofs-node/internal/slices" + "github.com/stretchr/testify/require" +) + +func TestTwoDimElementCount(t *testing.T) { + require.Zero(t, islices.TwoDimSliceElementCount([][]int(nil))) + require.Zero(t, islices.TwoDimSliceElementCount(make([][]int, 10))) + require.EqualValues(t, 10, islices.TwoDimSliceElementCount([][]int{ + {1}, + {2, 3}, + {4, 5, 6}, + {7, 8, 9, 10}, + })) +} + +func TestNilTwoDimSliceElements(t *testing.T) { + require.Nil(t, islices.NilTwoDimSliceElements([][]int(nil), []int{1, 2, 3})) + require.Empty(t, islices.NilTwoDimSliceElements([][]int{}, []int{1, 2, 3})) + + excl := []int{1, 3} + res := islices.NilTwoDimSliceElements([][]int{ + {1}, + {2, 3}, + {4, 5, 6}, + {7, 8, 9, 10}, + }, excl) + + require.Equal(t, [][]int{ + {1}, + nil, + {4, 5, 6}, + nil, + }, res) + require.EqualValues(t, len(excl), islices.CountNilsInTwoDimSlice(res)) +} diff --git a/pkg/services/object/put/distibuted_test.go b/pkg/services/object/put/distibuted_test.go index 679c0c4d3e..b731eafd3d 100644 --- a/pkg/services/object/put/distibuted_test.go +++ b/pkg/services/object/put/distibuted_test.go @@ -11,7 +11,6 @@ import ( cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/netmap" - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -32,26 +31,6 @@ func allocNodes(n []uint) [][]netmap.NodeInfo { return res } -type testContainerNodes struct { - objID oid.ID - - sortErr error - cnrNodes [][]netmap.NodeInfo - - primCounts []uint -} - -func (x testContainerNodes) Unsorted() [][]netmap.NodeInfo { return x.cnrNodes } - -func (x testContainerNodes) SortForObject(obj oid.ID) ([][]netmap.NodeInfo, error) { - if x.objID != obj { - return nil, errors.New("[test] unexpected object ID") - } - return x.cnrNodes, x.sortErr -} - -func (x testContainerNodes) PrimaryCounts() []uint { return x.primCounts } - type testNetwork struct { localPubKey []byte } @@ -95,15 +74,10 @@ func TestIterateNodesForObject(t *testing.T) { localPubKey: cnrNodes[0][2].PublicKey(), }, remotePool: &rwp, - containerNodes: testContainerNodes{ - objID: objID, - cnrNodes: cnrNodes, - primCounts: []uint{2, 3, 2}, - }, } var handlerMtx sync.Mutex var handlerCalls []nodeDesc - err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + err := iter.iterateNodesForObject(objID, []uint{2, 3, 2}, cnrNodes, false, func(node nodeDesc) error { handlerMtx.Lock() handlerCalls = append(handlerCalls, node) handlerMtx.Unlock() @@ -183,19 +157,14 @@ func TestIterateNodesForObject(t *testing.T) { cnrNodes := allocNodes([]uint{3, 3, 2}) cnrNodes[1][1].SetPublicKey(cnrNodes[0][1].PublicKey()) iter := placementIterator{ - log: zap.NewNop(), - neoFSNet: new(testNetwork), - remotePool: new(testWorkerPool), - containerNodes: testContainerNodes{ - objID: objID, - cnrNodes: cnrNodes, - primCounts: []uint{2, 1, 2}, - }, + log: zap.NewNop(), + neoFSNet: new(testNetwork), + remotePool: new(testWorkerPool), linearReplNum: 4, } var handlerMtx sync.Mutex var handlerCalls [][]byte - err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + err := iter.iterateNodesForObject(objID, []uint{2, 1, 2}, cnrNodes, false, func(node nodeDesc) error { handlerMtx.Lock() handlerCalls = append(handlerCalls, node.info.PublicKey()) handlerMtx.Unlock() @@ -227,16 +196,10 @@ func TestIterateNodesForObject(t *testing.T) { log: zap.NewNop(), neoFSNet: new(testNetwork), remotePool: new(testWorkerPool), - containerNodes: testContainerNodes{ - objID: objID, - cnrNodes: cnrNodes, - primCounts: []uint{1, 1, 1}, - }, - broadcast: true, } var handlerMtx sync.Mutex var handlerCalls [][]byte - err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + err := iter.iterateNodesForObject(objID, []uint{1, 1, 1}, cnrNodes, true, func(node nodeDesc) error { handlerMtx.Lock() handlerCalls = append(handlerCalls, node.info.PublicKey()) handlerMtx.Unlock() @@ -260,21 +223,6 @@ func TestIterateNodesForObject(t *testing.T) { }, key) } }) - t.Run("sort nodes for object failure", func(t *testing.T) { - objID := oidtest.ID() - iter := placementIterator{ - log: zap.NewNop(), - containerNodes: testContainerNodes{ - objID: objID, - sortErr: errors.New("any sort error"), - }, - } - err := iter.iterateNodesForObject(objID, func(nodeDesc) error { - t.Fatal("must not be called") - return nil - }) - require.EqualError(t, err, "sort container nodes for the object: any sort error") - }) t.Run("worker pool failure", func(t *testing.T) { // nodes: [A B] [C D E] [F] // policy: [2 2 1] @@ -290,15 +238,10 @@ func TestIterateNodesForObject(t *testing.T) { log: zap.NewNop(), neoFSNet: new(testNetwork), remotePool: &wp, - containerNodes: testContainerNodes{ - objID: objID, - cnrNodes: cnrNodes, - primCounts: []uint{2, 2, 1}, - }, } var handlerMtx sync.Mutex var handlerCalls [][]byte - err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + err := iter.iterateNodesForObject(objID, []uint{2, 2, 1}, cnrNodes, false, func(node nodeDesc) error { handlerMtx.Lock() handlerCalls = append(handlerCalls, node.info.PublicKey()) handlerMtx.Unlock() @@ -334,15 +277,10 @@ func TestIterateNodesForObject(t *testing.T) { log: zap.NewNop(), neoFSNet: new(testNetwork), remotePool: &wp, - containerNodes: testContainerNodes{ - objID: objID, - cnrNodes: cnrNodes, - primCounts: []uint{2, 4, 1}, - }, } var handlerMtx sync.Mutex var handlerCalls [][]byte - err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + err := iter.iterateNodesForObject(objID, []uint{2, 4, 1}, cnrNodes, false, func(node nodeDesc) error { handlerMtx.Lock() handlerCalls = append(handlerCalls, node.info.PublicKey()) handlerMtx.Unlock() @@ -372,15 +310,10 @@ func TestIterateNodesForObject(t *testing.T) { log: zap.NewNop(), neoFSNet: new(testNetwork), remotePool: &wp, - containerNodes: testContainerNodes{ - objID: objID, - cnrNodes: cnrNodes, - primCounts: []uint{2, 2, 1}, - }, } var handlerMtx sync.Mutex var handlerCalls [][]byte - err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + err := iter.iterateNodesForObject(objID, []uint{2, 2, 1}, cnrNodes, false, func(node nodeDesc) error { handlerMtx.Lock() handlerCalls = append(handlerCalls, node.info.PublicKey()) handlerMtx.Unlock() @@ -413,15 +346,10 @@ func TestIterateNodesForObject(t *testing.T) { log: zap.NewNop(), neoFSNet: new(testNetwork), remotePool: &wp, - containerNodes: testContainerNodes{ - objID: objID, - cnrNodes: cnrNodes, - primCounts: []uint{2, 2, 1}, - }, } var handlerMtx sync.Mutex var handlerCalls [][]byte - err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + err := iter.iterateNodesForObject(objID, []uint{2, 2, 1}, cnrNodes, false, func(node nodeDesc) error { handlerMtx.Lock() handlerCalls = append(handlerCalls, node.info.PublicKey()) handlerMtx.Unlock() @@ -452,16 +380,11 @@ func TestIterateNodesForObject(t *testing.T) { err: errors.New("pool err"), nFail: 2, }, - containerNodes: testContainerNodes{ - objID: objID, - cnrNodes: cnrNodes, - primCounts: []uint{2, 3, 1}, - }, } blockCh := make(chan struct{}) returnCh := make(chan struct{}) go func() { - err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + err := iter.iterateNodesForObject(objID, []uint{2, 3, 1}, cnrNodes, false, func(node nodeDesc) error { <-blockCh return nil }) diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index be377f46ae..f14c4e28cf 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -11,6 +11,7 @@ import ( "sync" "sync/atomic" + iec "github.com/nspcc-dev/neofs-node/internal/ec" "github.com/nspcc-dev/neofs-node/pkg/core/client" netmapcore "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/core/object" @@ -32,7 +33,6 @@ type distributedTarget struct { placementIterator placementIterator obj *objectSDK.Object - objMeta object.ContentMeta networkMagicNumber uint32 fsState netmapcore.StateDetailed @@ -45,8 +45,10 @@ type distributedTarget struct { objSharedMeta []byte collectedSignatures [][]byte + containerNodes ContainerNodes localNodeInContainer bool localNodeSigner neofscrypto.Signer + sessionSigner neofscrypto.Signer // - object if localOnly // - replicate request if localNodeInContainer // - payload otherwise @@ -63,6 +65,10 @@ type distributedTarget struct { keyStorage *svcutil.KeyStorage localOnly bool + + // When object from request is an EC part, ecPart.RuleIndex is >= 0. + // Undefined when policy has no EC rules. + ecPart iec.PartInfo } type nodeDesc struct { @@ -126,49 +132,99 @@ func (t *distributedTarget) Close() (oid.ID, error) { defer func() { putPayload(t.encodedObject.b) t.encodedObject.b = nil - t.collectedSignatures = nil }() t.obj.SetPayload(t.encodedObject.b[t.encodedObject.pldOff:]) - tombOrLink := t.obj.Type() == objectSDK.TypeLink || t.obj.Type() == objectSDK.TypeTombstone - - if !t.placementIterator.broadcast && len(t.obj.Children()) > 0 || tombOrLink { - // enabling extra broadcast for linking and tomb objects - t.placementIterator.broadcast = true - } + typ := t.obj.Type() + tombOrLink := typ == objectSDK.TypeLink || typ == objectSDK.TypeTombstone // v2 split link object and tombstone validations are expensive routines // and are useless if the node does not belong to the container, since // another node is responsible for the validation and may decline it, // does not matter what this node thinks about it + var objMeta object.ContentMeta if !tombOrLink || t.localNodeInContainer { var err error - if t.objMeta, err = t.fmt.ValidateContent(t.obj); err != nil { + if objMeta, err = t.fmt.ValidateContent(t.obj); err != nil { return oid.ID{}, fmt.Errorf("(%T) could not validate payload content: %w", t, err) } } + err := t.saveObject(*t.obj, objMeta, t.encodedObject) + if err != nil { + return oid.ID{}, err + } + + return t.obj.GetID(), nil +} + +func (t *distributedTarget) saveObject(obj objectSDK.Object, objMeta object.ContentMeta, encObj encodedObject) error { + if t.localOnly && t.sessionSigner == nil { + return t.distributeObject(obj, objMeta, encObj, nil) + } + + objNodeLists, err := t.containerNodes.SortForObject(t.obj.GetID()) + if err != nil { + return fmt.Errorf("sort container nodes by object ID: %w", err) + } + + // TODO: handle rules in parallel. https://github.com/nspcc-dev/neofs-node/issues/3503 + + repRules := t.containerNodes.PrimaryCounts() + if len(repRules) > 0 { + typ := obj.Type() + broadcast := typ == objectSDK.TypeTombstone || typ == objectSDK.TypeLink || (!t.localOnly && typ == objectSDK.TypeLock) || len(obj.Children()) > 0 + return t.distributeObject(obj, objMeta, encObj, func(obj objectSDK.Object, objMeta object.ContentMeta, encObj encodedObject) error { + return t.placementIterator.iterateNodesForObject(obj.GetID(), repRules, objNodeLists, broadcast, func(node nodeDesc) error { + return t.sendObject(obj, objMeta, encObj, node) + }) + }) + } + + if ecRules := t.containerNodes.ECRules(); len(ecRules) > 0 { + if t.ecPart.RuleIndex >= 0 { // already encoded EC part + total := int(ecRules[t.ecPart.RuleIndex].DataPartNum + ecRules[t.ecPart.RuleIndex].ParityPartNum) + nodes := objNodeLists[len(repRules)+t.ecPart.RuleIndex] + return t.saveECPart(obj, objMeta, encObj, t.ecPart.Index, total, nodes) + } + + if t.sessionSigner != nil { + if err := t.ecAndSaveObject(t.sessionSigner, obj, ecRules, objNodeLists[len(repRules):]); err != nil { + return err + } + } + } + + return nil +} + +func (t *distributedTarget) distributeObject(obj objectSDK.Object, objMeta object.ContentMeta, encObj encodedObject, + placementFn func(obj objectSDK.Object, objMeta object.ContentMeta, encObj encodedObject) error) error { + defer func() { + t.collectedSignatures = nil + }() + if t.localNodeInContainer && t.metainfoConsistencyAttr != "" { - t.objSharedMeta = t.encodeCurrentObjectMetadata() + t.objSharedMeta = t.encodeObjectMetadata(obj) } - id := t.obj.GetID() + id := obj.GetID() var err error if t.localOnly { - var l = t.placementIterator.log.With(zap.Stringer("oid", t.obj.GetID())) + var l = t.placementIterator.log.With(zap.Stringer("oid", id)) - err = t.writeObjectLocally() + err = t.writeObjectLocally(obj, objMeta, encObj) if err != nil { err = fmt.Errorf("write object locally: %w", err) svcutil.LogServiceError(l, "PUT", nil, err) err = errIncompletePut{singleErr: fmt.Errorf("%w (last node error: %w)", errNotEnoughNodes{required: 1}, err)} } } else { - err = t.placementIterator.iterateNodesForObject(id, t.sendObject) + err = placementFn(obj, objMeta, encObj) } if err != nil { - return oid.ID{}, err + return err } if t.localNodeInContainer && t.metainfoConsistencyAttr != "" { @@ -176,7 +232,7 @@ func (t *distributedTarget) Close() (oid.ID, error) { defer t.metaMtx.RUnlock() if len(t.collectedSignatures) == 0 { - return oid.ID{}, fmt.Errorf("skip metadata chain submit for %s object: no signatures were collected", id) + return fmt.Errorf("skip metadata chain submit for %s object: no signatures were collected", id) } var await bool @@ -187,10 +243,10 @@ func (t *distributedTarget) Close() (oid.ID, error) { case "optimistic": await = false default: - return id, nil + return nil } - addr := object.AddressOf(t.obj) + addr := object.AddressOf(&obj) var objAccepted chan struct{} if await { objAccepted = make(chan struct{}, 1) @@ -202,14 +258,14 @@ func (t *distributedTarget) Close() (oid.ID, error) { if await { t.metaSvc.UnsubscribeFromObject(addr) } - return oid.ID{}, fmt.Errorf("failed to submit %s object meta information: %w", addr, err) + return fmt.Errorf("failed to submit %s object meta information: %w", addr, err) } if await { select { case <-t.opCtx.Done(): t.metaSvc.UnsubscribeFromObject(addr) - return oid.ID{}, fmt.Errorf("interrupted awaiting for %s object meta information: %w", addr, t.opCtx.Err()) + return fmt.Errorf("interrupted awaiting for %s object meta information: %w", addr, t.opCtx.Err()) case <-objAccepted: } } @@ -217,38 +273,38 @@ func (t *distributedTarget) Close() (oid.ID, error) { t.placementIterator.log.Debug("submitted object meta information", zap.Stringer("addr", addr)) } - return id, nil + return nil } -func (t *distributedTarget) encodeCurrentObjectMetadata() []byte { +func (t *distributedTarget) encodeObjectMetadata(obj objectSDK.Object) []byte { currBlock := t.fsState.CurrentBlock() currEpochDuration := t.fsState.CurrentEpochDuration() expectedVUB := (uint64(currBlock)/currEpochDuration + 2) * currEpochDuration - firstObj := t.obj.GetFirstID() - if t.obj.HasParent() && firstObj.IsZero() { + firstObj := obj.GetFirstID() + if obj.HasParent() && firstObj.IsZero() { // object itself is the first one - firstObj = t.obj.GetID() + firstObj = obj.GetID() } var deletedObjs []oid.ID var lockedObjs []oid.ID - typ := t.obj.Type() + typ := obj.Type() switch typ { case objectSDK.TypeTombstone: - deletedObjs = append(deletedObjs, t.obj.AssociatedObject()) + deletedObjs = append(deletedObjs, obj.AssociatedObject()) case objectSDK.TypeLock: - lockedObjs = append(lockedObjs, t.obj.AssociatedObject()) + lockedObjs = append(lockedObjs, obj.AssociatedObject()) default: } - return object.EncodeReplicationMetaInfo(t.obj.GetContainerID(), t.obj.GetID(), firstObj, t.obj.GetPreviousID(), - t.obj.PayloadSize(), typ, deletedObjs, lockedObjs, expectedVUB, t.networkMagicNumber) + return object.EncodeReplicationMetaInfo(obj.GetContainerID(), obj.GetID(), firstObj, obj.GetPreviousID(), + obj.PayloadSize(), typ, deletedObjs, lockedObjs, expectedVUB, t.networkMagicNumber) } -func (t *distributedTarget) sendObject(node nodeDesc) error { +func (t *distributedTarget) sendObject(obj objectSDK.Object, objMeta object.ContentMeta, encObj encodedObject, node nodeDesc) error { if node.local { - if err := t.writeObjectLocally(); err != nil { + if err := t.writeObjectLocally(obj, objMeta, encObj); err != nil { return fmt.Errorf("write object locally: %w", err) } return nil @@ -260,13 +316,13 @@ func (t *distributedTarget) sendObject(node nodeDesc) error { var sigsRaw []byte var err error - if t.encodedObject.hdrOff > 0 { - sigsRaw, err = t.transport.SendReplicationRequestToNode(t.opCtx, t.encodedObject.b, node.info) + if encObj.hdrOff > 0 { + sigsRaw, err = t.transport.SendReplicationRequestToNode(t.opCtx, encObj.b, node.info) if err != nil { err = fmt.Errorf("replicate object to remote node (key=%x): %w", node.info.PublicKey(), err) } } else { - err = putObjectToNode(t.opCtx, node.info, t.obj, t.keyStorage, t.clientConstructor, t.commonPrm) + err = putObjectToNode(t.opCtx, node.info, &obj, t.keyStorage, t.clientConstructor, t.commonPrm) } if err != nil { return fmt.Errorf("could not close object stream: %w", err) @@ -275,7 +331,7 @@ func (t *distributedTarget) sendObject(node nodeDesc) error { if t.localNodeInContainer && t.metainfoConsistencyAttr != "" { // These should technically be errors, but we don't have // a complete implementation now, so errors are substituted with logs. - var l = t.placementIterator.log.With(zap.Stringer("oid", t.obj.GetID()), + var l = t.placementIterator.log.With(zap.Stringer("oid", obj.GetID()), zap.String("node", network.StringifyGroup(node.info.AddressGroup()))) sigs, err := decodeSignatures(sigsRaw) @@ -306,8 +362,8 @@ func (t *distributedTarget) sendObject(node nodeDesc) error { return nil } -func (t *distributedTarget) writeObjectLocally() error { - if err := putObjectLocally(t.localStorage, t.obj, t.objMeta, &t.encodedObject); err != nil { +func (t *distributedTarget) writeObjectLocally(obj objectSDK.Object, objMeta object.ContentMeta, encObj encodedObject) error { + if err := putObjectLocally(t.localStorage, &obj, objMeta, &encObj); err != nil { return err } @@ -376,28 +432,17 @@ type placementIterator struct { neoFSNet NeoFSNetwork remotePool util.WorkerPool /* request-dependent */ - containerNodes ContainerNodes // when non-zero, this setting simplifies the object's storage policy // requirements to a fixed number of object replicas to be retained linearReplNum uint - // whether to perform additional best-effort of sending the object replica to - // all reserve nodes of the container - broadcast bool } -func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) error) error { - var replCounts []uint +func (x placementIterator) iterateNodesForObject(obj oid.ID, replCounts []uint, nodeLists [][]netmap.NodeInfo, broadcast bool, f func(nodeDesc) error) error { var l = x.log.With(zap.Stringer("oid", obj)) - nodeLists, err := x.containerNodes.SortForObject(obj) - if err != nil { - return fmt.Errorf("sort container nodes for the object: %w", err) - } if x.linearReplNum > 0 { ns := slices.Concat(nodeLists...) nodeLists = [][]netmap.NodeInfo{ns} replCounts = []uint{x.linearReplNum} - } else { - replCounts = x.containerNodes.PrimaryCounts() } var processedNodesMtx sync.RWMutex var nextNodeGroupKeys []string @@ -441,7 +486,7 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er // latency and volume of "unfinished" data to be garbage-collected. Also after // the failure of any of the nodes the ability to comply with the policy // requirements may be lost. - for i := range nodeLists { + for i := range replCounts { listInd := i for { replRem := replCounts[listInd] - nodesCounters[listInd].stored @@ -450,7 +495,7 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er } listLen := uint(len(nodeLists[listInd])) if listLen-nodesCounters[listInd].processed < replRem { - err = errNotEnoughNodes{listIndex: listInd, required: replRem, left: listLen - nodesCounters[listInd].processed} + var err error = errNotEnoughNodes{listIndex: listInd, required: replRem, left: listLen - nodesCounters[listInd].processed} if e, _ := lastRespErr.Load().(error); e != nil { err = fmt.Errorf("%w (last node error: %w)", err, e) } @@ -472,7 +517,7 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er continue } if nr.desc.local = x.neoFSNet.IsLocalNodePublicKey(pk); !nr.desc.local { - nr.desc.info, nr.convertErr = x.convertNodeInfo(nodeLists[listInd][j]) + nr.desc.info, nr.convertErr = convertNodeInfo(nodeLists[listInd][j]) } processedNodesMtx.Lock() nodeResults[pks] = nr @@ -486,7 +531,7 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er l.Error("failed to decode network endpoints of the storage node from the network map, skip the node", zap.String("public key", netmap.StringifyPublicKey(nodeLists[listInd][j])), zap.Error(nr.convertErr)) if listLen-nodesCounters[listInd].processed-1 < replRem { // -1 includes current node failure - err = fmt.Errorf("%w (last node error: failed to decode network addresses: %w)", + err := fmt.Errorf("%w (last node error: failed to decode network addresses: %w)", errNotEnoughNodes{listIndex: listInd, required: replRem, left: listLen - nodesCounters[listInd].processed - 1}, nr.convertErr) return errIncompletePut{singleErr: err} @@ -514,7 +559,7 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er wg.Wait() } } - if !x.broadcast { + if !broadcast { return nil } // TODO: since main part of the operation has already been completed, and @@ -532,7 +577,7 @@ broadcast: continue } if nr.desc.local = x.neoFSNet.IsLocalNodePublicKey(pk); !nr.desc.local { - nr.desc.info, nr.convertErr = x.convertNodeInfo(nodeLists[i][j]) + nr.desc.info, nr.convertErr = convertNodeInfo(nodeLists[i][j]) } processedNodesMtx.Lock() nodeResults[pks] = nr @@ -562,7 +607,7 @@ broadcast: return nil } -func (x placementIterator) convertNodeInfo(nodeInfo netmap.NodeInfo) (client.NodeInfo, error) { +func convertNodeInfo(nodeInfo netmap.NodeInfo) (client.NodeInfo, error) { var res client.NodeInfo var endpoints network.AddressGroup if err := endpoints.FromIterator(network.NodeEndpointsIterator(nodeInfo)); err != nil { diff --git a/pkg/services/object/put/ec.go b/pkg/services/object/put/ec.go new file mode 100644 index 0000000000..7d593bde8b --- /dev/null +++ b/pkg/services/object/put/ec.go @@ -0,0 +1,133 @@ +package putsvc + +import ( + "fmt" + "math" + "slices" + + iec "github.com/nspcc-dev/neofs-node/internal/ec" + objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + "github.com/nspcc-dev/neofs-sdk-go/netmap" + "github.com/nspcc-dev/neofs-sdk-go/object" + "go.uber.org/zap" +) + +func (t *distributedTarget) ecAndSaveObject(signer neofscrypto.Signer, obj object.Object, ecRules []iec.Rule, nodeLists [][]netmap.NodeInfo) error { + for i := range ecRules { + if slices.Contains(ecRules[:i], ecRules[i]) { // has already been processed, see below + continue + } + + payloadParts, err := iec.Encode(ecRules[i], obj.Payload()) + if err != nil { + return fmt.Errorf("split object payload into EC parts for rule #%d (%s): %w", i, ecRules[i], err) + } + + if err := t.applyECRule(signer, obj, i, payloadParts, nodeLists[i]); err != nil { + return fmt.Errorf("apply EC rule #%d (%s): %w", i, ecRules[i], err) + } + + for j := i + 1; j < len(ecRules); j++ { + if ecRules[i] != ecRules[j] { + continue + } + if err := t.applyECRule(signer, obj, j, payloadParts, nodeLists[j]); err != nil { + return fmt.Errorf("apply EC rule #%d (%s): %w", j, ecRules[j], err) + } + } + } + + return nil +} + +func (t *distributedTarget) applyECRule(signer neofscrypto.Signer, obj object.Object, ruleIdx int, payloadParts [][]byte, nodeList []netmap.NodeInfo) error { + for partIdx := range payloadParts { + // TODO: each part is handled independently, so this worth concurrent execution. https://github.com/nspcc-dev/neofs-node/issues/3504 + // Note that distributeTarget.distributeObject is not thread-safe. + if err := t.formAndSaveObjectForECPart(signer, obj, ruleIdx, partIdx, payloadParts, nodeList); err != nil { + return fmt.Errorf("form and save object for part %d: %w", partIdx, err) + } + } + + return nil +} + +func (t *distributedTarget) formAndSaveObjectForECPart(signer neofscrypto.Signer, obj object.Object, ruleIdx, partIdx int, payloadParts [][]byte, nodeList []netmap.NodeInfo) error { + partObj, err := iec.FormObjectForECPart(signer, obj, payloadParts[partIdx], iec.PartInfo{ + RuleIndex: ruleIdx, + Index: partIdx, + }) + if err != nil { + return fmt.Errorf("form object for part: %w", err) + } + + var encObj encodedObject + // similar to pkg/services/object/put/distributed.go:95 + if t.localNodeInContainer { + payloadLen := partObj.PayloadSize() + if payloadLen > math.MaxInt { + return fmt.Errorf("too big payload of physically stored for this server %d > %d", payloadLen, math.MaxInt) + } + + hdr := partObj + hdr.SetPayload(nil) + + if t.localOnly { + encObj, err = encodeObjectWithoutPayload(hdr, int(payloadLen)) + } else { + encObj, err = encodeReplicateRequestWithoutPayload(t.localNodeSigner, hdr, int(payloadLen), t.metainfoConsistencyAttr != "") + } + if err != nil { + return fmt.Errorf("encode object into binary: %w", err) + } + + defer putPayload(encObj.b) + + encObj.b = append(encObj.b, partObj.Payload()...) + } + + if err := t.saveECPart(partObj, objectcore.ContentMeta{}, encObj, partIdx, len(payloadParts), nodeList); err != nil { + return fmt.Errorf("save part object: %w", err) + } + + return nil +} + +func (t *distributedTarget) saveECPart(part object.Object, objMeta objectcore.ContentMeta, encObj encodedObject, idx, total int, nodeList []netmap.NodeInfo) error { + return t.distributeObject(part, objMeta, encObj, func(obj object.Object, objMeta objectcore.ContentMeta, encObj encodedObject) error { + return t.distributeECPart(obj, objMeta, encObj, idx, total, nodeList) + }) +} + +func (t *distributedTarget) distributeECPart(part object.Object, objMeta objectcore.ContentMeta, enc encodedObject, partIdx, totalParts int, nodeList []netmap.NodeInfo) error { + var firstErr error + for i := range iec.NodeSequenceForPart(partIdx, totalParts, len(nodeList)) { + err := t.saveECPartOnNode(part, objMeta, enc, nodeList[i]) + if err == nil { + return nil + } + + // err contains network addresses + if firstErr == nil { + firstErr = fmt.Errorf("save on SN #%d: %w", i, err) + } else { + t.placementIterator.log.Info("failed to save EC part on reserve SN", zap.Int("nodeIdx", i), zap.Error(err)) + } + } + + return errIncompletePut{singleErr: firstErr} +} + +func (t *distributedTarget) saveECPartOnNode(obj object.Object, objMeta objectcore.ContentMeta, enc encodedObject, node netmap.NodeInfo) error { + var n nodeDesc + n.local = t.placementIterator.neoFSNet.IsLocalNodePublicKey(node.PublicKey()) + if !n.local { + var err error + if n.info, err = convertNodeInfo(node); err != nil { + return fmt.Errorf("convert node info: %w", err) + } + } + + return t.sendObject(obj, objMeta, enc, n) +} diff --git a/pkg/services/object/put/prm.go b/pkg/services/object/put/prm.go index 106b1ed966..58761e3734 100644 --- a/pkg/services/object/put/prm.go +++ b/pkg/services/object/put/prm.go @@ -1,6 +1,7 @@ package putsvc import ( + iec "github.com/nspcc-dev/neofs-node/internal/ec" "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" containerSDK "github.com/nspcc-dev/neofs-sdk-go/container" @@ -20,9 +21,11 @@ type PutInitPrm struct { relay func(client.NodeInfo, client.MultiAddressClient) error containerNodes ContainerNodes + ecPart iec.PartInfo localNodeInContainer bool localSignerRFC6979 neofscrypto.Signer localNodeSigner neofscrypto.Signer + sessionSigner neofscrypto.Signer } type PutChunkPrm struct { diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index b556a579ff..e29cb1f011 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -3,6 +3,7 @@ package putsvc import ( "context" + iec "github.com/nspcc-dev/neofs-node/internal/ec" "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" @@ -51,6 +52,8 @@ type ContainerNodes interface { // matching storage policy of the container. Nodes are identified by their // public keys and can be repeated in different sets. // + // First PrimaryCounts() sets are for replication, the rest are for ECRules(). + // // Unsorted callers do not change resulting slices and their elements. Unsorted() [][]netmapsdk.NodeInfo // SortForObject sorts container nodes for the referenced object's storage. @@ -63,6 +66,11 @@ type ContainerNodes interface { // - first N nodes of each L are primary data holders while others (if any) // are backup. PrimaryCounts() []uint + // ECRules returns list of erasure coding rules for all objects in the + // container. Same rule may repeat. + // + // ECRules callers do not change resulting slice. + ECRules() []iec.Rule } // NeoFSNetwork provides access to the NeoFS network to get information diff --git a/pkg/services/object/put/service_test.go b/pkg/services/object/put/service_test.go index d912150be1..e3e72c9d79 100644 --- a/pkg/services/object/put/service_test.go +++ b/pkg/services/object/put/service_test.go @@ -2,16 +2,22 @@ package putsvc import ( "bytes" + "cmp" "context" "crypto/sha256" "errors" "fmt" "io" "slices" + "strconv" "sync" "testing" "github.com/google/uuid" + "github.com/klauspost/reedsolomon" + iec "github.com/nspcc-dev/neofs-node/internal/ec" + iobject "github.com/nspcc-dev/neofs-node/internal/object" + islices "github.com/nspcc-dev/neofs-node/internal/slices" "github.com/nspcc-dev/neofs-node/internal/testutil" clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" @@ -68,6 +74,37 @@ func Test_Slicing_REP3(t *testing.T) { } } +func Test_Slicing_EC(t *testing.T) { + rules := []iec.Rule{ + {DataPartNum: 2, ParityPartNum: 2}, + {DataPartNum: 3, ParityPartNum: 1}, + {DataPartNum: 6, ParityPartNum: 3}, + {DataPartNum: 12, ParityPartNum: 4}, + } + + for _, tc := range []struct { + name string + ln uint64 + skip string + }{ + {name: "no payload", ln: 0}, + {name: "1B", ln: 1}, + {name: "limit-1B", ln: maxObjectSize - 1}, + {name: "exactly limit", ln: maxObjectSize}, + {name: "limit+1b", ln: maxObjectSize + 1, skip: "https://github.com/nspcc-dev/neofs-node/issues/3500"}, + {name: "limitX2", ln: maxObjectSize * 2, skip: "https://github.com/nspcc-dev/neofs-node/issues/3500"}, + {name: "limitX4-1", ln: maxObjectSize + 4 - 1, skip: "https://github.com/nspcc-dev/neofs-node/issues/3500"}, + {name: "limitX5", ln: maxObjectSize * 5, skip: "https://github.com/nspcc-dev/neofs-node/issues/3500"}, + } { + t.Run(tc.name, func(t *testing.T) { + if tc.skip != "" { + t.Skip(tc.skip) + } + testSlicingECRules(t, tc.ln, rules) + }) + } +} + func testSlicingREP3(t *testing.T, ln uint64) { const repNodes = 3 const cnrReserveNodes = 2 @@ -146,6 +183,75 @@ func testSlicingREP3(t *testing.T, ln uint64) { } } +func testSlicingECRules(t *testing.T, ln uint64, rules []iec.Rule) { + maxRule := slices.MaxFunc(rules, func(a, b iec.Rule) int { + return cmp.Compare(a.DataPartNum+a.ParityPartNum, b.DataPartNum+b.ParityPartNum) + }) + + maxTotalParts := int(maxRule.DataPartNum + maxRule.ParityPartNum) + const cnrReserveNodes = 2 + const outCnrNodes = 2 + + cluster := newTestClusterForRepPolicy(t, uint(maxTotalParts), cnrReserveNodes, outCnrNodes) + for i := range cluster.nodeNetworks { + // TODO: add alternative to newTestClusterForRepPolicy for EC instead + cluster.nodeNetworks[i].cnrNodes.repCounts = nil + for range len(rules) - 1 { + cluster.nodeNetworks[i].cnrNodes.unsorted = append(cluster.nodeNetworks[i].cnrNodes.unsorted, cluster.nodeNetworks[i].cnrNodes.unsorted[0]) + cluster.nodeNetworks[i].cnrNodes.sorted = append(cluster.nodeNetworks[i].cnrNodes.sorted, cluster.nodeNetworks[i].cnrNodes.sorted[0]) + } + cluster.nodeNetworks[i].cnrNodes.ecRules = rules + } + + var srcObj object.Object + srcObj.SetContainerID(cidtest.ID()) + srcObj.SetOwner(usertest.ID()) + srcObj.SetAttributes( + object.NewAttribute("attr1", "val1"), + object.NewAttribute("attr2", "val2"), + ) + + var sessionToken session.Object + sessionToken.SetID(uuid.New()) + sessionToken.SetExp(1) + sessionToken.BindContainer(cidtest.ID()) + srcObj.SetPayload(testutil.RandByteSlice(ln)) + + testThroughNode := func(t *testing.T, idx int) { + sessionToken.SetAuthKey(cluster.nodeSessions[idx].signer.Public()) + require.NoError(t, sessionToken.Sign(usertest.User())) + + storeObjectWithSession(t, cluster.nodeServices[idx], srcObj, sessionToken) + + nodeObjLists := cluster.allStoredObjects() + + var restoredObj object.Object + if ln > maxObjectSize { + restoredObj = checkAndCutSplitECObject(t, ln, sessionToken, rules, nodeObjLists) + } else { + restoredObj = checkAndCutUnsplitECObject(t, rules, nodeObjLists) + } + + require.Zero(t, islices.TwoDimSliceElementCount(nodeObjLists)) + + assertObjectIntegrity(t, restoredObj) + require.Equal(t, sessionToken, *restoredObj.SessionToken()) + require.Equal(t, srcObj.GetContainerID(), restoredObj.GetContainerID()) + require.Equal(t, sessionToken.Issuer(), restoredObj.Owner()) + require.EqualValues(t, currentEpoch, restoredObj.CreationEpoch()) + require.Equal(t, object.TypeRegular, restoredObj.Type()) + require.Equal(t, srcObj.Attributes(), restoredObj.Attributes()) + require.False(t, restoredObj.HasParent()) + require.True(t, bytes.Equal(srcObj.Payload(), restoredObj.Payload())) + + cluster.resetAllStoredObjects() + } + + for i := range maxTotalParts + cnrReserveNodes + outCnrNodes { + testThroughNode(t, i) + } +} + func newTestClusterForRepPolicy(t *testing.T, repNodes, cnrReserveNodes, outCnrNodes uint) *testCluster { allNodes := allocNodes([]uint{repNodes + cnrReserveNodes + outCnrNodes})[0] cnrNodes := allNodes[:repNodes+cnrReserveNodes] @@ -166,7 +272,7 @@ func newTestClusterForRepPolicy(t *testing.T, repNodes, cnrReserveNodes, outCnrN for i := range allNodes { nodeKey := neofscryptotest.ECDSAPrivateKey() - nodeWorkerPool, err := ants.NewPool(10, ants.WithNonblocking(true)) + nodeWorkerPool, err := ants.NewPool(len(cnrNodes), ants.WithNonblocking(true)) require.NoError(t, err) cluster.nodeNetworks[i] = mockNetwork{ @@ -240,6 +346,7 @@ type mockContainerNodes struct { unsorted [][]netmap.NodeInfo sorted [][]netmap.NodeInfo repCounts []uint + ecRules []iec.Rule } func (x mockContainerNodes) Unsorted() [][]netmap.NodeInfo { @@ -254,6 +361,10 @@ func (x mockContainerNodes) PrimaryCounts() []uint { return x.repCounts } +func (x mockContainerNodes) ECRules() []iec.Rule { + return x.ecRules +} + type mockMaxSize uint64 func (x mockMaxSize) MaxObjectSize() uint64 { @@ -637,3 +748,154 @@ func assertObjectIntegrity(t *testing.T, obj object.Object) { require.Zero(t, obj.SplitID()) } + +func checkAndGetObjectFromECParts(t *testing.T, limit uint64, rule iec.Rule, parts []object.Object) object.Object { + require.Len(t, parts, int(rule.DataPartNum+rule.ParityPartNum)) + + for _, part := range parts { + assertObjectIntegrity(t, part) + require.LessOrEqual(t, part.PayloadSize(), limit) + } + + hdr := checkAndCutParentHeaderFromECPart(t, parts[0]) + + for i := 1; i < len(parts); i++ { + hdrI := checkAndCutParentHeaderFromECPart(t, parts[i]) + require.Equal(t, hdr, hdrI) + } + + payload := checkAndGetPayloadFromECParts(t, hdr.PayloadSize(), rule, parts) + + res := hdr + res.SetPayload(payload) + + return res +} + +func checkAndGetPayloadFromECParts(t *testing.T, ln uint64, rule iec.Rule, parts []object.Object) []byte { + var payloadParts [][]byte + for i := range parts { + payloadParts = append(payloadParts, parts[i].Payload()) + } + + if ln == 0 { + require.Negative(t, slices.IndexFunc(payloadParts, func(e []byte) bool { return len(e) > 0 })) + return nil + } + + enc, err := reedsolomon.New(int(rule.DataPartNum), int(rule.ParityPartNum)) + require.NoError(t, err) + + ok, err := enc.Verify(payloadParts) + require.NoError(t, err) + require.True(t, ok) + + required := make([]bool, rule.DataPartNum+rule.ParityPartNum) + for i := range rule.DataPartNum { + required[i] = true + } + + for lostCount := 1; lostCount <= int(rule.ParityPartNum); lostCount++ { + for _, lostIdxs := range islices.IndexCombos(len(payloadParts), lostCount) { + brokenParts := islices.NilTwoDimSliceElements(payloadParts, lostIdxs) + require.NoError(t, enc.Reconstruct(brokenParts)) + require.Equal(t, payloadParts, brokenParts) + + brokenParts = islices.NilTwoDimSliceElements(payloadParts, lostIdxs) + require.NoError(t, enc.ReconstructSome(brokenParts, required)) + require.Equal(t, payloadParts[:rule.DataPartNum], brokenParts[:rule.DataPartNum]) + } + } + + for _, lostIdxs := range islices.IndexCombos(len(payloadParts), int(rule.ParityPartNum)+1) { + require.Error(t, enc.Reconstruct(islices.NilTwoDimSliceElements(payloadParts, lostIdxs))) + require.Error(t, enc.ReconstructSome(islices.NilTwoDimSliceElements(payloadParts, lostIdxs), required)) + } + + payload := slices.Concat(payloadParts[:rule.DataPartNum]...) + + require.GreaterOrEqual(t, uint64(len(payload)), ln) + + require.False(t, slices.ContainsFunc(payload[ln:], func(b byte) bool { return b != 0 })) + + return payload[:ln] +} + +func checkAndCutParentHeaderFromECPart(t *testing.T, part object.Object) object.Object { + par := part.Parent() + require.NotNil(t, par) + + require.Equal(t, par.Version(), part.Version()) + require.Equal(t, par.GetContainerID(), part.GetContainerID()) + require.Equal(t, par.Owner(), part.Owner()) + require.Equal(t, par.CreationEpoch(), part.CreationEpoch()) + require.Equal(t, object.TypeRegular, part.Type()) + require.Equal(t, par.SessionToken(), part.SessionToken()) + + return *par +} + +func checkAndGetECPartInfo(t testing.TB, part object.Object) (int, int) { + ruleIdxAttr := iobject.GetAttribute(part, "__NEOFS__EC_RULE_IDX") + require.NotZero(t, ruleIdxAttr) + ruleIdx, err := strconv.Atoi(ruleIdxAttr) + require.NoError(t, err) + require.True(t, ruleIdx >= 0) + + partIdxAttr := iobject.GetAttribute(part, "__NEOFS__EC_PART_IDX") + require.NotZero(t, partIdxAttr) + partIdx, err := strconv.Atoi(partIdxAttr) + require.NoError(t, err) + require.True(t, partIdx >= 0) + + return ruleIdx, partIdx +} + +func checkAndCutSplitECObject(t *testing.T, ln uint64, sessionToken session.Object, rules []iec.Rule, nodeObjLists [][]object.Object) object.Object { + splitPartCount := splitMembersCount(maxObjectSize, ln) + + var expectedCount int + for i := range rules { + expectedCount += int(rules[i].DataPartNum+rules[i].ParityPartNum) * splitPartCount + } + + require.EqualValues(t, expectedCount, islices.TwoDimSliceElementCount(nodeObjLists)) + + var splitParts []object.Object + for range splitPartCount { + splitPart := checkAndCutUnsplitECObject(t, rules, nodeObjLists) + splitParts = append(splitParts, splitPart) + } + + restoredObj := assertSplitChain(t, maxObjectSize, ln, sessionToken, splitParts) + + return restoredObj +} + +func checkAndCutUnsplitECObject(t *testing.T, rules []iec.Rule, nodeObjLists [][]object.Object) object.Object { + ecParts := checkAndCutECPartsForRule(t, 0, rules[0], nodeObjLists) + restoredObj := checkAndGetObjectFromECParts(t, maxObjectSize, rules[0], ecParts) + + for i := 1; i < len(rules); i++ { + ecPartsI := checkAndCutECPartsForRule(t, i, rules[i], nodeObjLists) + restoredObjI := checkAndGetObjectFromECParts(t, maxObjectSize, rules[i], ecPartsI) + require.Equal(t, restoredObj, restoredObjI) + } + + return restoredObj +} + +func checkAndCutECPartsForRule(t *testing.T, ruleIdx int, rule iec.Rule, nodeObjLists [][]object.Object) []object.Object { + var parts []object.Object + + for i := range rule.DataPartNum + rule.ParityPartNum { + gotRuleIdx, partIdx := checkAndGetECPartInfo(t, nodeObjLists[i][0]) + require.EqualValues(t, ruleIdx, gotRuleIdx) + require.EqualValues(t, i, partIdx) + + parts = append(parts, nodeObjLists[i][0]) + nodeObjLists[i] = nodeObjLists[i][1:] + } + + return parts +} diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index a30f8b0044..ea463213e7 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -5,12 +5,12 @@ import ( "errors" "fmt" + iec "github.com/nspcc-dev/neofs-node/internal/ec" "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/services/object/internal" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-sdk-go/container" neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa" - "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/nspcc-dev/neofs-sdk-go/user" ) @@ -122,6 +122,8 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error { } } + sessionSigner := user.NewAutoIDSigner(*sessionKey) + prm.sessionSigner = sessionSigner p.target = &validatingTarget{ fmt: p.fmtValidator, unpreparedObject: true, @@ -129,7 +131,7 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error { p.ctx, p.maxPayloadSz, !homomorphicChecksumRequired, - user.NewAutoIDSigner(*sessionKey), + sessionSigner, sToken, p.networkState.CurrentEpoch(), p.newCommonTarget(prm), @@ -167,14 +169,32 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error { return fmt.Errorf("select storage nodes for the container: %w", err) } cnrNodes := prm.containerNodes.Unsorted() -nextSet: - for i := range cnrNodes { - for j := range cnrNodes[i] { - prm.localNodeInContainer = p.neoFSNet.IsLocalNodePublicKey(cnrNodes[i][j].PublicKey()) - if prm.localNodeInContainer { - break nextSet + ecRulesN := len(prm.containerNodes.ECRules()) + if ecRulesN > 0 { + ecPart, err := iec.GetPartInfo(*prm.hdr) + if err != nil { + return fmt.Errorf("get EC part info from object header: %w", err) + } + + repRulesN := len(prm.containerNodes.PrimaryCounts()) + if ecPart.Index >= 0 { + if ecPart.RuleIndex >= ecRulesN { + return fmt.Errorf("invalid EC part info in object header: EC rule idx=%d with %d rules in total", ecPart.RuleIndex, ecRulesN) + } + if prm.hdr.Signature() == nil { + return errors.New("unsigned EC part object") + } + prm.localNodeInContainer = localNodeInSet(p.neoFSNet, cnrNodes[repRulesN+ecPart.RuleIndex]) + } else { + if repRulesN == 0 && prm.hdr.Signature() != nil { + return errors.New("missing EC part info in signed object") } + prm.localNodeInContainer = localNodeInSets(p.neoFSNet, cnrNodes) } + + prm.ecPart = ecPart + } else { + prm.localNodeInContainer = localNodeInSets(p.neoFSNet, cnrNodes) } if !prm.localNodeInContainer && localOnly { return errors.New("local operation on the node not compliant with the container storage policy") @@ -199,24 +219,16 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) internal.Target { } } - // enable additional container broadcast on non-local operation - // if object has TOMBSTONE or LOCK type. - typ := prm.hdr.Type() - localOnly := prm.common.LocalOnly() - withBroadcast := !localOnly && (typ == object.TypeTombstone || typ == object.TypeLock) - return &distributedTarget{ opCtx: p.ctx, fsState: p.networkState, networkMagicNumber: p.networkMagic, metaSvc: p.metaSvc, placementIterator: placementIterator{ - log: p.log, - neoFSNet: p.neoFSNet, - remotePool: p.remotePool, - containerNodes: prm.containerNodes, - linearReplNum: uint(prm.copiesNumber), - broadcast: withBroadcast, + log: p.log, + neoFSNet: p.neoFSNet, + remotePool: p.remotePool, + linearReplNum: uint(prm.copiesNumber), }, localStorage: p.localStore, keyStorage: p.keyStorage, @@ -225,12 +237,15 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) internal.Target { transport: p.transport, relay: relay, fmt: p.fmtValidator, + containerNodes: prm.containerNodes, + ecPart: prm.ecPart, localNodeInContainer: prm.localNodeInContainer, localNodeSigner: prm.localNodeSigner, + sessionSigner: prm.sessionSigner, cnrClient: p.cfg.cnrClient, metainfoConsistencyAttr: metaAttribute(prm.cnr), metaSigner: prm.localSignerRFC6979, - localOnly: localOnly, + localOnly: prm.common.LocalOnly(), } } diff --git a/pkg/services/object/put/util.go b/pkg/services/object/put/util.go new file mode 100644 index 0000000000..380bcf2c14 --- /dev/null +++ b/pkg/services/object/put/util.go @@ -0,0 +1,19 @@ +package putsvc + +import ( + "slices" + + "github.com/nspcc-dev/neofs-sdk-go/netmap" +) + +func localNodeInSets(n NeoFSNetwork, ss [][]netmap.NodeInfo) bool { + return slices.ContainsFunc(ss, func(s []netmap.NodeInfo) bool { + return localNodeInSet(n, s) + }) +} + +func localNodeInSet(n NeoFSNetwork, nodes []netmap.NodeInfo) bool { + return slices.ContainsFunc(nodes, func(node netmap.NodeInfo) bool { + return n.IsLocalNodePublicKey(node.PublicKey()) + }) +}