diff --git a/internal/errors/object.go b/internal/errors/object.go new file mode 100644 index 0000000000..29877365f8 --- /dev/null +++ b/internal/errors/object.go @@ -0,0 +1,13 @@ +package errors + +import ( + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" +) + +// ObjectID is an object ID as error. +type ObjectID oid.ID + +// Error returns string-encoded object ID. +func (x ObjectID) Error() string { + return oid.ID(x).String() +} diff --git a/internal/errors/object_test.go b/internal/errors/object_test.go new file mode 100644 index 0000000000..b23717d8f4 --- /dev/null +++ b/internal/errors/object_test.go @@ -0,0 +1,30 @@ +package errors_test + +import ( + "errors" + "fmt" + "testing" + + ierrors "github.com/nspcc-dev/neofs-node/internal/errors" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + "github.com/stretchr/testify/require" +) + +func TestErrObjectID(t *testing.T) { + id := oidtest.ID() + err := ierrors.ObjectID(id) + + t.Run("errors.As", func(t *testing.T) { + check := func(t *testing.T, err error) { + var e ierrors.ObjectID + require.ErrorAs(t, err, &e) + require.EqualValues(t, id, e) + } + + check(t, err) + check(t, fmt.Errorf("some context: %w, %w", errors.New("any"), err)) + }) + + require.Implements(t, new(error), err) + require.EqualError(t, err, id.String()) +} diff --git a/pkg/local_object_storage/engine/ec.go b/pkg/local_object_storage/engine/ec.go new file mode 100644 index 0000000000..b003f1e881 --- /dev/null +++ b/pkg/local_object_storage/engine/ec.go @@ -0,0 +1,103 @@ +package engine + +import ( + "errors" + "io" + + iec "github.com/nspcc-dev/neofs-node/internal/ec" + ierrors "github.com/nspcc-dev/neofs-node/internal/errors" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.uber.org/zap" +) + +// GetECPart looks up for object that carries EC part produced within cnr for +// parent object and indexed by pi in the underlying metabase, checks its +// availability and reads it from the underlying BLOB storage. The result is a +// header and a payload stream that must be closed by caller after processing. +// +// If write-cache is enabled, GetECPart tries to get the object from it first. +// +// If object has expired, GetECPart returns [meta.ErrObjectIsExpired]. +// +// If object exists but tombstoned (e.g. via [StorageEngine.Inhume] or stored +// tombstone object), GetECPart returns [apistatus.ErrObjectAlreadyRemoved]. +// +// If object is marked as garbage (e.g. via [StorageEngine.MarkGarbage]), +// GetECPart returns [apistatus.ErrObjectNotFound]. +// +// If object is locked (e.g. via [StorageEngine.Lock] or stored locker object), +// GetECPart ignores expiration, tombstone and garbage marks. +func (e *StorageEngine) GetECPart(cnr cid.ID, parent oid.ID, pi iec.PartInfo) (object.Object, io.ReadCloser, error) { + if e.metrics != nil { + defer elapsed(e.metrics.AddGetECPartDuration)() + } + + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() + if e.blockErr != nil { + return object.Object{}, nil, e.blockErr + } + + // TODO: sync placement with PUT. They should sort shards equally, but now PUT sorts by part ID. + // https://github.com/nspcc-dev/neofs-node/issues/3537 + s := e.sortShardsFn(e, oid.NewAddress(cnr, parent)) + + var partID oid.ID +loop: + for i := range s { + obj, rdr, err := s[i].shardIface.GetECPart(cnr, parent, pi) + switch { + case err == nil: + return obj, rdr, nil + case errors.Is(err, apistatus.ErrObjectAlreadyRemoved): + return object.Object{}, nil, err + case errors.Is(err, meta.ErrObjectIsExpired): + return object.Object{}, nil, apistatus.ErrObjectNotFound // like Get + case errors.As(err, (*ierrors.ObjectID)(&partID)): + if partID.IsZero() { + panic("zero object ID returned as error") + } + + e.log.Info("EC part's object ID resolved in shard but reading failed, continue bypassing metabase", + zap.Stringer("container", cnr), zap.Stringer("parent", parent), + zap.Int("ecRule", pi.RuleIndex), zap.Int("partIdx", pi.Index), + zap.Stringer("shardID", s[i].shardIface.ID()), zap.Error(err)) + // TODO: need report error? Same for other places. https://github.com/nspcc-dev/neofs-node/issues/3538 + + s = s[i+1:] + break loop + case errors.Is(err, apistatus.ErrObjectNotFound): + default: + e.log.Info("failed to get EC part from shard, ignore error", + zap.Stringer("container", cnr), zap.Stringer("parent", parent), + zap.Int("ecRule", pi.RuleIndex), zap.Int("partIdx", pi.Index), + zap.Stringer("shardID", s[i].shardIface.ID()), zap.Error(err)) + } + } + + if partID.IsZero() { + return object.Object{}, nil, apistatus.ErrObjectNotFound + } + + for i := range s { + // get an object bypassing the metabase. We can miss deletion or expiration mark. Get behaves like this, so here too. + obj, rdr, err := s[i].shardIface.GetStream(oid.NewAddress(cnr, partID), true) + switch { + case err == nil: + return *obj, rdr, nil + case errors.Is(err, apistatus.ErrObjectNotFound): + default: + e.log.Info("failed to get EC part from shard bypassing metabase, ignore error", + zap.Stringer("container", cnr), zap.Stringer("parent", parent), + zap.Int("ecRule", pi.RuleIndex), zap.Int("partIdx", pi.Index), + zap.Stringer("partID", partID), + zap.Stringer("shardID", s[i].shardIface.ID()), zap.Error(err)) + } + } + + return object.Object{}, nil, apistatus.ErrObjectNotFound +} diff --git a/pkg/local_object_storage/engine/ec_test.go b/pkg/local_object_storage/engine/ec_test.go new file mode 100644 index 0000000000..03fb4187db --- /dev/null +++ b/pkg/local_object_storage/engine/ec_test.go @@ -0,0 +1,396 @@ +package engine + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "testing" + "time" + + "github.com/mr-tron/base58" + iec "github.com/nspcc-dev/neofs-node/internal/ec" + ierrors "github.com/nspcc-dev/neofs-node/internal/errors" + "github.com/nspcc-dev/neofs-node/internal/testutil" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + neofscryptotest "github.com/nspcc-dev/neofs-sdk-go/crypto/test" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestStorageEngine_GetECPart(t *testing.T) { + cnr := cidtest.ID() + parentID := oidtest.ID() + pi := iec.PartInfo{ + RuleIndex: 123, + Index: 456, + } + + t.Run("blocked", func(t *testing.T) { + s := newEngineWithFixedShardOrder([]shardInterface{unimplementedShard{}}) // to ensure shards are not accessed + + e := errors.New("any error") + require.NoError(t, s.BlockExecution(e)) + + _, _, err := s.GetECPart(cnr, parentID, pi) + require.Equal(t, e, err) + }) + + var parentObj object.Object + parentObj.SetContainerID(cnr) + parentObj.SetID(parentID) + + partObj, err := iec.FormObjectForECPart(neofscryptotest.Signer(), parentObj, testutil.RandByteSlice(32), pi) + require.NoError(t, err) + + partID := partObj.GetID() + partAddr := oid.NewAddress(cnr, partID) + + shardOK := &mockShard{ + getECPart: map[getECPartKey]getECPartValue{ + {cnr: cnr, parent: parentID, pi: pi}: {obj: partObj}, + }, + } + + t.Run("metric", func(t *testing.T) { + const sleepTime = 50 * time.Millisecond // pretty big for test, pretty fast IRL + var m testMetrics + + shardOK.getECPartSleep = sleepTime + + s := newEngineWithFixedShardOrder([]shardInterface{shardOK, unimplementedShard{}}) // to ensure 2nd shard is not accessed + s.metrics = &m + + _, _, _ = s.GetECPart(cnr, parentID, pi) + require.GreaterOrEqual(t, time.Duration(m.getECPart.Load()), sleepTime) + }) + + t.Run("zero OID error", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{&mockShard{ + getECPart: map[getECPartKey]getECPartValue{ + {cnr: cnr, parent: parentID, pi: pi}: {err: fmt.Errorf("some error: %w", ierrors.ObjectID{})}, + }, + }, unimplementedShard{}}) // to ensure 2nd shard is not accessed + s.log = l + + require.PanicsWithValue(t, "zero object ID returned as error", func() { + _, _, _ = s.GetECPart(cnr, parentID, pi) + }) + + lb.AssertEmpty() + }) + + shardAlreadyRemoved := &mockShard{ + getECPart: map[getECPartKey]getECPartValue{ + {cnr: cnr, parent: parentID, pi: pi}: {err: apistatus.ErrObjectAlreadyRemoved}, + }, + } + shardExpired := &mockShard{ + getECPart: map[getECPartKey]getECPartValue{ + {cnr: cnr, parent: parentID, pi: pi}: {err: meta.ErrObjectIsExpired}, + }, + } + shard500 := &mockShard{ + i: 0, + getECPart: map[getECPartKey]getECPartValue{ + {cnr: cnr, parent: parentID, pi: pi}: {err: errors.New("some shard error")}, + }, + } + shard404 := &mockShard{ + getECPart: map[getECPartKey]getECPartValue{ + {cnr: cnr, parent: parentID, pi: pi}: {err: apistatus.ErrObjectNotFound}, + }, + getStream: map[getStreamKey]getStreamValue{ + {addr: partAddr, skipMeta: true}: {err: apistatus.ErrObjectNotFound}, + }, + } + + checkOK := func(t *testing.T, s *StorageEngine) { + hdr, rdr, err := s.GetECPart(cnr, parentID, pi) + require.NoError(t, err) + assertGetECPartOK(t, partObj, hdr, rdr) + } + checkErrorIs := func(t *testing.T, s *StorageEngine, e error) { + _, _, err := s.GetECPart(cnr, parentID, pi) + require.ErrorIs(t, err, e) + } + + t.Run("404,OK", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shard404, shardOK}) + s.log = l + + checkOK(t, s) + + lb.AssertEmpty() + }) + + t.Run("404,already removed", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shard404, shardAlreadyRemoved}) + s.log = l + + checkErrorIs(t, s, apistatus.ErrObjectAlreadyRemoved) + + lb.AssertEmpty() + }) + + t.Run("404,expired", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shard404, shardExpired}) + s.log = l + + checkErrorIs(t, s, apistatus.ErrObjectNotFound) + + lb.AssertEmpty() + }) + + t.Run("404,404", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shard404, shard404}) + s.log = l + + checkErrorIs(t, s, apistatus.ErrObjectNotFound) + + lb.AssertEmpty() + }) + + t.Run("internal,OK", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shard500, shardOK}) + s.log = l + + checkOK(t, s) + + lb.AssertSingle(testutil.LogEntry{ + Level: zap.InfoLevel, + Message: "failed to get EC part from shard, ignore error", + Fields: map[string]any{ + "container": cnr.String(), + "parent": parentID.String(), + "ecRule": json.Number("123"), + "partIdx": json.Number("456"), + "shardID": base58.Encode([]byte("0")), + "error": "some shard error", + }, + }) + }) + + t.Run("internal,already removed", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shard500, shardAlreadyRemoved}) + s.log = l + + checkErrorIs(t, s, apistatus.ErrObjectAlreadyRemoved) + + lb.AssertSingle(testutil.LogEntry{ + Level: zap.InfoLevel, + Message: "failed to get EC part from shard, ignore error", + Fields: map[string]any{ + "container": cnr.String(), + "parent": parentID.String(), + "ecRule": json.Number("123"), + "partIdx": json.Number("456"), + "shardID": base58.Encode([]byte("0")), + "error": "some shard error", + }, + }) + }) + + t.Run("internal,expired", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shard500, shardExpired}) + s.log = l + + checkErrorIs(t, s, apistatus.ErrObjectNotFound) + + lb.AssertSingle(testutil.LogEntry{ + Level: zap.InfoLevel, + Message: "failed to get EC part from shard, ignore error", + Fields: map[string]any{ + "container": cnr.String(), + "parent": parentID.String(), + "ecRule": json.Number("123"), + "partIdx": json.Number("456"), + "shardID": base58.Encode([]byte("0")), + "error": "some shard error", + }, + }) + }) + + t.Run("internal,404", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shard500, shard404}) + s.log = l + + checkErrorIs(t, s, apistatus.ErrObjectNotFound) + + lb.AssertSingle(testutil.LogEntry{ + Level: zap.InfoLevel, + Message: "failed to get EC part from shard, ignore error", + Fields: map[string]any{ + "container": cnr.String(), + "parent": parentID.String(), + "ecRule": json.Number("123"), + "partIdx": json.Number("456"), + "shardID": base58.Encode([]byte("0")), + "error": "some shard error", + }, + }) + }) + + t.Run("404,OID,OK", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shard404, &mockShard{ + i: 1, + getECPart: map[getECPartKey]getECPartValue{ + {cnr: cnr, parent: parentID, pi: pi}: {err: fmt.Errorf("some error: %w", ierrors.ObjectID(partID))}, + }, + }, &mockShard{ + getStream: map[getStreamKey]getStreamValue{ + {addr: partAddr, skipMeta: true}: {obj: partObj}, + }, + }}) + s.log = l + + checkOK(t, s) + + lb.AssertSingle(testutil.LogEntry{ + Level: zap.InfoLevel, + Message: "EC part's object ID resolved in shard but reading failed, continue bypassing metabase", + Fields: map[string]any{ + "container": cnr.String(), + "parent": parentID.String(), + "ecRule": json.Number("123"), + "partIdx": json.Number("456"), + "shardID": base58.Encode([]byte("1")), + "error": "some error: " + partID.String(), + }, + }) + }) + + t.Run("404,OID,404", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shard404, &mockShard{ + i: 1, + getECPart: map[getECPartKey]getECPartValue{ + {cnr: cnr, parent: parentID, pi: pi}: {err: fmt.Errorf("some error: %w", ierrors.ObjectID(partID))}, + }, + }, shard404}) + s.log = l + + checkErrorIs(t, s, apistatus.ErrObjectNotFound) + + lb.AssertSingle(testutil.LogEntry{ + Level: zap.InfoLevel, + Message: "EC part's object ID resolved in shard but reading failed, continue bypassing metabase", + Fields: map[string]any{ + "container": cnr.String(), + "parent": parentID.String(), + "ecRule": json.Number("123"), + "partIdx": json.Number("456"), + "shardID": base58.Encode([]byte("1")), + "error": "some error: " + partID.String(), + }, + }) + }) + + t.Run("404,OID,internal", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shard404, &mockShard{ + i: 1, + getECPart: map[getECPartKey]getECPartValue{ + {cnr: cnr, parent: parentID, pi: pi}: {err: fmt.Errorf("some error: %w", ierrors.ObjectID(partID))}, + }, + }, &mockShard{ + i: 2, + getStream: map[getStreamKey]getStreamValue{ + {addr: partAddr, skipMeta: true}: {err: errors.New("some shard error")}, + }, + }}) + s.log = l + + checkErrorIs(t, s, apistatus.ErrObjectNotFound) + + lb.AssertEqual([]testutil.LogEntry{{ + Level: zap.InfoLevel, + Message: "EC part's object ID resolved in shard but reading failed, continue bypassing metabase", + Fields: map[string]any{ + "container": cnr.String(), + "parent": parentID.String(), + "ecRule": json.Number("123"), + "partIdx": json.Number("456"), + "shardID": base58.Encode([]byte("1")), + "error": "some error: " + partID.String(), + }, + }, { + Level: zap.InfoLevel, + Message: "failed to get EC part from shard bypassing metabase, ignore error", + Fields: map[string]any{ + "container": cnr.String(), + "parent": parentID.String(), + "ecRule": json.Number("123"), + "partIdx": json.Number("456"), + "partID": partID.String(), + "shardID": base58.Encode([]byte("2")), + "error": "some shard error", + }, + }}) + }) + + t.Run("already removed", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shardAlreadyRemoved, unimplementedShard{}}) // to ensure 2nd shard is not accessed + s.log = l + + checkErrorIs(t, s, apistatus.ErrObjectAlreadyRemoved) + + lb.AssertEmpty() + }) + + t.Run("expired", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shardExpired, unimplementedShard{}}) // to ensure 2nd shard is not accessed + s.log = l + + checkErrorIs(t, s, apistatus.ErrObjectNotFound) + + lb.AssertEmpty() + }) + + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shardOK, unimplementedShard{}}) // to ensure 2nd shard is not accessed + s.log = l + + checkOK(t, s) + + lb.AssertEmpty() +} + +func assertGetECPartOK(t testing.TB, exp, hdr object.Object, rdr io.ReadCloser) { + b, err := io.ReadAll(rdr) + require.NoError(t, err) + hdr.SetPayload(b) + require.Equal(t, exp, hdr) +} diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index 44b1ccc40c..32b79ac90e 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -2,15 +2,20 @@ package engine import ( "errors" + "io" "sync" "sync/atomic" "time" + iec "github.com/nspcc-dev/neofs-node/internal/ec" "github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" "github.com/nspcc-dev/neofs-node/pkg/util" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" ) @@ -33,11 +38,21 @@ type StorageEngine struct { blockMtx sync.RWMutex blockErr error + + sortShardsFn func(*StorageEngine, oid.Address) []shardWrapper +} + +// interface of [shard.Shard] used by [StorageEngine] for overriding in tests. +type shardInterface interface { + ID() *shard.ID + GetStream(oid.Address, bool) (*object.Object, io.ReadCloser, error) + GetECPart(cid.ID, oid.ID, iec.PartInfo) (object.Object, io.ReadCloser, error) } type shardWrapper struct { errorCount *atomic.Uint32 *shard.Shard + shardIface shardInterface // TODO: make Shard a shardInterface } type setModeRequest struct { @@ -229,6 +244,8 @@ func New(opts ...Option) *StorageEngine { shardPools: make(map[string]util.WorkerPool), closeCh: make(chan struct{}), setModeCh: make(chan setModeRequest), + + sortShardsFn: (*StorageEngine).sortedShards, } } diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index f1ab08ed2b..9febd80e6a 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -1,12 +1,18 @@ package engine import ( + "bytes" + "errors" "fmt" + "io" "os" "path/filepath" + "strconv" "sync/atomic" "testing" + "time" + iec "github.com/nspcc-dev/neofs-node/internal/ec" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" @@ -17,6 +23,7 @@ import ( cid "github.com/nspcc-dev/neofs-sdk-go/container/id" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" usertest "github.com/nspcc-dev/neofs-sdk-go/user/test" "github.com/nspcc-dev/neofs-sdk-go/version" @@ -197,3 +204,175 @@ func testNewEngineWithShardNum(t *testing.T, num int) *StorageEngine { return testNewEngineWithShards(shards...) } + +func newEngineWithFixedShardOrder(ss []shardInterface) *StorageEngine { + e := New() + + ws := make([]shardWrapper, len(ss)) + + for i := range ss { + ws[i] = shardWrapper{ + shardIface: ss[i], + } + } + + e.sortShardsFn = func(*StorageEngine, oid.Address) []shardWrapper { + return ws + } + + return e +} + +type unimplementedShard struct{} + +func (unimplementedShard) ID() *shard.ID { + panic("unimplemented") +} + +func (unimplementedShard) GetStream(oid.Address, bool) (*object.Object, io.ReadCloser, error) { + panic("unimplemented") +} + +func (unimplementedShard) GetECPart(cid.ID, oid.ID, iec.PartInfo) (object.Object, io.ReadCloser, error) { + panic("unimplemented") +} + +type getECPartKey struct { + cnr cid.ID + parent oid.ID + pi iec.PartInfo +} + +type getECPartValue struct { + obj object.Object + err error +} + +type getStreamKey struct { + addr oid.Address + skipMeta bool +} + +type getStreamValue struct { + obj object.Object + err error +} + +type mockShard struct { + i int + getECPartSleep time.Duration + getECPart map[getECPartKey]getECPartValue + getStream map[getStreamKey]getStreamValue +} + +func (x *mockShard) ID() *shard.ID { + si := strconv.Itoa(x.i) + return shard.NewIDFromBytes([]byte(si)) +} + +func (x *mockShard) GetECPart(cnr cid.ID, parent oid.ID, pi iec.PartInfo) (object.Object, io.ReadCloser, error) { + time.Sleep(x.getECPartSleep) + val, ok := x.getECPart[getECPartKey{ + cnr: cnr, + parent: parent, + pi: pi, + }] + if !ok { + return object.Object{}, nil, errors.New("[test] unexpected object requested") + } + return *val.obj.CutPayload(), io.NopCloser(bytes.NewReader(val.obj.Payload())), val.err +} + +func (x *mockShard) GetStream(addr oid.Address, skipMeta bool) (*object.Object, io.ReadCloser, error) { + val, ok := x.getStream[getStreamKey{ + addr: addr, + skipMeta: skipMeta, + }] + if !ok { + return nil, nil, errors.New("[test] unexpected object requested") + } + return val.obj.CutPayload(), io.NopCloser(bytes.NewReader(val.obj.Payload())), val.err +} + +type unimplementedMetrics struct{} + +func (x unimplementedMetrics) AddListContainersDuration(time.Duration) { + panic("unimplemented") +} + +func (x unimplementedMetrics) AddEstimateContainerSizeDuration(time.Duration) { + panic("unimplemented") +} + +func (x unimplementedMetrics) AddDeleteDuration(time.Duration) { + panic("unimplemented") +} + +func (x unimplementedMetrics) AddExistsDuration(time.Duration) { + panic("unimplemented") +} + +func (x unimplementedMetrics) AddGetDuration(time.Duration) { + panic("unimplemented") +} + +func (x unimplementedMetrics) AddHeadDuration(time.Duration) { + panic("unimplemented") +} + +func (x unimplementedMetrics) AddGetStreamDuration(time.Duration) { + panic("unimplemented") +} + +func (x unimplementedMetrics) AddInhumeDuration(time.Duration) { + panic("unimplemented") +} + +func (x unimplementedMetrics) AddPutDuration(time.Duration) { + panic("unimplemented") +} + +func (x unimplementedMetrics) AddRangeDuration(time.Duration) { + panic("unimplemented") +} + +func (x unimplementedMetrics) AddSearchDuration(time.Duration) { + panic("unimplemented") +} + +func (x unimplementedMetrics) AddListObjectsDuration(time.Duration) { + panic("unimplemented") +} + +func (x unimplementedMetrics) AddGetECPartDuration(time.Duration) { + panic("unimplemented") +} + +func (x unimplementedMetrics) SetObjectCounter(string, string, uint64) { + panic("unimplemented") +} + +func (x unimplementedMetrics) AddToObjectCounter(string, string, int) { + panic("unimplemented") +} + +func (x unimplementedMetrics) SetReadonly(string, bool) { + panic("unimplemented") +} + +func (x unimplementedMetrics) AddToContainerSize(string, int64) { + panic("unimplemented") +} + +func (x unimplementedMetrics) AddToPayloadCounter(string, int64) { + panic("unimplemented") +} + +type testMetrics struct { + unimplementedMetrics + getECPart atomic.Int64 +} + +func (x *testMetrics) AddGetECPartDuration(d time.Duration) { + x.getECPart.Add(int64(d)) +} diff --git a/pkg/local_object_storage/engine/metrics.go b/pkg/local_object_storage/engine/metrics.go index f166d0d8a5..ea99fbbbd5 100644 --- a/pkg/local_object_storage/engine/metrics.go +++ b/pkg/local_object_storage/engine/metrics.go @@ -17,6 +17,7 @@ type MetricRegister interface { AddRangeDuration(d time.Duration) AddSearchDuration(d time.Duration) AddListObjectsDuration(d time.Duration) + AddGetECPartDuration(d time.Duration) SetObjectCounter(shardID, objectType string, v uint64) AddToObjectCounter(shardID, objectType string, delta int) diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 722ccaea07..40b594375f 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -9,6 +9,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -189,11 +190,15 @@ func generateShardID() (*shard.ID, error) { return shard.NewIDFromBytes(bin), nil } -func (e *StorageEngine) sortedShards(objAddr interface{ EncodeToString() string }) []shardWrapper { +func (e *StorageEngine) sortedShards(objAddr oid.Address) []shardWrapper { shards := e.unsortedShards() hrw.Sort(shards, hrw.WrapBytes([]byte(objAddr.EncodeToString()))) + for i := range shards { + shards[i].shardIface = shards[i].Shard + } + return shards } diff --git a/pkg/local_object_storage/shard/ec.go b/pkg/local_object_storage/shard/ec.go index 9ffe5508f9..2461854591 100644 --- a/pkg/local_object_storage/shard/ec.go +++ b/pkg/local_object_storage/shard/ec.go @@ -6,6 +6,7 @@ import ( "io" iec "github.com/nspcc-dev/neofs-node/internal/ec" + ierrors "github.com/nspcc-dev/neofs-node/internal/errors" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" @@ -18,6 +19,10 @@ import ( // availability and reads it from the underlying BLOB storage. The result is a // header and a payload stream that must be closed by caller after processing. // +// If object is found in the metabase but unreadable from the BLOB storage, +// GetECPart wraps [ierrors.ObjectID] with the object ID along with the failure +// cause. +// // If write-cache is enabled, GetECPart tries to get the object from it first. // // If object has expired, GetECPart returns [meta.ErrObjectIsExpired]. @@ -52,7 +57,7 @@ func (s *Shard) GetECPart(cnr cid.ID, parent oid.ID, pi iec.PartInfo) (object.Ob hdr, rdr, err := s.blobStor.GetStream(partAddr) if err != nil { - return object.Object{}, nil, fmt.Errorf("get from BLOB storage by ID %s: %w", partID, err) + return object.Object{}, nil, fmt.Errorf("get from BLOB storage by ID %w: %w", ierrors.ObjectID(partID), err) } return *hdr, rdr, nil diff --git a/pkg/local_object_storage/shard/ec_test.go b/pkg/local_object_storage/shard/ec_test.go index 8ba3d03fd9..7fa612190e 100644 --- a/pkg/local_object_storage/shard/ec_test.go +++ b/pkg/local_object_storage/shard/ec_test.go @@ -7,6 +7,7 @@ import ( "testing" iec "github.com/nspcc-dev/neofs-node/internal/ec" + ierrors "github.com/nspcc-dev/neofs-node/internal/errors" "github.com/nspcc-dev/neofs-node/internal/testutil" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" @@ -102,6 +103,10 @@ func TestShard_GetECPart(t *testing.T) { _, _, err := s.GetECPart(cnr, parentID, pi) require.ErrorIs(t, err, tc.err) require.ErrorContains(t, err, fmt.Sprintf("get from BLOB storage by ID %s", partID)) + + var oidErr ierrors.ObjectID + require.ErrorAs(t, err, &oidErr) + require.EqualValues(t, partID, oidErr) }) } diff --git a/pkg/metrics/engine.go b/pkg/metrics/engine.go index 52ecfee691..f8216fb8fc 100644 --- a/pkg/metrics/engine.go +++ b/pkg/metrics/engine.go @@ -20,6 +20,7 @@ type ( rangeDuration prometheus.Histogram searchDuration prometheus.Histogram listObjectsDuration prometheus.Histogram + getECPartDuration prometheus.Histogram containerSize prometheus.GaugeVec payloadSize prometheus.GaugeVec @@ -114,6 +115,13 @@ func newEngineMetrics() engineMetrics { Name: "list_objects_time", Help: "Engine 'list objects' operations handling time", }) + + getECPartDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: storageNodeNameSpace, + Subsystem: engineSubsystem, + Name: "get_ec_part_time", + Help: "Engine 'get EC part' operations handling time", + }) ) var ( @@ -152,6 +160,7 @@ func newEngineMetrics() engineMetrics { rangeDuration: rangeDuration, searchDuration: searchDuration, listObjectsDuration: listObjectsDuration, + getECPartDuration: getECPartDuration, containerSize: *containerSize, payloadSize: *payloadSize, capacitySize: *capacitySize, @@ -171,6 +180,7 @@ func (m engineMetrics) register() { prometheus.MustRegister(m.rangeDuration) prometheus.MustRegister(m.searchDuration) prometheus.MustRegister(m.listObjectsDuration) + prometheus.MustRegister(m.getECPartDuration) prometheus.MustRegister(m.containerSize) prometheus.MustRegister(m.payloadSize) prometheus.MustRegister(m.capacitySize) @@ -224,6 +234,10 @@ func (m engineMetrics) AddListObjectsDuration(d time.Duration) { m.listObjectsDuration.Observe(d.Seconds()) } +func (m engineMetrics) AddGetECPartDuration(d time.Duration) { + m.getECPartDuration.Observe(d.Seconds()) +} + func (m engineMetrics) AddToContainerSize(cnrID string, size int64) { m.containerSize.With( prometheus.Labels{