Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/neofs-lens/internal/storage/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func listFunc(cmd *cobra.Command, _ []string) error {
defer storage.Close()

var (
addrs []objectcore.AddressWithType
addrs []objectcore.AddressWithAttributes
cursor *engine.Cursor
)
for {
Expand Down
2 changes: 1 addition & 1 deletion cmd/neofs-lens/internal/storage/sanity.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func sanityCheck(cmd *cobra.Command, _ []string) error {

func checkShard(cmd *cobra.Command, sh storageShard) (int, error) {
var (
addrs []objectcore.AddressWithType
addrs []objectcore.AddressWithAttributes
cursor *meta.Cursor
err error
objectsChecked int
Expand Down
23 changes: 2 additions & 21 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func initObjectService(c *cfg) {
policer.WithMaxCapacity(c.appCfg.Policer.MaxWorkers),
policer.WithPool(c.cfgObject.pool.replication),
policer.WithNodeLoader(c),
policer.WithNetwork(noEC{c}),
policer.WithNetwork(c),
policer.WithReplicationCooldown(c.appCfg.Policer.ReplicationCooldown),
policer.WithObjectBatchSize(c.appCfg.Policer.ObjectBatchSize),
)
Expand Down Expand Up @@ -695,32 +695,13 @@ func (o objectSource) SearchOne(ctx context.Context, cnr cid.ID, filters objectS
// IsLocalNodePublicKey implements [getsvc.NeoFSNetwork].
func (c *cfg) IsLocalNodePublicKey(b []byte) bool { return c.IsLocalKey(b) }

// temporary wrapper until [getsvc.NeoFSNetwork] and [policer.Network]
// interfaces converge.
type noEC struct {
*cfg
}

// GetNodesForObject reads storage policy of the referenced container from the
// underlying container storage, reads network map at the specified epoch from
// the underlying storage, applies the storage policy to it and returns sorted
// lists of selected storage nodes along with the policy rules.
//
// Resulting slices must not be changed.
//
// GetNodesForObject implements [policer.Network].
func (x noEC) GetNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) {
nodeLists, repRules, _, err := x.cfg.GetNodesForObject(addr)
return nodeLists[:len(repRules)], repRules, err
}

// GetNodesForObject reads storage policy of the referenced container from the
// underlying container storage, reads network map at the specified epoch from
// the underlying storage, applies the storage policy to it and returns sorted
// lists of selected storage nodes along with the per-list numbers of primary
// object holders. Resulting slices must not be changed.
//
// GetNodesForObject implements [getsvc.NeoFSNetwork].
// GetNodesForObject implements [getsvc.NeoFSNetwork], [policer.Network].
func (c *cfg) GetNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, []iec.Rule, error) {
nodeSets, repRules, ecRules, err := c.cfgObject.containerNodes.getNodesForObject(addr)
return nodeSets, repRules, ecRules, err
Expand Down
51 changes: 51 additions & 0 deletions internal/ec/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,54 @@ func TestFormObjectForECPart(t *testing.T) {
require.Equal(t, checksum.NewTillichZemor(tz.Sum(part)), phh)
})
}

func TestDecodePartInfoFromAttributes(t *testing.T) {
t.Run("missing", func(t *testing.T) {
pi, err := iec.DecodePartInfoFromAttributes("", "")
require.NoError(t, err)
require.EqualValues(t, -1, pi.RuleIndex)
})

t.Run("failure", func(t *testing.T) {
for _, tc := range []struct {
name string
ruleIdx string
partIdx string
assertErr func(t *testing.T, err error)
}{
{name: "non-int rule index", ruleIdx: "not_an_int", partIdx: "34", assertErr: func(t *testing.T, err error) {
require.EqualError(t, err, `decode rule index: strconv.ParseUint: parsing "not_an_int": invalid syntax`)
}},
{name: "negative rule index", ruleIdx: "-12", partIdx: "34", assertErr: func(t *testing.T, err error) {
require.EqualError(t, err, `decode rule index: strconv.ParseUint: parsing "-12": invalid syntax`)
}},
{name: "rule index overflow", ruleIdx: "256", partIdx: "34", assertErr: func(t *testing.T, err error) {
require.EqualError(t, err, "rule index out of range")
}},
{name: "non-int part index", ruleIdx: "12", partIdx: "not_an_int", assertErr: func(t *testing.T, err error) {
require.EqualError(t, err, `decode part index: strconv.ParseUint: parsing "not_an_int": invalid syntax`)
}},
{name: "negative part index", ruleIdx: "12", partIdx: "-34", assertErr: func(t *testing.T, err error) {
require.EqualError(t, err, `decode part index: strconv.ParseUint: parsing "-34": invalid syntax`)
}},
{name: "part index overflow", ruleIdx: "12", partIdx: "256", assertErr: func(t *testing.T, err error) {
require.EqualError(t, err, "part index out of range")
}},
{name: "rule index without part index", ruleIdx: "12", partIdx: "", assertErr: func(t *testing.T, err error) {
require.EqualError(t, err, "rule index is set, part index is not")
}},
{name: "part index without rule index", ruleIdx: "", partIdx: "34", assertErr: func(t *testing.T, err error) {
require.EqualError(t, err, "part index is set, rule index is not")
}},
} {
t.Run(tc.name, func(t *testing.T) {
_, err := iec.DecodePartInfoFromAttributes(tc.ruleIdx, tc.partIdx)
tc.assertErr(t, err)
})
}
})

pi, err := iec.DecodePartInfoFromAttributes("12", "34")
require.NoError(t, err)
require.Equal(t, iec.PartInfo{RuleIndex: 12, Index: 34}, pi)
}
49 changes: 49 additions & 0 deletions internal/ec/objects.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package ec

import (
"errors"
"fmt"
"strconv"

iobject "github.com/nspcc-dev/neofs-node/internal/object"
"github.com/nspcc-dev/neofs-sdk-go/checksum"
Expand Down Expand Up @@ -73,3 +75,50 @@ func FormObjectForECPart(signer neofscrypto.Signer, parent object.Object, part [

return obj, nil
}

// DecodePartInfoFromAttributes decodes EC part info from given object
// attributes. It one of attributes is set, the other must be set too. If both
// are missing, DecodePartInfoFromAttributes returns [PartInfo.RuleIndex] = -1
// without error.
func DecodePartInfoFromAttributes(ruleIdxAttr, partIdxAttr string) (PartInfo, error) {
// TODO: sync with object GET server
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will it be done?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in a separate issue

// TODO: sync with GetPartInfo, it does not check for numeric limits.
if ruleIdxAttr == "" {
if partIdxAttr != "" {
return PartInfo{}, errors.New("part index is set, rule index is not")
}
return PartInfo{RuleIndex: -1}, nil
}
if partIdxAttr == "" {
return PartInfo{}, errors.New("rule index is set, part index is not")
}

ruleIdx, err := decodeUint8StringToInt(ruleIdxAttr)
if err != nil {
if errors.Is(err, strconv.ErrRange) {
return PartInfo{}, errors.New("rule index out of range")
}
return PartInfo{}, fmt.Errorf("decode rule index: %w", err)
}
partIdx, err := decodeUint8StringToInt(partIdxAttr)
if err != nil {
if errors.Is(err, strconv.ErrRange) {
return PartInfo{}, errors.New("part index out of range")
}
return PartInfo{}, fmt.Errorf("decode part index: %w", err)
}

return PartInfo{
RuleIndex: ruleIdx,
Index: partIdx,
}, nil
}

// returns [strconv.ErrRange] if value >= 256.
func decodeUint8StringToInt(s string) (int, error) {
n, err := strconv.ParseUint(s, 10, 8)
if err != nil {
return 0, err
}
return int(n), nil
}
9 changes: 9 additions & 0 deletions internal/slices/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,12 @@ func Indexes(n int) []int {
}
return s
}

// CollectIndex returns new slice of shallows copies of s elements with given indexes.
func CollectIndex[E any, S []E](s S, idxs ...int) S {
newS := make(S, len(idxs))
for i, idx := range idxs {
newS[i] = s[idx]
}
return newS
}
21 changes: 21 additions & 0 deletions internal/slices/index_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package slices_test

import (
"slices"
"testing"

islices "github.com/nspcc-dev/neofs-node/internal/slices"
Expand All @@ -22,3 +23,23 @@ func TestIndexCombos(t *testing.T) {
{2, 3},
})
}

func TestCollectIndex(t *testing.T) {
t.Run("nil", func(t *testing.T) {
require.Empty(t, islices.CollectIndex([]any(nil)))
})
t.Run("empty", func(t *testing.T) {
require.Empty(t, islices.CollectIndex([]any{}))
})

s := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}
sc := slices.Clone(s)

newS := islices.CollectIndex(s, 9, 7, 5, 3, 1)
require.Equal(t, []string{"9", "7", "5", "3", "1"}, newS)
require.Len(t, newS, 5)
require.EqualValues(t, 5, cap(newS))

newS[0] += "_"
require.Equal(t, sc, s)
}
14 changes: 14 additions & 0 deletions internal/slices/slices.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,17 @@ func RepeatElement[E any, S []E](n int, e E) S {
}
return s
}

// MaxLen returns max length in s. Returns 0 if s is empty.
func MaxLen(s []string) int {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

len(slices.MaxFunc(s, func(a, b string) {return cmp.Compare(len(a), len(b))}))?

I'm really worried about growing islices package. Especially given that this function is used only once (at least in the first patch).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

len(slices.MaxFunc(s, func(a, b string) {return cmp.Compare(len(a), len(b))}))

at first I did so, and found it so inconvenient that I added MaxLen(). Such an obvious task cannot be solved by simple call to the stdlib. I can think of it as an internal implementation of a new func, but not a replacement

I'm really worried about growing islices package.

sure everybody remember https://go.dev/wiki/SliceTricks before slices was added. Hope most of the islices will be in slices someday. Maybe I can even contribute to this

if len(s) == 0 {
return 0
}
mx := len(s[0])
for i := 1; i < len(s); i++ {
if len(s[i]) > mx {
mx = len(s[i])
}
}
return mx
}
27 changes: 27 additions & 0 deletions internal/slices/slices_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package slices_test

import (
"errors"
"math/rand/v2"
"slices"
"strconv"
"testing"

islices "github.com/nspcc-dev/neofs-node/internal/slices"
"github.com/nspcc-dev/neofs-node/internal/testutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -93,3 +95,28 @@ func TestRepeatElements(t *testing.T) {
})
}
}

func TestMaxLen(t *testing.T) {
t.Run("nil", func(t *testing.T) {
require.Zero(t, islices.MaxLen(nil))
})
t.Run("empty", func(t *testing.T) {
require.Zero(t, islices.MaxLen([]string{}))
})

s := make([]string, 100)
for i := range s {
s[i] = string(testutil.RandByteSlice(i))
}
require.EqualValues(t, 99, islices.MaxLen(s))

slices.Reverse(s)
require.EqualValues(t, 99, islices.MaxLen(s))

rand.Shuffle(len(s), func(i, j int) { s[i], s[j] = s[j], s[i] })
require.EqualValues(t, 99, islices.MaxLen(s))

e := s[50]
s = islices.RepeatElement(len(s), e)
require.EqualValues(t, len(e), islices.MaxLen(s))
}
8 changes: 8 additions & 0 deletions internal/testutil/neofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,18 @@ import (
)

// Nodes returns n [netmap.NodeInfo] elements with unique public keys.
//
// Each node has two network addresses with localhost IP. Ports are sequential
// starting from 10000.
func Nodes(n int) []netmap.NodeInfo {
const portsStart = 10_000
nodes := make([]netmap.NodeInfo, n)
for i := range nodes {
nodes[i].SetPublicKey([]byte("public_key_" + strconv.Itoa(i)))
nodes[i].SetNetworkEndpoints(
"localhost:"+strconv.Itoa(portsStart+2*i),
"localhost:"+strconv.Itoa(portsStart+2*i+1),
)
}
return nodes
}
13 changes: 13 additions & 0 deletions internal/testutil/neofs_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package testutil_test

import (
"slices"
"strconv"
"strings"
"testing"

"github.com/nspcc-dev/neofs-node/internal/testutil"
Expand All @@ -17,6 +20,16 @@ func TestNodes(t *testing.T) {
m := make(map[string]struct{})
for i := range s {
m[string(s[i].PublicKey())] = struct{}{}

for j, netAddr := range slices.Collect(s[i].NetworkEndpoints()) {
ps, ok := strings.CutPrefix(netAddr, "localhost:")
require.True(t, ok)

p, err := strconv.ParseUint(ps, 10, 16)
require.NoError(t, err)

require.EqualValues(t, 10_000+2*i+j, p)
}
}
require.Len(t, m, len(s))
}
10 changes: 5 additions & 5 deletions pkg/core/object/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

// AddressWithType groups object address with its NeoFS
// object type.
type AddressWithType struct {
Address oid.Address
Type object.Type
// AddressWithAttributes groups object's address and its attributes.
type AddressWithAttributes struct {
Address oid.Address
Type object.Type
Attributes []string
}
9 changes: 6 additions & 3 deletions pkg/local_object_storage/engine/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@ type Cursor struct {
// Does not include inhumed objects. Use cursor value from the response
// for consecutive requests (it's nil when iteration is over).
//
// Optional attrs specifies attributes to include in the result. If object does
// not have requested attribute, corresponding element in the result is empty.
//
// Returns ErrEndOfListing if there are no more objects to return or count
// parameter set to zero.
func (e *StorageEngine) ListWithCursor(count uint32, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) {
func (e *StorageEngine) ListWithCursor(count uint32, cursor *Cursor, attrs ...string) ([]objectcore.AddressWithAttributes, *Cursor, error) {
if e.metrics != nil {
defer elapsed(e.metrics.AddListObjectsDuration)()
}

result := make([]objectcore.AddressWithType, 0, count)
result := make([]objectcore.AddressWithAttributes, 0, count)

// 1. Get available shards and sort them.
e.mtx.RLock()
Expand Down Expand Up @@ -76,7 +79,7 @@ func (e *StorageEngine) ListWithCursor(count uint32, cursor *Cursor) ([]objectco
shCursor = cursor.shardCursor
}

res, shCursor, err := shardInstance.ListWithCursor(int(count), shCursor)
res, shCursor, err := shardInstance.ListWithCursor(int(count), shCursor, attrs...)
if err != nil {
continue
}
Expand Down
Loading
Loading