Skip to content

Commit 1a387bc

Browse files
committed
services/object: use stream API for Get operation
Network API always had a concept of payload stream, FSTree can also provide it now after #3431. So use `GetStream` in object service to enable memory-efficient object retrieval without loading large payloads into memory. Closes #3439. Signed-off-by: Andrey Butusov <[email protected]>
1 parent 22f0daa commit 1a387bc

File tree

9 files changed

+201
-115
lines changed

9 files changed

+201
-115
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ Changelog for NeoFS Node
88
### Fixed
99

1010
### Changed
11+
- Use stream API of FSTree for object service `Get` operation (#3466)
1112

1213
### Removed
1314
- `neofs-cli object head --main-only` no-op flag (#3509)

pkg/services/object/get/assemble.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (exec *execCtx) assemble() {
6666

6767
if ok := exec.overtakePayloadInReverse(children[len(children)-1]); ok {
6868
// payload of all children except the last are written, write last payload
69-
exec.writeObjectPayload(exec.collectedObject)
69+
exec.writeObjectPayload(exec.collectedObject, exec.collectedReader)
7070
}
7171
}
7272
} else if prev != nil {
@@ -77,7 +77,7 @@ func (exec *execCtx) assemble() {
7777

7878
if ok := exec.overtakePayloadInReverse(*prev); ok {
7979
// payload of all children except the last are written, write last payloa
80-
exec.writeObjectPayload(exec.collectedObject)
80+
exec.writeObjectPayload(exec.collectedObject, exec.collectedReader)
8181
}
8282
}
8383
} else {
@@ -163,12 +163,8 @@ func (exec *execCtx) overtakePayloadDirectly(children []oid.ID, rngs []objectSDK
163163
r = &rngs[i]
164164
}
165165

166-
child, ok := exec.getChild(children[i], r, !withRng && checkRight)
167-
if !ok {
168-
return
169-
}
170-
171-
if ok := exec.writeObjectPayload(child); !ok {
166+
retrieved, wrote := exec.copyChild(children[i], r, !withRng && checkRight)
167+
if !retrieved || !wrote {
172168
return
173169
}
174170
}

pkg/services/object/get/assembly_v2.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func (exec *execCtx) processV2Last(lastID oid.ID) {
5151
// see it for comments and optimization suggestions
5252
if ok := exec.writeCollectedHeader(); ok {
5353
if ok := exec.overtakePayloadInReverse(lastID); ok {
54-
exec.writeObjectPayload(exec.collectedObject)
54+
exec.writeObjectPayload(exec.collectedObject, exec.collectedReader)
5555
}
5656
}
5757
}
@@ -132,13 +132,11 @@ func (exec *execCtx) rangeFromLink(link objectSDK.Link) bool {
132132
}
133133
}
134134

135-
part, ok := exec.getChild(child.ObjectID(), rngPerChild, false)
136-
if !ok {
135+
retrieved, wrote := exec.copyChild(child.ObjectID(), rngPerChild, false)
136+
if !retrieved { // failed to fetch child -> abort whole operation
137137
return false
138138
}
139-
140-
if !exec.writeObjectPayload(part) {
141-
// we have payload, we want to send it but can't so stop here
139+
if !wrote { // payload fetch ok but writing failed -> stop further processing
142140
return true
143141
}
144142
}

pkg/services/object/get/exec.go

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/ecdsa"
66
"errors"
7+
"io"
78

89
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
910
"github.com/nspcc-dev/neofs-node/pkg/core/object"
@@ -14,6 +15,12 @@ import (
1415
"go.uber.org/zap"
1516
)
1617

18+
const (
19+
// streamChunkSize is the size of the chunk that is used to read/write
20+
// object payload from/to the stream.
21+
streamChunkSize = 64 * 1024 // 64 KiB
22+
)
23+
1724
type statusError struct {
1825
status int
1926
err error
@@ -35,6 +42,7 @@ type execCtx struct {
3542
log *zap.Logger
3643

3744
collectedObject *objectSDK.Object
45+
collectedReader io.ReadCloser
3846

3947
curOff uint64
4048

@@ -198,6 +206,44 @@ func (exec *execCtx) getChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (*o
198206
return child, ok
199207
}
200208

209+
// copyChild fetches child object payload and streams it directly into current exec writer.
210+
// Returns if child header was received and if full payload was successfully written.
211+
func (exec *execCtx) copyChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (bool, bool) {
212+
log := exec.log
213+
if rng != nil {
214+
log = log.With(zap.String("child range", prettyRange(rng)))
215+
}
216+
217+
childWriter := NewDirectChildWriter(exec.prm.objWriter)
218+
219+
p := exec.prm
220+
p.common = p.common.WithLocalOnly(false)
221+
p.objWriter = childWriter
222+
p.SetRange(rng)
223+
p.addr.SetContainer(exec.containerID())
224+
p.addr.SetObject(id)
225+
226+
exec.statusError = exec.svc.get(exec.context(), p.commonPrm, withPayloadRange(rng), withLogger(log))
227+
228+
obj := childWriter.Object()
229+
ok := exec.status == statusOK
230+
if ok && withHdr && !exec.isChild(obj) {
231+
exec.status = statusUndefined
232+
exec.err = errors.New("wrong child header")
233+
234+
exec.log.Debug("parent address in child object differs")
235+
}
236+
237+
if !ok {
238+
if childWriter.HeaderSet() {
239+
return true, false
240+
}
241+
return false, false
242+
}
243+
244+
return true, true
245+
}
246+
201247
func (exec *execCtx) headChild(id oid.ID) (*objectSDK.Object, bool) {
202248
p := exec.prm
203249
p.common = p.common.WithLocalOnly(false)
@@ -299,12 +345,18 @@ func (exec *execCtx) writeCollectedHeader() bool {
299345
return exec.status == statusOK
300346
}
301347

302-
func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool {
348+
func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object, reader io.ReadCloser) bool {
303349
if exec.headOnly() {
304350
return true
305351
}
306352

307-
err := exec.prm.objWriter.WriteChunk(obj.Payload())
353+
var err error
354+
if reader != nil {
355+
defer func() { _ = reader.Close() }()
356+
err = exec.writeFromStream(reader)
357+
} else {
358+
err = exec.prm.objWriter.WriteChunk(obj.Payload())
359+
}
308360

309361
switch {
310362
default:
@@ -322,9 +374,29 @@ func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool {
322374
return err == nil
323375
}
324376

377+
// writeFromStream writes payload from stream.
378+
func (exec *execCtx) writeFromStream(reader io.ReadCloser) error {
379+
buf := make([]byte, streamChunkSize)
380+
for {
381+
n, err := reader.Read(buf)
382+
if n > 0 {
383+
if writeErr := exec.prm.objWriter.WriteChunk(buf[:n]); writeErr != nil {
384+
return writeErr
385+
}
386+
}
387+
388+
if errors.Is(err, io.EOF) {
389+
return nil
390+
}
391+
if err != nil {
392+
return err
393+
}
394+
}
395+
}
396+
325397
func (exec *execCtx) writeCollectedObject() {
326398
if ok := exec.writeCollectedHeader(); ok {
327-
exec.writeObjectPayload(exec.collectedObject)
399+
exec.writeObjectPayload(exec.collectedObject, exec.collectedReader)
328400
}
329401
}
330402

pkg/services/object/get/get_test.go

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package getsvc
22

33
import (
4+
"bytes"
45
"context"
56
"crypto/rand"
67
"errors"
78
"fmt"
9+
"io"
810
"strconv"
911
"testing"
12+
"time"
1013

1114
"github.com/nspcc-dev/neofs-node/pkg/core/client"
1215
netmapcore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
@@ -113,19 +116,28 @@ func newTestClient() *testClient {
113116
}
114117
}
115118

116-
func (c *testClient) getObject(exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) {
119+
func (c *testClient) getObject(exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, io.ReadCloser, error) {
117120
v, ok := c.results[exec.address().EncodeToString()]
118121
if !ok {
119122
var errNotFound apistatus.ObjectNotFound
120123

121-
return nil, errNotFound
124+
return nil, nil, errNotFound
122125
}
123126

124127
if v.err != nil {
125-
return nil, v.err
128+
return nil, nil, v.err
126129
}
127130

128-
return cutToRange(v.obj, exec.ctxRange()), nil
131+
obj := cutToRange(v.obj, exec.ctxRange())
132+
133+
if obj != nil && len(obj.Payload()) > 0 {
134+
reader := io.NopCloser(bytes.NewReader(obj.Payload()))
135+
objWithoutPayload := obj.CutPayload()
136+
objWithoutPayload.SetPayloadSize(obj.PayloadSize())
137+
return objWithoutPayload, reader, nil
138+
}
139+
140+
return obj, nil, nil
129141
}
130142

131143
func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err error) {
@@ -135,7 +147,7 @@ func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err erro
135147
}{obj: obj, err: err}
136148
}
137149

138-
func (s *testStorage) get(exec *execCtx) (*objectSDK.Object, error) {
150+
func (s *testStorage) get(exec *execCtx) (*objectSDK.Object, io.ReadCloser, error) {
139151
var (
140152
ok bool
141153
obj *objectSDK.Object
@@ -145,20 +157,29 @@ func (s *testStorage) get(exec *execCtx) (*objectSDK.Object, error) {
145157
if _, ok = s.inhumed[sAddr]; ok {
146158
var errRemoved apistatus.ObjectAlreadyRemoved
147159

148-
return nil, errRemoved
160+
return nil, nil, errRemoved
149161
}
150162

151163
if info, ok := s.virtual[sAddr]; ok {
152-
return nil, objectSDK.NewSplitInfoError(info)
164+
return nil, nil, objectSDK.NewSplitInfoError(info)
153165
}
154166

155167
if obj, ok = s.phy[sAddr]; ok {
156-
return cutToRange(obj, exec.ctxRange()), nil
168+
obj = cutToRange(obj, exec.ctxRange())
169+
170+
if obj != nil && len(obj.Payload()) > 0 {
171+
reader := io.NopCloser(bytes.NewReader(obj.Payload()))
172+
objWithoutPayload := obj.CutPayload()
173+
objWithoutPayload.SetPayloadSize(obj.PayloadSize())
174+
return objWithoutPayload, reader, nil
175+
}
176+
177+
return obj, nil, nil
157178
}
158179

159180
var errNotFound apistatus.ObjectNotFound
160181

161-
return nil, errNotFound
182+
return nil, nil, errNotFound
162183
}
163184

164185
func cutToRange(o *objectSDK.Object, rng *objectSDK.Range) *objectSDK.Object {
@@ -476,7 +497,8 @@ func generateChain(ln int, cnr cid.ID) ([]*objectSDK.Object, []oid.ID, []byte) {
476497
}
477498

478499
func TestGetRemoteSmall(t *testing.T) {
479-
ctx := context.Background()
500+
ctx, cancel := context.WithTimeout(context.Background(), 150*time.Second)
501+
t.Cleanup(cancel)
480502

481503
var cnr container.Container
482504
cnr.SetPlacementPolicy(netmaptest.PlacementPolicy())

pkg/services/object/get/local.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
func (exec *execCtx) executeLocal() {
1212
var err error
1313

14-
exec.collectedObject, err = exec.svc.localStorage.get(exec)
14+
exec.collectedObject, exec.collectedReader, err = exec.svc.localStorage.get(exec)
1515

1616
var errSplitInfo *objectSDK.SplitInfoError
1717

pkg/services/object/get/remote.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func (exec *execCtx) processNode(info client.NodeInfo) bool {
1717
return true
1818
}
1919

20-
obj, err := remoteClient.getObject(exec, info)
20+
obj, reader, err := remoteClient.getObject(exec, info)
2121

2222
var errSplitInfo *objectSDK.SplitInfoError
2323

@@ -33,13 +33,14 @@ func (exec *execCtx) processNode(info client.NodeInfo) bool {
3333
exec.status = statusOK
3434
exec.err = nil
3535

36-
// both object and err are nil only if the original
36+
// object, reader and err are nil only if the original
3737
// request was forwarded to another node and the object
3838
// has already been streamed to the requesting party,
3939
// or it is a GETRANGEHASH forwarded request whose
4040
// response is not an object
41-
if obj != nil {
41+
if obj != nil || reader != nil {
4242
exec.collectedObject = obj
43+
exec.collectedReader = reader
4344
exec.writeCollectedObject()
4445
}
4546
case errors.Is(err, apistatus.Error) && !errors.Is(err, apistatus.ErrObjectNotFound):

pkg/services/object/get/service.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package getsvc
22

33
import (
4+
"io"
5+
46
"github.com/nspcc-dev/neofs-node/pkg/core/client"
57
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
68
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
@@ -41,14 +43,14 @@ type Service struct {
4143
type Option func(*cfg)
4244

4345
type getClient interface {
44-
getObject(*execCtx, client.NodeInfo) (*object.Object, error)
46+
getObject(*execCtx, client.NodeInfo) (*object.Object, io.ReadCloser, error)
4547
}
4648

4749
type cfg struct {
4850
log *zap.Logger
4951

5052
localStorage interface {
51-
get(*execCtx) (*object.Object, error)
53+
get(*execCtx) (*object.Object, io.ReadCloser, error)
5254
}
5355

5456
clientCache interface {

0 commit comments

Comments
 (0)