Skip to content

Commit 3c09860

Browse files
authored
fstree: serve Stream from the storage (#3431)
2 parents 81afcb3 + 2cfea63 commit 3c09860

File tree

9 files changed

+236
-51
lines changed

9 files changed

+236
-51
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ Changelog for NeoFS Node
55

66
### Added
77
- `Head` operation for FSTree (#3383)
8+
- `GetStream` operation for FSTree (#3431)
89

910
### Fixed
1011
- IR exponentially retries updating SN lists in the Container contract in error cases (#3344)

cmd/neofs-cli/modules/acl/extended/create.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ Action is 'allow' or 'deny'.
2727
Operation is an object service verb: 'get', 'head', 'put', 'search', 'delete', 'getrange', or 'getrangehash'.
2828
2929
Filter consists of <typ>:<key><match><value>
30-
Typ is 'obj' for object applied filter or 'req' for request applied filter.
31-
Key is a valid unicode string corresponding to object or request header key.
30+
Typ is 'obj' for object applied filter or 'req' for request applied filter.
31+
Key is a valid unicode string corresponding to object or request header key.
3232
Well-known system object headers start with '$Object:' prefix.
3333
User defined headers start without prefix.
3434
Read more about filter keys at github.com/nspcc-dev/neofs-api/blob/master/proto-docs/acl.md#message-eaclrecordfilter
@@ -38,10 +38,10 @@ Filter consists of <typ>:<key><match><value>
3838
'>' | '>=' | '<' | '<=' for integer comparison.
3939
Value is a valid unicode string corresponding to object or request header value. Numeric filters must have base-10 integer values.
4040
41-
Target is
42-
'user' for container owner,
41+
Target is
42+
'user' for container owner,
4343
'system' for Storage nodes in container and Inner Ring nodes,
44-
'others' for all other request senders,
44+
'others' for all other request senders,
4545
'address:<adr1>,<adr2>,...' for exact request sender, where <adr> is a base58 25-byte address. Example: NSiVJYZej4XsxG5CUpdwn7VRQk8iiiDMPM.
4646
4747
When both '--rule' and '--file' arguments are used, '--rule' records will be placed higher in resulting extended ACL table.

cmd/neofs-cli/modules/container/create.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ var (
3333
var createContainerCmd = &cobra.Command{
3434
Use: "create",
3535
Short: "Create new container",
36-
Long: `Create new container and register it in the NeoFS.
36+
Long: `Create new container and register it in the NeoFS.
3737
It will be stored in FS chain when inner ring will accepts it.`,
3838
Args: cobra.NoArgs,
3939
RunE: func(cmd *cobra.Command, _ []string) error {

cmd/neofs-cli/modules/container/delete.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
var deleteContainerCmd = &cobra.Command{
1919
Use: "delete",
2020
Short: "Delete existing container",
21-
Long: `Delete existing container.
21+
Long: `Delete existing container.
2222
Only owner of the container has a permission to remove container.`,
2323
Args: cobra.NoArgs,
2424
RunE: func(cmd *cobra.Command, _ []string) error {

cmd/neofs-cli/modules/session/create.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ var createCmd = &cobra.Command{
3232
Short: "Create session token",
3333
Long: `Create session token.
3434
35-
Default lifetime of session token is ` + strconv.Itoa(defaultLifetime) + ` epochs
36-
if none of --` + commonflags.ExpireAt + ` or --` + commonflags.Lifetime + ` flags is specified.
35+
Default lifetime of session token is ` + strconv.Itoa(defaultLifetime) + ` epochs
36+
if none of --` + commonflags.ExpireAt + ` or --` + commonflags.Lifetime + ` flags is specified.
3737
`,
3838
Args: cobra.NoArgs,
3939
RunE: createSession,

pkg/local_object_storage/blobstor/fstree/fstree.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,18 @@ func (t *FSTree) readFullObject(f io.Reader, initial []byte, size int64) ([]byte
489489
return t.Decompress(data)
490490
}
491491

492+
// GetStream returns an object from the storage by address as a stream.
493+
// It returns the object with header only, and a reader for the payload.
494+
// The caller is responsible for closing the returned io.ReadCloser if it is not nil.
495+
func (t *FSTree) GetStream(addr oid.Address) (*objectSDK.Object, io.ReadCloser, error) {
496+
obj, reader, err := t.getObjectStream(addr)
497+
if err != nil {
498+
return nil, nil, err
499+
}
500+
501+
return obj, reader, nil
502+
}
503+
492504
// GetRange implements common.Storage.
493505
func (t *FSTree) GetRange(addr oid.Address, from uint64, length uint64) ([]byte, error) {
494506
obj, err := t.Get(addr)
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package fstree
2+
3+
import (
4+
"crypto/rand"
5+
"fmt"
6+
"io"
7+
"os"
8+
"path/filepath"
9+
"testing"
10+
11+
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
12+
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
13+
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
func TestGetStream(t *testing.T) {
18+
tree := New(WithPath(t.TempDir()))
19+
20+
payloadSizes := []int{
21+
1,
22+
1024,
23+
1024 * 1024,
24+
}
25+
26+
t.Run("object not found", func(t *testing.T) {
27+
addr := oidtest.Address()
28+
obj, reader, err := tree.GetStream(addr)
29+
require.Error(t, err)
30+
require.Nil(t, obj)
31+
require.Nil(t, reader)
32+
})
33+
34+
testStream := func(t *testing.T, size int) {
35+
payload := make([]byte, size)
36+
_, err := rand.Read(payload)
37+
require.NoError(t, err)
38+
39+
addr := oidtest.Address()
40+
obj := objectSDK.New()
41+
obj.SetID(addr.Object())
42+
obj.SetPayload(payload)
43+
44+
require.NoError(t, tree.Put(addr, obj.Marshal()))
45+
46+
retrievedObj, reader, err := tree.GetStream(addr)
47+
require.NoError(t, err)
48+
require.NotNil(t, retrievedObj)
49+
require.Equal(t, obj.CutPayload(), retrievedObj)
50+
51+
require.NotNil(t, reader)
52+
streamedPayload, err := io.ReadAll(reader)
53+
require.NoError(t, err)
54+
require.Equal(t, payload, streamedPayload)
55+
require.NoError(t, reader.Close())
56+
}
57+
58+
t.Run("different objects", func(t *testing.T) {
59+
for _, size := range payloadSizes {
60+
t.Run(fmt.Sprint(size), func(t *testing.T) {
61+
testStream(t, size)
62+
})
63+
}
64+
})
65+
66+
t.Run("compressed object", func(t *testing.T) {
67+
compress := compression.Config{Enabled: true}
68+
require.NoError(t, compress.Init())
69+
tree.Config = &compress
70+
71+
for _, size := range payloadSizes {
72+
t.Run(fmt.Sprint(size), func(t *testing.T) {
73+
testStream(t, size)
74+
})
75+
}
76+
})
77+
}
78+
79+
func TestGetStreamAfterErrors(t *testing.T) {
80+
tree := New(WithPath(t.TempDir()))
81+
82+
t.Run("corrupt header", func(t *testing.T) {
83+
addr := oidtest.Address()
84+
85+
objPath := tree.treePath(addr)
86+
require.NoError(t, os.MkdirAll(filepath.Dir(objPath), 0755))
87+
88+
f, err := os.Create(objPath)
89+
require.NoError(t, err)
90+
_, err = f.Write([]byte("corrupt data that isn't a valid object"))
91+
require.NoError(t, err)
92+
require.NoError(t, f.Close())
93+
94+
obj, reader, err := tree.GetStream(addr)
95+
require.Error(t, err)
96+
require.Nil(t, obj)
97+
require.Nil(t, reader)
98+
})
99+
100+
t.Run("corrupt compressed data", func(t *testing.T) {
101+
compress := compression.Config{Enabled: true}
102+
require.NoError(t, compress.Init())
103+
tree.Config = &compress
104+
105+
addr := oidtest.Address()
106+
obj := objectSDK.New()
107+
obj.SetID(addr.Object())
108+
payload := []byte("test payload")
109+
obj.SetPayload(payload)
110+
111+
require.NoError(t, tree.Put(addr, obj.Marshal()))
112+
113+
objPath := tree.treePath(addr)
114+
115+
f, err := os.OpenFile(objPath, os.O_WRONLY|os.O_APPEND, 0644)
116+
require.NoError(t, err)
117+
_, err = f.Write([]byte("corruption at the end"))
118+
require.NoError(t, err)
119+
require.NoError(t, f.Close())
120+
121+
_, _, err = tree.GetStream(addr)
122+
require.Error(t, err)
123+
})
124+
}

0 commit comments

Comments
 (0)