diff --git a/CHANGELOG.md b/CHANGELOG.md index 993787ce8a..227a0be82f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,7 @@ Changelog for NeoFS Node - `neofs-cli object range` command now truncates file passed to `--file` (#3544) - `neofs-cli object range` command now creates file with `rw-r--r--` permissions (#3544) - Alphabet nodes send basic storage income based on the new Reports API from `container` contract (#3053) -- Use stream API of FSTree for object service `Get` operation (#3466) +- Use stream API of FSTree for object service `Get` operation (#3466, #3568) - Use meta buckets to mark containers with GC (#3561) ### Removed diff --git a/pkg/services/object/get/assemble.go b/pkg/services/object/get/assemble.go index 9c53b7571f..eaae4d52ea 100644 --- a/pkg/services/object/get/assemble.go +++ b/pkg/services/object/get/assemble.go @@ -113,7 +113,7 @@ func (exec *execCtx) initFromChild(obj oid.ID) (*oid.ID, []oid.ID) { seekLen := rng.GetLength() parSize := par.PayloadSize() if seekLen == 0 { - seekLen = parSize + seekLen = parSize - seekOff } seekTo := seekOff + seekLen @@ -175,8 +175,7 @@ func (exec *execCtx) overtakePayloadDirectly(children []oid.ID, rngs []objectSDK r = &rngs[i] } - retrieved, wrote := exec.copyChild(children[i], r, !withRng && checkRight) - if !retrieved && !wrote { + if !exec.copyChild(children[i], r, !withRng && checkRight) { return } } diff --git a/pkg/services/object/get/assembly_v2.go b/pkg/services/object/get/assembly_v2.go index eeda650bef..9f21f9fdb5 100644 --- a/pkg/services/object/get/assembly_v2.go +++ b/pkg/services/object/get/assembly_v2.go @@ -48,9 +48,7 @@ func (exec *execCtx) processV2Last(lastID oid.ID) { } if ok := exec.writeCollectedHeader(); ok { - if ok := exec.overtakePayloadInReverse(lastID); ok { - exec.writeObjectPayload(exec.collectedHeader, exec.collectedReader) - } + exec.overtakePayloadInReverse(lastID) } } @@ -147,11 +145,7 @@ func (exec *execCtx) rangeFromLink(link objectSDK.Link) bool { } } - retrieved, wrote := exec.copyChild(child.ObjectID(), rngPerChild, false) - if !retrieved { // failed to fetch child -> abort whole operation - return false - } - if !wrote { // payload fetch ok but writing failed -> stop further processing + if !exec.copyChild(child.ObjectID(), rngPerChild, false) { return true } } diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index ccde7c4e56..570111383a 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -192,8 +192,8 @@ func (exec *execCtx) headOnly() bool { } // copyChild fetches child object payload and streams it directly into current exec writer. -// Returns if child header was received and if full payload was successfully written. -func (exec *execCtx) copyChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (bool, bool) { +// Returns true if full payload (or requested range) was successfully written and, if requested, header validated. +func (exec *execCtx) copyChild(id oid.ID, rng *objectSDK.Range, withHdr bool) bool { log := exec.log if rng != nil { log = log.With(zap.String("child range", prettyRange(rng))) @@ -213,14 +213,18 @@ func (exec *execCtx) copyChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (b hdr := childWriter.hdr ok := exec.status == statusOK - if ok && withHdr && !exec.isChild(hdr) { - exec.status = statusUndefined - exec.err = errors.New("wrong child header") - - exec.log.Debug("parent address in child object differs") + if ok && withHdr { + if hdr == nil { + return false + } + if !exec.isChild(hdr) { + exec.status = statusUndefined + exec.err = errors.New("wrong child header") + exec.log.Debug("parent address in child object differs") + } } - return hdr != nil, ok + return ok } func (exec *execCtx) headChild(id oid.ID) (*objectSDK.Object, bool) { @@ -339,7 +343,7 @@ func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object, reader io.ReadClo }() bufSize := uint64(streamChunkSize) if obj != nil { - bufSize = min(streamChunkSize, obj.PayloadSize()) + bufSize = min(streamChunkSize, max(obj.PayloadSize(), 1)) } err = copyPayloadStream(exec.prm.objWriter, reader, bufSize) } else { diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index 5c7578093f..6433a6160e 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -58,6 +58,78 @@ type hasherWrapper struct { hash io.Writer } +// fallbackRangeReader wraps a range reader obtained via ObjectRangeInit and +// falls back to a full GET in case apistatus.ErrObjectAccessDenied is +// returned while reading. +type fallbackRangeReader struct { + io.ReadCloser + exec *execCtx + client *clientWrapper + key *ecdsa.PrivateKey + rng *object.Range + + fallbackDone bool +} + +func newFallbackRangeReader(exec *execCtx, c *clientWrapper, key *ecdsa.PrivateKey, rng *object.Range, rdr io.ReadCloser) io.ReadCloser { + return &fallbackRangeReader{ + ReadCloser: rdr, + exec: exec, + client: c, + key: key, + rng: rng, + } +} + +func (f *fallbackRangeReader) Read(p []byte) (int, error) { + n, err := f.ReadCloser.Read(p) + if err == nil || !errors.Is(err, apistatus.ErrObjectAccessDenied) || f.fallbackDone { + return n, err + } + + f.exec.log.Debug("range read access denied, falling back to full GET") + f.fallbackDone = true + + hdr, rdr, getErr := f.client.get(f.exec, f.key) + if getErr != nil { + return 0, fmt.Errorf("fallback GET after access denial failed: %w", getErr) + } + + pLen := hdr.PayloadSize() + from := f.rng.GetOffset() + ln := f.rng.GetLength() + var to uint64 + if ln != 0 { + to = from + ln + } else { + to = pLen + } + + if to < from || pLen < from || pLen < to { + _ = rdr.Close() + return 0, apistatus.ErrObjectOutOfRange + } + + if from > 0 { + _, err = io.CopyN(io.Discard, rdr, int64(from)) + if err != nil { + _ = rdr.Close() + return n, fmt.Errorf("discard %d bytes in stream: %w", from, err) + } + } + + f.ReadCloser = struct { + io.Reader + io.Closer + }{ + Reader: io.LimitReader(rdr, int64(to-from)), + Closer: rdr, + } + + // attempt to read again immediately to fill p. + return f.Read(p) +} + func NewSimpleObjectWriter() *SimpleObjectWriter { return &SimpleObjectWriter{ obj: object.New(), @@ -162,46 +234,11 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj } rdr, err := c.client.ObjectRangeInit(exec.context(), addr.Container(), id, rng.GetOffset(), ln, user.NewAutoIDSigner(*key), opts) - if err == nil { - return nil, rdr, nil - } - if !errors.Is(err, apistatus.ErrObjectAccessDenied) { - return nil, nil, fmt.Errorf("init payload reading: %w", err) - } - // Current spec allows other storage node to deny access, - // fallback to GET here. - hdr, reader, err := c.get(exec, key) if err != nil { - return nil, nil, err - } - - pLen := hdr.PayloadSize() - from := rng.GetOffset() - var to uint64 - if ln != 0 { - to = from + ln - } else { - to = pLen - } - - if to < from || pLen < from || pLen < to { - return nil, nil, apistatus.ErrObjectOutOfRange - } - - if from > 0 { - _, err = io.CopyN(io.Discard, reader, int64(from)) - if err != nil { - return nil, nil, fmt.Errorf("discard %d bytes in stream: %w", from, err) - } + return nil, nil, fmt.Errorf("init payload reading: %w", err) } - - return nil, struct { - io.Reader - io.Closer - }{ - Reader: io.LimitReader(reader, int64(to-from)), - Closer: reader, - }, nil + // fallback to full GET in case of access denial error. + return nil, newFallbackRangeReader(exec, c, key, rng, rdr), nil } return c.get(exec, key)