Skip to content

Commit 17e3c07

Browse files
Storage GetStream API (#3532)
2 parents d5526f3 + 49a0d6f commit 17e3c07

File tree

12 files changed

+218
-6
lines changed

12 files changed

+218
-6
lines changed

pkg/local_object_storage/blobstor/common/storage.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package common
22

33
import (
44
"fmt"
5+
"io"
56

67
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
78
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
@@ -26,6 +27,7 @@ type Storage interface {
2627
GetBytes(oid.Address) ([]byte, error)
2728
Get(oid.Address) (*objectSDK.Object, error)
2829
GetRange(oid.Address, uint64, uint64) ([]byte, error)
30+
GetStream(oid.Address) (*objectSDK.Object, io.ReadCloser, error)
2931
Head(oid.Address) (*objectSDK.Object, error)
3032
Exists(oid.Address) (bool, error)
3133
Put(oid.Address, []byte) error

pkg/local_object_storage/blobstor/fstree/fstree.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,8 @@ func (t *FSTree) readFullObject(f io.Reader, initial []byte, size int64) ([]byte
494494

495495
// GetStream returns an object from the storage by address as a stream.
496496
// It returns the object with header only, and a reader for the payload.
497-
// The caller is responsible for closing the returned io.ReadCloser if it is not nil.
497+
// On success, the reader is non-nil and must be closed;
498+
// a nil reader is only returned with a non‑nil error.
498499
func (t *FSTree) GetStream(addr oid.Address) (*objectSDK.Object, io.ReadCloser, error) {
499500
obj, reader, err := t.getObjectStream(addr)
500501
if err != nil {

pkg/local_object_storage/engine/get.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package engine
22

33
import (
44
"errors"
5+
"io"
56

67
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
78
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util"
@@ -154,3 +155,39 @@ func (e *StorageEngine) GetBytes(addr oid.Address) ([]byte, error) {
154155
})
155156
return b, err
156157
}
158+
159+
// GetStream reads an object from local storage as a stream.
160+
//
161+
// Returns the object header and a reader for the payload.
162+
// On success, the reader is non-nil and must be closed;
163+
// a nil reader is only returned with a non‑nil error.
164+
//
165+
// Returns any error encountered that did not allow to completely read the object part.
166+
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in local storage.
167+
// Returns an error of type apistatus.ObjectAlreadyRemoved if the object has been marked as removed.
168+
//
169+
// Returns an error if executions are blocked (see BlockExecution).
170+
func (e *StorageEngine) GetStream(addr oid.Address) (*objectSDK.Object, io.ReadCloser, error) {
171+
if e.metrics != nil {
172+
defer elapsed(e.metrics.AddGetStreamDuration)()
173+
}
174+
175+
e.blockMtx.RLock()
176+
defer e.blockMtx.RUnlock()
177+
178+
if e.blockErr != nil {
179+
return nil, nil, e.blockErr
180+
}
181+
182+
var (
183+
err error
184+
obj *objectSDK.Object
185+
reader io.ReadCloser
186+
)
187+
188+
err = e.get(addr, func(s *shard.Shard, ignoreMetadata bool) error {
189+
obj, reader, err = s.GetStream(addr, ignoreMetadata)
190+
return err
191+
})
192+
return obj, reader, err
193+
}

pkg/local_object_storage/engine/get_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package engine
22

33
import (
4+
"io"
45
"testing"
56

67
"github.com/nspcc-dev/neofs-node/pkg/core/object"
@@ -22,3 +23,24 @@ func TestStorageEngine_GetBytes(t *testing.T) {
2223
require.NoError(t, err)
2324
require.Equal(t, objBin, b)
2425
}
26+
27+
func TestStorageEngine_GetStream(t *testing.T) {
28+
e, _, _ := newEngine(t, t.TempDir())
29+
obj := generateObjectWithCID(cidtest.ID())
30+
addr := object.AddressOf(obj)
31+
32+
objBin := obj.Payload()
33+
34+
err := e.Put(obj, nil)
35+
require.NoError(t, err)
36+
37+
header, reader, err := e.GetStream(addr)
38+
require.NoError(t, err)
39+
require.Equal(t, obj.CutPayload(), header)
40+
41+
require.NotNil(t, reader)
42+
b, err := io.ReadAll(reader)
43+
require.NoError(t, err)
44+
require.Equal(t, objBin, b)
45+
require.NoError(t, reader.Close())
46+
}

pkg/local_object_storage/engine/metrics.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ type MetricRegister interface {
1111
AddExistsDuration(d time.Duration)
1212
AddGetDuration(d time.Duration)
1313
AddHeadDuration(d time.Duration)
14+
AddGetStreamDuration(d time.Duration)
1415
AddInhumeDuration(d time.Duration)
1516
AddPutDuration(d time.Duration)
1617
AddRangeDuration(d time.Duration)

pkg/local_object_storage/shard/get.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package shard
33
import (
44
"errors"
55
"fmt"
6+
"io"
67

78
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
89
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
@@ -140,3 +141,52 @@ func (s *Shard) getBytesWithMetadataLookup(addr oid.Address, skipMeta bool) ([]b
140141
}
141142
return b, err
142143
}
144+
145+
// GetStream reads an object from shard as a stream. skipMeta flag allows to fetch object from
146+
// the blobstor directly.
147+
//
148+
// Returns the object header and a reader for the payload.
149+
// On success, the reader is non-nil and must be closed;
150+
// a nil reader is only returned with a non‑nil error.
151+
//
152+
// Returns any error encountered that did not allow to completely read the object part.
153+
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in shard.
154+
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
155+
// Returns the object.ErrObjectIsExpired if the object is present but already expired.
156+
func (s *Shard) GetStream(addr oid.Address, skipMeta bool) (*objectSDK.Object, io.ReadCloser, error) {
157+
s.m.RLock()
158+
defer s.m.RUnlock()
159+
160+
var (
161+
res *objectSDK.Object
162+
reader io.ReadCloser
163+
)
164+
165+
cb := func(stor common.Storage) error {
166+
obj, r, err := stor.GetStream(addr)
167+
if err != nil {
168+
return err
169+
}
170+
res = obj
171+
reader = r
172+
return nil
173+
}
174+
175+
wc := func(c writecache.Cache) error {
176+
o, r, err := c.GetStream(addr)
177+
if err != nil {
178+
return err
179+
}
180+
res = o
181+
reader = r
182+
return nil
183+
}
184+
185+
skipMeta = skipMeta || s.info.Mode.NoMetabase()
186+
gotMeta, err := s.fetchObjectData(addr, skipMeta, cb, wc)
187+
if err != nil && gotMeta {
188+
err = fmt.Errorf("%w, %w", err, ErrMetaWithNoObject)
189+
}
190+
191+
return res, reader, err
192+
}

pkg/local_object_storage/shard/get_test.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package shard_test
33
import (
44
"bytes"
55
"errors"
6+
"io"
67
"testing"
78
"time"
89

@@ -43,6 +44,7 @@ func testShardGet(t *testing.T, hasWriteCache bool) {
4344
require.Equal(t, obj, res)
4445

4546
testGetBytes(t, sh, addr, obj.Marshal())
47+
testGetStream(t, sh, addr, obj)
4648
})
4749

4850
t.Run("big object", func(t *testing.T) {
@@ -60,32 +62,38 @@ func testShardGet(t *testing.T, hasWriteCache bool) {
6062
require.Equal(t, obj, res)
6163

6264
testGetBytes(t, sh, addr, obj.Marshal())
65+
testGetStream(t, sh, addr, obj)
6366
})
6467

6568
t.Run("parent object", func(t *testing.T) {
66-
obj := generateObject()
67-
addAttribute(obj, "foo", "bar")
6869
cnr := cidtest.ID()
6970
splitID := objectSDK.NewSplitID()
7071

7172
parent := generateObjectWithCID(cnr)
7273
addAttribute(parent, "parent", "attribute")
74+
parentAddr := object.AddressOf(parent)
7375

7476
child := generateObjectWithCID(cnr)
7577
child.SetParent(parent)
7678
idParent := parent.GetID()
7779
child.SetParentID(idParent)
7880
child.SetSplitID(splitID)
7981
addPayload(child, 1<<5)
82+
childAddr := object.AddressOf(child)
8083

8184
err := sh.Put(child, nil)
8285
require.NoError(t, err)
8386

84-
res, err := testGet(t, sh, object.AddressOf(child), hasWriteCache)
87+
res, err := testGet(t, sh, childAddr, hasWriteCache)
8588
require.NoError(t, err)
8689
require.True(t, binaryEqual(child, res))
8790

88-
_, err = testGet(t, sh, object.AddressOf(parent), hasWriteCache)
91+
testGetStream(t, sh, childAddr, child)
92+
93+
_, _, streamErr := sh.GetStream(parentAddr, false)
94+
require.Error(t, streamErr)
95+
_, err = testGet(t, sh, parentAddr, hasWriteCache)
96+
require.Equal(t, streamErr, err)
8997

9098
var si *objectSDK.SplitInfoError
9199
require.True(t, errors.As(err, &si))
@@ -122,6 +130,18 @@ func testGetBytes(t testing.TB, sh *shard.Shard, addr oid.Address, objBin []byte
122130
require.Equal(t, objBin, b)
123131
}
124132

133+
func testGetStream(t testing.TB, sh *shard.Shard, addr oid.Address, obj *objectSDK.Object) {
134+
header, reader, err := sh.GetStream(addr, false)
135+
require.NoError(t, err)
136+
require.Equal(t, obj.CutPayload(), header)
137+
138+
data, err := io.ReadAll(reader)
139+
require.NoError(t, err)
140+
require.Equal(t, obj.Payload(), data)
141+
142+
require.NoError(t, reader.Close())
143+
}
144+
125145
// binary equal is used when object contains empty lists in the structure and
126146
// require.Equal fails on comparing <nil> and []{} lists.
127147
func binaryEqual(a, b *objectSDK.Object) bool {

pkg/local_object_storage/writecache/get.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package writecache
22

33
import (
44
"fmt"
5+
"io"
56

67
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
78
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
@@ -50,3 +51,19 @@ func (c *cache) GetBytes(addr oid.Address) ([]byte, error) {
5051

5152
return b, nil
5253
}
54+
55+
// GetStream returns an object stream from write-cache.
56+
// On success, the reader is non-nil and must be closed;
57+
// a nil reader is only returned with a non‑nil error.
58+
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
59+
func (c *cache) GetStream(addr oid.Address) (*objectSDK.Object, io.ReadCloser, error) {
60+
if !c.objCounters.HasAddress(addr) {
61+
return nil, nil, logicerr.Wrap(apistatus.ErrObjectNotFound)
62+
}
63+
stream, reader, err := c.fsTree.GetStream(addr)
64+
if err != nil {
65+
return nil, nil, logicerr.Wrap(apistatus.ErrObjectNotFound)
66+
}
67+
68+
return stream, reader, nil
69+
}

pkg/local_object_storage/writecache/get_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package writecache
22

33
import (
4+
"io"
45
"testing"
56

7+
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
8+
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
69
"github.com/stretchr/testify/require"
710
)
811

@@ -24,3 +27,34 @@ func TestCache_GetBytes(t *testing.T) {
2427
require.NoError(t, err)
2528
require.Equal(t, objBin, b)
2629
}
30+
31+
func TestCache_GetStream(t *testing.T) {
32+
const maxObjSize = 4 << 10
33+
c, _ := newCache(t)
34+
35+
testStream := func(t *testing.T, size int) {
36+
o := putObject(t, c, size)
37+
objBin := o.obj.Payload()
38+
39+
header, reader, err := c.GetStream(o.addr)
40+
require.NoError(t, err)
41+
require.Equal(t, o.obj.CutPayload(), header)
42+
43+
require.NotNil(t, reader)
44+
b, err := io.ReadAll(reader)
45+
require.NoError(t, err)
46+
require.Equal(t, objBin, b)
47+
require.NoError(t, reader.Close())
48+
}
49+
50+
testStream(t, 0)
51+
testStream(t, maxObjSize/2)
52+
testStream(t, 2*maxObjSize)
53+
54+
t.Run("not found", func(t *testing.T) {
55+
header, reader, err := c.GetStream(oidtest.Address())
56+
require.ErrorIs(t, err, apistatus.ErrObjectNotFound)
57+
require.Nil(t, header)
58+
require.Nil(t, reader)
59+
})
60+
}

pkg/local_object_storage/writecache/writecache.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package writecache
33
import (
44
"errors"
55
"fmt"
6+
"io"
67
"os"
78
"path/filepath"
89
"sync"
@@ -29,6 +30,8 @@ type Cache interface {
2930
// canonical NeoFS binary format. Returns [apistatus.ObjectNotFound] if object
3031
// is missing.
3132
GetBytes(oid.Address) ([]byte, error)
33+
// GetStream returns an object and a stream to read its payload.
34+
GetStream(oid.Address) (*object.Object, io.ReadCloser, error)
3235
Head(oid.Address) (*object.Object, error)
3336
// Delete removes object referenced by the given oid.Address from the
3437
// Cache. Returns any error encountered that prevented the object to be

0 commit comments

Comments
 (0)