Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions go/store/nbs/journal_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,11 +375,15 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, tryTruncate boo
}

if buf, err = rdr.Peek(int(l)); err != nil {
if warningsCb != nil {
// We probably wend off the end of the file. Report error and recover.
jErr := fmt.Errorf("failed to read full journal record of length %d at offset %d: %w", l, off, err)
warningsCb(jErr)
}
// Not being able to read a full record here is considered OK. Given our write protocol,
// it is expected for only a prefix of the journal to make it to storage.
//
// We still set recovered because there's a potential failure mode involving corruption
// which we would like to detect. That comes about when the length tag itself gets
// corrupted to be larger than expected, but still not larger than |journalWriterbuffSize|,
// and that length ends up taking us to EOF. But running recovery we can detect cases where
// there are valid sync'd records after this corruption and we can still detect some cases
// where we have knowable corruption.
recovered = true
break
}
Expand Down
44 changes: 44 additions & 0 deletions go/store/nbs/journal_record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,50 @@ func TestJournalForDataLoss(t *testing.T) {
}
}

func TestJournalTruncated(t *testing.T) {
// We should be able to process a truncated journal without any errors.
origTimestampGenerator := journalRecordTimestampGenerator
t.Cleanup(func() {
journalRecordTimestampGenerator = origTimestampGenerator
})
journalRecordTimestampGenerator = testTimestampGenerator

journal := make([]byte, 1<<20) // 1 MB should be plenty for these tests.

var off uint32
for range 8 {
r, _ := makeRootHashRecord()
off += writeRootHashRecord(journal[off:], r.address)
for range 32 {
r, _ = makeChunkRecord()
off += writeChunkRecord(journal[off:], mustCompressedChunk(r))
}
}
r, _ := makeRootHashRecord()
rootstart := off
off += writeRootHashRecord(journal[off:], r.address)
for i := range 16 {
var recoverErr error
_, err := processJournalRecords(t.Context(), bytes.NewReader(journal[:int(off)-i]), true, 0, func(int64, journalRec) error { return nil }, func(e error) { recoverErr = e })
require.NoError(t, err, "err should be nil, iteration %d", i)
require.NoError(t, recoverErr, "recoverErr should be nil, iteration %d", i)
}

// Test writing a large incorrect offset for that root record and then some more records after it. We should see
// errors in that case.
writeUint32(journal[rootstart:], uint32(1<<20))
for range 8 {
r, _ := makeRootHashRecord()
off += writeRootHashRecord(journal[off:], r.address)
for range 32 {
r, _ = makeChunkRecord()
off += writeChunkRecord(journal[off:], mustCompressedChunk(r))
}
}
_, err := processJournalRecords(t.Context(), bytes.NewReader(journal[:off]), true, 0, func(int64, journalRec) error { return nil }, nil)
require.Error(t, err)
}

func TestJournalForDataLossOnBoundary(t *testing.T) {
r := rand.New(rand.NewSource(987654321))
// The data loss detection logic has some special cases around buffer boundaries to avoid reading all data into
Expand Down
Loading