Skip to content

Commit 5af0080

Browse files
committed
services/object: use stream API for Get operation
Network API always had a concept of payload stream, FSTree can also provide it now after #3431. So use `GetStream` in object service to enable memory-efficient object retrieval without loading large payloads into memory. Closes #3439. Signed-off-by: Andrey Butusov <[email protected]>
1 parent c3c4080 commit 5af0080

File tree

9 files changed

+284
-189
lines changed

9 files changed

+284
-189
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Changelog for NeoFS Node
1212
- Stream payload without buffering to reduce memory usage in CLI `Get/Put` operations (#3535)
1313
- `neofs-cli object range` command now truncates file passed to `--file` (#3544)
1414
- `neofs-cli object range` command now creates file with `rw-r--r--` permissions (#3544)
15+
- Use stream API of FSTree for object service `Get` operation (#3466)
1516

1617
### Removed
1718
- `neofs-cli object head --main-only` no-op flag (#3509)

pkg/services/object/get/assemble.go

Lines changed: 88 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -60,24 +60,25 @@ func (exec *execCtx) assemble() {
6060
exec.overtakePayloadDirectly(children, nil, true)
6161
}
6262
} else {
63-
// TODO: #1155 choose one-by-one restoring algorithm according to size
64-
// * if size > MAX => go right-to-left with HEAD and back with GET
65-
// * else go right-to-left with GET and compose in single object before writing
66-
6763
if ok := exec.overtakePayloadInReverse(children[len(children)-1]); ok {
68-
// payload of all children except the last are written, write last payload
69-
exec.writeObjectPayload(exec.collectedObject)
64+
// stream last (right-most) child slice if needed
65+
if exec.lastChildRange.GetLength() > 0 {
66+
exec.copyChild(exec.lastChildID, &exec.lastChildRange, false)
67+
}
7068
}
7169
}
7270
} else if prev != nil {
73-
if ok := exec.writeCollectedHeader(); ok {
74-
// TODO: #1155 choose one-by-one restoring algorithm according to size
75-
// * if size > MAX => go right-to-left with HEAD and back with GET
76-
// * else go right-to-left with GET and compose in single object before writing
77-
71+
if exec.ctxRange() == nil {
72+
if ok := exec.writeCollectedHeader(); ok {
73+
if ok := exec.overtakePayloadInReverse(*prev); ok {
74+
exec.writeObjectPayload(exec.collectedHeader, exec.collectedReader)
75+
}
76+
}
77+
} else {
7878
if ok := exec.overtakePayloadInReverse(*prev); ok {
79-
// payload of all children except the last are written, write last payloa
80-
exec.writeObjectPayload(exec.collectedObject)
79+
if exec.lastChildRange.GetLength() > 0 {
80+
exec.copyChild(exec.lastChildID, &exec.lastChildRange, false)
81+
}
8182
}
8283
}
8384
} else {
@@ -88,70 +89,100 @@ func (exec *execCtx) assemble() {
8889
func (exec *execCtx) initFromChild(obj oid.ID) (prev *oid.ID, children []oid.ID) {
8990
exec.log.Debug("starting assembling from child", zap.Stringer("child ID", obj))
9091

91-
child, ok := exec.getChild(obj, nil, true)
92+
head, ok := exec.headChild(obj)
9293
if !ok {
93-
return
94+
return nil, nil
9495
}
9596

96-
par := child.Parent()
97+
par := head.Parent()
9798
if par == nil {
9899
exec.status = statusUndefined
99-
100100
exec.log.Debug("received child with empty parent", zap.Stringer("child ID", obj))
101-
102-
return
101+
return nil, nil
103102
}
104103

105-
exec.collectedObject = par
106-
107-
var payload []byte
104+
exec.collectedHeader = par
108105

109-
if rng := exec.ctxRange(); rng != nil {
110-
seekOff := rng.GetOffset()
111-
seekLen := rng.GetLength()
112-
parSize := par.PayloadSize()
113-
if seekLen == 0 {
114-
seekLen = parSize
106+
buildPrevChain := func(h *objectSDK.Object, includeHead bool) []oid.ID {
107+
chain := make([]oid.ID, 0, 8)
108+
if includeHead {
109+
chain = append(chain, h.GetID())
115110
}
116-
seekTo := seekOff + seekLen
117-
118-
if seekTo < seekOff || parSize < seekOff || parSize < seekTo {
119-
var errOutOfRange apistatus.ObjectOutOfRange
120-
121-
exec.err = &errOutOfRange
122-
exec.status = statusAPIResponse
123-
124-
return
111+
prevID := h.GetPreviousID()
112+
for !prevID.IsZero() {
113+
ph, ok := exec.headChild(prevID)
114+
if !ok {
115+
return nil
116+
}
117+
chain = append(chain, ph.GetID())
118+
prevID = ph.GetPreviousID()
119+
}
120+
// reverse to chronological order
121+
for l, r := 0, len(chain)-1; l < r; l, r = l+1, r-1 {
122+
chain[l], chain[r] = chain[r], chain[l]
123+
}
124+
if !includeHead && len(chain) == 0 {
125+
return nil
125126
}
127+
return chain
128+
}
126129

127-
childSize := child.PayloadSize()
130+
if exec.ctxRange() == nil {
131+
if ch := head.Children(); len(ch) > 0 {
132+
return nil, ch
133+
}
134+
return nil, buildPrevChain(head, true)
135+
}
128136

129-
exec.curOff = parSize - childSize
137+
rng := exec.ctxRange()
138+
seekOff := rng.GetOffset()
139+
seekLen := rng.GetLength()
140+
parSize := par.PayloadSize()
141+
if seekLen == 0 {
142+
seekLen = parSize
143+
}
144+
seekTo := seekOff + seekLen
145+
if seekTo < seekOff || parSize < seekOff || parSize < seekTo {
146+
var errOutOfRange apistatus.ObjectOutOfRange
147+
exec.err = &errOutOfRange
148+
exec.status = statusAPIResponse
149+
return nil, nil
150+
}
130151

131-
from := uint64(0)
132-
if exec.curOff < seekOff {
133-
from = seekOff - exec.curOff
134-
}
152+
childSize := head.PayloadSize()
153+
startRight := parSize - childSize
154+
exec.curOff = startRight
135155

136-
to := uint64(0)
137-
if seekOff+seekLen > exec.curOff+from {
138-
to = seekOff + seekLen - exec.curOff
156+
from := uint64(0)
157+
if startRight < seekOff {
158+
from = seekOff - startRight
159+
}
160+
to := uint64(0)
161+
if seekOff+seekLen > startRight+from {
162+
to = seekOff + seekLen - startRight
163+
if to > childSize {
164+
to = childSize
139165
}
166+
}
167+
segLen := uint64(0)
168+
if to > from {
169+
segLen = to - from
170+
}
171+
rng.SetLength(seekLen - segLen)
140172

141-
payload = child.Payload()[from:to]
142-
rng.SetLength(seekLen - to + from)
173+
exec.lastChildID = head.GetID()
174+
if segLen > 0 {
175+
exec.lastChildRange.SetOffset(from)
176+
exec.lastChildRange.SetLength(segLen)
143177
} else {
144-
payload = child.Payload()
178+
exec.lastChildRange.SetLength(0)
145179
}
146180

147-
exec.collectedObject.SetPayload(payload)
148-
149-
idPrev := child.GetPreviousID()
150-
if !idPrev.IsZero() {
151-
return &idPrev, child.Children()
181+
if ch := head.Children(); len(ch) > 0 {
182+
return nil, ch
152183
}
153184

154-
return nil, child.Children()
185+
return nil, buildPrevChain(head, false)
155186
}
156187

157188
func (exec *execCtx) overtakePayloadDirectly(children []oid.ID, rngs []objectSDK.Range, checkRight bool) {
@@ -163,12 +194,8 @@ func (exec *execCtx) overtakePayloadDirectly(children []oid.ID, rngs []objectSDK
163194
r = &rngs[i]
164195
}
165196

166-
child, ok := exec.getChild(children[i], r, !withRng && checkRight)
167-
if !ok {
168-
return
169-
}
170-
171-
if ok := exec.writeObjectPayload(child); !ok {
197+
retrieved, wrote := exec.copyChild(children[i], r, !withRng && checkRight)
198+
if !retrieved || !wrote {
172199
return
173200
}
174201
}

pkg/services/object/get/assembly_v2.go

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,37 +36,54 @@ func (exec *execCtx) processV2Split(si *objectSDK.SplitInfo) {
3636
}
3737

3838
func (exec *execCtx) processV2Last(lastID oid.ID) {
39-
lastObj, ok := exec.getChild(lastID, nil, true)
39+
lastHead, ok := exec.headChild(lastID)
4040
if !ok {
41-
exec.log.Debug("failed to read last object")
41+
exec.log.Debug("failed to head last object")
4242
return
4343
}
4444

45-
exec.collectedObject = lastObj.Parent()
45+
exec.collectedHeader = lastHead.Parent()
4646
if r := exec.ctxRange(); r != nil && r.GetLength() == 0 {
47-
r.SetLength(exec.collectedObject.PayloadSize())
47+
r.SetLength(exec.collectedHeader.PayloadSize())
4848
}
4949

50-
// copied from V1, and it has the same problems as V1;
51-
// see it for comments and optimization suggestions
5250
if ok := exec.writeCollectedHeader(); ok {
5351
if ok := exec.overtakePayloadInReverse(lastID); ok {
54-
exec.writeObjectPayload(exec.collectedObject)
52+
exec.writeObjectPayload(exec.collectedHeader, exec.collectedReader)
5553
}
5654
}
5755
}
5856

5957
func (exec *execCtx) processV2Link(linkID oid.ID) bool {
60-
linkObj, ok := exec.getChild(linkID, nil, true)
61-
if !ok {
58+
// need full payload of link object to parse link structure
59+
w := NewSimpleObjectWriter()
60+
61+
p := exec.prm
62+
p.common = p.common.WithLocalOnly(false)
63+
p.objWriter = w
64+
p.SetRange(nil)
65+
p.addr.SetContainer(exec.containerID())
66+
p.addr.SetObject(linkID)
67+
68+
exec.statusError = exec.svc.get(exec.context(), p.commonPrm, withLogger(exec.log))
69+
70+
if exec.status != statusOK {
6271
exec.log.Debug("failed to read link object")
6372
return false
6473
}
6574

66-
exec.collectedObject = linkObj.Parent()
75+
linkObj := w.Object()
76+
if !exec.isChild(linkObj) {
77+
exec.status = statusUndefined
78+
exec.err = errors.New("wrong child header")
79+
exec.log.Debug("parent address in link object differs")
80+
return false
81+
}
82+
83+
exec.collectedHeader = linkObj.Parent()
6784
rng := exec.ctxRange()
6885
if rng != nil && rng.GetLength() == 0 {
69-
rng.SetLength(exec.collectedObject.PayloadSize())
86+
rng.SetLength(exec.collectedHeader.PayloadSize())
7087
}
7188

7289
var link objectSDK.Link
@@ -132,13 +149,11 @@ func (exec *execCtx) rangeFromLink(link objectSDK.Link) bool {
132149
}
133150
}
134151

135-
part, ok := exec.getChild(child.ObjectID(), rngPerChild, false)
136-
if !ok {
152+
retrieved, wrote := exec.copyChild(child.ObjectID(), rngPerChild, false)
153+
if !retrieved { // failed to fetch child -> abort whole operation
137154
return false
138155
}
139-
140-
if !exec.writeObjectPayload(part) {
141-
// we have payload, we want to send it but can't so stop here
156+
if !wrote { // payload fetch ok but writing failed -> stop further processing
142157
return true
143158
}
144159
}

0 commit comments

Comments
 (0)