diff --git a/cmd/neofs-lens/internal/storage/list.go b/cmd/neofs-lens/internal/storage/list.go index c8d19cf120..af3235735d 100644 --- a/cmd/neofs-lens/internal/storage/list.go +++ b/cmd/neofs-lens/internal/storage/list.go @@ -34,7 +34,7 @@ func listFunc(cmd *cobra.Command, _ []string) error { defer storage.Close() var ( - addrs []objectcore.AddressWithType + addrs []objectcore.AddressWithAttributes cursor *engine.Cursor ) for { diff --git a/cmd/neofs-lens/internal/storage/sanity.go b/cmd/neofs-lens/internal/storage/sanity.go index ef283cbadf..1fea214189 100644 --- a/cmd/neofs-lens/internal/storage/sanity.go +++ b/cmd/neofs-lens/internal/storage/sanity.go @@ -142,7 +142,7 @@ func sanityCheck(cmd *cobra.Command, _ []string) error { func checkShard(cmd *cobra.Command, sh storageShard) (int, error) { var ( - addrs []objectcore.AddressWithType + addrs []objectcore.AddressWithAttributes cursor *meta.Cursor err error objectsChecked int diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index ee29a236dd..e295277388 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -204,7 +204,7 @@ func initObjectService(c *cfg) { policer.WithMaxCapacity(c.appCfg.Policer.MaxWorkers), policer.WithPool(c.cfgObject.pool.replication), policer.WithNodeLoader(c), - policer.WithNetwork(noEC{c}), + policer.WithNetwork(c), policer.WithReplicationCooldown(c.appCfg.Policer.ReplicationCooldown), policer.WithObjectBatchSize(c.appCfg.Policer.ObjectBatchSize), ) @@ -695,32 +695,13 @@ func (o objectSource) SearchOne(ctx context.Context, cnr cid.ID, filters objectS // IsLocalNodePublicKey implements [getsvc.NeoFSNetwork]. func (c *cfg) IsLocalNodePublicKey(b []byte) bool { return c.IsLocalKey(b) } -// temporary wrapper until [getsvc.NeoFSNetwork] and [policer.Network] -// interfaces converge. -type noEC struct { - *cfg -} - -// GetNodesForObject reads storage policy of the referenced container from the -// underlying container storage, reads network map at the specified epoch from -// the underlying storage, applies the storage policy to it and returns sorted -// lists of selected storage nodes along with the policy rules. -// -// Resulting slices must not be changed. -// -// GetNodesForObject implements [policer.Network]. -func (x noEC) GetNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) { - nodeLists, repRules, _, err := x.cfg.GetNodesForObject(addr) - return nodeLists[:len(repRules)], repRules, err -} - // GetNodesForObject reads storage policy of the referenced container from the // underlying container storage, reads network map at the specified epoch from // the underlying storage, applies the storage policy to it and returns sorted // lists of selected storage nodes along with the per-list numbers of primary // object holders. Resulting slices must not be changed. // -// GetNodesForObject implements [getsvc.NeoFSNetwork]. +// GetNodesForObject implements [getsvc.NeoFSNetwork], [policer.Network]. func (c *cfg) GetNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, []iec.Rule, error) { nodeSets, repRules, ecRules, err := c.cfgObject.containerNodes.getNodesForObject(addr) return nodeSets, repRules, ecRules, err diff --git a/internal/ec/object_test.go b/internal/ec/object_test.go index 4110c5f040..ca6790d419 100644 --- a/internal/ec/object_test.go +++ b/internal/ec/object_test.go @@ -172,3 +172,54 @@ func TestFormObjectForECPart(t *testing.T) { require.Equal(t, checksum.NewTillichZemor(tz.Sum(part)), phh) }) } + +func TestDecodePartInfoFromAttributes(t *testing.T) { + t.Run("missing", func(t *testing.T) { + pi, err := iec.DecodePartInfoFromAttributes("", "") + require.NoError(t, err) + require.EqualValues(t, -1, pi.RuleIndex) + }) + + t.Run("failure", func(t *testing.T) { + for _, tc := range []struct { + name string + ruleIdx string + partIdx string + assertErr func(t *testing.T, err error) + }{ + {name: "non-int rule index", ruleIdx: "not_an_int", partIdx: "34", assertErr: func(t *testing.T, err error) { + require.EqualError(t, err, `decode rule index: strconv.ParseUint: parsing "not_an_int": invalid syntax`) + }}, + {name: "negative rule index", ruleIdx: "-12", partIdx: "34", assertErr: func(t *testing.T, err error) { + require.EqualError(t, err, `decode rule index: strconv.ParseUint: parsing "-12": invalid syntax`) + }}, + {name: "rule index overflow", ruleIdx: "256", partIdx: "34", assertErr: func(t *testing.T, err error) { + require.EqualError(t, err, "rule index out of range") + }}, + {name: "non-int part index", ruleIdx: "12", partIdx: "not_an_int", assertErr: func(t *testing.T, err error) { + require.EqualError(t, err, `decode part index: strconv.ParseUint: parsing "not_an_int": invalid syntax`) + }}, + {name: "negative part index", ruleIdx: "12", partIdx: "-34", assertErr: func(t *testing.T, err error) { + require.EqualError(t, err, `decode part index: strconv.ParseUint: parsing "-34": invalid syntax`) + }}, + {name: "part index overflow", ruleIdx: "12", partIdx: "256", assertErr: func(t *testing.T, err error) { + require.EqualError(t, err, "part index out of range") + }}, + {name: "rule index without part index", ruleIdx: "12", partIdx: "", assertErr: func(t *testing.T, err error) { + require.EqualError(t, err, "rule index is set, part index is not") + }}, + {name: "part index without rule index", ruleIdx: "", partIdx: "34", assertErr: func(t *testing.T, err error) { + require.EqualError(t, err, "part index is set, rule index is not") + }}, + } { + t.Run(tc.name, func(t *testing.T) { + _, err := iec.DecodePartInfoFromAttributes(tc.ruleIdx, tc.partIdx) + tc.assertErr(t, err) + }) + } + }) + + pi, err := iec.DecodePartInfoFromAttributes("12", "34") + require.NoError(t, err) + require.Equal(t, iec.PartInfo{RuleIndex: 12, Index: 34}, pi) +} diff --git a/internal/ec/objects.go b/internal/ec/objects.go index 4dd4693fd4..1ea24bff4a 100644 --- a/internal/ec/objects.go +++ b/internal/ec/objects.go @@ -1,7 +1,9 @@ package ec import ( + "errors" "fmt" + "strconv" iobject "github.com/nspcc-dev/neofs-node/internal/object" "github.com/nspcc-dev/neofs-sdk-go/checksum" @@ -73,3 +75,50 @@ func FormObjectForECPart(signer neofscrypto.Signer, parent object.Object, part [ return obj, nil } + +// DecodePartInfoFromAttributes decodes EC part info from given object +// attributes. It one of attributes is set, the other must be set too. If both +// are missing, DecodePartInfoFromAttributes returns [PartInfo.RuleIndex] = -1 +// without error. +func DecodePartInfoFromAttributes(ruleIdxAttr, partIdxAttr string) (PartInfo, error) { + // TODO: sync with object GET server + // TODO: sync with GetPartInfo, it does not check for numeric limits. + if ruleIdxAttr == "" { + if partIdxAttr != "" { + return PartInfo{}, errors.New("part index is set, rule index is not") + } + return PartInfo{RuleIndex: -1}, nil + } + if partIdxAttr == "" { + return PartInfo{}, errors.New("rule index is set, part index is not") + } + + ruleIdx, err := decodeUint8StringToInt(ruleIdxAttr) + if err != nil { + if errors.Is(err, strconv.ErrRange) { + return PartInfo{}, errors.New("rule index out of range") + } + return PartInfo{}, fmt.Errorf("decode rule index: %w", err) + } + partIdx, err := decodeUint8StringToInt(partIdxAttr) + if err != nil { + if errors.Is(err, strconv.ErrRange) { + return PartInfo{}, errors.New("part index out of range") + } + return PartInfo{}, fmt.Errorf("decode part index: %w", err) + } + + return PartInfo{ + RuleIndex: ruleIdx, + Index: partIdx, + }, nil +} + +// returns [strconv.ErrRange] if value >= 256. +func decodeUint8StringToInt(s string) (int, error) { + n, err := strconv.ParseUint(s, 10, 8) + if err != nil { + return 0, err + } + return int(n), nil +} diff --git a/internal/slices/index.go b/internal/slices/index.go index 69a0107e07..006b0b0bd7 100644 --- a/internal/slices/index.go +++ b/internal/slices/index.go @@ -15,3 +15,12 @@ func Indexes(n int) []int { } return s } + +// CollectIndex returns new slice of shallows copies of s elements with given indexes. +func CollectIndex[E any, S []E](s S, idxs ...int) S { + newS := make(S, len(idxs)) + for i, idx := range idxs { + newS[i] = s[idx] + } + return newS +} diff --git a/internal/slices/index_test.go b/internal/slices/index_test.go index 404bf751bb..cb4f95eeaf 100644 --- a/internal/slices/index_test.go +++ b/internal/slices/index_test.go @@ -1,6 +1,7 @@ package slices_test import ( + "slices" "testing" islices "github.com/nspcc-dev/neofs-node/internal/slices" @@ -22,3 +23,23 @@ func TestIndexCombos(t *testing.T) { {2, 3}, }) } + +func TestCollectIndex(t *testing.T) { + t.Run("nil", func(t *testing.T) { + require.Empty(t, islices.CollectIndex([]any(nil))) + }) + t.Run("empty", func(t *testing.T) { + require.Empty(t, islices.CollectIndex([]any{})) + }) + + s := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"} + sc := slices.Clone(s) + + newS := islices.CollectIndex(s, 9, 7, 5, 3, 1) + require.Equal(t, []string{"9", "7", "5", "3", "1"}, newS) + require.Len(t, newS, 5) + require.EqualValues(t, 5, cap(newS)) + + newS[0] += "_" + require.Equal(t, sc, s) +} diff --git a/internal/slices/slices.go b/internal/slices/slices.go index 78e0877ec9..bf6ec0e053 100644 --- a/internal/slices/slices.go +++ b/internal/slices/slices.go @@ -56,3 +56,17 @@ func RepeatElement[E any, S []E](n int, e E) S { } return s } + +// MaxLen returns max length in s. Returns 0 if s is empty. +func MaxLen(s []string) int { + if len(s) == 0 { + return 0 + } + mx := len(s[0]) + for i := 1; i < len(s); i++ { + if len(s[i]) > mx { + mx = len(s[i]) + } + } + return mx +} diff --git a/internal/slices/slices_test.go b/internal/slices/slices_test.go index 6bc278b330..438ce87a6f 100644 --- a/internal/slices/slices_test.go +++ b/internal/slices/slices_test.go @@ -2,11 +2,13 @@ package slices_test import ( "errors" + "math/rand/v2" "slices" "strconv" "testing" islices "github.com/nspcc-dev/neofs-node/internal/slices" + "github.com/nspcc-dev/neofs-node/internal/testutil" "github.com/stretchr/testify/require" ) @@ -93,3 +95,28 @@ func TestRepeatElements(t *testing.T) { }) } } + +func TestMaxLen(t *testing.T) { + t.Run("nil", func(t *testing.T) { + require.Zero(t, islices.MaxLen(nil)) + }) + t.Run("empty", func(t *testing.T) { + require.Zero(t, islices.MaxLen([]string{})) + }) + + s := make([]string, 100) + for i := range s { + s[i] = string(testutil.RandByteSlice(i)) + } + require.EqualValues(t, 99, islices.MaxLen(s)) + + slices.Reverse(s) + require.EqualValues(t, 99, islices.MaxLen(s)) + + rand.Shuffle(len(s), func(i, j int) { s[i], s[j] = s[j], s[i] }) + require.EqualValues(t, 99, islices.MaxLen(s)) + + e := s[50] + s = islices.RepeatElement(len(s), e) + require.EqualValues(t, len(e), islices.MaxLen(s)) +} diff --git a/internal/testutil/neofs.go b/internal/testutil/neofs.go index 57793a88ce..c4b4dd3446 100644 --- a/internal/testutil/neofs.go +++ b/internal/testutil/neofs.go @@ -7,10 +7,18 @@ import ( ) // Nodes returns n [netmap.NodeInfo] elements with unique public keys. +// +// Each node has two network addresses with localhost IP. Ports are sequential +// starting from 10000. func Nodes(n int) []netmap.NodeInfo { + const portsStart = 10_000 nodes := make([]netmap.NodeInfo, n) for i := range nodes { nodes[i].SetPublicKey([]byte("public_key_" + strconv.Itoa(i))) + nodes[i].SetNetworkEndpoints( + "localhost:"+strconv.Itoa(portsStart+2*i), + "localhost:"+strconv.Itoa(portsStart+2*i+1), + ) } return nodes } diff --git a/internal/testutil/neofs_test.go b/internal/testutil/neofs_test.go index f015d2ec96..2c0a4341cd 100644 --- a/internal/testutil/neofs_test.go +++ b/internal/testutil/neofs_test.go @@ -1,6 +1,9 @@ package testutil_test import ( + "slices" + "strconv" + "strings" "testing" "github.com/nspcc-dev/neofs-node/internal/testutil" @@ -17,6 +20,16 @@ func TestNodes(t *testing.T) { m := make(map[string]struct{}) for i := range s { m[string(s[i].PublicKey())] = struct{}{} + + for j, netAddr := range slices.Collect(s[i].NetworkEndpoints()) { + ps, ok := strings.CutPrefix(netAddr, "localhost:") + require.True(t, ok) + + p, err := strconv.ParseUint(ps, 10, 16) + require.NoError(t, err) + + require.EqualValues(t, 10_000+2*i+j, p) + } } require.Len(t, m, len(s)) } diff --git a/pkg/core/object/address.go b/pkg/core/object/address.go index a91082874e..201ab996dc 100644 --- a/pkg/core/object/address.go +++ b/pkg/core/object/address.go @@ -5,9 +5,9 @@ import ( oid "github.com/nspcc-dev/neofs-sdk-go/object/id" ) -// AddressWithType groups object address with its NeoFS -// object type. -type AddressWithType struct { - Address oid.Address - Type object.Type +// AddressWithAttributes groups object's address and its attributes. +type AddressWithAttributes struct { + Address oid.Address + Type object.Type + Attributes []string } diff --git a/pkg/local_object_storage/engine/list.go b/pkg/local_object_storage/engine/list.go index 40f0e73306..7be5f8166b 100644 --- a/pkg/local_object_storage/engine/list.go +++ b/pkg/local_object_storage/engine/list.go @@ -25,14 +25,17 @@ type Cursor struct { // Does not include inhumed objects. Use cursor value from the response // for consecutive requests (it's nil when iteration is over). // +// Optional attrs specifies attributes to include in the result. If object does +// not have requested attribute, corresponding element in the result is empty. +// // Returns ErrEndOfListing if there are no more objects to return or count // parameter set to zero. -func (e *StorageEngine) ListWithCursor(count uint32, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) { +func (e *StorageEngine) ListWithCursor(count uint32, cursor *Cursor, attrs ...string) ([]objectcore.AddressWithAttributes, *Cursor, error) { if e.metrics != nil { defer elapsed(e.metrics.AddListObjectsDuration)() } - result := make([]objectcore.AddressWithType, 0, count) + result := make([]objectcore.AddressWithAttributes, 0, count) // 1. Get available shards and sort them. e.mtx.RLock() @@ -76,7 +79,7 @@ func (e *StorageEngine) ListWithCursor(count uint32, cursor *Cursor) ([]objectco shCursor = cursor.shardCursor } - res, shCursor, err := shardInstance.ListWithCursor(int(count), shCursor) + res, shCursor, err := shardInstance.ListWithCursor(int(count), shCursor, attrs...) if err != nil { continue } diff --git a/pkg/local_object_storage/engine/list_test.go b/pkg/local_object_storage/engine/list_test.go index 011e5c91a6..799be608f0 100644 --- a/pkg/local_object_storage/engine/list_test.go +++ b/pkg/local_object_storage/engine/list_test.go @@ -2,13 +2,16 @@ package engine import ( "errors" + "fmt" "os" "sort" + "strconv" "testing" "github.com/nspcc-dev/neofs-node/pkg/core/object" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + usertest "github.com/nspcc-dev/neofs-sdk-go/user/test" "github.com/stretchr/testify/require" ) @@ -24,8 +27,8 @@ func TestListWithCursor(t *testing.T) { const total = 20 - expected := make([]object.AddressWithType, 0, total) - got := make([]object.AddressWithType, 0, total) + expected := make([]object.AddressWithAttributes, 0, total) + got := make([]object.AddressWithAttributes, 0, total) for range total { containerID := cidtest.ID() @@ -33,7 +36,7 @@ func TestListWithCursor(t *testing.T) { err := e.Put(obj, nil) require.NoError(t, err) - expected = append(expected, object.AddressWithType{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)}) + expected = append(expected, object.AddressWithAttributes{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)}) } expected = sortAddresses(expected) @@ -56,11 +59,88 @@ func TestListWithCursor(t *testing.T) { got = sortAddresses(got) require.Equal(t, expected, got) + + t.Run("attributes", func(t *testing.T) { + const containerNum = 10 + const objectsPerContainer = 10 + const totalObjects = containerNum * objectsPerContainer + const staticAttr, staticVal = "attr_static", "val_static" + const commonAttr = "attr_common" + const groupAttr = "attr_group" + + var exp []object.AddressWithAttributes + var objs []objectSDK.Object + for i := range containerNum { + cnr := cidtest.ID() + for j := range objectsPerContainer { + commonVal := strconv.Itoa(i*objectsPerContainer + j) + owner := usertest.ID() + + obj := generateObjectWithCID(cnr) + obj.SetOwner(owner) + obj.SetType(objectSDK.TypeRegular) + obj.SetAttributes( + objectSDK.NewAttribute(staticAttr, staticVal), + objectSDK.NewAttribute(commonAttr, commonVal), + ) + + var groupVal string + if j == 0 { + groupVal = strconv.Itoa(i) + addAttribute(obj, groupAttr, groupVal) + } + + objs = append(objs, *obj) + exp = append(exp, object.AddressWithAttributes{ + Address: object.AddressOf(obj), + Type: objectSDK.TypeRegular, + Attributes: []string{staticVal, commonVal, groupVal, string(owner[:])}, + }) + } + } + + for _, shardNum := range []int{1, 5, 10} { + t.Run(fmt.Sprintf("shard=%d", shardNum), func(t *testing.T) { + s := testNewEngineWithShardNum(t, shardNum) + for i := range objs { + require.NoError(t, s.Put(&objs[i], nil)) + } + + for _, count := range []uint32{ + 1, + totalObjects / 10, + totalObjects / 2, + totalObjects - 1, + totalObjects, + totalObjects + 1, + } { + t.Run(fmt.Sprintf("total=%d,count=%d", totalObjects, count), func(t *testing.T) { + collected := collectListWithCursor(t, s, count, staticAttr, commonAttr, groupAttr, "$Object:ownerID") + require.ElementsMatch(t, exp, collected) + }) + } + }) + } + }) } -func sortAddresses(addrWithType []object.AddressWithType) []object.AddressWithType { +func sortAddresses(addrWithType []object.AddressWithAttributes) []object.AddressWithAttributes { sort.Slice(addrWithType, func(i, j int) bool { return addrWithType[i].Address.EncodeToString() < addrWithType[j].Address.EncodeToString() }) return addrWithType } + +func collectListWithCursor(t *testing.T, s *StorageEngine, count uint32, attrs ...string) []object.AddressWithAttributes { + var next, collected []object.AddressWithAttributes + var crs *Cursor + var err error + for { + next, crs, err = s.ListWithCursor(count, crs, attrs...) + collected = append(collected, next...) + if errors.Is(err, ErrEndOfListing) { + return collected + } + require.NoError(t, err) + } +} diff --git a/pkg/local_object_storage/metabase/iterators.go b/pkg/local_object_storage/metabase/iterators.go index 7dcb132c1f..fc4d8641e9 100644 --- a/pkg/local_object_storage/metabase/iterators.go +++ b/pkg/local_object_storage/metabase/iterators.go @@ -9,6 +9,7 @@ import ( "github.com/nspcc-dev/bbolt" islices "github.com/nspcc-dev/neofs-node/internal/slices" + objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" @@ -107,6 +108,14 @@ func fillIDTypePrefix(typPrefix []byte) { copy(typPrefix[1+objectKeySize:], object.FilterType) } +func fillIDAttributePrefix(s []byte, id oid.ID, attr string) int { + s[0] = metaPrefixIDAttr + copy(s[1:], id[:]) + copy(s[1+oid.Size:], attr) + copy(s[1+oid.Size+len(attr):], objectcore.MetaAttributeDelimiter) + return attrIDFixedLen + len(attr) +} + func (db *DB) iterateExpired(tx *bbolt.Tx, curEpoch uint64, h ExpiredObjectHandler) error { var ( expStart = make([]byte, 1+len(object.AttributeExpirationEpoch)+1+1) diff --git a/pkg/local_object_storage/metabase/list.go b/pkg/local_object_storage/metabase/list.go index ab38fa8f3a..10db14dbf7 100644 --- a/pkg/local_object_storage/metabase/list.go +++ b/pkg/local_object_storage/metabase/list.go @@ -4,6 +4,7 @@ import ( "bytes" "github.com/nspcc-dev/bbolt" + islices "github.com/nspcc-dev/neofs-node/internal/slices" objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -19,15 +20,19 @@ var ErrEndOfListing = logicerr.New("end of object listing") type Cursor struct { bucketName []byte inBucketOffset []byte + attrsPrefix []byte } // ListWithCursor lists physical objects available in metabase starting from // cursor. Includes objects of all types. Does not include inhumed objects. // Use cursor value from response for consecutive requests. // +// Optional attrs specifies attributes to include in the result. If object does +// not have requested attribute, corresponding element in the result is empty. +// // Returns ErrEndOfListing if there are no more objects to return or count // parameter set to zero. -func (db *DB) ListWithCursor(count int, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) { +func (db *DB) ListWithCursor(count int, cursor *Cursor, attrs ...string) ([]objectcore.AddressWithAttributes, *Cursor, error) { db.modeMtx.RLock() defer db.modeMtx.RUnlock() @@ -37,19 +42,19 @@ func (db *DB) ListWithCursor(count int, cursor *Cursor) ([]objectcore.AddressWit var ( err error - result = make([]objectcore.AddressWithType, 0, count) + result = make([]objectcore.AddressWithAttributes, 0, count) currEpoch = db.epochState.CurrentEpoch() ) err = db.boltDB.View(func(tx *bbolt.Tx) error { - result, cursor, err = db.listWithCursor(tx, currEpoch, result, count, cursor) + result, cursor, err = db.listWithCursor(tx, currEpoch, result, count, cursor, attrs...) return err }) return result, cursor, err } -func (db *DB) listWithCursor(tx *bbolt.Tx, currEpoch uint64, result []objectcore.AddressWithType, count int, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) { +func (db *DB) listWithCursor(tx *bbolt.Tx, currEpoch uint64, result []objectcore.AddressWithAttributes, count int, cursor *Cursor, attrs ...string) ([]objectcore.AddressWithAttributes, *Cursor, error) { threshold := cursor == nil // threshold is a flag to ignore cursor var bucketName []byte @@ -78,7 +83,7 @@ loop: if bkt != nil { copy(rawAddr, cidRaw) result, offset, cursor = selectNFromBucket(bkt, currEpoch, graveyardBkt, garbageObjectsBkt, rawAddr, containerID, - result, count, cursor, threshold) + result, count, cursor, threshold, attrs...) } bucketName = name if len(result) >= count { @@ -114,11 +119,12 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket graveyardBkt, garbageObjectsBkt *bbolt.Bucket, // cached graveyard buckets cidRaw []byte, // container ID prefix, optimization cnt cid.ID, // container ID - to []objectcore.AddressWithType, // listing result + to []objectcore.AddressWithAttributes, // listing result limit int, // stop listing at `limit` items in result cursor *Cursor, // start from cursor object threshold bool, // ignore cursor and start immediately -) ([]objectcore.AddressWithType, []byte, *Cursor) { + attrs ...string, +) ([]objectcore.AddressWithAttributes, []byte, *Cursor) { if cursor == nil { cursor = new(Cursor) } @@ -129,6 +135,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket offset []byte phyPrefix = mkFilterPhysicalPrefix() typePrefix = make([]byte, metaIDTypePrefixSize) + gotAttrs []string ) if containerMarkedGC(c) { @@ -169,10 +176,25 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket continue } + if len(attrs) > 0 { + if cursor.attrsPrefix == nil { + mx := islices.MaxLen(attrs) + cursor.attrsPrefix = make([]byte, attrIDFixedLen+mx) + } + + gotAttrs = make([]string, len(attrs)) + for i := range attrs { + n := fillIDAttributePrefix(cursor.attrsPrefix, obj, attrs[i]) + if k, _ := mCursor.Seek(cursor.attrsPrefix[:n]); bytes.HasPrefix(k, cursor.attrsPrefix[:n]) { + gotAttrs[i] = string(k[n:]) + } + } + } + var a oid.Address a.SetContainer(cnt) a.SetObject(obj) - to = append(to, objectcore.AddressWithType{Address: a, Type: objType}) + to = append(to, objectcore.AddressWithAttributes{Address: a, Type: objType, Attributes: gotAttrs}) count++ } diff --git a/pkg/local_object_storage/metabase/list_test.go b/pkg/local_object_storage/metabase/list_test.go index 584c40772a..8320af1ca6 100644 --- a/pkg/local_object_storage/metabase/list_test.go +++ b/pkg/local_object_storage/metabase/list_test.go @@ -2,7 +2,10 @@ package meta_test import ( "errors" + "fmt" + "slices" "sort" + "strconv" "testing" "github.com/nspcc-dev/bbolt" @@ -12,6 +15,7 @@ import ( objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + usertest "github.com/nspcc-dev/neofs-sdk-go/user/test" "github.com/stretchr/testify/require" ) @@ -46,7 +50,7 @@ func listWithCursorPrepareDB(b *testing.B) *meta.DB { func benchmarkListWithCursor(b *testing.B, db *meta.DB, batchSize int) { var ( - addrs []object.AddressWithType + addrs []object.AddressWithAttributes cursor *meta.Cursor err error ) @@ -66,6 +70,65 @@ func benchmarkListWithCursor(b *testing.B, db *meta.DB, batchSize int) { } } +func BenchmarkDB_ListWithCursor_Attributes(b *testing.B) { + const attributeNum = 10 + const containerNum = 10 + const objectsPerContainer = 10 + const totalObjects = containerNum * objectsPerContainer + + attrs := make([]string, attributeNum) + for i := range attrs { + attrs[i] = "attrs_" + strconv.Itoa(i) + } + + db := newDB(b) + + for range containerNum { + cnr := cidtest.ID() + for range objectsPerContainer { + obj := generateObjectWithCID(b, cnr) + + as := make([]objectSDK.Attribute, len(attrs)) + for i := range attrs { + as[i] = objectSDK.NewAttribute(attrs[i], strconv.Itoa(i)) + } + obj.SetAttributes(as...) + + require.NoError(b, db.Put(obj)) + } + } + + benchAttributes := func(b *testing.B, attrs []string) { + for _, count := range []int{ + 1, + totalObjects / 10, + totalObjects / 2, + totalObjects - 1, + totalObjects, + totalObjects + 1, + } { + b.Run(fmt.Sprintf("total=%d,count=%d", totalObjects, count), func(b *testing.B) { + for range b.N { + require.NoError(b, traverseListWithCursor(db, count, attrs...)) + } + }) + } + } + + b.Run("all hit", func(b *testing.B) { + benchAttributes(b, attrs) + }) + + b.Run("all miss", func(b *testing.B) { + other := slices.Clone(attrs) + for i := range other { + other[i] += "_" + } + + benchAttributes(b, other) + }) +} + func TestLisObjectsWithCursor(t *testing.T) { db := newDB(t) @@ -74,7 +137,7 @@ func TestLisObjectsWithCursor(t *testing.T) { total = containers * 4 // regular + ts + child + lock ) - expected := make([]object.AddressWithType, 0, total) + expected := make([]object.AddressWithAttributes, 0, total) // fill metabase with objects for range containers { @@ -85,21 +148,21 @@ func TestLisObjectsWithCursor(t *testing.T) { obj.SetType(objectSDK.TypeRegular) err := putBig(db, obj) require.NoError(t, err) - expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular}) + expected = append(expected, object.AddressWithAttributes{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular}) // add one tombstone obj = generateObjectWithCID(t, containerID) obj.SetType(objectSDK.TypeTombstone) err = putBig(db, obj) require.NoError(t, err) - expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeTombstone}) + expected = append(expected, object.AddressWithAttributes{Address: object.AddressOf(obj), Type: objectSDK.TypeTombstone}) // add one lock obj = generateObjectWithCID(t, containerID) obj.SetType(objectSDK.TypeLock) err = putBig(db, obj) require.NoError(t, err) - expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeLock}) + expected = append(expected, object.AddressWithAttributes{Address: object.AddressOf(obj), Type: objectSDK.TypeLock}) // add one inhumed (do not include into expected) obj = generateObjectWithCID(t, containerID) @@ -121,14 +184,14 @@ func TestLisObjectsWithCursor(t *testing.T) { child.SetSplitID(splitID) err = putBig(db, child) require.NoError(t, err) - expected = append(expected, object.AddressWithType{Address: object.AddressOf(child), Type: objectSDK.TypeRegular}) + expected = append(expected, object.AddressWithAttributes{Address: object.AddressOf(child), Type: objectSDK.TypeRegular}) } expected = sortAddresses(expected) t.Run("success with various count", func(t *testing.T) { for countPerReq := 1; countPerReq <= total; countPerReq++ { - got := make([]object.AddressWithType, 0, total) + got := make([]object.AddressWithAttributes, 0, total) res, cursor, err := metaListWithCursor(db, uint32(countPerReq), nil) require.NoError(t, err, "count:%d", countPerReq) @@ -157,6 +220,62 @@ func TestLisObjectsWithCursor(t *testing.T) { _, _, err := metaListWithCursor(db, 0, nil) require.ErrorIs(t, err, meta.ErrEndOfListing) }) + + t.Run("attributes", func(t *testing.T) { + const containerNum = 10 + const objectsPerContainer = 10 + const totalObjects = containerNum * objectsPerContainer + const staticAttr, staticVal = "attr_static", "val_static" + const commonAttr = "attr_common" + const groupAttr = "attr_group" + + db := newDB(t) + + var exp []object.AddressWithAttributes + for i := range containerNum { + cnr := cidtest.ID() + for j := range objectsPerContainer { + commonVal := strconv.Itoa(i*objectsPerContainer + j) + owner := usertest.ID() + + obj := generateObjectWithCID(t, cnr) + obj.SetOwner(owner) + obj.SetType(objectSDK.TypeRegular) + obj.SetAttributes( + objectSDK.NewAttribute(staticAttr, staticVal), + objectSDK.NewAttribute(commonAttr, commonVal), + ) + + var groupVal string + if j == 0 { + groupVal = strconv.Itoa(i) + addAttribute(obj, groupAttr, groupVal) + } + + require.NoError(t, db.Put(obj)) + + exp = append(exp, object.AddressWithAttributes{ + Address: object.AddressOf(obj), + Type: objectSDK.TypeRegular, + Attributes: []string{staticVal, commonVal, groupVal, string(owner[:])}, + }) + } + } + + for _, count := range []int{ + 1, + totalObjects / 10, + totalObjects / 2, + totalObjects - 1, + totalObjects, + totalObjects + 1, + } { + t.Run(fmt.Sprintf("total=%d,count=%d", totalObjects, count), func(t *testing.T) { + collected := collectListWithCursor(t, db, count, staticAttr, commonAttr, groupAttr, "$Object:ownerID") + require.ElementsMatch(t, exp, collected) + }) + } + }) } func TestAddObjectDuringListingWithCursor(t *testing.T) { @@ -209,13 +328,41 @@ func TestAddObjectDuringListingWithCursor(t *testing.T) { } } -func sortAddresses(addrWithType []object.AddressWithType) []object.AddressWithType { +func sortAddresses(addrWithType []object.AddressWithAttributes) []object.AddressWithAttributes { sort.Slice(addrWithType, func(i, j int) bool { return addrWithType[i].Address.EncodeToString() < addrWithType[j].Address.EncodeToString() }) return addrWithType } -func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]object.AddressWithType, *meta.Cursor, error) { +func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]object.AddressWithAttributes, *meta.Cursor, error) { return db.ListWithCursor(int(count), cursor) } + +func collectListWithCursor(t *testing.T, db *meta.DB, count int, attrs ...string) []object.AddressWithAttributes { + var next, collected []object.AddressWithAttributes + var crs *meta.Cursor + var err error + for { + next, crs, err = db.ListWithCursor(count, crs, attrs...) + collected = append(collected, next...) + if errors.Is(err, meta.ErrEndOfListing) { + return collected + } + require.NoError(t, err) + } +} + +func traverseListWithCursor(db *meta.DB, count int, attrs ...string) error { + var c *meta.Cursor + var err error + for { + _, c, err = db.ListWithCursor(count, c, attrs...) + if err != nil { + if errors.Is(err, meta.ErrEndOfListing) { + return nil + } + return err + } + } +} diff --git a/pkg/local_object_storage/metabase/put_test.go b/pkg/local_object_storage/metabase/put_test.go index c45fc2e4ed..92393899ac 100644 --- a/pkg/local_object_storage/metabase/put_test.go +++ b/pkg/local_object_storage/metabase/put_test.go @@ -133,7 +133,7 @@ func TestDB_Put_ObjectWithTombstone(t *testing.T) { t.Run("list with cursor", func(t *testing.T) { res, c, err := db.ListWithCursor(2, nil) require.NoError(t, err) - require.Equal(t, []object.AddressWithType{{Address: tsAddr, Type: objectSDK.TypeTombstone}}, res) + require.Equal(t, []object.AddressWithAttributes{{Address: tsAddr, Type: objectSDK.TypeTombstone}}, res) require.NotZero(t, c) _, _, err = db.ListWithCursor(1, c) @@ -216,7 +216,7 @@ func assertObjectAvailability(t *testing.T, db *meta.DB, addr oid.Address, obj o t.Run("list with cursor", func(t *testing.T) { res, c, err := db.ListWithCursor(2, nil) require.NoError(t, err) - require.Equal(t, []object.AddressWithType{{Address: addr, Type: obj.Type()}}, res) + require.Equal(t, []object.AddressWithAttributes{{Address: addr, Type: obj.Type()}}, res) require.NotZero(t, c) _, _, err = db.ListWithCursor(1, c) diff --git a/pkg/local_object_storage/shard/list.go b/pkg/local_object_storage/shard/list.go index 20e7abeb55..44e1bba8a2 100644 --- a/pkg/local_object_storage/shard/list.go +++ b/pkg/local_object_storage/shard/list.go @@ -72,14 +72,17 @@ func (s *Shard) ListContainers() ([]cid.ID, error) { // cursor. Includes regular, tombstone and storage group objects. Does not // include inhumed objects. Use cursor value from response for consecutive requests. // +// Optional attrs specifies attributes to include in the result. If object does +// not have requested attribute, corresponding element in the result is empty. +// // Returns ErrEndOfListing if there are no more objects to return or count // parameter set to zero. -func (s *Shard) ListWithCursor(count int, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) { +func (s *Shard) ListWithCursor(count int, cursor *Cursor, attrs ...string) ([]objectcore.AddressWithAttributes, *Cursor, error) { if s.GetMode().NoMetabase() { return nil, nil, ErrDegradedMode } - addrs, cursor, err := s.metaBase.ListWithCursor(count, cursor) + addrs, cursor, err := s.metaBase.ListWithCursor(count, cursor, attrs...) if err != nil { return nil, nil, fmt.Errorf("could not get list of objects: %w", err) } diff --git a/pkg/local_object_storage/shard/list_test.go b/pkg/local_object_storage/shard/list_test.go index 678c873536..000215cae0 100644 --- a/pkg/local_object_storage/shard/list_test.go +++ b/pkg/local_object_storage/shard/list_test.go @@ -1,12 +1,17 @@ package shard_test import ( + "errors" + "fmt" + "strconv" "testing" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + usertest "github.com/nspcc-dev/neofs-sdk-go/user/test" "github.com/stretchr/testify/require" ) @@ -65,3 +70,75 @@ func testShardList(t *testing.T, sh *shard.Shard) { objs[objID] = 1 } } + +func TestShard_ListWithCursor(t *testing.T) { + t.Run("attributes", func(t *testing.T) { + const containerNum = 10 + const objectsPerContainer = 10 + const totalObjects = containerNum * objectsPerContainer + const staticAttr, staticVal = "attr_static", "val_static" + const commonAttr = "attr_common" + const groupAttr = "attr_group" + + s := newShard(t, true) + + var exp []object.AddressWithAttributes + for i := range containerNum { + cnr := cidtest.ID() + for j := range objectsPerContainer { + commonVal := strconv.Itoa(i*objectsPerContainer + j) + owner := usertest.ID() + + obj := generateObjectWithCID(cnr) + obj.SetOwner(owner) + obj.SetType(objectSDK.TypeRegular) + obj.SetAttributes( + objectSDK.NewAttribute(staticAttr, staticVal), + objectSDK.NewAttribute(commonAttr, commonVal), + ) + + var groupVal string + if j == 0 { + groupVal = strconv.Itoa(i) + addAttribute(obj, groupAttr, groupVal) + } + + require.NoError(t, s.Put(obj, nil)) + + exp = append(exp, object.AddressWithAttributes{ + Address: object.AddressOf(obj), + Type: objectSDK.TypeRegular, + Attributes: []string{staticVal, commonVal, groupVal, string(owner[:])}, + }) + } + } + + for _, count := range []int{ + 1, + totalObjects / 10, + totalObjects / 2, + totalObjects - 1, + totalObjects, + totalObjects + 1, + } { + t.Run(fmt.Sprintf("total=%d,count=%d", totalObjects, count), func(t *testing.T) { + collected := collectListWithCursor(t, s, count, staticAttr, commonAttr, groupAttr, "$Object:ownerID") + require.ElementsMatch(t, exp, collected) + }) + } + }) +} + +func collectListWithCursor(t *testing.T, s *shard.Shard, count int, attrs ...string) []object.AddressWithAttributes { + var next, collected []object.AddressWithAttributes + var crs *shard.Cursor + var err error + for { + next, crs, err = s.ListWithCursor(count, crs, attrs...) + collected = append(collected, next...) + if errors.Is(err, shard.ErrEndOfListing) { + return collected + } + require.NoError(t, err) + } +} diff --git a/pkg/services/control/server/list_objects.go b/pkg/services/control/server/list_objects.go index 865a04c10c..d71e8159c3 100644 --- a/pkg/services/control/server/list_objects.go +++ b/pkg/services/control/server/list_objects.go @@ -24,7 +24,7 @@ func (s *Server) ListObjects(req *control.ListObjectsRequest, stream control.Con var ( cursor *engine.Cursor - addresses []objectcore.AddressWithType + addresses []objectcore.AddressWithAttributes ) // (Limit 4MB - 64KB for service bytes and future fields) / 89B address length = 46390 addresses can be sent const count = 46390 diff --git a/pkg/services/policer/check.go b/pkg/services/policer/check.go index f10c90bc1f..7d1bf4bca7 100644 --- a/pkg/services/policer/check.go +++ b/pkg/services/policer/check.go @@ -4,12 +4,14 @@ import ( "context" "errors" + iec "github.com/nspcc-dev/neofs-node/internal/ec" "github.com/nspcc-dev/neofs-node/pkg/core/container" objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/services/replicator" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" ) @@ -74,19 +76,26 @@ func (n nodeCache) atLeastOneHolder() bool { return false } -func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.AddressWithType) { - addr := addrWithType.Address +func (p *Policer) processObject(ctx context.Context, addrWithAttrs objectcore.AddressWithAttributes) { + addr := addrWithAttrs.Address idCnr := addr.Container() idObj := addr.Object() - nn, repRules, err := p.network.GetNodesForObject(addr) + ecp, err := iec.DecodePartInfoFromAttributes(addrWithAttrs.Attributes[0], addrWithAttrs.Attributes[1]) + if err != nil { + p.log.Error("failed to decode EC part info from attributes, skip object", + zap.Stringer("object", addr), zap.Error(err)) + return + } + + nn, repRules, ecRules, err := p.network.GetNodesForObject(addr) if err != nil { p.log.Error("could not build placement vector for object", zap.Stringer("cid", idCnr), zap.Error(err), ) if container.IsErrNotFound(err) { - err = p.localStorage.Delete(addrWithType.Address) + err = p.localStorage.Delete(addrWithAttrs.Address) if err != nil { p.log.Error("could not inhume object with missing container", zap.Stringer("cid", idCnr), @@ -98,12 +107,30 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add return } + if ecp.RuleIndex >= 0 { + if len(ecRules) > 0 { + p.processECPart(ctx, addr, ecp, ecRules, nn[len(repRules):]) + return + } + // TODO: forbid to PUT such objects and drop this one? + p.log.Info("object with EC attributes in container without EC rules detected, process according to REP rules", + zap.Stringer("object", addr), zap.Int("ruleIdx", ecp.RuleIndex), zap.Int("partIdx", ecp.Index)) + } else if len(ecRules) > 0 && len(repRules) == 0 { + p.log.Info("object with lacking EC attributes detected, deleting", + zap.Stringer("object", addr)) + if err := p.localStorage.Delete(addr); err != nil { + p.log.Error("failed to delete local object with lacking EC attributes", + zap.Stringer("object", addr), zap.Error(err)) + } + return + } + c := &processPlacementContext{ - object: addrWithType, + object: addrWithAttrs, checkedNodes: newNodeCache(), } - for i := range nn { + for i := range repRules { select { case <-ctx.Done(): return @@ -158,12 +185,7 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add ) } - err = p.localStorage.Delete(addr) - if err != nil { - p.log.Warn("could not inhume mark redundant copy as garbage", - zap.Error(err), - ) - } + p.dropRedundantLocalObject(addr) } } @@ -178,16 +200,14 @@ type processPlacementContext struct { needLocalCopy bool // descriptor of the object for which the policy is being checked - object objectcore.AddressWithType + object objectcore.AddressWithAttributes // caches nodes which has been already processed in previous iterations checkedNodes *nodeCache } func (p *Policer) processNodes(ctx context.Context, plc *processPlacementContext, nodes []netmap.NodeInfo, shortage uint32) { - p.cfg.mtx.RLock() - headTimeout := p.headTimeout - p.cfg.mtx.RUnlock() + headTimeout := p.getHeadTimeout() // Number of copies that are stored on maintenance nodes. var uncheckedCopies int @@ -280,12 +300,7 @@ func (p *Policer) processNodes(ctx context.Context, plc *processPlacementContext zap.Uint32("shortage", shortage), ) - var task replicator.Task - task.SetObjectAddress(plc.object.Address) - task.SetNodes(candidates) - task.SetCopiesNumber(shortage) - - p.replicator.HandleTask(ctx, task, plc.checkedNodes) + p.tryToReplicate(ctx, plc.object.Address, shortage, candidates, plc.checkedNodes) } else if uncheckedCopies > 0 { // If we have more copies than needed, but some of them are from the maintenance nodes, // save the local copy. @@ -294,3 +309,20 @@ func (p *Policer) processNodes(ctx context.Context, plc *processPlacementContext zap.Int("count", uncheckedCopies)) } } + +func (p *Policer) dropRedundantLocalObject(addr oid.Address) { + err := p.localStorage.Delete(addr) + if err != nil { + p.log.Warn("could not inhume mark redundant copy as garbage", + zap.Error(err)) + } +} + +func (p *Policer) tryToReplicate(ctx context.Context, addr oid.Address, shortage uint32, candidates []netmap.NodeInfo, res replicator.TaskResult) { + var task replicator.Task + task.SetObjectAddress(addr) + task.SetNodes(candidates) + task.SetCopiesNumber(shortage) + + p.replicator.HandleTask(ctx, task, res) +} diff --git a/pkg/services/policer/ec.go b/pkg/services/policer/ec.go new file mode 100644 index 0000000000..38411be3fd --- /dev/null +++ b/pkg/services/policer/ec.go @@ -0,0 +1,135 @@ +package policer + +import ( + "context" + "errors" + "slices" + "time" + + iec "github.com/nspcc-dev/neofs-node/internal/ec" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + "github.com/nspcc-dev/neofs-sdk-go/netmap" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.uber.org/zap" +) + +func (p *Policer) processECPart(ctx context.Context, addr oid.Address, pi iec.PartInfo, ecRules []iec.Rule, nodeLists [][]netmap.NodeInfo) { + if pi.RuleIndex >= len(ecRules) { + p.log.Warn("local object with invalid EC rule index detected, deleting", + zap.Stringer("object", addr), zap.Int("ruleIdx", pi.RuleIndex), zap.Int("totalRules", len(ecRules))) + if err := p.localStorage.Delete(addr); err != nil { + p.log.Error("failed to delete local object with invalid EC rule index", + zap.Stringer("object", addr), zap.Error(err)) + } + return + } + + rule := ecRules[pi.RuleIndex] + if pi.Index >= int(rule.DataPartNum+rule.ParityPartNum) { + p.log.Warn("local object with invalid EC part index detected, deleting", + zap.Stringer("object", addr), zap.Stringer("rule", rule), zap.Int("partIdx", pi.Index)) + if err := p.localStorage.Delete(addr); err != nil { + p.log.Error("failed to delete local object with invalid EC part index", + zap.Stringer("object", addr), zap.Error(err)) + } + return + } + + p.processECPartByRule(ctx, rule, addr, pi.Index, nodeLists[pi.RuleIndex]) +} + +func (p *Policer) processECPartByRule(ctx context.Context, rule iec.Rule, addr oid.Address, partIdx int, nodes []netmap.NodeInfo) { + var candidates []netmap.NodeInfo + var maintenance bool + headTimeout := time.Duration(-1) + + for i := range iec.NodeSequenceForPart(partIdx, int(rule.DataPartNum+rule.ParityPartNum), len(nodes)) { + if p.network.IsLocalNodePublicKey(nodes[i].PublicKey()) { + if len(candidates) == 0 { + p.log.Debug("local node is optimal for EC part, hold", + zap.Stringer("cid", addr.Container()), zap.Stringer("partOID", addr.Object()), + zap.Stringer("rule", rule), zap.Int("partIdx", partIdx)) + return + } + break + } + + if headTimeout < 0 { + headTimeout = p.getHeadTimeout() + } + + callCtx, cancel := context.WithTimeout(ctx, headTimeout) + _, err := p.apiConns.headObject(callCtx, nodes[i], addr) + cancel() + + if err == nil { + p.log.Info("EC part header successfully received from more optimal node, drop", + zap.Stringer("cid", addr.Container()), zap.Stringer("partOID", addr.Object()), + zap.Stringer("rule", rule), zap.Int("partIdx", partIdx), + zap.Strings("node", slices.Collect(nodes[i].NetworkEndpoints()))) + p.dropRedundantLocalObject(addr) + return + } + + switch { + default: + p.log.Info("failed to receive EC part header from more optimal node, exclude", + zap.Stringer("cid", addr.Container()), zap.Stringer("partOID", addr.Object()), + zap.Stringer("rule", rule), zap.Int("partIdx", partIdx), zap.Error(err)) // error includes network addresses + case errors.Is(err, apistatus.ErrNodeUnderMaintenance): // same as for REP rules + p.log.Info("failed to receive EC part header from more optimal node due to its maintenance, continue", + zap.Stringer("cid", addr.Container()), zap.Stringer("partOID", addr.Object()), + zap.Stringer("rule", rule), zap.Int("partIdx", partIdx), + zap.Strings("node", slices.Collect(nodes[i].NetworkEndpoints()))) + maintenance = true + case errors.Is(err, apistatus.ErrObjectNotFound): + candidates = append(candidates, nodes[i]) + } + } + + if maintenance { + // same as for REP rules + p.log.Info("more optimal node for EC part is under maintenance, hold", + zap.Stringer("cid", addr.Container()), zap.Stringer("partOID", addr.Object()), + zap.Stringer("rule", rule), zap.Int("partIdx", partIdx)) + return + } + + if len(candidates) == 0 { + p.log.Info("local node is suboptimal for EC part but now there are no other candidates, hold", + zap.Stringer("cid", addr.Container()), zap.Stringer("partOID", addr.Object()), + zap.Stringer("rule", rule), zap.Int("partIdx", partIdx)) + return + } + + p.log.Info("local node is suboptimal for EC part, moving to more optimal node...", + zap.Stringer("cid", addr.Container()), zap.Stringer("partOID", addr.Object()), + zap.Stringer("rule", rule), zap.Int("partIdx", partIdx), zap.Int("candidateNum", len(candidates))) + + var repRes singleReplication + p.tryToReplicate(ctx, addr, 1, candidates, &repRes) + if repRes.done { + p.log.Info("EC part successfully moved to more optimal node, drop", + zap.Stringer("cid", addr.Container()), zap.Stringer("partOID", addr.Object()), + zap.Stringer("rule", rule), zap.Int("partIdx", partIdx), zap.Strings("newHolder", repRes.netAddresses)) + p.dropRedundantLocalObject(addr) + return + } + + p.log.Info("failed to move EC part to more optimal node, hold", + zap.Stringer("cid", addr.Container()), zap.Stringer("partOID", addr.Object()), + zap.Stringer("rule", rule), zap.Int("partIdx", partIdx), zap.Int("candidateNum", len(candidates))) +} + +type singleReplication struct { + done bool + netAddresses []string +} + +func (x *singleReplication) SubmitSuccessfulReplication(node netmap.NodeInfo) { + if x.done { + panic("recall") + } + x.done = true + x.netAddresses = slices.Collect(node.NetworkEndpoints()) +} diff --git a/pkg/services/policer/policer.go b/pkg/services/policer/policer.go index 0554831f46..246d6247e1 100644 --- a/pkg/services/policer/policer.go +++ b/pkg/services/policer/policer.go @@ -5,6 +5,7 @@ import ( "sync" "time" + iec "github.com/nspcc-dev/neofs-node/internal/ec" objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" @@ -29,7 +30,7 @@ type replicatorIface interface { // interface of [engine.StorageEngine] used by [Policer] for overriding in tests. type localStorage interface { - ListWithCursor(uint32, *engine.Cursor) ([]objectcore.AddressWithType, *engine.Cursor, error) + ListWithCursor(uint32, *engine.Cursor, ...string) ([]objectcore.AddressWithAttributes, *engine.Cursor, error) Delete(oid.Address) error } @@ -82,17 +83,27 @@ type Network interface { IsLocalNodeInNetmap() bool // GetNodesForObject returns descriptors of storage nodes matching storage // policy of the referenced object for now. Nodes are identified by their public - // keys and can be repeated in different lists. The second value specifies the - // number (N) of primary object holders for each list (L) so: + // keys and can be repeated in different lists. First len(repRules) lists relate + // to replication, the rest len(ecRules) - to EC. + // + // repRules specifies replication rules: the number (N) of primary object + // holders for each list (L) so: // - size of each L >= N; // - first N nodes of each L are primary data holders while others (if any) // are backup. // - // GetContainerNodes callers do not change resulting slices and their elements. + // ecRules specifies erasure coding rules for all objects in the container: each + // object is split into [iec.Rule.DataPartNum] data and [iec.Rule.ParityPartNum] + // parity parts. Each i-th part most expected to be located on SN described by + // i-th list element. In general, list len is a multiple of CBF, and the part is + // expected on N with index M*i, M in [0,CBF). Then part is expected on SN + // for i+1-th part and so on. + // + // GetNodesForObject callers do not change resulting slices and their elements. // // Returns [apistatus.ErrContainerNotFound] if requested container is missing in // the network. - GetNodesForObject(oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) + GetNodesForObject(oid.Address) (nodeLists [][]netmapsdk.NodeInfo, repRules []uint, ecRules []iec.Rule, err error) // IsLocalNodePublicKey checks whether given binary-encoded public key is // assigned in the network map to a local storage node running [Policer]. IsLocalNodePublicKey([]byte) bool @@ -150,6 +161,13 @@ func New(opts ...Option) *Policer { } } +func (p *Policer) getHeadTimeout() time.Duration { + p.cfg.mtx.RLock() + headTimeout := p.headTimeout + p.cfg.mtx.RUnlock() + return headTimeout +} + // WithHeadTimeout returns option to set Head timeout of Policer. func WithHeadTimeout(v time.Duration) Option { return func(c *cfg) { diff --git a/pkg/services/policer/policer_test.go b/pkg/services/policer/policer_test.go index 501e688a2c..0369d28453 100644 --- a/pkg/services/policer/policer_test.go +++ b/pkg/services/policer/policer_test.go @@ -8,10 +8,12 @@ import ( "errors" "maps" "slices" + "strconv" "sync" "testing" "time" + iec "github.com/nspcc-dev/neofs-node/internal/ec" islices "github.com/nspcc-dev/neofs-node/internal/slices" "github.com/nspcc-dev/neofs-node/internal/testutil" objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" @@ -324,12 +326,12 @@ func testRepCheck(t *testing.T, rep uint, nodes []netmap.NodeInfo, localIdx int, objAddr := oid.NewAddress(cnr, objID) localNode := newTestLocalNode() - localNode.objList = []objectcore.AddressWithType{ - {Address: objAddr, Type: object.TypeRegular}, + localNode.objList = []objectcore.AddressWithAttributes{ + {Address: objAddr, Type: object.TypeRegular, Attributes: make([]string, 2)}, } mockNet := newMockNetwork() - mockNet.setObjectNodesResult(cnr, objID, slices.Clone(nodes), rep) + mockNet.setObjectNodesRepResult(cnr, objID, slices.Clone(nodes), rep) if localIdx >= 0 { mockNet.pubKey = nodes[localIdx].PublicKey() mockNet.inNetmap = true @@ -395,10 +397,370 @@ func testRepCheck(t *testing.T, rep uint, nodes []netmap.NodeInfo, localIdx int, } } +func TestPolicer_Run_EC(t *testing.T) { + const defaultCBF = 3 + cnr := cidtest.ID() + partOID := oidtest.ID() + rule := iec.Rule{ + DataPartNum: 6, + ParityPartNum: 3, + } + localObj := objectcore.AddressWithAttributes{ + Address: oid.NewAddress(cnr, partOID), + Type: object.TypeRegular, + Attributes: []string{"0", "5"}, + } + + nodes := testutil.Nodes(int(rule.DataPartNum+rule.ParityPartNum) * defaultCBF) + allOK := islices.RepeatElement(len(nodes), error(nil)) + all404 := islices.RepeatElement(len(nodes), error(apistatus.ErrObjectNotFound)) + optimalOrder := []int{ + 5, 14, 23, + 6, 15, 24, + 7, 16, 25, + 8, 17, 26, + 0, 9, 18, + 1, 10, 19, + 2, 11, 20, + 3, 12, 21, + 4, 13, 22, + } + + t.Run("invalid EC attributes in local object", func(t *testing.T) { + localObj := localObj + + for _, tc := range []struct { + name string + ruleIdx string + partIdx string + err string + }{ + {name: "non-int rule index", ruleIdx: "not_an_int", partIdx: "34", + err: `decode rule index: strconv.ParseUint: parsing "not_an_int": invalid syntax`}, + {name: "negative rule index", ruleIdx: "-12", partIdx: "34", + err: `decode rule index: strconv.ParseUint: parsing "-12": invalid syntax`}, + {name: "rule index overflow", ruleIdx: "256", partIdx: "34", + err: "rule index out of range"}, + {name: "non-int part index", ruleIdx: "12", partIdx: "not_an_int", + err: `decode part index: strconv.ParseUint: parsing "not_an_int": invalid syntax`}, + {name: "negative part index", ruleIdx: "12", partIdx: "-34", + err: `decode part index: strconv.ParseUint: parsing "-34": invalid syntax`}, + {name: "part index overflow", ruleIdx: "12", partIdx: "256", + err: "part index out of range"}, + {name: "rule index without part index", ruleIdx: "12", partIdx: "", + err: "rule index is set, part index is not"}, + {name: "part index without rule index", ruleIdx: "", partIdx: "34", + err: "part index is set, rule index is not"}, + } { + t.Run(tc.name, func(t *testing.T) { + localObj.Attributes = []string{tc.ruleIdx, tc.partIdx} + + logBuf := testECCheck(t, rule, localObj, nodes, 0, allOK, false, nil, false) + logBuf.AssertContains(testutil.LogEntry{ + Level: zap.ErrorLevel, Message: "failed to decode EC part info from attributes, skip object", + Fields: map[string]any{"component": "Object Policer", "object": localObj.Address.String(), "error": tc.err}, + }) + }) + } + }) + + t.Run("EC part in non-EC container", func(t *testing.T) { + mockNet := newMockNetwork() + mockNet.setObjectNodesRepResult(localObj.Address.Container(), localObj.Address.Object(), nodes, 3) + + logBuf := testECCheckWithNetwork(t, mockNet, localObj, nodes, 0, allOK, false, nil, false) + logBuf.AssertContains(testutil.LogEntry{ + Level: zap.InfoLevel, Message: "object with EC attributes in container without EC rules detected, process according to REP rules", + Fields: map[string]any{"component": "Object Policer", "object": localObj.Address.String(), + "ruleIdx": json.Number(localObj.Attributes[0]), "partIdx": json.Number(localObj.Attributes[1])}, + }) + }) + + t.Run("part violates EC policy", func(t *testing.T) { + t.Run("no EC attributes", func(t *testing.T) { + localObj := localObj + localObj.Attributes = []string{"", ""} + + logBuf := testECCheck(t, rule, localObj, nodes, 0, allOK, true, nil, false) + logBuf.AssertContains(testutil.LogEntry{ + Level: zap.InfoLevel, Message: "object with lacking EC attributes detected, deleting", + Fields: map[string]any{"component": "Object Policer", "object": localObj.Address.String()}, + }) + }) + t.Run("too big rule index", func(t *testing.T) { + localObj := localObj + localObj.Attributes = slices.Clone(localObj.Attributes) + localObj.Attributes[0] = "1" + + logBuf := testECCheck(t, rule, localObj, nodes, 0, allOK, true, nil, false) + logBuf.AssertContains(testutil.LogEntry{ + Level: zap.WarnLevel, Message: "local object with invalid EC rule index detected, deleting", + Fields: map[string]any{"component": "Object Policer", "object": localObj.Address.String(), + "ruleIdx": json.Number("1"), "totalRules": json.Number("1")}, + }) + }) + t.Run("too big part index", func(t *testing.T) { + rule := iec.Rule{DataPartNum: 17, ParityPartNum: 4} + localObj := localObj + localObj.Attributes = slices.Clone(localObj.Attributes) + localObj.Attributes[1] = "21" + + logBuf := testECCheck(t, rule, localObj, nodes, 0, allOK, true, nil, false) + logBuf.AssertContains(testutil.LogEntry{ + Level: zap.WarnLevel, Message: "local object with invalid EC part index detected, deleting", + Fields: map[string]any{"component": "Object Policer", "object": localObj.Address.String(), "rule": "17/4", + "partIdx": json.Number("21")}, + }) + }) + }) + + t.Run("local node is optimal", func(t *testing.T) { + logBuf := testECCheck(t, rule, localObj, nodes, 5, all404, false, nil, false) + logBuf.AssertContains(testutil.LogEntry{ + Level: zap.DebugLevel, Message: "local node is optimal for EC part, hold", + Fields: map[string]any{"component": "Object Policer", "cid": cnr.String(), "partOID": partOID.String(), + "rule": "6/3", "partIdx": json.Number("5")}, + }) + }) + + t.Run("found on more optimal node", func(t *testing.T) { + errs := slices.Clone(all404) + errs[5] = nil + + logBuf := testECCheck(t, rule, localObj, nodes, 6, errs, true, nil, false) + logBuf.AssertContains(testutil.LogEntry{ + Level: zap.InfoLevel, Message: "EC part header successfully received from more optimal node, drop", + Fields: map[string]any{"component": "Object Policer", "cid": cnr.String(), "partOID": partOID.String(), + "rule": "6/3", "partIdx": json.Number("5"), "node": []any{"localhost:10010", "localhost:10011"}}, + }) + }) + + t.Run("not found on more optimal node", func(t *testing.T) { + candidates := islices.CollectIndex(nodes, optimalOrder[:len(optimalOrder)-1]...) + + t.Run("move failure", func(t *testing.T) { + logBuf := testECCheck(t, rule, localObj, nodes, optimalOrder[len(optimalOrder)-1], all404, false, candidates, false) + logBuf.AssertContains(testutil.LogEntry{ + Level: zap.InfoLevel, Message: "local node is suboptimal for EC part, moving to more optimal node...", + Fields: map[string]any{"component": "Object Policer", "cid": cnr.String(), "partOID": partOID.String(), + "rule": "6/3", "partIdx": json.Number("5"), "candidateNum": json.Number("26")}, + }) + logBuf.AssertContains(testutil.LogEntry{ + Level: zap.InfoLevel, Message: "failed to move EC part to more optimal node, hold", + Fields: map[string]any{"component": "Object Policer", "cid": cnr.String(), "partOID": partOID.String(), + "rule": "6/3", "partIdx": json.Number("5"), "candidateNum": json.Number("26")}, + }) + }) + + logBuf := testECCheck(t, rule, localObj, nodes, optimalOrder[len(optimalOrder)-1], all404, true, candidates, true) + logBuf.AssertContains(testutil.LogEntry{ + Level: zap.InfoLevel, Message: "local node is suboptimal for EC part, moving to more optimal node...", + Fields: map[string]any{"component": "Object Policer", "cid": cnr.String(), "partOID": partOID.String(), "rule": "6/3", + "partIdx": json.Number("5"), "candidateNum": json.Number("26")}, + }) + logBuf.AssertContains(testutil.LogEntry{ + Level: zap.InfoLevel, Message: "EC part successfully moved to more optimal node, drop", + Fields: map[string]any{"component": "Object Policer", "cid": cnr.String(), "partOID": partOID.String(), "rule": "6/3", + "partIdx": json.Number("5"), "newHolder": []any{"localhost:10010", "localhost:10011"}}, + }) + }) + + t.Run("maintenance", func(t *testing.T) { + errs := slices.Clone(all404) + errs[optimalOrder[len(optimalOrder)-3]] = apistatus.ErrNodeUnderMaintenance + + t.Run("moved", func(t *testing.T) { + errs := slices.Clone(errs) + errs[optimalOrder[len(optimalOrder)-2]] = nil + + logBuf := testECCheck(t, rule, localObj, nodes, optimalOrder[len(optimalOrder)-1], errs, true, nil, false) + logBuf.AssertContains(testutil.LogEntry{ + Level: zap.InfoLevel, Message: "failed to receive EC part header from more optimal node due to its maintenance, continue", + Fields: map[string]any{"component": "Object Policer", "cid": cnr.String(), "partOID": partOID.String(), + "rule": "6/3", "partIdx": json.Number("5"), "node": []any{"localhost:10008", "localhost:10009"}}, + }) + }) + + logBuf := testECCheck(t, rule, localObj, nodes, optimalOrder[len(optimalOrder)-1], errs, false, nil, false) + logBuf.AssertContains(testutil.LogEntry{ + Level: zap.InfoLevel, Message: "failed to receive EC part header from more optimal node due to its maintenance, continue", + Fields: map[string]any{"component": "Object Policer", "cid": cnr.String(), "partOID": partOID.String(), + "rule": "6/3", "partIdx": json.Number("5"), "node": []any{"localhost:10008", "localhost:10009"}}, + }) + }) + + t.Run("various errors", func(t *testing.T) { + otherErrs := []error{ + errors.New("some error"), + apistatus.ErrServerInternal, + apistatus.ErrWrongMagicNumber, + apistatus.ErrSignatureVerification, + apistatus.ErrObjectAccessDenied, + apistatus.ErrObjectAlreadyRemoved, + apistatus.ErrContainerNotFound, + apistatus.ErrSessionTokenExpired, + } + + errs := slices.Clone(all404) + for i := range otherErrs { + errs[optimalOrder[i]] = otherErrs[i] + } + + checkErrsLog := func(logBuf *testutil.LogBuffer) { + for i := range otherErrs { + logBuf.AssertContains(testutil.LogEntry{ + Level: zap.InfoLevel, Message: "failed to receive EC part header from more optimal node, exclude", + Fields: map[string]any{"component": "Object Policer", "cid": cnr.String(), "partOID": partOID.String(), "rule": "6/3", + "partIdx": json.Number("5"), "error": otherErrs[i].Error()}, + }) + } + } + + holderField := []any{"localhost:10050", "localhost:10051"} + + testWithInContainer := func(t *testing.T, inCnr bool) { + candidates := islices.CollectIndex(nodes, optimalOrder[len(otherErrs):]...) + localIdx := -1 + candidateNum := len(nodes) - len(otherErrs) + if inCnr { + localIdx = optimalOrder[len(optimalOrder)-1] + candidates = candidates[:len(candidates)-1] + candidateNum-- + } + + candidateNumField := json.Number(strconv.Itoa(candidateNum)) + + t.Run("drop", func(t *testing.T) { + errs := slices.Clone(errs) + errs[optimalOrder[len(otherErrs)]] = nil + + logBuf := testECCheck(t, rule, localObj, nodes, localIdx, errs, true, nil, false) + checkErrsLog(logBuf) + logBuf.AssertContains(testutil.LogEntry{ + Level: zap.InfoLevel, Message: "EC part header successfully received from more optimal node, drop", + Fields: map[string]any{"component": "Object Policer", "cid": cnr.String(), "partOID": partOID.String(), + "rule": "6/3", "partIdx": json.Number("5"), "node": holderField}, + }) + }) + + t.Run("move failure", func(t *testing.T) { + logBuf := testECCheck(t, rule, localObj, nodes, localIdx, errs, false, candidates, false) + checkErrsLog(logBuf) + logBuf.AssertContains(testutil.LogEntry{ + Level: zap.InfoLevel, Message: "local node is suboptimal for EC part, moving to more optimal node...", + Fields: map[string]any{"component": "Object Policer", "cid": cnr.String(), "partOID": partOID.String(), + "rule": "6/3", "partIdx": json.Number("5"), "candidateNum": candidateNumField}, + }) + logBuf.AssertContains(testutil.LogEntry{ + Level: zap.InfoLevel, Message: "failed to move EC part to more optimal node, hold", + Fields: map[string]any{"component": "Object Policer", "cid": cnr.String(), "partOID": partOID.String(), + "rule": "6/3", "partIdx": json.Number("5"), "candidateNum": candidateNumField}, + }) + }) + + logBuf := testECCheck(t, rule, localObj, nodes, localIdx, errs, true, candidates, true) + checkErrsLog(logBuf) + logBuf.AssertContains(testutil.LogEntry{ + Level: zap.InfoLevel, Message: "local node is suboptimal for EC part, moving to more optimal node...", + Fields: map[string]any{"component": "Object Policer", "cid": cnr.String(), "partOID": partOID.String(), + "rule": "6/3", "partIdx": json.Number("5"), "candidateNum": candidateNumField}, + }) + logBuf.AssertContains(testutil.LogEntry{ + Level: zap.InfoLevel, Message: "EC part successfully moved to more optimal node, drop", + Fields: map[string]any{"component": "Object Policer", "cid": cnr.String(), "partOID": partOID.String(), + "rule": "6/3", "partIdx": json.Number("5"), "newHolder": holderField}, + }) + } + + t.Run("in container", func(t *testing.T) { + testWithInContainer(t, true) + }) + t.Run("out container", func(t *testing.T) { + testWithInContainer(t, false) + }) + }) +} + +func testECCheck(t *testing.T, rule iec.Rule, localObj objectcore.AddressWithAttributes, nodes []netmap.NodeInfo, localIdx int, headErrs []error, expRedundant bool, expCandidates []netmap.NodeInfo, repSuccess bool) *testutil.LogBuffer { + mockNet := newMockNetwork() + mockNet.setObjectNodesECResult(localObj.Address.Container(), localObj.Address.Object(), slices.Clone(nodes), rule) + if localIdx >= 0 { + mockNet.pubKey = nodes[localIdx].PublicKey() + } + + return testECCheckWithNetwork(t, mockNet, localObj, nodes, localIdx, headErrs, expRedundant, expCandidates, repSuccess) +} + +func testECCheckWithNetwork(t *testing.T, mockNet *mockNetwork, localObj objectcore.AddressWithAttributes, nodes []netmap.NodeInfo, + localIdx int, headErrs []error, expRedundant bool, expCandidates []netmap.NodeInfo, repSuccess bool) *testutil.LogBuffer { + require.Equal(t, len(nodes), len(headErrs)) + + wp, err := ants.NewPool(100) + require.NoError(t, err) + + localNode := newTestLocalNode() + localNode.objList = []objectcore.AddressWithAttributes{localObj} + + r := newTestReplicator(t) + r.success = repSuccess + + conns := newMockAPIConnections() + for i := range nodes { + if i != localIdx { + conns.setHeadResult(nodes[i], localObj.Address, headErrs[i]) + } + } + + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + p := New( + WithPool(wp), + WithReplicationCooldown(time.Hour), // any huge time to cancel process repeat + WithNodeLoader(nopNodeLoader{}), + WithNetwork(mockNet), + WithLogger(l), + ) + p.localStorage = localNode + p.apiConns = conns + p.replicator = r + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + go p.Run(ctx) + + repTaskSubmitted := func() bool { + select { + case <-r.gotTaskCh: + return true + default: + return false + } + } + if len(expCandidates) > 0 { + require.Eventually(t, repTaskSubmitted, 3*time.Second, 30*time.Millisecond) + + var exp replicator.Task + exp.SetObjectAddress(localObj.Address) + exp.SetCopiesNumber(1) + exp.SetNodes(expCandidates) + require.Equal(t, exp, r.task) + } else { + require.Never(t, repTaskSubmitted, 3*time.Second, 30*time.Millisecond) + } + + if expRedundant { + require.Equal(t, []oid.Address{localObj.Address}, localNode.deletedObjects()) + } else { + require.Empty(t, localNode.deletedObjects()) + } + + return lb +} + type testReplicator struct { t *testing.T task replicator.Task gotTaskCh chan struct{} + success bool } func newTestReplicator(t *testing.T) *testReplicator { @@ -412,12 +774,18 @@ func (x *testReplicator) HandleTask(ctx context.Context, task replicator.Task, r require.NotNil(x.t, ctx) require.NotNil(x.t, r) + nodes := task.Nodes() + require.NotEmpty(x.t, nodes) + if x.success { + r.SubmitSuccessfulReplication(nodes[0]) + } + x.task = task close(x.gotTaskCh) } type testLocalNode struct { - objList []objectcore.AddressWithType + objList []objectcore.AddressWithAttributes delMtx sync.RWMutex del map[oid.Address]struct{} @@ -451,7 +819,7 @@ func (x *mockNetwork) IsLocalNodeInNetmap() bool { return x.inNetmap } -func (x *testLocalNode) ListWithCursor(uint32, *engine.Cursor) ([]objectcore.AddressWithType, *engine.Cursor, error) { +func (x *testLocalNode) ListWithCursor(uint32, *engine.Cursor, ...string) ([]objectcore.AddressWithAttributes, *engine.Cursor, error) { return x.objList, nil, nil } @@ -475,8 +843,9 @@ type getNodesKey struct { } type getNodesValue struct { - nodes []netmap.NodeInfo - rep uint + nodes []netmap.NodeInfo + repRules []uint + ecRules []iec.Rule } func newGetNodesKey(cnr cid.ID, obj oid.ID) getNodesKey { @@ -486,20 +855,27 @@ func newGetNodesKey(cnr cid.ID, obj oid.ID) getNodesKey { } } -func (x *mockNetwork) setObjectNodesResult(cnr cid.ID, obj oid.ID, nodes []netmap.NodeInfo, rep uint) { +func (x *mockNetwork) setObjectNodesRepResult(cnr cid.ID, obj oid.ID, nodes []netmap.NodeInfo, rep uint) { + x.getNodes[newGetNodesKey(cnr, obj)] = getNodesValue{ + nodes: nodes, + repRules: []uint{rep}, + } +} + +func (x *mockNetwork) setObjectNodesECResult(cnr cid.ID, obj oid.ID, nodes []netmap.NodeInfo, rule iec.Rule) { x.getNodes[newGetNodesKey(cnr, obj)] = getNodesValue{ - nodes: nodes, - rep: rep, + nodes: nodes, + ecRules: []iec.Rule{rule}, } } -func (x *mockNetwork) GetNodesForObject(addr oid.Address) ([][]netmap.NodeInfo, []uint, error) { +func (x *mockNetwork) GetNodesForObject(addr oid.Address) ([][]netmap.NodeInfo, []uint, []iec.Rule, error) { v, ok := x.getNodes[newGetNodesKey(addr.Container(), addr.Object())] if !ok { - return nil, nil, errors.New("[test] unexpected policy requested") + return nil, nil, nil, errors.New("[test] unexpected policy requested") } - return [][]netmap.NodeInfo{v.nodes}, []uint{v.rep}, nil + return [][]netmap.NodeInfo{v.nodes}, v.repRules, v.ecRules, nil } type nopNodeLoader struct{} diff --git a/pkg/services/policer/process.go b/pkg/services/policer/process.go index ac675306e1..8425561bc7 100644 --- a/pkg/services/policer/process.go +++ b/pkg/services/policer/process.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + iec "github.com/nspcc-dev/neofs-node/internal/ec" objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "go.uber.org/zap" @@ -27,7 +28,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) { p.cfg.mtx.RUnlock() var ( - addrs []objectcore.AddressWithType + addrs []objectcore.AddressWithAttributes cursor *engine.Cursor err error ) @@ -41,7 +42,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) { default: } - addrs, cursor, err = p.localStorage.ListWithCursor(batchSize, cursor) + addrs, cursor, err = p.localStorage.ListWithCursor(batchSize, cursor, iec.AttributeRuleIdx, iec.AttributePartIdx) if err != nil { if errors.Is(err, engine.ErrEndOfListing) { time.Sleep(time.Second) // finished whole cycle, sleep a bit diff --git a/pkg/services/replicator/task.go b/pkg/services/replicator/task.go index 31347362d8..af2e475e07 100644 --- a/pkg/services/replicator/task.go +++ b/pkg/services/replicator/task.go @@ -36,3 +36,8 @@ func (t *Task) SetObject(obj *objectSDK.Object) { func (t *Task) SetNodes(v []netmap.NodeInfo) { t.nodes = v } + +// Nodes returns a list of potential object holders. +func (t Task) Nodes() []netmap.NodeInfo { + return t.nodes +}