Skip to content

Commit 3953660

Browse files
committed
services/object: use stream API for Get operation
Network API always had a concept of payload stream, FSTree can also provide it now after #3431. So use `GetStream` in object service to enable memory-efficient object retrieval without loading large payloads into memory. Closes #3439. Signed-off-by: Andrey Butusov <[email protected]>
1 parent adf5444 commit 3953660

File tree

9 files changed

+234
-152
lines changed

9 files changed

+234
-152
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ Changelog for NeoFS Node
1616
- `neofs-cli object range` command now truncates file passed to `--file` (#3544)
1717
- `neofs-cli object range` command now creates file with `rw-r--r--` permissions (#3544)
1818
- Alphabet nodes send basic storage income based on the new Reports API from `container` contract (#3053)
19+
- Use stream API of FSTree for object service `Get` operation (#3466)
1920

2021
### Removed
2122
- `neofs-cli object head --main-only` no-op flag (#3509)

pkg/services/object/get/assemble.go

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (exec *execCtx) assemble() {
6666

6767
if ok := exec.overtakePayloadInReverse(children[len(children)-1]); ok {
6868
// payload of all children except the last are written, write last payload
69-
exec.writeObjectPayload(exec.collectedObject)
69+
exec.copyChild(exec.lastChildID, &exec.lastChildRange, false)
7070
}
7171
}
7272
} else if prev != nil {
@@ -76,21 +76,25 @@ func (exec *execCtx) assemble() {
7676
// * else go right-to-left with GET and compose in single object before writing
7777

7878
if ok := exec.overtakePayloadInReverse(*prev); ok {
79-
// payload of all children except the last are written, write last payloa
80-
exec.writeObjectPayload(exec.collectedObject)
79+
var rng *objectSDK.Range
80+
if exec.ctxRange() != nil {
81+
rng = &exec.lastChildRange
82+
}
83+
// payload of all children except the last are written, write last payload
84+
exec.copyChild(exec.lastChildID, rng, false)
8185
}
8286
}
8387
} else {
8488
exec.log.Debug("could not init parent from child")
8589
}
8690
}
8791

88-
func (exec *execCtx) initFromChild(obj oid.ID) (prev *oid.ID, children []oid.ID) {
92+
func (exec *execCtx) initFromChild(obj oid.ID) (*oid.ID, []oid.ID) {
8993
exec.log.Debug("starting assembling from child", zap.Stringer("child ID", obj))
9094

91-
child, ok := exec.getChild(obj, nil, true)
95+
child, ok := exec.headChild(obj)
9296
if !ok {
93-
return
97+
return nil, nil
9498
}
9599

96100
par := child.Parent()
@@ -99,12 +103,10 @@ func (exec *execCtx) initFromChild(obj oid.ID) (prev *oid.ID, children []oid.ID)
99103

100104
exec.log.Debug("received child with empty parent", zap.Stringer("child ID", obj))
101105

102-
return
106+
return nil, nil
103107
}
104108

105-
exec.collectedObject = par
106-
107-
var payload []byte
109+
exec.collectedHeader = par
108110

109111
if rng := exec.ctxRange(); rng != nil {
110112
seekOff := rng.GetOffset()
@@ -121,30 +123,42 @@ func (exec *execCtx) initFromChild(obj oid.ID) (prev *oid.ID, children []oid.ID)
121123
exec.err = &errOutOfRange
122124
exec.status = statusAPIResponse
123125

124-
return
126+
return nil, nil
125127
}
126128

127129
childSize := child.PayloadSize()
128130

129-
exec.curOff = parSize - childSize
131+
startRight := parSize - childSize
132+
exec.curOff = startRight
130133

131134
from := uint64(0)
132-
if exec.curOff < seekOff {
133-
from = seekOff - exec.curOff
135+
if startRight < seekOff {
136+
from = seekOff - startRight
134137
}
135138

136139
to := uint64(0)
137-
if seekOff+seekLen > exec.curOff+from {
138-
to = seekOff + seekLen - exec.curOff
140+
if seekOff+seekLen > startRight+from {
141+
to = seekOff + seekLen - startRight
142+
if to > childSize {
143+
to = childSize
144+
}
139145
}
140146

141-
payload = child.Payload()[from:to]
142-
rng.SetLength(seekLen - to + from)
143-
} else {
144-
payload = child.Payload()
147+
segLen := uint64(0)
148+
if to > from {
149+
segLen = to - from
150+
}
151+
rng.SetLength(seekLen - segLen)
152+
153+
if segLen > 0 {
154+
exec.lastChildRange.SetOffset(from)
155+
exec.lastChildRange.SetLength(segLen)
156+
} else {
157+
exec.lastChildRange.SetLength(0)
158+
}
145159
}
146160

147-
exec.collectedObject.SetPayload(payload)
161+
exec.lastChildID = child.GetID()
148162

149163
idPrev := child.GetPreviousID()
150164
if !idPrev.IsZero() {
@@ -163,12 +177,8 @@ func (exec *execCtx) overtakePayloadDirectly(children []oid.ID, rngs []objectSDK
163177
r = &rngs[i]
164178
}
165179

166-
child, ok := exec.getChild(children[i], r, !withRng && checkRight)
167-
if !ok {
168-
return
169-
}
170-
171-
if ok := exec.writeObjectPayload(child); !ok {
180+
retrieved, wrote := exec.copyChild(children[i], r, !withRng && checkRight)
181+
if !retrieved || !wrote {
172182
return
173183
}
174184
}

pkg/services/object/get/assembly_v2.go

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,37 +36,54 @@ func (exec *execCtx) processV2Split(si *objectSDK.SplitInfo) {
3636
}
3737

3838
func (exec *execCtx) processV2Last(lastID oid.ID) {
39-
lastObj, ok := exec.getChild(lastID, nil, true)
39+
lastHead, ok := exec.headChild(lastID)
4040
if !ok {
4141
exec.log.Debug("failed to read last object")
4242
return
4343
}
4444

45-
exec.collectedObject = lastObj.Parent()
45+
exec.collectedHeader = lastHead.Parent()
4646
if r := exec.ctxRange(); r != nil && r.GetLength() == 0 {
47-
r.SetLength(exec.collectedObject.PayloadSize())
47+
r.SetLength(exec.collectedHeader.PayloadSize())
4848
}
4949

50-
// copied from V1, and it has the same problems as V1;
51-
// see it for comments and optimization suggestions
5250
if ok := exec.writeCollectedHeader(); ok {
5351
if ok := exec.overtakePayloadInReverse(lastID); ok {
54-
exec.writeObjectPayload(exec.collectedObject)
52+
exec.writeObjectPayload(exec.collectedHeader, exec.collectedReader)
5553
}
5654
}
5755
}
5856

5957
func (exec *execCtx) processV2Link(linkID oid.ID) bool {
60-
linkObj, ok := exec.getChild(linkID, nil, true)
61-
if !ok {
58+
// need full payload of link object to parse link structure
59+
w := NewSimpleObjectWriter()
60+
61+
p := exec.prm
62+
p.common = p.common.WithLocalOnly(false)
63+
p.objWriter = w
64+
p.SetRange(nil)
65+
p.addr.SetContainer(exec.containerID())
66+
p.addr.SetObject(linkID)
67+
68+
exec.statusError = exec.svc.get(exec.context(), p.commonPrm, withLogger(exec.log))
69+
70+
if exec.status != statusOK {
6271
exec.log.Debug("failed to read link object")
6372
return false
6473
}
6574

66-
exec.collectedObject = linkObj.Parent()
75+
linkObj := w.Object()
76+
if !exec.isChild(linkObj) {
77+
exec.status = statusUndefined
78+
exec.err = errors.New("wrong child header")
79+
exec.log.Debug("parent address in link object differs")
80+
return false
81+
}
82+
83+
exec.collectedHeader = linkObj.Parent()
6784
rng := exec.ctxRange()
6885
if rng != nil && rng.GetLength() == 0 {
69-
rng.SetLength(exec.collectedObject.PayloadSize())
86+
rng.SetLength(exec.collectedHeader.PayloadSize())
7087
}
7188

7289
var link objectSDK.Link
@@ -132,13 +149,11 @@ func (exec *execCtx) rangeFromLink(link objectSDK.Link) bool {
132149
}
133150
}
134151

135-
part, ok := exec.getChild(child.ObjectID(), rngPerChild, false)
136-
if !ok {
152+
retrieved, wrote := exec.copyChild(child.ObjectID(), rngPerChild, false)
153+
if !retrieved { // failed to fetch child -> abort whole operation
137154
return false
138155
}
139-
140-
if !exec.writeObjectPayload(part) {
141-
// we have payload, we want to send it but can't so stop here
156+
if !wrote { // payload fetch ok but writing failed -> stop further processing
142157
return true
143158
}
144159
}

pkg/services/object/get/exec.go

Lines changed: 67 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/ecdsa"
66
"errors"
7+
"io"
78

89
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
910
"github.com/nspcc-dev/neofs-node/pkg/core/object"
@@ -14,6 +15,12 @@ import (
1415
"go.uber.org/zap"
1516
)
1617

18+
const (
19+
// streamChunkSize is the size of the chunk that is used to read/write
20+
// object payload from/to the stream.
21+
streamChunkSize = 64 * 1024 // 64 KiB
22+
)
23+
1724
type statusError struct {
1825
status int
1926
err error
@@ -34,11 +41,16 @@ type execCtx struct {
3441
// If debug level is enabled, all messages also include info about processing request.
3542
log *zap.Logger
3643

37-
collectedObject *objectSDK.Object
44+
collectedHeader *objectSDK.Object
45+
collectedReader io.ReadCloser
3846

3947
curOff uint64
4048

4149
head bool
50+
51+
// range assembly (V1 split) helpers
52+
lastChildID oid.ID
53+
lastChildRange objectSDK.Range
4254
}
4355

4456
type execOption func(*execCtx)
@@ -167,35 +179,42 @@ func (exec *execCtx) headOnly() bool {
167179
return exec.head
168180
}
169181

170-
func (exec *execCtx) getChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (*objectSDK.Object, bool) {
182+
// copyChild fetches child object payload and streams it directly into current exec writer.
183+
// Returns if child header was received and if full payload was successfully written.
184+
func (exec *execCtx) copyChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (bool, bool) {
171185
log := exec.log
172186
if rng != nil {
173187
log = log.With(zap.String("child range", prettyRange(rng)))
174188
}
175189

176-
w := NewSimpleObjectWriter()
190+
childWriter := newDirectChildWriter(exec.prm.objWriter)
177191

178192
p := exec.prm
179193
p.common = p.common.WithLocalOnly(false)
180-
p.objWriter = w
194+
p.objWriter = childWriter
181195
p.SetRange(rng)
182-
183196
p.addr.SetContainer(exec.containerID())
184197
p.addr.SetObject(id)
185198

186199
exec.statusError = exec.svc.get(exec.context(), p.commonPrm, withPayloadRange(rng), withLogger(log))
187200

188-
child := w.Object()
201+
hdr := childWriter.hdr
189202
ok := exec.status == statusOK
190-
191-
if ok && withHdr && !exec.isChild(child) {
203+
if ok && withHdr && !exec.isChild(hdr) {
192204
exec.status = statusUndefined
193205
exec.err = errors.New("wrong child header")
194206

195207
exec.log.Debug("parent address in child object differs")
196208
}
197209

198-
return child, ok
210+
if !ok {
211+
if hdr != nil {
212+
return true, false
213+
}
214+
return false, false
215+
}
216+
217+
return true, true
199218
}
200219

201220
func (exec *execCtx) headChild(id oid.ID) (*objectSDK.Object, bool) {
@@ -280,7 +299,7 @@ func (exec *execCtx) writeCollectedHeader() bool {
280299
}
281300

282301
err := exec.prm.objWriter.WriteHeader(
283-
exec.collectedObject.CutPayload(),
302+
exec.collectedHeader.CutPayload(),
284303
)
285304

286305
switch {
@@ -299,12 +318,27 @@ func (exec *execCtx) writeCollectedHeader() bool {
299318
return exec.status == statusOK
300319
}
301320

302-
func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool {
321+
func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object, reader io.ReadCloser) bool {
303322
if exec.headOnly() {
304323
return true
305324
}
306325

307-
err := exec.prm.objWriter.WriteChunk(obj.Payload())
326+
var err error
327+
if reader != nil {
328+
defer func() {
329+
err := reader.Close()
330+
if err != nil {
331+
exec.log.Debug("error while closing payload reader", zap.Error(err))
332+
}
333+
}()
334+
bufSize := streamChunkSize
335+
if obj != nil {
336+
bufSize = min(streamChunkSize, int(obj.PayloadSize()))
337+
}
338+
err = copyPayloadStream(exec.prm.objWriter, reader, bufSize)
339+
} else {
340+
err = exec.prm.objWriter.WriteChunk(obj.Payload())
341+
}
308342

309343
switch {
310344
default:
@@ -322,9 +356,29 @@ func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool {
322356
return err == nil
323357
}
324358

359+
// copyPayloadStream writes payload from stream to writer.
360+
func copyPayloadStream(w ChunkWriter, r io.Reader, bufSize int) error {
361+
buf := make([]byte, bufSize)
362+
for {
363+
n, err := r.Read(buf)
364+
if n > 0 {
365+
if writeErr := w.WriteChunk(buf[:n]); writeErr != nil {
366+
return writeErr
367+
}
368+
}
369+
370+
if errors.Is(err, io.EOF) {
371+
return nil
372+
}
373+
if err != nil {
374+
return err
375+
}
376+
}
377+
}
378+
325379
func (exec *execCtx) writeCollectedObject() {
326380
if ok := exec.writeCollectedHeader(); ok {
327-
exec.writeObjectPayload(exec.collectedObject)
381+
exec.writeObjectPayload(exec.collectedHeader, exec.collectedReader)
328382
}
329383
}
330384

0 commit comments

Comments
 (0)