Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Changelog for NeoFS Node
- `neofs-cli object range` command now truncates file passed to `--file` (#3544)
- `neofs-cli object range` command now creates file with `rw-r--r--` permissions (#3544)
- Alphabet nodes send basic storage income based on the new Reports API from `container` contract (#3053)
- Use stream API of FSTree for object service `Get` operation (#3466)
- Use stream API of FSTree for object service `Get` operation (#3466, #3568)
- Use meta buckets to mark containers with GC (#3561)

### Removed
Expand Down
5 changes: 2 additions & 3 deletions pkg/services/object/get/assemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (exec *execCtx) initFromChild(obj oid.ID) (*oid.ID, []oid.ID) {
seekLen := rng.GetLength()
parSize := par.PayloadSize()
if seekLen == 0 {
seekLen = parSize
seekLen = parSize - seekOff
}
seekTo := seekOff + seekLen

Expand Down Expand Up @@ -175,8 +175,7 @@ func (exec *execCtx) overtakePayloadDirectly(children []oid.ID, rngs []objectSDK
r = &rngs[i]
}

retrieved, wrote := exec.copyChild(children[i], r, !withRng && checkRight)
if !retrieved && !wrote {
if !exec.copyChild(children[i], r, !withRng && checkRight) {
return
}
}
Expand Down
10 changes: 2 additions & 8 deletions pkg/services/object/get/assembly_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ func (exec *execCtx) processV2Last(lastID oid.ID) {
}

if ok := exec.writeCollectedHeader(); ok {
if ok := exec.overtakePayloadInReverse(lastID); ok {
exec.writeObjectPayload(exec.collectedHeader, exec.collectedReader)
}
exec.overtakePayloadInReverse(lastID)
}
}

Expand Down Expand Up @@ -147,11 +145,7 @@ func (exec *execCtx) rangeFromLink(link objectSDK.Link) bool {
}
}

retrieved, wrote := exec.copyChild(child.ObjectID(), rngPerChild, false)
if !retrieved { // failed to fetch child -> abort whole operation
return false
}
if !wrote { // payload fetch ok but writing failed -> stop further processing
if !exec.copyChild(child.ObjectID(), rngPerChild, false) {
return true
}
}
Expand Down
22 changes: 13 additions & 9 deletions pkg/services/object/get/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ func (exec *execCtx) headOnly() bool {
}

// copyChild fetches child object payload and streams it directly into current exec writer.
// Returns if child header was received and if full payload was successfully written.
func (exec *execCtx) copyChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (bool, bool) {
// Returns true if full payload (or requested range) was successfully written and, if requested, header validated.
func (exec *execCtx) copyChild(id oid.ID, rng *objectSDK.Range, withHdr bool) bool {
log := exec.log
if rng != nil {
log = log.With(zap.String("child range", prettyRange(rng)))
Expand All @@ -213,14 +213,18 @@ func (exec *execCtx) copyChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (b
hdr := childWriter.hdr
ok := exec.status == statusOK

if ok && withHdr && !exec.isChild(hdr) {
exec.status = statusUndefined
exec.err = errors.New("wrong child header")

exec.log.Debug("parent address in child object differs")
if ok && withHdr {
if hdr == nil {
return false
}
if !exec.isChild(hdr) {
exec.status = statusUndefined
exec.err = errors.New("wrong child header")
exec.log.Debug("parent address in child object differs")
}
}

return hdr != nil, ok
return ok
}

func (exec *execCtx) headChild(id oid.ID) (*objectSDK.Object, bool) {
Expand Down Expand Up @@ -339,7 +343,7 @@ func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object, reader io.ReadClo
}()
bufSize := uint64(streamChunkSize)
if obj != nil {
bufSize = min(streamChunkSize, obj.PayloadSize())
bufSize = min(streamChunkSize, max(obj.PayloadSize(), 1))
}
err = copyPayloadStream(exec.prm.objWriter, reader, bufSize)
} else {
Expand Down
113 changes: 75 additions & 38 deletions pkg/services/object/get/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,78 @@ type hasherWrapper struct {
hash io.Writer
}

// fallbackRangeReader wraps a range reader obtained via ObjectRangeInit and
// falls back to a full GET in case apistatus.ErrObjectAccessDenied is
// returned while reading.
type fallbackRangeReader struct {
io.ReadCloser
exec *execCtx
client *clientWrapper
key *ecdsa.PrivateKey
rng *object.Range

fallbackDone bool
}

func newFallbackRangeReader(exec *execCtx, c *clientWrapper, key *ecdsa.PrivateKey, rng *object.Range, rdr io.ReadCloser) io.ReadCloser {
return &fallbackRangeReader{
ReadCloser: rdr,
exec: exec,
client: c,
key: key,
rng: rng,
}
}

func (f *fallbackRangeReader) Read(p []byte) (int, error) {
n, err := f.ReadCloser.Read(p)
if err == nil || !errors.Is(err, apistatus.ErrObjectAccessDenied) || f.fallbackDone {
return n, err
}

f.exec.log.Debug("range read access denied, falling back to full GET")
f.fallbackDone = true

hdr, rdr, getErr := f.client.get(f.exec, f.key)
if getErr != nil {
return 0, fmt.Errorf("fallback GET after access denial failed: %w", getErr)
}

pLen := hdr.PayloadSize()
from := f.rng.GetOffset()
ln := f.rng.GetLength()
var to uint64
if ln != 0 {
to = from + ln
} else {
to = pLen
}

if to < from || pLen < from || pLen < to {
_ = rdr.Close()
return 0, apistatus.ErrObjectOutOfRange
}

if from > 0 {
_, err = io.CopyN(io.Discard, rdr, int64(from))
if err != nil {
_ = rdr.Close()
return n, fmt.Errorf("discard %d bytes in stream: %w", from, err)
}
}

f.ReadCloser = struct {
io.Reader
io.Closer
}{
Reader: io.LimitReader(rdr, int64(to-from)),
Closer: rdr,
}

// attempt to read again immediately to fill p.
return f.Read(p)
}

func NewSimpleObjectWriter() *SimpleObjectWriter {
return &SimpleObjectWriter{
obj: object.New(),
Expand Down Expand Up @@ -162,46 +234,11 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
}

rdr, err := c.client.ObjectRangeInit(exec.context(), addr.Container(), id, rng.GetOffset(), ln, user.NewAutoIDSigner(*key), opts)
if err == nil {
return nil, rdr, nil
}
if !errors.Is(err, apistatus.ErrObjectAccessDenied) {
return nil, nil, fmt.Errorf("init payload reading: %w", err)
}
// Current spec allows other storage node to deny access,
// fallback to GET here.
hdr, reader, err := c.get(exec, key)
if err != nil {
return nil, nil, err
}

pLen := hdr.PayloadSize()
from := rng.GetOffset()
var to uint64
if ln != 0 {
to = from + ln
} else {
to = pLen
}

if to < from || pLen < from || pLen < to {
return nil, nil, apistatus.ErrObjectOutOfRange
}

if from > 0 {
_, err = io.CopyN(io.Discard, reader, int64(from))
if err != nil {
return nil, nil, fmt.Errorf("discard %d bytes in stream: %w", from, err)
}
return nil, nil, fmt.Errorf("init payload reading: %w", err)
}

return nil, struct {
io.Reader
io.Closer
}{
Reader: io.LimitReader(reader, int64(to-from)),
Closer: reader,
}, nil
// fallback to full GET in case of access denial error.
return nil, newFallbackRangeReader(exec, c, key, rng, rdr), nil
}

return c.get(exec, key)
Expand Down
Loading