Skip to content

Commit 17a9b61

Browse files
authored
services/object: fix read complex objects regressions (#3568)
Closes #3562.
2 parents c61326c + 851d110 commit 17a9b61

File tree

5 files changed

+93
-59
lines changed

5 files changed

+93
-59
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ Changelog for NeoFS Node
1616
- `neofs-cli object range` command now truncates file passed to `--file` (#3544)
1717
- `neofs-cli object range` command now creates file with `rw-r--r--` permissions (#3544)
1818
- Alphabet nodes send basic storage income based on the new Reports API from `container` contract (#3053)
19-
- Use stream API of FSTree for object service `Get` operation (#3466)
19+
- Use stream API of FSTree for object service `Get` operation (#3466, #3568)
2020
- Use meta buckets to mark containers with GC (#3561)
2121
- Switched to local BoltDB fork based on go.etcd.io/bbolt version 1.4.3 (#3576)
2222

pkg/services/object/get/assemble.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func (exec *execCtx) initFromChild(obj oid.ID) (*oid.ID, []oid.ID) {
113113
seekLen := rng.GetLength()
114114
parSize := par.PayloadSize()
115115
if seekLen == 0 {
116-
seekLen = parSize
116+
seekLen = parSize - seekOff
117117
}
118118
seekTo := seekOff + seekLen
119119

@@ -175,8 +175,7 @@ func (exec *execCtx) overtakePayloadDirectly(children []oid.ID, rngs []objectSDK
175175
r = &rngs[i]
176176
}
177177

178-
retrieved, wrote := exec.copyChild(children[i], r, !withRng && checkRight)
179-
if !retrieved && !wrote {
178+
if !exec.copyChild(children[i], r, !withRng && checkRight) {
180179
return
181180
}
182181
}

pkg/services/object/get/assembly_v2.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,7 @@ func (exec *execCtx) processV2Last(lastID oid.ID) {
4848
}
4949

5050
if ok := exec.writeCollectedHeader(); ok {
51-
if ok := exec.overtakePayloadInReverse(lastID); ok {
52-
exec.writeObjectPayload(exec.collectedHeader, exec.collectedReader)
53-
}
51+
exec.overtakePayloadInReverse(lastID)
5452
}
5553
}
5654

@@ -147,11 +145,7 @@ func (exec *execCtx) rangeFromLink(link objectSDK.Link) bool {
147145
}
148146
}
149147

150-
retrieved, wrote := exec.copyChild(child.ObjectID(), rngPerChild, false)
151-
if !retrieved { // failed to fetch child -> abort whole operation
152-
return false
153-
}
154-
if !wrote { // payload fetch ok but writing failed -> stop further processing
148+
if !exec.copyChild(child.ObjectID(), rngPerChild, false) {
155149
return true
156150
}
157151
}

pkg/services/object/get/exec.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,8 @@ func (exec *execCtx) headOnly() bool {
192192
}
193193

194194
// copyChild fetches child object payload and streams it directly into current exec writer.
195-
// Returns if child header was received and if full payload was successfully written.
196-
func (exec *execCtx) copyChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (bool, bool) {
195+
// Returns true if full payload (or requested range) was successfully written and, if requested, header validated.
196+
func (exec *execCtx) copyChild(id oid.ID, rng *objectSDK.Range, withHdr bool) bool {
197197
log := exec.log
198198
if rng != nil {
199199
log = log.With(zap.String("child range", prettyRange(rng)))
@@ -213,14 +213,18 @@ func (exec *execCtx) copyChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (b
213213
hdr := childWriter.hdr
214214
ok := exec.status == statusOK
215215

216-
if ok && withHdr && !exec.isChild(hdr) {
217-
exec.status = statusUndefined
218-
exec.err = errors.New("wrong child header")
219-
220-
exec.log.Debug("parent address in child object differs")
216+
if ok && withHdr {
217+
if hdr == nil {
218+
return false
219+
}
220+
if !exec.isChild(hdr) {
221+
exec.status = statusUndefined
222+
exec.err = errors.New("wrong child header")
223+
exec.log.Debug("parent address in child object differs")
224+
}
221225
}
222226

223-
return hdr != nil, ok
227+
return ok
224228
}
225229

226230
func (exec *execCtx) headChild(id oid.ID) (*objectSDK.Object, bool) {
@@ -339,7 +343,7 @@ func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object, reader io.ReadClo
339343
}()
340344
bufSize := uint64(streamChunkSize)
341345
if obj != nil {
342-
bufSize = min(streamChunkSize, obj.PayloadSize())
346+
bufSize = min(streamChunkSize, max(obj.PayloadSize(), 1))
343347
}
344348
err = copyPayloadStream(exec.prm.objWriter, reader, bufSize)
345349
} else {

pkg/services/object/get/util.go

Lines changed: 75 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,78 @@ type hasherWrapper struct {
5858
hash io.Writer
5959
}
6060

61+
// fallbackRangeReader wraps a range reader obtained via ObjectRangeInit and
62+
// falls back to a full GET in case apistatus.ErrObjectAccessDenied is
63+
// returned while reading.
64+
type fallbackRangeReader struct {
65+
io.ReadCloser
66+
exec *execCtx
67+
client *clientWrapper
68+
key *ecdsa.PrivateKey
69+
rng *object.Range
70+
71+
fallbackDone bool
72+
}
73+
74+
func newFallbackRangeReader(exec *execCtx, c *clientWrapper, key *ecdsa.PrivateKey, rng *object.Range, rdr io.ReadCloser) io.ReadCloser {
75+
return &fallbackRangeReader{
76+
ReadCloser: rdr,
77+
exec: exec,
78+
client: c,
79+
key: key,
80+
rng: rng,
81+
}
82+
}
83+
84+
func (f *fallbackRangeReader) Read(p []byte) (int, error) {
85+
n, err := f.ReadCloser.Read(p)
86+
if err == nil || !errors.Is(err, apistatus.ErrObjectAccessDenied) || f.fallbackDone {
87+
return n, err
88+
}
89+
90+
f.exec.log.Debug("range read access denied, falling back to full GET")
91+
f.fallbackDone = true
92+
93+
hdr, rdr, getErr := f.client.get(f.exec, f.key)
94+
if getErr != nil {
95+
return 0, fmt.Errorf("fallback GET after access denial failed: %w", getErr)
96+
}
97+
98+
pLen := hdr.PayloadSize()
99+
from := f.rng.GetOffset()
100+
ln := f.rng.GetLength()
101+
var to uint64
102+
if ln != 0 {
103+
to = from + ln
104+
} else {
105+
to = pLen
106+
}
107+
108+
if to < from || pLen < from || pLen < to {
109+
_ = rdr.Close()
110+
return 0, apistatus.ErrObjectOutOfRange
111+
}
112+
113+
if from > 0 {
114+
_, err = io.CopyN(io.Discard, rdr, int64(from))
115+
if err != nil {
116+
_ = rdr.Close()
117+
return n, fmt.Errorf("discard %d bytes in stream: %w", from, err)
118+
}
119+
}
120+
121+
f.ReadCloser = struct {
122+
io.Reader
123+
io.Closer
124+
}{
125+
Reader: io.LimitReader(rdr, int64(to-from)),
126+
Closer: rdr,
127+
}
128+
129+
// attempt to read again immediately to fill p.
130+
return f.Read(p)
131+
}
132+
61133
func NewSimpleObjectWriter() *SimpleObjectWriter {
62134
return &SimpleObjectWriter{
63135
obj: object.New(),
@@ -162,46 +234,11 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
162234
}
163235

164236
rdr, err := c.client.ObjectRangeInit(exec.context(), addr.Container(), id, rng.GetOffset(), ln, user.NewAutoIDSigner(*key), opts)
165-
if err == nil {
166-
return nil, rdr, nil
167-
}
168-
if !errors.Is(err, apistatus.ErrObjectAccessDenied) {
169-
return nil, nil, fmt.Errorf("init payload reading: %w", err)
170-
}
171-
// Current spec allows other storage node to deny access,
172-
// fallback to GET here.
173-
hdr, reader, err := c.get(exec, key)
174237
if err != nil {
175-
return nil, nil, err
176-
}
177-
178-
pLen := hdr.PayloadSize()
179-
from := rng.GetOffset()
180-
var to uint64
181-
if ln != 0 {
182-
to = from + ln
183-
} else {
184-
to = pLen
185-
}
186-
187-
if to < from || pLen < from || pLen < to {
188-
return nil, nil, apistatus.ErrObjectOutOfRange
189-
}
190-
191-
if from > 0 {
192-
_, err = io.CopyN(io.Discard, reader, int64(from))
193-
if err != nil {
194-
return nil, nil, fmt.Errorf("discard %d bytes in stream: %w", from, err)
195-
}
238+
return nil, nil, fmt.Errorf("init payload reading: %w", err)
196239
}
197-
198-
return nil, struct {
199-
io.Reader
200-
io.Closer
201-
}{
202-
Reader: io.LimitReader(reader, int64(to-from)),
203-
Closer: reader,
204-
}, nil
240+
// fallback to full GET in case of access denial error.
241+
return nil, newFallbackRangeReader(exec, c, key, rng, rdr), nil
205242
}
206243

207244
return c.get(exec, key)

0 commit comments

Comments
 (0)