Skip to content

Commit dd83657

Browse files
authored
services/object: use stream API for Get operation (#3466)
Closes #3439.
2 parents adf5444 + 16adc2d commit dd83657

File tree

9 files changed

+229
-157
lines changed

9 files changed

+229
-157
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ Changelog for NeoFS Node
1616
- `neofs-cli object range` command now truncates file passed to `--file` (#3544)
1717
- `neofs-cli object range` command now creates file with `rw-r--r--` permissions (#3544)
1818
- Alphabet nodes send basic storage income based on the new Reports API from `container` contract (#3053)
19+
- Use stream API of FSTree for object service `Get` operation (#3466)
1920

2021
### Removed
2122
- `neofs-cli object head --main-only` no-op flag (#3509)

pkg/services/object/get/assemble.go

Lines changed: 38 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (exec *execCtx) assemble() {
6666

6767
if ok := exec.overtakePayloadInReverse(children[len(children)-1]); ok {
6868
// payload of all children except the last are written, write last payload
69-
exec.writeObjectPayload(exec.collectedObject)
69+
exec.copyChild(exec.lastChildID, &exec.lastChildRange, false)
7070
}
7171
}
7272
} else if prev != nil {
@@ -76,21 +76,25 @@ func (exec *execCtx) assemble() {
7676
// * else go right-to-left with GET and compose in single object before writing
7777

7878
if ok := exec.overtakePayloadInReverse(*prev); ok {
79-
// payload of all children except the last are written, write last payloa
80-
exec.writeObjectPayload(exec.collectedObject)
79+
var rng *objectSDK.Range
80+
if exec.ctxRange() != nil {
81+
rng = &exec.lastChildRange
82+
}
83+
// payload of all children except the last are written, write last payload
84+
exec.copyChild(exec.lastChildID, rng, false)
8185
}
8286
}
8387
} else {
8488
exec.log.Debug("could not init parent from child")
8589
}
8690
}
8791

88-
func (exec *execCtx) initFromChild(obj oid.ID) (prev *oid.ID, children []oid.ID) {
92+
func (exec *execCtx) initFromChild(obj oid.ID) (*oid.ID, []oid.ID) {
8993
exec.log.Debug("starting assembling from child", zap.Stringer("child ID", obj))
9094

91-
child, ok := exec.getChild(obj, nil, true)
95+
child, ok := exec.headChild(obj)
9296
if !ok {
93-
return
97+
return nil, nil
9498
}
9599

96100
par := child.Parent()
@@ -99,12 +103,10 @@ func (exec *execCtx) initFromChild(obj oid.ID) (prev *oid.ID, children []oid.ID)
99103

100104
exec.log.Debug("received child with empty parent", zap.Stringer("child ID", obj))
101105

102-
return
106+
return nil, nil
103107
}
104108

105-
exec.collectedObject = par
106-
107-
var payload []byte
109+
exec.collectedHeader = par
108110

109111
if rng := exec.ctxRange(); rng != nil {
110112
seekOff := rng.GetOffset()
@@ -116,35 +118,45 @@ func (exec *execCtx) initFromChild(obj oid.ID) (prev *oid.ID, children []oid.ID)
116118
seekTo := seekOff + seekLen
117119

118120
if seekTo < seekOff || parSize < seekOff || parSize < seekTo {
119-
var errOutOfRange apistatus.ObjectOutOfRange
120-
121-
exec.err = &errOutOfRange
121+
exec.err = apistatus.ErrObjectOutOfRange
122122
exec.status = statusAPIResponse
123123

124-
return
124+
return nil, nil
125125
}
126126

127127
childSize := child.PayloadSize()
128128

129-
exec.curOff = parSize - childSize
129+
startRight := parSize - childSize
130+
exec.curOff = startRight
130131

131132
from := uint64(0)
132-
if exec.curOff < seekOff {
133-
from = seekOff - exec.curOff
133+
if startRight < seekOff {
134+
from = seekOff - startRight
134135
}
135136

136137
to := uint64(0)
137-
if seekOff+seekLen > exec.curOff+from {
138-
to = seekOff + seekLen - exec.curOff
138+
if seekOff+seekLen > startRight+from {
139+
to = seekOff + seekLen - startRight
140+
if to > childSize {
141+
to = childSize
142+
}
139143
}
140144

141-
payload = child.Payload()[from:to]
142-
rng.SetLength(seekLen - to + from)
143-
} else {
144-
payload = child.Payload()
145+
segLen := uint64(0)
146+
if to > from {
147+
segLen = to - from
148+
}
149+
rng.SetLength(seekLen - segLen)
150+
151+
if segLen > 0 {
152+
exec.lastChildRange.SetOffset(from)
153+
exec.lastChildRange.SetLength(segLen)
154+
} else {
155+
exec.lastChildRange.SetLength(0)
156+
}
145157
}
146158

147-
exec.collectedObject.SetPayload(payload)
159+
exec.lastChildID = child.GetID()
148160

149161
idPrev := child.GetPreviousID()
150162
if !idPrev.IsZero() {
@@ -163,12 +175,8 @@ func (exec *execCtx) overtakePayloadDirectly(children []oid.ID, rngs []objectSDK
163175
r = &rngs[i]
164176
}
165177

166-
child, ok := exec.getChild(children[i], r, !withRng && checkRight)
167-
if !ok {
168-
return
169-
}
170-
171-
if ok := exec.writeObjectPayload(child); !ok {
178+
retrieved, wrote := exec.copyChild(children[i], r, !withRng && checkRight)
179+
if !retrieved && !wrote {
172180
return
173181
}
174182
}

pkg/services/object/get/assembly_v2.go

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,37 +36,54 @@ func (exec *execCtx) processV2Split(si *objectSDK.SplitInfo) {
3636
}
3737

3838
func (exec *execCtx) processV2Last(lastID oid.ID) {
39-
lastObj, ok := exec.getChild(lastID, nil, true)
39+
lastHead, ok := exec.headChild(lastID)
4040
if !ok {
4141
exec.log.Debug("failed to read last object")
4242
return
4343
}
4444

45-
exec.collectedObject = lastObj.Parent()
45+
exec.collectedHeader = lastHead.Parent()
4646
if r := exec.ctxRange(); r != nil && r.GetLength() == 0 {
47-
r.SetLength(exec.collectedObject.PayloadSize())
47+
r.SetLength(exec.collectedHeader.PayloadSize())
4848
}
4949

50-
// copied from V1, and it has the same problems as V1;
51-
// see it for comments and optimization suggestions
5250
if ok := exec.writeCollectedHeader(); ok {
5351
if ok := exec.overtakePayloadInReverse(lastID); ok {
54-
exec.writeObjectPayload(exec.collectedObject)
52+
exec.writeObjectPayload(exec.collectedHeader, exec.collectedReader)
5553
}
5654
}
5755
}
5856

5957
func (exec *execCtx) processV2Link(linkID oid.ID) bool {
60-
linkObj, ok := exec.getChild(linkID, nil, true)
61-
if !ok {
58+
// need full payload of link object to parse link structure
59+
w := NewSimpleObjectWriter()
60+
61+
p := exec.prm
62+
p.common = p.common.WithLocalOnly(false)
63+
p.objWriter = w
64+
p.SetRange(nil)
65+
p.addr.SetContainer(exec.containerID())
66+
p.addr.SetObject(linkID)
67+
68+
exec.statusError = exec.svc.get(exec.context(), p.commonPrm, withLogger(exec.log))
69+
70+
if exec.status != statusOK {
6271
exec.log.Debug("failed to read link object")
6372
return false
6473
}
6574

66-
exec.collectedObject = linkObj.Parent()
75+
linkObj := w.Object()
76+
if !exec.isChild(linkObj) {
77+
exec.status = statusUndefined
78+
exec.err = errors.New("wrong child header")
79+
exec.log.Debug("parent address in link object differs")
80+
return false
81+
}
82+
83+
exec.collectedHeader = linkObj.Parent()
6784
rng := exec.ctxRange()
6885
if rng != nil && rng.GetLength() == 0 {
69-
rng.SetLength(exec.collectedObject.PayloadSize())
86+
rng.SetLength(exec.collectedHeader.PayloadSize())
7087
}
7188

7289
var link objectSDK.Link
@@ -100,9 +117,7 @@ func (exec *execCtx) processV2Link(linkID oid.ID) bool {
100117
seekTo := seekOff + seekLen
101118

102119
if seekTo < seekOff || parSize < seekOff || parSize < seekTo {
103-
var errOutOfRange apistatus.ObjectOutOfRange
104-
105-
exec.err = &errOutOfRange
120+
exec.err = apistatus.ErrObjectOutOfRange
106121
exec.status = statusAPIResponse
107122

108123
// the operation has failed but no need to continue so `true` here
@@ -132,13 +147,11 @@ func (exec *execCtx) rangeFromLink(link objectSDK.Link) bool {
132147
}
133148
}
134149

135-
part, ok := exec.getChild(child.ObjectID(), rngPerChild, false)
136-
if !ok {
150+
retrieved, wrote := exec.copyChild(child.ObjectID(), rngPerChild, false)
151+
if !retrieved { // failed to fetch child -> abort whole operation
137152
return false
138153
}
139-
140-
if !exec.writeObjectPayload(part) {
141-
// we have payload, we want to send it but can't so stop here
154+
if !wrote { // payload fetch ok but writing failed -> stop further processing
142155
return true
143156
}
144157
}

pkg/services/object/get/exec.go

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/ecdsa"
66
"errors"
7+
"io"
78

89
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
910
"github.com/nspcc-dev/neofs-node/pkg/core/object"
@@ -14,6 +15,12 @@ import (
1415
"go.uber.org/zap"
1516
)
1617

18+
const (
19+
// streamChunkSize is the size of the chunk that is used to read/write
20+
// object payload from/to the stream.
21+
streamChunkSize = 256 * 1024 // 256 KiB
22+
)
23+
1724
type statusError struct {
1825
status int
1926
err error
@@ -34,11 +41,16 @@ type execCtx struct {
3441
// If debug level is enabled, all messages also include info about processing request.
3542
log *zap.Logger
3643

37-
collectedObject *objectSDK.Object
44+
collectedHeader *objectSDK.Object
45+
collectedReader io.ReadCloser
3846

3947
curOff uint64
4048

4149
head bool
50+
51+
// range assembly (V1 split) helpers
52+
lastChildID oid.ID
53+
lastChildRange objectSDK.Range
4254
}
4355

4456
type execOption func(*execCtx)
@@ -167,35 +179,36 @@ func (exec *execCtx) headOnly() bool {
167179
return exec.head
168180
}
169181

170-
func (exec *execCtx) getChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (*objectSDK.Object, bool) {
182+
// copyChild fetches child object payload and streams it directly into current exec writer.
183+
// Returns if child header was received and if full payload was successfully written.
184+
func (exec *execCtx) copyChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (bool, bool) {
171185
log := exec.log
172186
if rng != nil {
173187
log = log.With(zap.String("child range", prettyRange(rng)))
174188
}
175189

176-
w := NewSimpleObjectWriter()
190+
childWriter := newDirectChildWriter(exec.prm.objWriter)
177191

178192
p := exec.prm
179193
p.common = p.common.WithLocalOnly(false)
180-
p.objWriter = w
194+
p.objWriter = childWriter
181195
p.SetRange(rng)
182-
183196
p.addr.SetContainer(exec.containerID())
184197
p.addr.SetObject(id)
185198

186199
exec.statusError = exec.svc.get(exec.context(), p.commonPrm, withPayloadRange(rng), withLogger(log))
187200

188-
child := w.Object()
201+
hdr := childWriter.hdr
189202
ok := exec.status == statusOK
190203

191-
if ok && withHdr && !exec.isChild(child) {
204+
if ok && withHdr && !exec.isChild(hdr) {
192205
exec.status = statusUndefined
193206
exec.err = errors.New("wrong child header")
194207

195208
exec.log.Debug("parent address in child object differs")
196209
}
197210

198-
return child, ok
211+
return hdr != nil, ok
199212
}
200213

201214
func (exec *execCtx) headChild(id oid.ID) (*objectSDK.Object, bool) {
@@ -280,7 +293,7 @@ func (exec *execCtx) writeCollectedHeader() bool {
280293
}
281294

282295
err := exec.prm.objWriter.WriteHeader(
283-
exec.collectedObject.CutPayload(),
296+
exec.collectedHeader.CutPayload(),
284297
)
285298

286299
switch {
@@ -299,12 +312,27 @@ func (exec *execCtx) writeCollectedHeader() bool {
299312
return exec.status == statusOK
300313
}
301314

302-
func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool {
315+
func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object, reader io.ReadCloser) bool {
303316
if exec.headOnly() {
304317
return true
305318
}
306319

307-
err := exec.prm.objWriter.WriteChunk(obj.Payload())
320+
var err error
321+
if reader != nil {
322+
defer func() {
323+
err := reader.Close()
324+
if err != nil {
325+
exec.log.Debug("error while closing payload reader", zap.Error(err))
326+
}
327+
}()
328+
bufSize := streamChunkSize
329+
if obj != nil {
330+
bufSize = min(streamChunkSize, int(obj.PayloadSize()))
331+
}
332+
err = copyPayloadStream(exec.prm.objWriter, reader, bufSize)
333+
} else {
334+
err = exec.prm.objWriter.WriteChunk(obj.Payload())
335+
}
308336

309337
switch {
310338
default:
@@ -322,9 +350,29 @@ func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool {
322350
return err == nil
323351
}
324352

353+
// copyPayloadStream writes payload from stream to writer.
354+
func copyPayloadStream(w ChunkWriter, r io.Reader, bufSize int) error {
355+
buf := make([]byte, bufSize)
356+
for {
357+
n, err := r.Read(buf)
358+
if n > 0 {
359+
if writeErr := w.WriteChunk(buf[:n]); writeErr != nil {
360+
return writeErr
361+
}
362+
}
363+
364+
if errors.Is(err, io.EOF) {
365+
return nil
366+
}
367+
if err != nil {
368+
return err
369+
}
370+
}
371+
}
372+
325373
func (exec *execCtx) writeCollectedObject() {
326374
if ok := exec.writeCollectedHeader(); ok {
327-
exec.writeObjectPayload(exec.collectedObject)
375+
exec.writeObjectPayload(exec.collectedHeader, exec.collectedReader)
328376
}
329377
}
330378

0 commit comments

Comments
 (0)