Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fix BlockReader#SkipNext & add SourceOffset property #491

Merged
merged 1 commit into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 57 additions & 36 deletions v2/block_reader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package car

import (
"errors"
"fmt"
"io"

Expand All @@ -23,6 +24,7 @@
// Used internally only, by BlockReader.Next during iteration over blocks.
r io.Reader
offset uint64
v1offset uint64
readerSize int64
opts Options
}
Expand Down Expand Up @@ -80,7 +82,8 @@
if _, err := rs.Seek(int64(v2h.DataOffset)-PragmaSize-HeaderSize, io.SeekCurrent); err != nil {
return nil, err
}
br.offset = uint64(v2h.DataOffset)
br.v1offset = uint64(v2h.DataOffset)
br.offset = br.v1offset
br.readerSize = int64(v2h.DataOffset + v2h.DataSize)

// Set br.r to a LimitReader reading from r limited to dataSize.
Expand All @@ -96,6 +99,8 @@
return nil, fmt.Errorf("invalid data payload header version; expected 1, got %v", header.Version)
}
br.Roots = header.Roots
hs, _ := carv1.HeaderSize(header)
br.offset += hs
default:
// Otherwise, error out with invalid version since only versions 1 or 2 are expected.
return nil, fmt.Errorf("invalid car version: %d", br.Version)
Expand Down Expand Up @@ -136,10 +141,22 @@
return blocks.NewBlockWithCid(data, c)
}

// BlockMetadata contains metadata about a block's section in a CAR file/stream.
//
// There are two offsets for the block data which will be the same if the
// original CAR is a CARv1, but will differ if the original CAR is a CARv2. In
// the case of a CARv2, SourceOffset will be the offset from the beginning of
// the file/steam, and Offset will be the offset from the beginning of the CARv1
// payload container within the CARv2.
//
// Offset is useful for index generation which requires an offset from the CARv1
// payload; while SourceOffset is useful for direct block reads out of the
// source file/stream regardless of version.
type BlockMetadata struct {
cid.Cid
Offset uint64
Size uint64
Offset uint64 // Offset of the block data in the container CARv1
SourceOffset uint64 // SourceOffset is the offset of block data in the source file/stream
Size uint64
}

// SkipNext jumps over the next block, returning metadata about what it is (the CID, offset, and size).
Expand All @@ -148,24 +165,33 @@
// If the underlying reader used by the BlockReader is actually a ReadSeeker, this method will attempt to
// seek over the underlying data rather than reading it into memory.
func (br *BlockReader) SkipNext() (*BlockMetadata, error) {
sctSize, err := util.LdReadSize(br.r, br.opts.ZeroLengthSectionAsEOF, br.opts.MaxAllowedSectionSize)
sectionSize, err := util.LdReadSize(br.r, br.opts.ZeroLengthSectionAsEOF, br.opts.MaxAllowedSectionSize)
if err != nil {
return nil, err
}

if sctSize == 0 {
_, _, err := cid.CidFromBytes([]byte{})
if sectionSize == 0 {
_, _, err := cid.CidFromBytes([]byte{}) // generate zero-byte CID error
if err == nil {
panic("expected zero-byte CID error")

Check warning on line 175 in v2/block_reader.go

View check run for this annotation

Codecov / codecov/patch

v2/block_reader.go#L175

Added line #L175 was not covered by tests
}
return nil, err
}

cidSize, c, err := cid.CidFromReader(io.LimitReader(br.r, int64(sctSize)))
lenSize := uint64(varint.UvarintSize(sectionSize))

cidSize, c, err := cid.CidFromReader(io.LimitReader(br.r, int64(sectionSize)))
if err != nil {
return nil, err
}

blkSize := sctSize - uint64(cidSize)
blockSize := sectionSize - uint64(cidSize)
blockOffset := br.offset + lenSize + uint64(cidSize)

// move our reader forward; either by seeking or slurping

if brs, ok := br.r.(io.ReadSeeker); ok {
// carv1 and we don't know the size, so work it out and cache it
// carv1 and we don't know the size, so work it out and cache it so we
// can use it to determine over-reads
if br.readerSize == -1 {
cur, err := brs.Seek(0, io.SeekCurrent)
if err != nil {
Expand All @@ -180,42 +206,37 @@
return nil, err
}
}
// seek.
finalOffset, err := brs.Seek(int64(blkSize), io.SeekCurrent)

// seek forward past the block data
finalOffset, err := brs.Seek(int64(blockSize), io.SeekCurrent)
if err != nil {
return nil, err
}
if finalOffset != int64(br.offset)+int64(sctSize)+int64(varint.UvarintSize(sctSize)) {
return nil, fmt.Errorf("unexpected length")
if finalOffset != int64(br.offset)+int64(lenSize)+int64(sectionSize) {
return nil, errors.New("unexpected length")

Check warning on line 216 in v2/block_reader.go

View check run for this annotation

Codecov / codecov/patch

v2/block_reader.go#L216

Added line #L216 was not covered by tests
}
if finalOffset > br.readerSize {
return nil, io.ErrUnexpectedEOF
}
br.offset = uint64(finalOffset)
return &BlockMetadata{
c,
uint64(finalOffset) - sctSize - uint64(varint.UvarintSize(sctSize)),
blkSize,
}, nil
}

// read to end.
readCnt, err := io.CopyN(io.Discard, br.r, int64(blkSize))
if err != nil {
if err == io.EOF {
return nil, io.ErrUnexpectedEOF
} else { // just a reader, we need to slurp the block bytes
readCnt, err := io.CopyN(io.Discard, br.r, int64(blockSize))
if err != nil {
if err == io.EOF {
return nil, io.ErrUnexpectedEOF
}
return nil, err

Check warning on line 227 in v2/block_reader.go

View check run for this annotation

Codecov / codecov/patch

v2/block_reader.go#L224-L227

Added lines #L224 - L227 were not covered by tests
}
if readCnt != int64(blockSize) {
return nil, errors.New("unexpected length")

Check warning on line 230 in v2/block_reader.go

View check run for this annotation

Codecov / codecov/patch

v2/block_reader.go#L230

Added line #L230 was not covered by tests
}
return nil, err
}
if readCnt != int64(blkSize) {
return nil, fmt.Errorf("unexpected length")
}
origOffset := br.offset
br.offset += uint64(varint.UvarintSize(sctSize)) + sctSize

br.offset = blockOffset + blockSize

return &BlockMetadata{
c,
origOffset,
blkSize,
Cid: c,
Offset: blockOffset - br.v1offset,
SourceOffset: blockOffset,
Size: blockSize,
}, nil
}
149 changes: 149 additions & 0 deletions v2/block_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package car_test

import (
"bytes"
"crypto/rand"
"encoding/hex"
"fmt"
"io"
"os"
"testing"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/internal/carv1"
Expand Down Expand Up @@ -229,6 +231,153 @@ func TestMaxHeaderLength(t *testing.T) {
require.EqualError(t, err, "invalid header data, length of read beyond allowable maximum")
}

func TestBlockReader(t *testing.T) {
req := require.New(t)

// prepare a CARv1 with 100 blocks
roots := []cid.Cid{cid.MustParse("bafyrgqhai26anf3i7pips7q22coa4sz2fr4gk4q4sqdtymvvjyginfzaqewveaeqdh524nsktaq43j65v22xxrybrtertmcfxufdam3da3hbk")}
blks := make([]struct {
block blocks.Block
dataOffset uint64
}, 100)
v1buf := new(bytes.Buffer)
carv1.WriteHeader(&carv1.CarHeader{Roots: roots, Version: 1}, v1buf)
vb := make([]byte, 2)
for i := 0; i < 100; i++ {
blk := randBlock(100 + i) // we should cross the varint two-byte boundary in here somewhere
vn := varint.PutUvarint(vb, uint64(len(blk.Cid().Bytes())+len(blk.RawData())))
n, err := v1buf.Write(vb[:vn])
req.NoError(err)
req.Equal(n, vn)
n, err = v1buf.Write(blk.Cid().Bytes())
req.NoError(err)
req.Equal(len(blk.Cid().Bytes()), n)
blks[i] = struct {
block blocks.Block
dataOffset uint64
}{block: blk, dataOffset: uint64(v1buf.Len())}
n, err = v1buf.Write(blk.RawData())
req.NoError(err)
req.Equal(len(blk.RawData()), n)
}

v2buf := new(bytes.Buffer)
n, err := v2buf.Write(carv2.Pragma)
req.NoError(err)
req.Equal(len(carv2.Pragma), n)
v2Header := carv2.NewHeader(uint64(v1buf.Len()))
ni, err := v2Header.WriteTo(v2buf)
req.NoError(err)
req.Equal(carv2.HeaderSize, int(ni))
n, err = v2buf.Write(v1buf.Bytes())
req.NoError(err)
req.Equal(v1buf.Len(), n)

v2padbuf := new(bytes.Buffer)
n, err = v2padbuf.Write(carv2.Pragma)
req.NoError(err)
req.Equal(len(carv2.Pragma), n)
v2Header = carv2.NewHeader(uint64(v1buf.Len()))
// pad with 100 bytes
v2Header.DataOffset += 100
ni, err = v2Header.WriteTo(v2padbuf)
req.NoError(err)
req.Equal(carv2.HeaderSize, int(ni))
v2padbuf.Write(make([]byte, 100))
n, err = v2padbuf.Write(v1buf.Bytes())
req.NoError(err)
req.Equal(v1buf.Len(), n)

for _, testCase := range []struct {
name string
reader func() io.Reader
v1offset uint64
}{
{
name: "v1",
reader: func() io.Reader { return &readerOnly{bytes.NewReader(v1buf.Bytes())} },
},
{
name: "v2",
reader: func() io.Reader { return &readerOnly{bytes.NewReader(v2buf.Bytes())} },
v1offset: uint64(carv2.PragmaSize + carv2.HeaderSize),
},
{
name: "v2 padded",
reader: func() io.Reader { return &readerOnly{bytes.NewReader(v2padbuf.Bytes())} },
v1offset: uint64(carv2.PragmaSize+carv2.HeaderSize) + 100,
},
{
name: "v1 w/ReadSeeker",
reader: func() io.Reader { return bytes.NewReader(v1buf.Bytes()) },
},
{
name: "v2 w/ReadSeeker",
reader: func() io.Reader { return bytes.NewReader(v2buf.Bytes()) },
v1offset: uint64(carv2.PragmaSize + carv2.HeaderSize),
},
{
name: "v2 padded w/ReadSeeker",
reader: func() io.Reader { return bytes.NewReader(v2padbuf.Bytes()) },
v1offset: uint64(carv2.PragmaSize+carv2.HeaderSize) + 100,
},
} {
t.Run(testCase.name, func(t *testing.T) {
req := require.New(t)

car, err := carv2.NewBlockReader(testCase.reader())
req.NoError(err)
req.ElementsMatch(roots, car.Roots)

for i := 0; i < 100; i++ {
blk, err := car.Next()
req.NoError(err)
req.Equal(blks[i].block.Cid(), blk.Cid())
req.Equal(blks[i].block.RawData(), blk.RawData())
}
_, err = car.Next()
req.ErrorIs(err, io.EOF)

car, err = carv2.NewBlockReader(testCase.reader())
req.NoError(err)
req.ElementsMatch(roots, car.Roots)

for i := 0; i < 100; i++ {
blk, err := car.SkipNext()
req.NoError(err)
req.Equal(blks[i].block.Cid(), blk.Cid)
req.Equal(uint64(len(blks[i].block.RawData())), blk.Size)
req.Equal(blks[i].dataOffset, blk.Offset, "block #%d", i)
req.Equal(blks[i].dataOffset+testCase.v1offset, blk.SourceOffset)
}
_, err = car.Next()
req.ErrorIs(err, io.EOF)
})
}
}

type readerOnly struct {
r io.Reader
}

func (r readerOnly) Read(b []byte) (int, error) {
return r.r.Read(b)
}

func randBlock(l int) blocks.Block {
data := make([]byte, l)
rand.Read(data)
h, err := mh.Sum(data, mh.SHA2_512, -1)
if err != nil {
panic(err)
}
blk, err := blocks.NewBlockWithCid(data, cid.NewCidV1(cid.Raw, h))
if err != nil {
panic(err)
}
return blk
}

func requireReaderFromPath(t *testing.T, path string) io.Reader {
f, err := os.Open(path)
require.NoError(t, err)
Expand Down