Skip to content

Commit 340a149

Browse files
committed
WIP: decoding in GET service
Signed-off-by: Leonard Lyubich <[email protected]>
1 parent 2917378 commit 340a149

File tree

10 files changed

+624
-17
lines changed

10 files changed

+624
-17
lines changed

cmd/neofs-node/object.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -716,8 +716,9 @@ func (c *cfg) IsLocalNodePublicKey(b []byte) bool { return c.IsLocalKey(b) }
716716
// object holders. Resulting slices must not be changed.
717717
//
718718
// GetNodesForObject implements [getsvc.NeoFSNetwork].
719-
func (c *cfg) GetNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) {
720-
return c.cfgObject.containerNodes.getNodesForObject(addr)
719+
func (c *cfg) GetNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, []iec.Rule, error) {
720+
nodeSets, repRules, err := c.cfgObject.containerNodes.getNodesForObject(addr)
721+
return nodeSets, repRules, nil, err
721722
}
722723

723724
type netmapSourceWithNodes struct {
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package engine
2+
3+
import (
4+
"fmt"
5+
6+
iec "github.com/nspcc-dev/neofs-node/internal/ec"
7+
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
8+
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
9+
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
10+
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
11+
"go.uber.org/zap"
12+
)
13+
14+
// TODO:: docs.
15+
func (e *StorageEngine) GetECPart(cnr cid.ID, parent oid.ID, pi iec.PartInfo) (objectSDK.Object, error) {
16+
// TODO: keep in sync with https://github.com/nspcc-dev/neofs-node/pull/3466.
17+
// TODO: metrics and blockErr like Get
18+
19+
// TODO: sync placement with PUT. They must sort shard equally
20+
shs := e.sortedShards(oid.NewAddress(cnr, parent))
21+
for i := range shs {
22+
obj, err := shs[i].GetECPart(cnr, parent, pi)
23+
if err == nil {
24+
return obj, nil
25+
}
26+
// TODO: debug if 404
27+
e.log.Info("failed to get EC part from shard", zap.Stringer("shardID", shs[i].ID()), zap.Stringer("container", cnr), zap.Stringer("parent", parent),
28+
zap.Int("ruleIdx", pi.RuleIndex), zap.Int("partIdx", pi.Index), zap.Error(err))
29+
// FIXME: some errors like 'akready removed' must abort
30+
}
31+
32+
return objectSDK.Object{}, fmt.Errorf("%w: all shards failed", apistatus.ErrObjectNotFound)
33+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package engine
2+
3+
import (
4+
"strconv"
5+
"testing"
6+
7+
iec "github.com/nspcc-dev/neofs-node/internal/ec"
8+
"github.com/nspcc-dev/neofs-node/internal/testutil"
9+
"github.com/nspcc-dev/neofs-sdk-go/checksum"
10+
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
11+
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
12+
objectsdk "github.com/nspcc-dev/neofs-sdk-go/object"
13+
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
14+
usertest "github.com/nspcc-dev/neofs-sdk-go/user/test"
15+
"github.com/stretchr/testify/require"
16+
)
17+
18+
func TestStorageEngine_GetECPartByIdx(t *testing.T) {
19+
for _, shardNum := range []int{1, 5} {
20+
t.Run("shards="+strconv.Itoa(shardNum), func(t *testing.T) {
21+
testGetECPart(t, shardNum)
22+
})
23+
}
24+
}
25+
26+
func testGetECPart(t *testing.T, shardNum int) {
27+
cnr := cidtest.ID()
28+
parentID := oidtest.ID()
29+
const ruleIdx = 123
30+
const partIdx = 456
31+
32+
var parent objectsdk.Object
33+
parent.SetContainerID(cnr)
34+
parent.SetID(parentID)
35+
parent.SetOwner(usertest.ID())
36+
parent.SetPayloadChecksum(checksum.NewSHA256([32]byte(testutil.RandByteSlice(32))))
37+
38+
part := parent
39+
part.SetID(oidtest.OtherID(parentID))
40+
part.SetParent(&parent)
41+
part.SetAttributes(
42+
objectsdk.NewAttribute("__NEOFS__EC_RULE_IDX", strconv.Itoa(ruleIdx)),
43+
objectsdk.NewAttribute("__NEOFS__EC_PART_IDX", strconv.Itoa(partIdx)),
44+
)
45+
46+
s := testNewEngineWithShardNum(t, shardNum)
47+
48+
checkMissingIdxs := func(t *testing.T, ruleIdx, partIdx int) {
49+
_, err := s.GetECPart(cnr, parentID, iec.PartInfo{
50+
RuleIndex: ruleIdx,
51+
Index: partIdx,
52+
})
53+
require.ErrorIs(t, err, apistatus.ErrObjectNotFound)
54+
}
55+
56+
checkMissingIdxs(t, ruleIdx-1, partIdx-1)
57+
checkMissingIdxs(t, ruleIdx-1, partIdx)
58+
checkMissingIdxs(t, ruleIdx-1, partIdx+1)
59+
checkMissingIdxs(t, ruleIdx, partIdx-1)
60+
checkMissingIdxs(t, ruleIdx, partIdx)
61+
checkMissingIdxs(t, ruleIdx, partIdx+1)
62+
checkMissingIdxs(t, ruleIdx+1, partIdx-1)
63+
checkMissingIdxs(t, ruleIdx+1, partIdx)
64+
checkMissingIdxs(t, ruleIdx+1, partIdx+1)
65+
66+
require.NoError(t, s.Put(&part, nil))
67+
68+
checkMissingIdxs(t, ruleIdx-1, partIdx-1)
69+
checkMissingIdxs(t, ruleIdx-1, partIdx)
70+
checkMissingIdxs(t, ruleIdx-1, partIdx+1)
71+
checkMissingIdxs(t, ruleIdx, partIdx-1)
72+
checkMissingIdxs(t, ruleIdx, partIdx+1)
73+
checkMissingIdxs(t, ruleIdx+1, partIdx-1)
74+
checkMissingIdxs(t, ruleIdx+1, partIdx)
75+
checkMissingIdxs(t, ruleIdx+1, partIdx+1)
76+
77+
got, err := s.GetECPart(cnr, parentID, iec.PartInfo{
78+
RuleIndex: ruleIdx,
79+
Index: partIdx,
80+
})
81+
require.NoError(t, err)
82+
require.Equal(t, part, got)
83+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package meta
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
"slices"
7+
"strconv"
8+
9+
iec "github.com/nspcc-dev/neofs-node/internal/ec"
10+
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
11+
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
12+
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
13+
"github.com/nspcc-dev/neofs-sdk-go/object"
14+
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
15+
"go.etcd.io/bbolt"
16+
)
17+
18+
// TODO: docs.
19+
func (db *DB) ResolveECPart(cnr cid.ID, parent oid.ID, pi iec.PartInfo) (oid.ID, error) {
20+
// FIXME: check degraded
21+
// TODO: check already removed case;. https://github.com/nspcc-dev/neofs-node/issues/3502
22+
var res oid.ID
23+
err := db.boltDB.View(func(tx *bbolt.Tx) error {
24+
var err error
25+
res, err = db.resolveECPartTx(tx, cnr, parent, pi)
26+
return err
27+
})
28+
return res, err
29+
}
30+
31+
func (db *DB) resolveECPartTx(tx *bbolt.Tx, cnr cid.ID, parent oid.ID, pi iec.PartInfo) (oid.ID, error) {
32+
metaBkt := tx.Bucket(metaBucketKey(cnr))
33+
if metaBkt == nil {
34+
return oid.ID{}, fmt.Errorf("%w: missing meta bucket", apistatus.ErrObjectNotFound)
35+
}
36+
metaBktCursor := metaBkt.Cursor()
37+
38+
pref := slices.Concat([]byte{metaPrefixAttrIDPlain}, []byte(object.FilterParentID), objectcore.MetaAttributeDelimiter,
39+
parent[:], objectcore.MetaAttributeDelimiter,
40+
)
41+
k, _ := metaBktCursor.Seek(pref)
42+
partID, ok := bytes.CutPrefix(k, pref)
43+
if !ok {
44+
return oid.ID{}, fmt.Errorf("%w: not found by index %s", apistatus.ErrObjectNotFound, object.FilterParentID)
45+
}
46+
if len(partID) != oid.Size {
47+
return oid.ID{}, invalidMetaBucketKeyErr(k, fmt.Errorf("wrong OID len %d", len(partID)))
48+
}
49+
50+
// TODO: make one buffer for all keys
51+
k = slices.Concat([]byte{metaPrefixIDAttr}, partID, []byte(iec.AttributeRuleIdx), objectcore.MetaAttributeDelimiter, []byte(strconv.Itoa(pi.RuleIndex)))
52+
if metaBkt.Get(k) == nil {
53+
return oid.ID{}, fmt.Errorf("%w: not found by index %s", apistatus.ErrObjectNotFound, iec.AttributeRuleIdx)
54+
}
55+
56+
k = slices.Concat([]byte{metaPrefixIDAttr}, partID, []byte(iec.AttributePartIdx), objectcore.MetaAttributeDelimiter, []byte(strconv.Itoa(pi.Index)))
57+
if metaBkt.Get(k) == nil {
58+
return oid.ID{}, fmt.Errorf("%w: not found by index %s", apistatus.ErrObjectNotFound, iec.AttributePartIdx)
59+
}
60+
61+
return oid.ID(partID), nil
62+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package shard
2+
3+
import (
4+
"fmt"
5+
6+
iec "github.com/nspcc-dev/neofs-node/internal/ec"
7+
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
8+
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
9+
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
10+
"go.uber.org/zap"
11+
)
12+
13+
// TODO: docs.
14+
// TODO: keep in sync with https://github.com/nspcc-dev/neofs-node/pull/3466.
15+
func (s *Shard) GetECPart(cnr cid.ID, parent oid.ID, pi iec.PartInfo) (objectSDK.Object, error) {
16+
// FIXME: metabase can be disabled
17+
partID, err := s.metaBase.ResolveECPart(cnr, parent, pi)
18+
if err != nil {
19+
return objectSDK.Object{}, fmt.Errorf("resolve part ID in metabase: %w", err)
20+
}
21+
22+
partAddr := oid.NewAddress(cnr, partID)
23+
if s.hasWriteCache() {
24+
obj, err := s.writeCache.Get(partAddr)
25+
if err == nil {
26+
return *obj, nil
27+
}
28+
29+
// TODO: debug if 404
30+
s.log.Info("failed to get EC part from write-cache, trying BLOB storage...", zap.Stringer("partAddr", partAddr), zap.Error(err))
31+
}
32+
33+
obj, err := s.blobStor.Get(partAddr)
34+
if err != nil {
35+
return objectSDK.Object{}, fmt.Errorf("get from BLOB storage by ID %s: %w", partID, err)
36+
}
37+
38+
return *obj, nil
39+
}

pkg/services/object/get/container.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ func (exec *execCtx) executeOnContainer() {
1717

1818
exec.log.Debug("trying to execute in container...")
1919

20-
nodeLists, primaryCounts, err := exec.svc.neoFSNet.GetNodesForObject(exec.address())
20+
addr := exec.address()
21+
22+
nodeLists, primaryCounts, _, err := exec.svc.neoFSNet.GetNodesForObject(addr)
2123
if err != nil {
2224
exec.status = statusUndefined
2325
exec.err = err
@@ -34,7 +36,7 @@ func (exec *execCtx) executeOnContainer() {
3436
var j, jLim uint
3537
primary := true
3638

37-
for i := 0; i < len(nodeLists); i++ { // do not use for-range!
39+
for i := 0; i < len(primaryCounts); i++ { // do not use for-range!
3840
if primary {
3941
j, jLim = 0, primaryCounts[i]
4042
} else {
@@ -81,7 +83,7 @@ func (exec *execCtx) executeOnContainer() {
8183
}
8284
}
8385

84-
if primary && i == len(nodeLists)-1 {
86+
if primary && i == len(primaryCounts)-1 {
8587
// switch to reserve nodes
8688
primary = false
8789
i = -1

0 commit comments

Comments
 (0)