Skip to content
Merged

EC GET #3497

Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func initObjectService(c *cfg) {
policer.WithMaxCapacity(c.appCfg.Policer.MaxWorkers),
policer.WithPool(c.cfgObject.pool.replication),
policer.WithNodeLoader(c),
policer.WithNetwork(c),
policer.WithNetwork(noEC{c}),
policer.WithReplicationCooldown(c.appCfg.Policer.ReplicationCooldown),
policer.WithObjectBatchSize(c.appCfg.Policer.ObjectBatchSize),
)
Expand Down Expand Up @@ -695,15 +695,35 @@ func (o objectSource) SearchOne(ctx context.Context, cnr cid.ID, filters objectS
// IsLocalNodePublicKey implements [getsvc.NeoFSNetwork].
func (c *cfg) IsLocalNodePublicKey(b []byte) bool { return c.IsLocalKey(b) }

// temporary wrapper until [getsvc.NeoFSNetwork] and [policer.Network]
// interfaces converge.
type noEC struct {
*cfg
}

// GetNodesForObject reads storage policy of the referenced container from the
// underlying container storage, reads network map at the specified epoch from
// the underlying storage, applies the storage policy to it and returns sorted
// lists of selected storage nodes along with the policy rules.
//
// Resulting slices must not be changed.
//
// GetNodesForObject implements [policer.Network].
func (x noEC) GetNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) {
nodeLists, repRules, _, err := x.cfg.GetNodesForObject(addr)
return nodeLists[:len(repRules)], repRules, err
}

// GetNodesForObject reads storage policy of the referenced container from the
// underlying container storage, reads network map at the specified epoch from
// the underlying storage, applies the storage policy to it and returns sorted
// lists of selected storage nodes along with the per-list numbers of primary
// object holders. Resulting slices must not be changed.
//
// GetNodesForObject implements [getsvc.NeoFSNetwork].
func (c *cfg) GetNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) {
return c.cfgObject.containerNodes.getNodesForObject(addr)
func (c *cfg) GetNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, []iec.Rule, error) {
nodeSets, repRules, ecRules, err := c.cfgObject.containerNodes.getNodesForObject(addr)
return nodeSets, repRules, ecRules, err
}

type netmapSourceWithNodes struct {
Expand Down Expand Up @@ -791,7 +811,7 @@ type containerNodesSorter struct {

func (x *containerNodesSorter) Unsorted() [][]netmapsdk.NodeInfo { return x.policy.nodeSets }
func (x *containerNodesSorter) PrimaryCounts() []uint { return x.policy.repCounts }
func (x *containerNodesSorter) ECRules() []iec.Rule { return nil }
func (x *containerNodesSorter) ECRules() []iec.Rule { return x.policy.ecRules }
func (x *containerNodesSorter) SortForObject(obj oid.ID) ([][]netmapsdk.NodeInfo, error) {
cacheKey := objectNodesCacheKey{epoch: x.curEpoch}
cacheKey.addr.SetContainer(x.cnrID)
Expand All @@ -808,6 +828,7 @@ func (x *containerNodesSorter) SortForObject(obj oid.ID) ([][]netmapsdk.NodeInfo
}
}
res.repCounts = x.policy.repCounts
res.ecRules = x.policy.ecRules
res.nodeSets, res.err = x.containerNodes.sortContainerNodesFunc(*x.networkMap, x.policy.nodeSets, obj)
if res.err != nil {
res.err = fmt.Errorf("sort container nodes for object: %w", res.err)
Expand Down
15 changes: 9 additions & 6 deletions cmd/neofs-node/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

lru "github.com/hashicorp/golang-lru/v2"
iec "github.com/nspcc-dev/neofs-node/internal/ec"
"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
sdkcontainer "github.com/nspcc-dev/neofs-sdk-go/container"
Expand All @@ -17,6 +18,7 @@ import (
type storagePolicyRes struct {
nodeSets [][]netmapsdk.NodeInfo
repCounts []uint
ecRules []iec.Rule
err error
}

Expand Down Expand Up @@ -152,33 +154,34 @@ func (x *containerNodes) forEachContainerNode(cnrID cid.ID, withPrevEpoch bool,
// the underlying storage, applies the storage policy to it and returns sorted
// lists of selected storage nodes along with the per-list numbers of primary
// object holders. Resulting slices must not be changed.
func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) {
func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, []iec.Rule, error) {
curEpoch, err := x.network.Epoch()
if err != nil {
return nil, nil, fmt.Errorf("read current NeoFS epoch: %w", err)
return nil, nil, nil, fmt.Errorf("read current NeoFS epoch: %w", err)
}
cacheKey := objectNodesCacheKey{curEpoch, addr}
res, ok := x.objCache.Get(cacheKey)
if ok {
return res.nodeSets, res.repCounts, res.err
return res.nodeSets, res.repCounts, res.ecRules, res.err
}
cnrRes, networkMap, err := x.getForCurrentEpoch(curEpoch, addr.Container())
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if networkMap == nil {
if networkMap, err = x.network.GetNetMapByEpoch(curEpoch); err != nil {
// non-persistent error => do not cache
return nil, nil, fmt.Errorf("read network map by epoch: %w", err)
return nil, nil, nil, fmt.Errorf("read network map by epoch: %w", err)
}
}
res.repCounts = cnrRes.repCounts
res.ecRules = cnrRes.ecRules
res.nodeSets, res.err = x.sortContainerNodesFunc(*networkMap, cnrRes.nodeSets, addr.Object())
if res.err != nil {
res.err = fmt.Errorf("sort container nodes for object: %w", res.err)
}
x.objCache.Add(cacheKey, res)
return res.nodeSets, res.repCounts, res.err
return res.nodeSets, res.repCounts, res.ecRules, res.err
}

func (x *containerNodes) getForCurrentEpoch(curEpoch uint64, cnr cid.ID) (storagePolicyRes, *netmapsdk.NetMap, error) {
Expand Down
18 changes: 9 additions & 9 deletions cmd/neofs-node/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ func TestContainerNodes_GetNodesForObject(t *testing.T) {
require.NoError(t, err)

for n := 1; n < 10; n++ {
_, _, err = ns.getNodesForObject(anyAddr)
_, _, _, err = ns.getNodesForObject(anyAddr)
require.ErrorIs(t, err, epochErr)
require.EqualError(t, err, "read current NeoFS epoch: any epoch error")
// such error must not be cached
Expand All @@ -534,7 +534,7 @@ func TestContainerNodes_GetNodesForObject(t *testing.T) {
require.NoError(t, err)

for n := 1; n < 10; n++ {
_, _, err = ns.getNodesForObject(anyAddr)
_, _, _, err = ns.getNodesForObject(anyAddr)
require.ErrorIs(t, err, cnrErr)
require.EqualError(t, err, "select container nodes for current epoch #42: read container by ID: any container error")
// such error must not be cached
Expand All @@ -548,7 +548,7 @@ func TestContainerNodes_GetNodesForObject(t *testing.T) {
require.NoError(t, err)

for n := 1; n <= 10; n++ {
_, _, err = ns.getNodesForObject(anyAddr)
_, _, _, err = ns.getNodesForObject(anyAddr)
require.ErrorIs(t, err, curNetmapErr)
require.EqualError(t, err, "select container nodes for current epoch #42: read network map by epoch: any current netmap error")
network.assertEpochCallCount(t, n)
Expand All @@ -569,7 +569,7 @@ func TestContainerNodes_GetNodesForObject(t *testing.T) {
require.NoError(t, err)

for n := 1; n <= 10; n++ {
_, _, err = ns.getNodesForObject(anyAddr)
_, _, _, err = ns.getNodesForObject(anyAddr)
require.EqualError(t, err, fmt.Sprintf("select container nodes for current epoch #42: %v", policyErr))
network.assertEpochCallCount(t, n)
// assert results are cached
Expand All @@ -592,7 +592,7 @@ func TestContainerNodes_GetNodesForObject(t *testing.T) {
}

for n := 1; n <= 10; n++ {
_, _, err = ns.getNodesForObject(anyAddr)
_, _, _, err = ns.getNodesForObject(anyAddr)
require.EqualError(t, err, "select container nodes for current epoch #42: "+
"invalid result of container's storage policy application to the network map: "+
"diff number of storage node sets (4) and required replica descriptors (2)")
Expand Down Expand Up @@ -622,7 +622,7 @@ func TestContainerNodes_GetNodesForObject(t *testing.T) {
}

for n := 1; n <= 10; n++ {
_, _, err = ns.getNodesForObject(anyAddr)
_, _, _, err = ns.getNodesForObject(anyAddr)
require.EqualError(t, err, "select container nodes for current epoch #42: "+
"invalid result of container's storage policy application to the network map: "+
"invalid storage node set #1: number of nodes (1) is less than minimum required by the container policy (2)")
Expand All @@ -647,7 +647,7 @@ func TestContainerNodes_GetNodesForObject(t *testing.T) {
}

for n := 1; n <= 10; n++ {
_, _, err = ns.getNodesForObject(anyAddr)
_, _, _, err = ns.getNodesForObject(anyAddr)
require.EqualError(t, err, "select container nodes for current epoch #42: "+
"invalid result of container's storage policy application to the network map: "+
"diff number of storage node sets (4) and required replica descriptors (2)")
Expand Down Expand Up @@ -677,7 +677,7 @@ func TestContainerNodes_GetNodesForObject(t *testing.T) {
}

for n := 1; n <= 10; n++ {
_, _, err = ns.getNodesForObject(anyAddr)
_, _, _, err = ns.getNodesForObject(anyAddr)
require.EqualError(t, err, "sort container nodes for object: any sort error")
network.assertEpochCallCount(t, n)
// assert results are cached
Expand All @@ -700,7 +700,7 @@ func TestContainerNodes_GetNodesForObject(t *testing.T) {
require.NoError(t, err)

for n := 1; n <= 10; n++ {
nodeLists, primCounts, err := ns.getNodesForObject(anyAddr)
nodeLists, primCounts, _, err := ns.getNodesForObject(anyAddr)
require.NoError(t, err)
require.Len(t, primCounts, 4)
require.Len(t, nodeLists, 4)
Expand Down
15 changes: 14 additions & 1 deletion internal/ec/ec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strconv"

"github.com/klauspost/reedsolomon"
islices "github.com/nspcc-dev/neofs-node/internal/slices"
)

// Erasure coding attributes.
Expand Down Expand Up @@ -75,6 +76,18 @@ func Decode(rule Rule, dataLen uint64, parts [][]byte) ([]byte, error) {
return nil, fmt.Errorf("restore Reed-Solomon: %w", err)
}

if got := islices.TwoDimSliceElementCount(parts[:rule.DataPartNum]); uint64(got) < dataLen {
return nil, fmt.Errorf("sum len of received data parts is less than full len: %d < %d", got, dataLen)
}

return ConcatDataParts(rule, dataLen, parts), nil
}

// ConcatDataParts returns a new slice of dataLen bytes originating given EC
// parts according to rule.
//
// Panics if there are less than [Rule.DataPartNum] parts.
func ConcatDataParts(rule Rule, dataLen uint64, parts [][]byte) []byte {
// TODO: last part may be shorter, do not overallocate buffer.
return slices.Concat(parts[:rule.DataPartNum]...)[:dataLen], nil
return slices.Concat(parts[:rule.DataPartNum]...)[:dataLen]
}
43 changes: 43 additions & 0 deletions internal/ec/ec_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package ec_test

import (
"bytes"
"fmt"
"testing"

"github.com/klauspost/reedsolomon"
Expand Down Expand Up @@ -74,3 +76,44 @@ func TestEncode(t *testing.T) {
})
}
}

func testConcatDataParts(t *testing.T, ln uint64) {
data := testutil.RandByteSlice(ln)

for _, rule := range []iec.Rule{
{DataPartNum: 3, ParityPartNum: 1},
{DataPartNum: 12, ParityPartNum: 4},
} {
t.Run(fmt.Sprintf("rule=%s", rule.String()), func(t *testing.T) {
var parts [][]byte
if ln > 0 {
rs, err := reedsolomon.New(int(rule.DataPartNum), int(rule.ParityPartNum))
require.NoError(t, err)

parts, err = rs.Split(data)
require.NoError(t, err)
} else {
parts = make([][]byte, rule.DataPartNum+rule.ParityPartNum)
}

got := iec.ConcatDataParts(rule, ln, parts)
require.True(t, bytes.Equal(data, got))
})
}
}

func TestConcatDataParts(t *testing.T) {
for _, ln := range []uint64{
0,
1,
100,
1 << 10,
1<<10 + 1,
10 << 10,
10<<10 + 1,
} {
t.Run(fmt.Sprintf("len=%d", ln), func(t *testing.T) {
testConcatDataParts(t, ln)
})
}
}
25 changes: 16 additions & 9 deletions pkg/services/object/get/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,19 @@ func (exec *execCtx) executeOnContainer() {

exec.log.Debug("trying to execute in container...")

nodeLists, primaryCounts, err := exec.svc.neoFSNet.GetNodesForObject(exec.address())
if err != nil {
exec.status = statusUndefined
exec.err = err
exec.log.Debug("failed to list storage nodes for the object", zap.Error(err))
return
addr := exec.address()

nodeLists := exec.nodeLists
primaryCounts := exec.repRules
if nodeLists == nil {
var err error
nodeLists, primaryCounts, _, err = exec.svc.neoFSNet.GetNodesForObject(addr)
if err != nil {
exec.status = statusUndefined
exec.err = err
exec.log.Debug("failed to list storage nodes for the object", zap.Error(err))
return
}
}

ctx, cancel := context.WithCancel(exec.context())
Expand All @@ -34,7 +41,7 @@ func (exec *execCtx) executeOnContainer() {
var j, jLim uint
primary := true

for i := 0; i < len(nodeLists); i++ { // do not use for-range!
for i := 0; i < len(primaryCounts); i++ { // do not use for-range!
if primary {
j, jLim = 0, primaryCounts[i]
} else {
Expand All @@ -60,7 +67,7 @@ func (exec *execCtx) executeOnContainer() {

mProcessedNodes[strKey] = struct{}{}

if err = endpoints.FromIterator(network.NodeEndpointsIterator(nodeLists[i][j])); err != nil {
if err := endpoints.FromIterator(network.NodeEndpointsIterator(nodeLists[i][j])); err != nil {
// critical error that may ultimately block the storage service. Normally it
// should not appear because entry into the network map under strict control
exec.log.Error("failed to decode network endpoints of the storage node from the network map, skip the node",
Expand All @@ -81,7 +88,7 @@ func (exec *execCtx) executeOnContainer() {
}
}

if primary && i == len(nodeLists)-1 {
if primary && i == len(primaryCounts)-1 {
// switch to reserve nodes
primary = false
i = -1
Expand Down
Loading