Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Changelog for NeoFS Node
- `neofs-cli object range` command now truncates file passed to `--file` (#3544)
- `neofs-cli object range` command now creates file with `rw-r--r--` permissions (#3544)
- Alphabet nodes send basic storage income based on the new Reports API from `container` contract (#3053)
- Use stream API of FSTree for object service `Get` operation (#3466)

### Removed
- `neofs-cli object head --main-only` no-op flag (#3509)
Expand Down
68 changes: 38 additions & 30 deletions pkg/services/object/get/assemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (exec *execCtx) assemble() {

if ok := exec.overtakePayloadInReverse(children[len(children)-1]); ok {
// payload of all children except the last are written, write last payload
exec.writeObjectPayload(exec.collectedObject)
exec.copyChild(exec.lastChildID, &exec.lastChildRange, false)
}
}
} else if prev != nil {
Expand All @@ -76,21 +76,25 @@ func (exec *execCtx) assemble() {
// * else go right-to-left with GET and compose in single object before writing

if ok := exec.overtakePayloadInReverse(*prev); ok {
// payload of all children except the last are written, write last payloa
exec.writeObjectPayload(exec.collectedObject)
var rng *objectSDK.Range
if exec.ctxRange() != nil {
rng = &exec.lastChildRange
}
// payload of all children except the last are written, write last payload
exec.copyChild(exec.lastChildID, rng, false)
}
}
} else {
exec.log.Debug("could not init parent from child")
}
}

func (exec *execCtx) initFromChild(obj oid.ID) (prev *oid.ID, children []oid.ID) {
func (exec *execCtx) initFromChild(obj oid.ID) (*oid.ID, []oid.ID) {
exec.log.Debug("starting assembling from child", zap.Stringer("child ID", obj))

child, ok := exec.getChild(obj, nil, true)
child, ok := exec.headChild(obj)
if !ok {
return
return nil, nil
}

par := child.Parent()
Expand All @@ -99,12 +103,10 @@ func (exec *execCtx) initFromChild(obj oid.ID) (prev *oid.ID, children []oid.ID)

exec.log.Debug("received child with empty parent", zap.Stringer("child ID", obj))

return
return nil, nil
}

exec.collectedObject = par

var payload []byte
exec.collectedHeader = par

if rng := exec.ctxRange(); rng != nil {
seekOff := rng.GetOffset()
Expand All @@ -116,35 +118,45 @@ func (exec *execCtx) initFromChild(obj oid.ID) (prev *oid.ID, children []oid.ID)
seekTo := seekOff + seekLen

if seekTo < seekOff || parSize < seekOff || parSize < seekTo {
var errOutOfRange apistatus.ObjectOutOfRange

exec.err = &errOutOfRange
exec.err = apistatus.ErrObjectOutOfRange
exec.status = statusAPIResponse

return
return nil, nil
}

childSize := child.PayloadSize()

exec.curOff = parSize - childSize
startRight := parSize - childSize
exec.curOff = startRight

from := uint64(0)
if exec.curOff < seekOff {
from = seekOff - exec.curOff
if startRight < seekOff {
from = seekOff - startRight
}

to := uint64(0)
if seekOff+seekLen > exec.curOff+from {
to = seekOff + seekLen - exec.curOff
if seekOff+seekLen > startRight+from {
to = seekOff + seekLen - startRight
if to > childSize {
to = childSize
}
}

payload = child.Payload()[from:to]
rng.SetLength(seekLen - to + from)
} else {
payload = child.Payload()
segLen := uint64(0)
if to > from {
segLen = to - from
}
rng.SetLength(seekLen - segLen)

if segLen > 0 {
exec.lastChildRange.SetOffset(from)
exec.lastChildRange.SetLength(segLen)
} else {
exec.lastChildRange.SetLength(0)
}
}

exec.collectedObject.SetPayload(payload)
exec.lastChildID = child.GetID()

idPrev := child.GetPreviousID()
if !idPrev.IsZero() {
Expand All @@ -163,12 +175,8 @@ func (exec *execCtx) overtakePayloadDirectly(children []oid.ID, rngs []objectSDK
r = &rngs[i]
}

child, ok := exec.getChild(children[i], r, !withRng && checkRight)
if !ok {
return
}

if ok := exec.writeObjectPayload(child); !ok {
retrieved, wrote := exec.copyChild(children[i], r, !withRng && checkRight)
if !retrieved && !wrote {
return
}
}
Expand Down
49 changes: 31 additions & 18 deletions pkg/services/object/get/assembly_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,37 +36,54 @@ func (exec *execCtx) processV2Split(si *objectSDK.SplitInfo) {
}

func (exec *execCtx) processV2Last(lastID oid.ID) {
lastObj, ok := exec.getChild(lastID, nil, true)
lastHead, ok := exec.headChild(lastID)
if !ok {
exec.log.Debug("failed to read last object")
return
}

exec.collectedObject = lastObj.Parent()
exec.collectedHeader = lastHead.Parent()
if r := exec.ctxRange(); r != nil && r.GetLength() == 0 {
r.SetLength(exec.collectedObject.PayloadSize())
r.SetLength(exec.collectedHeader.PayloadSize())
}

// copied from V1, and it has the same problems as V1;
// see it for comments and optimization suggestions
if ok := exec.writeCollectedHeader(); ok {
if ok := exec.overtakePayloadInReverse(lastID); ok {
exec.writeObjectPayload(exec.collectedObject)
exec.writeObjectPayload(exec.collectedHeader, exec.collectedReader)
}
}
}

func (exec *execCtx) processV2Link(linkID oid.ID) bool {
linkObj, ok := exec.getChild(linkID, nil, true)
if !ok {
// need full payload of link object to parse link structure
w := NewSimpleObjectWriter()

p := exec.prm
p.common = p.common.WithLocalOnly(false)
p.objWriter = w
p.SetRange(nil)
p.addr.SetContainer(exec.containerID())
p.addr.SetObject(linkID)

exec.statusError = exec.svc.get(exec.context(), p.commonPrm, withLogger(exec.log))

if exec.status != statusOK {
exec.log.Debug("failed to read link object")
return false
}

exec.collectedObject = linkObj.Parent()
linkObj := w.Object()
if !exec.isChild(linkObj) {
exec.status = statusUndefined
exec.err = errors.New("wrong child header")
exec.log.Debug("parent address in link object differs")
return false
}

exec.collectedHeader = linkObj.Parent()
rng := exec.ctxRange()
if rng != nil && rng.GetLength() == 0 {
rng.SetLength(exec.collectedObject.PayloadSize())
rng.SetLength(exec.collectedHeader.PayloadSize())
}

var link objectSDK.Link
Expand Down Expand Up @@ -100,9 +117,7 @@ func (exec *execCtx) processV2Link(linkID oid.ID) bool {
seekTo := seekOff + seekLen

if seekTo < seekOff || parSize < seekOff || parSize < seekTo {
var errOutOfRange apistatus.ObjectOutOfRange

exec.err = &errOutOfRange
exec.err = apistatus.ErrObjectOutOfRange
exec.status = statusAPIResponse

// the operation has failed but no need to continue so `true` here
Expand Down Expand Up @@ -132,13 +147,11 @@ func (exec *execCtx) rangeFromLink(link objectSDK.Link) bool {
}
}

part, ok := exec.getChild(child.ObjectID(), rngPerChild, false)
if !ok {
retrieved, wrote := exec.copyChild(child.ObjectID(), rngPerChild, false)
if !retrieved { // failed to fetch child -> abort whole operation
return false
}

if !exec.writeObjectPayload(part) {
// we have payload, we want to send it but can't so stop here
if !wrote { // payload fetch ok but writing failed -> stop further processing
return true
}
}
Expand Down
72 changes: 60 additions & 12 deletions pkg/services/object/get/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/ecdsa"
"errors"
"io"

clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
Expand All @@ -14,6 +15,12 @@ import (
"go.uber.org/zap"
)

const (
// streamChunkSize is the size of the chunk that is used to read/write
// object payload from/to the stream.
streamChunkSize = 256 * 1024 // 256 KiB
)

type statusError struct {
status int
err error
Expand All @@ -34,11 +41,16 @@ type execCtx struct {
// If debug level is enabled, all messages also include info about processing request.
log *zap.Logger

collectedObject *objectSDK.Object
collectedHeader *objectSDK.Object
collectedReader io.ReadCloser

curOff uint64

head bool

// range assembly (V1 split) helpers
lastChildID oid.ID
lastChildRange objectSDK.Range
}

type execOption func(*execCtx)
Expand Down Expand Up @@ -167,35 +179,36 @@ func (exec *execCtx) headOnly() bool {
return exec.head
}

func (exec *execCtx) getChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (*objectSDK.Object, bool) {
// copyChild fetches child object payload and streams it directly into current exec writer.
// Returns if child header was received and if full payload was successfully written.
func (exec *execCtx) copyChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (bool, bool) {
log := exec.log
if rng != nil {
log = log.With(zap.String("child range", prettyRange(rng)))
}

w := NewSimpleObjectWriter()
childWriter := newDirectChildWriter(exec.prm.objWriter)

p := exec.prm
p.common = p.common.WithLocalOnly(false)
p.objWriter = w
p.objWriter = childWriter
p.SetRange(rng)

p.addr.SetContainer(exec.containerID())
p.addr.SetObject(id)

exec.statusError = exec.svc.get(exec.context(), p.commonPrm, withPayloadRange(rng), withLogger(log))

child := w.Object()
hdr := childWriter.hdr
ok := exec.status == statusOK

if ok && withHdr && !exec.isChild(child) {
if ok && withHdr && !exec.isChild(hdr) {
exec.status = statusUndefined
exec.err = errors.New("wrong child header")

exec.log.Debug("parent address in child object differs")
}

return child, ok
return hdr != nil, ok
}

func (exec *execCtx) headChild(id oid.ID) (*objectSDK.Object, bool) {
Expand Down Expand Up @@ -280,7 +293,7 @@ func (exec *execCtx) writeCollectedHeader() bool {
}

err := exec.prm.objWriter.WriteHeader(
exec.collectedObject.CutPayload(),
exec.collectedHeader.CutPayload(),
)

switch {
Expand All @@ -299,12 +312,27 @@ func (exec *execCtx) writeCollectedHeader() bool {
return exec.status == statusOK
}

func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool {
func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object, reader io.ReadCloser) bool {
if exec.headOnly() {
return true
}

err := exec.prm.objWriter.WriteChunk(obj.Payload())
var err error
if reader != nil {
defer func() {
err := reader.Close()
if err != nil {
exec.log.Debug("error while closing payload reader", zap.Error(err))
}
}()
bufSize := streamChunkSize
if obj != nil {
bufSize = min(streamChunkSize, int(obj.PayloadSize()))
}
err = copyPayloadStream(exec.prm.objWriter, reader, bufSize)
} else {
err = exec.prm.objWriter.WriteChunk(obj.Payload())
}

switch {
default:
Expand All @@ -322,9 +350,29 @@ func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool {
return err == nil
}

// copyPayloadStream writes payload from stream to writer.
func copyPayloadStream(w ChunkWriter, r io.Reader, bufSize int) error {
buf := make([]byte, bufSize)
for {
n, err := r.Read(buf)
if n > 0 {
if writeErr := w.WriteChunk(buf[:n]); writeErr != nil {
return writeErr
}
}

if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
return err
}
}
}

func (exec *execCtx) writeCollectedObject() {
if ok := exec.writeCollectedHeader(); ok {
exec.writeObjectPayload(exec.collectedObject)
exec.writeObjectPayload(exec.collectedHeader, exec.collectedReader)
}
}

Expand Down
Loading
Loading