Skip to content

Commit c745de8

Browse files
authored
fstree: serve HEAD from the storage (#3383)
2 parents 861016f + e1cfe2a commit c745de8

File tree

10 files changed

+600
-100
lines changed

10 files changed

+600
-100
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ Changelog for NeoFS Node
44
## [Unreleased]
55

66
### Added
7+
- `Head` operation for FSTree (#3383)
78

89
### Fixed
910
- IR exponentially retries updating SN lists in the Container contract in error cases (#3344)

pkg/local_object_storage/blobstor/common/storage.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type Storage interface {
2626
GetBytes(oid.Address) ([]byte, error)
2727
Get(oid.Address) (*objectSDK.Object, error)
2828
GetRange(oid.Address, uint64, uint64) ([]byte, error)
29+
Head(oid.Address) (*objectSDK.Object, error)
2930
Exists(oid.Address) (bool, error)
3031
Put(oid.Address, []byte) error
3132
PutBatch(map[oid.Address][]byte) error

pkg/local_object_storage/blobstor/fstree/fstree.go

Lines changed: 27 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -232,14 +232,11 @@ func (t *FSTree) iterate(depth uint64, curPath []string,
232232
return nil
233233
})
234234
} else {
235-
data, err = getRawObjectBytes(addr.Object(), p)
236-
if err != nil && errors.Is(err, apistatus.ObjectNotFound{}) {
237-
continue
238-
}
239-
if err == nil {
240-
data, err = t.Decompress(data)
241-
}
235+
data, err = t.getObjectBytesByPath(addr.Object(), p)
242236
if err != nil {
237+
if errors.Is(err, apistatus.ErrObjectNotFound) {
238+
continue
239+
}
243240
if errorHandler != nil {
244241
err = errorHandler(*addr, err)
245242
if err == nil {
@@ -403,20 +400,11 @@ func (t *FSTree) GetBytes(addr oid.Address) ([]byte, error) {
403400
// getObjBytes extracts object bytes from the storage by address.
404401
func (t *FSTree) getObjBytes(addr oid.Address) ([]byte, error) {
405402
p := t.treePath(addr)
406-
data, err := getRawObjectBytes(addr.Object(), p)
407-
if err != nil {
408-
return nil, err
409-
}
410-
data, err = t.Decompress(data)
411-
if err != nil {
412-
return nil, fmt.Errorf("decompress file data %q: %w", p, err)
413-
}
414-
return data, nil
403+
return t.getObjectBytesByPath(addr.Object(), p)
415404
}
416405

417-
// getRawObjectBytes extracts raw object bytes from the storage by path. No
418-
// decompression is performed.
419-
func getRawObjectBytes(id oid.ID, p string) ([]byte, error) {
406+
// getObjectBytesByPath extracts object bytes from the storage by path.
407+
func (t *FSTree) getObjectBytesByPath(id oid.ID, p string) ([]byte, error) {
420408
f, err := os.Open(p)
421409
if err != nil {
422410
if errors.Is(err, fs.ErrNotExist) {
@@ -425,7 +413,7 @@ func getRawObjectBytes(id oid.ID, p string) ([]byte, error) {
425413
return nil, fmt.Errorf("read file %q: %w", p, err)
426414
}
427415
defer f.Close()
428-
data, err := extractCombinedObject(id, f)
416+
data, err := t.extractCombinedObject(id, f)
429417
if err != nil {
430418
if errors.Is(err, fs.ErrNotExist) {
431419
return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
@@ -437,18 +425,17 @@ func getRawObjectBytes(id oid.ID, p string) ([]byte, error) {
437425

438426
// parseCombinedPrefix checks the given array for combined data prefix and
439427
// returns a subslice with OID and object length if so (nil and 0 otherwise).
440-
func parseCombinedPrefix(p [combinedDataOff]byte) ([]byte, uint32) {
428+
func parseCombinedPrefix(p []byte) ([]byte, uint32) {
441429
if p[0] != combinedPrefix || p[1] != 0 { // Only version 0 is supported now.
442430
return nil, 0
443431
}
444432
return p[combinedIDOff:combinedLengthOff],
445433
binary.BigEndian.Uint32(p[combinedLengthOff:combinedDataOff])
446434
}
447435

448-
func extractCombinedObject(id oid.ID, f *os.File) ([]byte, error) {
436+
func (t *FSTree) extractCombinedObject(id oid.ID, f *os.File) ([]byte, error) {
449437
var (
450438
comBuf [combinedDataOff]byte
451-
data []byte
452439
isCombined bool
453440
)
454441

@@ -457,13 +444,13 @@ func extractCombinedObject(id oid.ID, f *os.File) ([]byte, error) {
457444
if err != nil {
458445
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
459446
if !isCombined {
460-
return comBuf[:n], nil
447+
return t.Decompress(comBuf[:n])
461448
}
462449
return nil, fs.ErrNotExist
463450
}
464451
return nil, err
465452
}
466-
thisOID, l := parseCombinedPrefix(comBuf)
453+
thisOID, l := parseCombinedPrefix(comBuf[:])
467454
if thisOID == nil {
468455
if isCombined {
469456
return nil, errors.New("malformed combined file")
@@ -476,22 +463,11 @@ func extractCombinedObject(id oid.ID, f *os.File) ([]byte, error) {
476463
if sz > math.MaxInt {
477464
return nil, errors.New("too large file")
478465
}
479-
data = make([]byte, int(sz))
480-
copy(data, comBuf[:])
481-
_, err = io.ReadFull(f, data[len(comBuf):])
482-
if err != nil {
483-
return nil, err
484-
}
485-
return data, nil
466+
return t.readFullObject(f, comBuf[:n], sz)
486467
}
487468
isCombined = true
488469
if bytes.Equal(thisOID, id[:]) {
489-
data = make([]byte, l)
490-
_, err = io.ReadFull(f, data)
491-
if err != nil {
492-
return nil, err
493-
}
494-
return data, nil
470+
return t.readFullObject(f, nil, int64(l))
495471
}
496472
_, err = f.Seek(int64(l), 1)
497473
if err != nil {
@@ -500,6 +476,19 @@ func extractCombinedObject(id oid.ID, f *os.File) ([]byte, error) {
500476
}
501477
}
502478

479+
// readFullObject reads full data of object from the file and decompresses it if necessary.
480+
func (t *FSTree) readFullObject(f io.Reader, initial []byte, size int64) ([]byte, error) {
481+
data := make([]byte, size)
482+
copy(data, initial)
483+
n, err := io.ReadFull(f, data[len(initial):])
484+
if err != nil {
485+
return nil, fmt.Errorf("read: %w", err)
486+
}
487+
data = data[:len(initial)+n]
488+
489+
return t.Decompress(data)
490+
}
491+
503492
// GetRange implements common.Storage.
504493
func (t *FSTree) GetRange(addr oid.Address, from uint64, length uint64) ([]byte, error) {
505494
obj, err := t.Get(addr)
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
package fstree
2+
3+
import (
4+
"bytes"
5+
"errors"
6+
"fmt"
7+
"io"
8+
"io/fs"
9+
"os"
10+
11+
"github.com/klauspost/compress/zstd"
12+
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
13+
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
14+
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
15+
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
16+
"github.com/nspcc-dev/neofs-sdk-go/proto/object"
17+
"github.com/nspcc-dev/neofs-sdk-go/proto/refs"
18+
"google.golang.org/protobuf/encoding/protowire"
19+
"google.golang.org/protobuf/proto"
20+
)
21+
22+
const (
23+
_ = iota
24+
fieldObjectID
25+
fieldObjectSignature
26+
fieldObjectHeader
27+
fieldObjectPayload
28+
)
29+
30+
// Head returns an object's header from the storage by address without reading the full payload.
31+
func (t *FSTree) Head(addr oid.Address) (*objectSDK.Object, error) {
32+
p := t.treePath(addr)
33+
34+
f, err := os.Open(p)
35+
if err != nil {
36+
if errors.Is(err, fs.ErrNotExist) {
37+
return nil, logicerr.Wrap(apistatus.ErrObjectNotFound)
38+
}
39+
return nil, fmt.Errorf("read file %q: %w", p, err)
40+
}
41+
defer f.Close()
42+
43+
obj, err := t.extractHeaderOnly(addr.Object(), f)
44+
if err != nil {
45+
if errors.Is(err, fs.ErrNotExist) {
46+
return nil, logicerr.Wrap(apistatus.ErrObjectNotFound)
47+
}
48+
return nil, fmt.Errorf("extract object header from %q: %w", p, err)
49+
}
50+
51+
return obj, nil
52+
}
53+
54+
// extractHeaderOnly reads the header of an object from a file.
55+
func (t *FSTree) extractHeaderOnly(id oid.ID, f *os.File) (*objectSDK.Object, error) {
56+
buf := make([]byte, objectSDK.MaxHeaderLen, 2*objectSDK.MaxHeaderLen)
57+
n, err := io.ReadFull(f, buf)
58+
if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
59+
return nil, err
60+
}
61+
62+
thisOID, l := parseCombinedPrefix(buf)
63+
if thisOID == nil {
64+
return t.readHeader(f, buf[:n])
65+
}
66+
67+
offset := combinedDataOff
68+
for {
69+
if bytes.Equal(thisOID, id[:]) {
70+
size := min(offset+int(l), offset+objectSDK.MaxHeaderLen)
71+
if n < size {
72+
_, err = io.ReadFull(f, buf[n:size])
73+
if err != nil {
74+
return nil, fmt.Errorf("read up to size: %w", err)
75+
}
76+
}
77+
return t.readHeader(f, buf[offset:size])
78+
}
79+
80+
offset += int(l)
81+
if n-offset < combinedDataOff {
82+
if offset > n {
83+
_, err = f.Seek(int64(offset-n), io.SeekCurrent)
84+
if err != nil {
85+
return nil, err
86+
}
87+
}
88+
n = copy(buf, buf[min(offset, n):n])
89+
offset = 0
90+
k, err := io.ReadFull(f, buf[n:n+objectSDK.MaxHeaderLen])
91+
if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
92+
return nil, fmt.Errorf("read full: %w", err)
93+
}
94+
n += k
95+
}
96+
97+
thisOID, l = parseCombinedPrefix(buf[offset:])
98+
if thisOID == nil {
99+
return nil, errors.New("malformed combined file")
100+
}
101+
102+
offset += combinedDataOff
103+
}
104+
}
105+
106+
// readHeader reads an object from the file.
107+
func (t *FSTree) readHeader(f io.Reader, initial []byte) (*objectSDK.Object, error) {
108+
var err error
109+
if len(initial) < objectSDK.MaxHeaderLen {
110+
initial, err = t.Decompress(initial)
111+
if err != nil {
112+
return nil, fmt.Errorf("decompress initial data: %w", err)
113+
}
114+
var obj objectSDK.Object
115+
err = obj.Unmarshal(initial)
116+
if err != nil {
117+
return nil, fmt.Errorf("unmarshal object: %w", err)
118+
}
119+
return obj.CutPayload(), nil
120+
}
121+
return t.readUntilPayload(f, initial)
122+
}
123+
124+
// readUntilPayload reads an object from the file until the payload field is reached.
125+
func (t *FSTree) readUntilPayload(f io.Reader, initial []byte) (*objectSDK.Object, error) {
126+
if t.IsCompressed(initial) {
127+
decoder, err := zstd.NewReader(io.MultiReader(bytes.NewReader(initial), f))
128+
if err != nil {
129+
return nil, fmt.Errorf("zstd decoder: %w", err)
130+
}
131+
defer decoder.Close()
132+
133+
buf := make([]byte, objectSDK.MaxHeaderLen)
134+
n, err := decoder.Read(buf)
135+
if err != nil && !errors.Is(err, io.EOF) {
136+
return nil, fmt.Errorf("zstd read: %w", err)
137+
}
138+
initial = buf[:n]
139+
}
140+
141+
return fastExtractHeader(initial)
142+
}
143+
144+
// fastExtractHeader extracts the header of an object from the given byte slice.
145+
func fastExtractHeader(data []byte) (*objectSDK.Object, error) {
146+
var (
147+
offset int
148+
res objectSDK.Object
149+
obj object.Object
150+
)
151+
152+
if len(data) == 0 {
153+
return nil, fmt.Errorf("empty data")
154+
}
155+
156+
for offset < len(data) {
157+
num, typ, n := protowire.ConsumeTag(data[offset:])
158+
if err := protowire.ParseError(n); err != nil {
159+
return nil, fmt.Errorf("invalid tag at offset %d: %w", offset, err)
160+
}
161+
offset += n
162+
163+
if typ != protowire.BytesType {
164+
return nil, fmt.Errorf("unexpected wire type: %v", typ)
165+
}
166+
167+
if num == fieldObjectPayload {
168+
break
169+
}
170+
171+
val, n := protowire.ConsumeBytes(data[offset:])
172+
if err := protowire.ParseError(n); err != nil {
173+
return nil, fmt.Errorf("invalid bytes field at offset %d: %w", offset, err)
174+
}
175+
offset += n
176+
177+
switch num {
178+
case fieldObjectID:
179+
obj.ObjectId = new(refs.ObjectID)
180+
err := proto.Unmarshal(val, obj.ObjectId)
181+
if err != nil {
182+
return nil, fmt.Errorf("unmarshal object ID: %w", err)
183+
}
184+
case fieldObjectSignature:
185+
obj.Signature = new(refs.Signature)
186+
err := proto.Unmarshal(val, obj.Signature)
187+
if err != nil {
188+
return nil, fmt.Errorf("unmarshal object signature: %w", err)
189+
}
190+
case fieldObjectHeader:
191+
obj.Header = new(object.Header)
192+
err := proto.Unmarshal(val, obj.Header)
193+
if err != nil {
194+
return nil, fmt.Errorf("unmarshal object header: %w", err)
195+
}
196+
return &res, res.FromProtoMessage(&obj)
197+
default:
198+
return nil, fmt.Errorf("unknown field number: %d", num)
199+
}
200+
}
201+
202+
return &res, res.FromProtoMessage(&obj)
203+
}

0 commit comments

Comments
 (0)