diff --git a/CHANGELOG.md b/CHANGELOG.md index 88b6cb3f9b..dee95bac87 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Changelog for NeoFS Node - `neofs-cli object range` command now truncates file passed to `--file` (#3544) - `neofs-cli object range` command now creates file with `rw-r--r--` permissions (#3544) - Alphabet nodes send basic storage income based on the new Reports API from `container` contract (#3053) +- Use stream API of FSTree for object service `Get` operation (#3466) ### Removed - `neofs-cli object head --main-only` no-op flag (#3509) diff --git a/pkg/services/object/get/assemble.go b/pkg/services/object/get/assemble.go index 4bc6515f75..9c53b7571f 100644 --- a/pkg/services/object/get/assemble.go +++ b/pkg/services/object/get/assemble.go @@ -66,7 +66,7 @@ func (exec *execCtx) assemble() { if ok := exec.overtakePayloadInReverse(children[len(children)-1]); ok { // payload of all children except the last are written, write last payload - exec.writeObjectPayload(exec.collectedObject) + exec.copyChild(exec.lastChildID, &exec.lastChildRange, false) } } } else if prev != nil { @@ -76,8 +76,12 @@ func (exec *execCtx) assemble() { // * else go right-to-left with GET and compose in single object before writing if ok := exec.overtakePayloadInReverse(*prev); ok { - // payload of all children except the last are written, write last payloa - exec.writeObjectPayload(exec.collectedObject) + var rng *objectSDK.Range + if exec.ctxRange() != nil { + rng = &exec.lastChildRange + } + // payload of all children except the last are written, write last payload + exec.copyChild(exec.lastChildID, rng, false) } } } else { @@ -85,12 +89,12 @@ func (exec *execCtx) assemble() { } } -func (exec *execCtx) initFromChild(obj oid.ID) (prev *oid.ID, children []oid.ID) { +func (exec *execCtx) initFromChild(obj oid.ID) (*oid.ID, []oid.ID) { exec.log.Debug("starting assembling from child", zap.Stringer("child ID", obj)) - child, ok := exec.getChild(obj, nil, true) + child, ok := exec.headChild(obj) if !ok { - return + return nil, nil } par := child.Parent() @@ -99,12 +103,10 @@ func (exec *execCtx) initFromChild(obj oid.ID) (prev *oid.ID, children []oid.ID) exec.log.Debug("received child with empty parent", zap.Stringer("child ID", obj)) - return + return nil, nil } - exec.collectedObject = par - - var payload []byte + exec.collectedHeader = par if rng := exec.ctxRange(); rng != nil { seekOff := rng.GetOffset() @@ -116,35 +118,45 @@ func (exec *execCtx) initFromChild(obj oid.ID) (prev *oid.ID, children []oid.ID) seekTo := seekOff + seekLen if seekTo < seekOff || parSize < seekOff || parSize < seekTo { - var errOutOfRange apistatus.ObjectOutOfRange - - exec.err = &errOutOfRange + exec.err = apistatus.ErrObjectOutOfRange exec.status = statusAPIResponse - return + return nil, nil } childSize := child.PayloadSize() - exec.curOff = parSize - childSize + startRight := parSize - childSize + exec.curOff = startRight from := uint64(0) - if exec.curOff < seekOff { - from = seekOff - exec.curOff + if startRight < seekOff { + from = seekOff - startRight } to := uint64(0) - if seekOff+seekLen > exec.curOff+from { - to = seekOff + seekLen - exec.curOff + if seekOff+seekLen > startRight+from { + to = seekOff + seekLen - startRight + if to > childSize { + to = childSize + } } - payload = child.Payload()[from:to] - rng.SetLength(seekLen - to + from) - } else { - payload = child.Payload() + segLen := uint64(0) + if to > from { + segLen = to - from + } + rng.SetLength(seekLen - segLen) + + if segLen > 0 { + exec.lastChildRange.SetOffset(from) + exec.lastChildRange.SetLength(segLen) + } else { + exec.lastChildRange.SetLength(0) + } } - exec.collectedObject.SetPayload(payload) + exec.lastChildID = child.GetID() idPrev := child.GetPreviousID() if !idPrev.IsZero() { @@ -163,12 +175,8 @@ func (exec *execCtx) overtakePayloadDirectly(children []oid.ID, rngs []objectSDK r = &rngs[i] } - child, ok := exec.getChild(children[i], r, !withRng && checkRight) - if !ok { - return - } - - if ok := exec.writeObjectPayload(child); !ok { + retrieved, wrote := exec.copyChild(children[i], r, !withRng && checkRight) + if !retrieved && !wrote { return } } diff --git a/pkg/services/object/get/assembly_v2.go b/pkg/services/object/get/assembly_v2.go index fa6b816762..eeda650bef 100644 --- a/pkg/services/object/get/assembly_v2.go +++ b/pkg/services/object/get/assembly_v2.go @@ -36,37 +36,54 @@ func (exec *execCtx) processV2Split(si *objectSDK.SplitInfo) { } func (exec *execCtx) processV2Last(lastID oid.ID) { - lastObj, ok := exec.getChild(lastID, nil, true) + lastHead, ok := exec.headChild(lastID) if !ok { exec.log.Debug("failed to read last object") return } - exec.collectedObject = lastObj.Parent() + exec.collectedHeader = lastHead.Parent() if r := exec.ctxRange(); r != nil && r.GetLength() == 0 { - r.SetLength(exec.collectedObject.PayloadSize()) + r.SetLength(exec.collectedHeader.PayloadSize()) } - // copied from V1, and it has the same problems as V1; - // see it for comments and optimization suggestions if ok := exec.writeCollectedHeader(); ok { if ok := exec.overtakePayloadInReverse(lastID); ok { - exec.writeObjectPayload(exec.collectedObject) + exec.writeObjectPayload(exec.collectedHeader, exec.collectedReader) } } } func (exec *execCtx) processV2Link(linkID oid.ID) bool { - linkObj, ok := exec.getChild(linkID, nil, true) - if !ok { + // need full payload of link object to parse link structure + w := NewSimpleObjectWriter() + + p := exec.prm + p.common = p.common.WithLocalOnly(false) + p.objWriter = w + p.SetRange(nil) + p.addr.SetContainer(exec.containerID()) + p.addr.SetObject(linkID) + + exec.statusError = exec.svc.get(exec.context(), p.commonPrm, withLogger(exec.log)) + + if exec.status != statusOK { exec.log.Debug("failed to read link object") return false } - exec.collectedObject = linkObj.Parent() + linkObj := w.Object() + if !exec.isChild(linkObj) { + exec.status = statusUndefined + exec.err = errors.New("wrong child header") + exec.log.Debug("parent address in link object differs") + return false + } + + exec.collectedHeader = linkObj.Parent() rng := exec.ctxRange() if rng != nil && rng.GetLength() == 0 { - rng.SetLength(exec.collectedObject.PayloadSize()) + rng.SetLength(exec.collectedHeader.PayloadSize()) } var link objectSDK.Link @@ -100,9 +117,7 @@ func (exec *execCtx) processV2Link(linkID oid.ID) bool { seekTo := seekOff + seekLen if seekTo < seekOff || parSize < seekOff || parSize < seekTo { - var errOutOfRange apistatus.ObjectOutOfRange - - exec.err = &errOutOfRange + exec.err = apistatus.ErrObjectOutOfRange exec.status = statusAPIResponse // the operation has failed but no need to continue so `true` here @@ -132,13 +147,11 @@ func (exec *execCtx) rangeFromLink(link objectSDK.Link) bool { } } - part, ok := exec.getChild(child.ObjectID(), rngPerChild, false) - if !ok { + retrieved, wrote := exec.copyChild(child.ObjectID(), rngPerChild, false) + if !retrieved { // failed to fetch child -> abort whole operation return false } - - if !exec.writeObjectPayload(part) { - // we have payload, we want to send it but can't so stop here + if !wrote { // payload fetch ok but writing failed -> stop further processing return true } } diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index c26ddab7ba..76b3401458 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -4,6 +4,7 @@ import ( "context" "crypto/ecdsa" "errors" + "io" clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/object" @@ -14,6 +15,12 @@ import ( "go.uber.org/zap" ) +const ( + // streamChunkSize is the size of the chunk that is used to read/write + // object payload from/to the stream. + streamChunkSize = 256 * 1024 // 256 KiB +) + type statusError struct { status int err error @@ -34,11 +41,16 @@ type execCtx struct { // If debug level is enabled, all messages also include info about processing request. log *zap.Logger - collectedObject *objectSDK.Object + collectedHeader *objectSDK.Object + collectedReader io.ReadCloser curOff uint64 head bool + + // range assembly (V1 split) helpers + lastChildID oid.ID + lastChildRange objectSDK.Range } type execOption func(*execCtx) @@ -167,35 +179,36 @@ func (exec *execCtx) headOnly() bool { return exec.head } -func (exec *execCtx) getChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (*objectSDK.Object, bool) { +// copyChild fetches child object payload and streams it directly into current exec writer. +// Returns if child header was received and if full payload was successfully written. +func (exec *execCtx) copyChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (bool, bool) { log := exec.log if rng != nil { log = log.With(zap.String("child range", prettyRange(rng))) } - w := NewSimpleObjectWriter() + childWriter := newDirectChildWriter(exec.prm.objWriter) p := exec.prm p.common = p.common.WithLocalOnly(false) - p.objWriter = w + p.objWriter = childWriter p.SetRange(rng) - p.addr.SetContainer(exec.containerID()) p.addr.SetObject(id) exec.statusError = exec.svc.get(exec.context(), p.commonPrm, withPayloadRange(rng), withLogger(log)) - child := w.Object() + hdr := childWriter.hdr ok := exec.status == statusOK - if ok && withHdr && !exec.isChild(child) { + if ok && withHdr && !exec.isChild(hdr) { exec.status = statusUndefined exec.err = errors.New("wrong child header") exec.log.Debug("parent address in child object differs") } - return child, ok + return hdr != nil, ok } func (exec *execCtx) headChild(id oid.ID) (*objectSDK.Object, bool) { @@ -280,7 +293,7 @@ func (exec *execCtx) writeCollectedHeader() bool { } err := exec.prm.objWriter.WriteHeader( - exec.collectedObject.CutPayload(), + exec.collectedHeader.CutPayload(), ) switch { @@ -299,12 +312,27 @@ func (exec *execCtx) writeCollectedHeader() bool { return exec.status == statusOK } -func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool { +func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object, reader io.ReadCloser) bool { if exec.headOnly() { return true } - err := exec.prm.objWriter.WriteChunk(obj.Payload()) + var err error + if reader != nil { + defer func() { + err := reader.Close() + if err != nil { + exec.log.Debug("error while closing payload reader", zap.Error(err)) + } + }() + bufSize := streamChunkSize + if obj != nil { + bufSize = min(streamChunkSize, int(obj.PayloadSize())) + } + err = copyPayloadStream(exec.prm.objWriter, reader, bufSize) + } else { + err = exec.prm.objWriter.WriteChunk(obj.Payload()) + } switch { default: @@ -322,9 +350,29 @@ func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool { return err == nil } +// copyPayloadStream writes payload from stream to writer. +func copyPayloadStream(w ChunkWriter, r io.Reader, bufSize int) error { + buf := make([]byte, bufSize) + for { + n, err := r.Read(buf) + if n > 0 { + if writeErr := w.WriteChunk(buf[:n]); writeErr != nil { + return writeErr + } + } + + if errors.Is(err, io.EOF) { + return nil + } + if err != nil { + return err + } + } +} + func (exec *execCtx) writeCollectedObject() { if ok := exec.writeCollectedHeader(); ok { - exec.writeObjectPayload(exec.collectedObject) + exec.writeObjectPayload(exec.collectedHeader, exec.collectedReader) } } diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index c95980549e..ae7c00393b 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -1,10 +1,12 @@ package getsvc import ( + "bytes" "context" "crypto/rand" "errors" "fmt" + "io" "strconv" "testing" @@ -89,19 +91,28 @@ func newTestClient() *testClient { } } -func (c *testClient) getObject(exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) { +func (c *testClient) getObject(exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, io.ReadCloser, error) { v, ok := c.results[exec.address()] if !ok { var errNotFound apistatus.ObjectNotFound - return nil, errNotFound + return nil, nil, errNotFound } if v.err != nil { - return nil, v.err + return nil, nil, v.err } - return cutToRange(v.obj, exec.ctxRange()), nil + obj := cutToRange(v.obj, exec.ctxRange()) + + if obj != nil && len(obj.Payload()) > 0 { + reader := io.NopCloser(bytes.NewReader(obj.Payload())) + objWithoutPayload := obj.CutPayload() + objWithoutPayload.SetPayloadSize(obj.PayloadSize()) + return objWithoutPayload, reader, nil + } + + return obj, nil, nil } func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err error) { @@ -111,7 +122,7 @@ func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err erro }{obj: obj, err: err} } -func (s *testStorage) get(exec *execCtx) (*objectSDK.Object, error) { +func (s *testStorage) get(exec *execCtx) (*objectSDK.Object, io.ReadCloser, error) { var ( ok bool obj *objectSDK.Object @@ -121,20 +132,29 @@ func (s *testStorage) get(exec *execCtx) (*objectSDK.Object, error) { if _, ok = s.inhumed[sAddr]; ok { var errRemoved apistatus.ObjectAlreadyRemoved - return nil, errRemoved + return nil, nil, errRemoved } if info, ok := s.virtual[sAddr]; ok { - return nil, objectSDK.NewSplitInfoError(info) + return nil, nil, objectSDK.NewSplitInfoError(info) } if obj, ok = s.phy[sAddr]; ok { - return cutToRange(obj, exec.ctxRange()), nil + obj = cutToRange(obj, exec.ctxRange()) + + if obj != nil && len(obj.Payload()) > 0 { + reader := io.NopCloser(bytes.NewReader(obj.Payload())) + objWithoutPayload := obj.CutPayload() + objWithoutPayload.SetPayloadSize(obj.PayloadSize()) + return objWithoutPayload, reader, nil + } + + return obj, nil, nil } var errNotFound apistatus.ObjectNotFound - return nil, errNotFound + return nil, nil, errNotFound } func cutToRange(o *objectSDK.Object, rng *objectSDK.Range) *objectSDK.Object { diff --git a/pkg/services/object/get/local.go b/pkg/services/object/get/local.go index 598900d696..22a7152f5a 100644 --- a/pkg/services/object/get/local.go +++ b/pkg/services/object/get/local.go @@ -11,7 +11,7 @@ import ( func (exec *execCtx) executeLocal() { var err error - exec.collectedObject, err = exec.svc.localStorage.get(exec) + exec.collectedHeader, exec.collectedReader, err = exec.svc.localStorage.get(exec) var errSplitInfo *objectSDK.SplitInfoError diff --git a/pkg/services/object/get/remote.go b/pkg/services/object/get/remote.go index a2b3d3a1a3..02e2b92e3d 100644 --- a/pkg/services/object/get/remote.go +++ b/pkg/services/object/get/remote.go @@ -17,7 +17,7 @@ func (exec *execCtx) processNode(info client.NodeInfo) bool { return true } - obj, err := remoteClient.getObject(exec, info) + obj, reader, err := remoteClient.getObject(exec, info) var errSplitInfo *objectSDK.SplitInfoError @@ -38,8 +38,9 @@ func (exec *execCtx) processNode(info client.NodeInfo) bool { // has already been streamed to the requesting party, // or it is a GETRANGEHASH forwarded request whose // response is not an object - if obj != nil { - exec.collectedObject = obj + if obj != nil || reader != nil { + exec.collectedHeader = obj + exec.collectedReader = reader exec.writeCollectedObject() } case errors.Is(err, apistatus.Error) && !errors.Is(err, apistatus.ErrObjectNotFound): diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index 68f1a5ac13..1fbf28df45 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -1,6 +1,8 @@ package getsvc import ( + "io" + "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" @@ -41,14 +43,14 @@ type Service struct { type Option func(*cfg) type getClient interface { - getObject(*execCtx, client.NodeInfo) (*object.Object, error) + getObject(*execCtx, client.NodeInfo) (*object.Object, io.ReadCloser, error) } type cfg struct { log *zap.Logger localStorage interface { - get(*execCtx) (*object.Object, error) + get(*execCtx) (*object.Object, io.ReadCloser, error) } clientCache interface { diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index 64bf956ea7..be115e43df 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -16,11 +16,6 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/user" ) -// maxInitialBufferSize is the maximum initial buffer size for GetRange result. -// We don't want to allocate a lot of space in advance because a query can -// fail with apistatus.ObjectOutOfRange status. -const maxInitialBufferSize = 1024 * 1024 // 1 MiB - type SimpleObjectWriter struct { obj *object.Object @@ -89,14 +84,15 @@ func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) { }, nil } -func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) { +func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*object.Object, io.ReadCloser, error) { if exec.isForwardingEnabled() { - return exec.prm.forwarder(exec.ctx, info, c.client) + obj, err := exec.prm.forwarder(exec.ctx, info, c.client) + return obj, nil, err } key, err := exec.key() if err != nil { - return nil, err + return nil, nil, err } if exec.headOnly() { @@ -120,15 +116,15 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj hdr, err := c.client.ObjectHead(exec.context(), addr.Container(), id, user.NewAutoIDSigner(*key), opts) if err != nil { - return nil, fmt.Errorf("read object header from NeoFS: %w", err) + return nil, nil, fmt.Errorf("read object header from NeoFS: %w", err) } - return hdr, nil + return hdr, nil, nil } if rngH := exec.prmRangeHash; rngH != nil && exec.isRangeHashForwardingEnabled() { exec.prmRangeHash.forwardedRangeHashResponse, err = exec.prm.rangeForwarder(exec.ctx, info, c.client) - return nil, err + return nil, nil, err } // we don't specify payload writer because we accumulate @@ -154,53 +150,52 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj } rdr, err := c.client.ObjectRangeInit(exec.context(), addr.Container(), id, rng.GetOffset(), ln, user.NewAutoIDSigner(*key), opts) - if err != nil { - err = fmt.Errorf("init payload reading: %w", err) - } else { - if int64(ln) < 0 { - // `CopyN` expects `int64`, this check ensures that the result is positive. - // On practice this means that we can return incorrect results for objects - // with size > 8_388 Petabytes, this will be fixed later with support for streaming. - return nil, new(apistatus.ObjectOutOfRange) - } - - bufInitLen := min(ln, maxInitialBufferSize) - - w := bytes.NewBuffer(make([]byte, bufInitLen)) - _, err = io.CopyN(w, rdr, int64(ln)) - if err == nil { - return payloadOnlyObject(w.Bytes()), nil - } - err = fmt.Errorf("read payload: %w", err) + if err == nil { + return nil, rdr, nil } if !errors.Is(err, apistatus.ErrObjectAccessDenied) { - return nil, err + return nil, nil, fmt.Errorf("init payload reading: %w", err) } // Current spec allows other storage node to deny access, // fallback to GET here. - obj, err := c.get(exec, key) + hdr, reader, err := c.get(exec, key) if err != nil { - return nil, err + return nil, nil, err } - payload := obj.Payload() + pLen := hdr.PayloadSize() from := rng.GetOffset() - if ln == 0 { - ln = obj.PayloadSize() + var to uint64 + if ln != 0 { + to = from + ln + } else { + to = pLen } - to := from + ln - if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to { - return nil, new(apistatus.ObjectOutOfRange) + if to < from || pLen < from || pLen < to { + return nil, nil, apistatus.ErrObjectOutOfRange } - return payloadOnlyObject(payload[from:to]), nil + if from > 0 { + _, err = io.CopyN(io.Discard, reader, int64(from)) + if err != nil { + return nil, nil, fmt.Errorf("discard %d bytes in stream: %w", from, err) + } + } + + return nil, struct { + io.Reader + io.Closer + }{ + Reader: io.LimitReader(reader, int64(to-from)), + Closer: reader, + }, nil } return c.get(exec, key) } -func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) { +func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, io.ReadCloser, error) { addr := exec.address() id := addr.Object() @@ -221,55 +216,28 @@ func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Objec hdr, rdr, err := c.client.ObjectGetInit(exec.context(), addr.Container(), id, user.NewAutoIDSigner(*key), opts) if err != nil { - return nil, fmt.Errorf("init object reading:: %w", err) - } - - buf := make([]byte, hdr.PayloadSize()) - - _, err = rdr.Read(buf) - if err != nil && !errors.Is(err, io.EOF) { - return nil, fmt.Errorf("read payload: %w", err) + return nil, nil, fmt.Errorf("init object reader: %w", err) } - - hdr.SetPayload(buf) - - return &hdr, nil + return &hdr, rdr, nil } -func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) { +func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, io.ReadCloser, error) { if exec.headOnly() { r, err := e.engine.Head(exec.address(), exec.isRaw()) if err != nil { - return nil, err + return nil, nil, err } - return r, nil + return r, nil, nil } if rng := exec.ctxRange(); rng != nil { r, err := e.engine.GetRange(exec.address(), rng.GetOffset(), rng.GetLength()) - if err != nil { - return nil, err - } - - o := object.New() - o.SetPayload(r) - - return o, nil - } - - header, reader, err := e.engine.GetStream(exec.address()) - if err != nil { - return nil, err + // TODO: use here GetRangeStream when it will be implemented + return nil, io.NopCloser(bytes.NewReader(r)), err } - defer func() { _ = reader.Close() }() - payload, err := io.ReadAll(reader) - if err != nil { - return nil, fmt.Errorf("can't read object payload: %w", err) - } - header.SetPayload(payload) - return header, nil + return e.engine.GetStream(exec.address()) } func (w *partWriter) WriteChunk(p []byte) error { @@ -280,13 +248,6 @@ func (w *partWriter) WriteHeader(o *object.Object) error { return w.headWriter.WriteHeader(o) } -func payloadOnlyObject(payload []byte) *object.Object { - obj := object.New() - obj.SetPayload(payload) - - return obj -} - func (h *hasherWrapper) WriteChunk(p []byte) error { _, err := h.hash.Write(p) return err @@ -295,3 +256,21 @@ func (h *hasherWrapper) WriteChunk(p []byte) error { func prettyRange(rng *object.Range) string { return fmt.Sprintf("[%d:%d]", rng.GetOffset(), rng.GetLength()) } + +// directChildWriter streams child object payload directly into destination ChunkWriter +// while capturing the header. +type directChildWriter struct { + hdr *object.Object + ChunkWriter +} + +func newDirectChildWriter(dest ChunkWriter) *directChildWriter { + return &directChildWriter{ + ChunkWriter: dest, + } +} + +func (w *directChildWriter) WriteHeader(obj *object.Object) error { + w.hdr = obj + return nil +}