|
| 1 | +package engine |
| 2 | + |
| 3 | +import ( |
| 4 | + "errors" |
| 5 | + "io" |
| 6 | + |
| 7 | + iec "github.com/nspcc-dev/neofs-node/internal/ec" |
| 8 | + ierrors "github.com/nspcc-dev/neofs-node/internal/errors" |
| 9 | + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" |
| 10 | + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" |
| 11 | + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" |
| 12 | + "github.com/nspcc-dev/neofs-sdk-go/object" |
| 13 | + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" |
| 14 | + "go.uber.org/zap" |
| 15 | +) |
| 16 | + |
| 17 | +// GetECPart looks up for object that carries EC part produced within cnr for |
| 18 | +// parent object and indexed by pi in the underlying metabase, checks its |
| 19 | +// availability and reads it from the underlying BLOB storage. The result is a |
| 20 | +// header and a payload stream that must be closed by caller after processing. |
| 21 | +// |
| 22 | +// If write-cache is enabled, GetECPart tries to get the object from it first. |
| 23 | +// |
| 24 | +// If object has expired, GetECPart returns [meta.ErrObjectIsExpired]. |
| 25 | +// |
| 26 | +// If object exists but tombstoned (e.g. via [StorageEngine.Inhume] or stored |
| 27 | +// tombstone object), GetECPart returns [apistatus.ErrObjectAlreadyRemoved]. |
| 28 | +// |
| 29 | +// If object is marked as garbage (e.g. via [StorageEngine.MarkGarbage]), |
| 30 | +// GetECPart returns [apistatus.ErrObjectNotFound]. |
| 31 | +// |
| 32 | +// If object is locked (e.g. via [StorageEngine.Lock] or stored locker object), |
| 33 | +// GetECPart ignores expiration, tombstone and garbage marks. |
| 34 | +func (e *StorageEngine) GetECPart(cnr cid.ID, parent oid.ID, pi iec.PartInfo) (object.Object, io.ReadCloser, error) { |
| 35 | + if e.metrics != nil { |
| 36 | + defer elapsed(e.metrics.AddGetECPartDuration)() |
| 37 | + } |
| 38 | + |
| 39 | + e.blockMtx.RLock() |
| 40 | + defer e.blockMtx.RUnlock() |
| 41 | + if e.blockErr != nil { |
| 42 | + return object.Object{}, nil, e.blockErr |
| 43 | + } |
| 44 | + |
| 45 | + // TODO: sync placement with PUT. They should sort shards equally, but now PUT sorts by part ID. |
| 46 | + // https://github.com/nspcc-dev/neofs-node/issues/3537 |
| 47 | + s := e.sortShardsFn(e, oid.NewAddress(cnr, parent)) |
| 48 | + |
| 49 | + var partID oid.ID |
| 50 | +loop: |
| 51 | + for i := range s { |
| 52 | + obj, rdr, err := s[i].shardIface.GetECPart(cnr, parent, pi) |
| 53 | + switch { |
| 54 | + case err == nil: |
| 55 | + return obj, rdr, nil |
| 56 | + case errors.Is(err, apistatus.ErrObjectAlreadyRemoved): |
| 57 | + return object.Object{}, nil, err |
| 58 | + case errors.Is(err, meta.ErrObjectIsExpired): |
| 59 | + return object.Object{}, nil, apistatus.ErrObjectNotFound // like Get |
| 60 | + case errors.As(err, (*ierrors.ObjectID)(&partID)): |
| 61 | + if partID.IsZero() { |
| 62 | + panic("zero object ID returned as error") |
| 63 | + } |
| 64 | + |
| 65 | + e.log.Info("EC part's object ID resolved in shard but reading failed, continue bypassing metabase", |
| 66 | + zap.Stringer("container", cnr), zap.Stringer("parent", parent), |
| 67 | + zap.Int("ecRule", pi.RuleIndex), zap.Int("partIdx", pi.Index), |
| 68 | + zap.Stringer("shardID", s[i].shardIface.ID()), zap.Error(err)) |
| 69 | + // TODO: need report error? Same for other places. https://github.com/nspcc-dev/neofs-node/issues/3538 |
| 70 | + |
| 71 | + s = s[i+1:] |
| 72 | + break loop |
| 73 | + case errors.Is(err, apistatus.ErrObjectNotFound): |
| 74 | + default: |
| 75 | + e.log.Info("failed to get EC part from shard, ignore error", |
| 76 | + zap.Stringer("container", cnr), zap.Stringer("parent", parent), |
| 77 | + zap.Int("ecRule", pi.RuleIndex), zap.Int("partIdx", pi.Index), |
| 78 | + zap.Stringer("shardID", s[i].shardIface.ID()), zap.Error(err)) |
| 79 | + } |
| 80 | + } |
| 81 | + |
| 82 | + if partID.IsZero() { |
| 83 | + return object.Object{}, nil, apistatus.ErrObjectNotFound |
| 84 | + } |
| 85 | + |
| 86 | + for i := range s { |
| 87 | + // get an object bypassing the metabase. We can miss deletion or expiration mark. Get behaves like this, so here too. |
| 88 | + obj, rdr, err := s[i].shardIface.GetStream(oid.NewAddress(cnr, partID), true) |
| 89 | + switch { |
| 90 | + case err == nil: |
| 91 | + return *obj, rdr, nil |
| 92 | + case errors.Is(err, apistatus.ErrObjectNotFound): |
| 93 | + default: |
| 94 | + e.log.Info("failed to get EC part from shard bypassing metabase, ignore error", |
| 95 | + zap.Stringer("container", cnr), zap.Stringer("parent", parent), |
| 96 | + zap.Int("ecRule", pi.RuleIndex), zap.Int("partIdx", pi.Index), |
| 97 | + zap.Stringer("partID", partID), |
| 98 | + zap.Stringer("shardID", s[i].shardIface.ID()), zap.Error(err)) |
| 99 | + } |
| 100 | + } |
| 101 | + |
| 102 | + return object.Object{}, nil, apistatus.ErrObjectNotFound |
| 103 | +} |
0 commit comments