diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 2a33741568ba..55aecde372a6 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -626,8 +626,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // to be skipped instead of propagating bad information (like overly // large sequence numbers). log::Reader reader(immutable_db_options_.info_log, std::move(file_reader), - &reporter, true /*checksum*/, log_number, - false /* retry_after_eof */); + &reporter, true /*checksum*/, log_number); // Determine if we should tolerate incomplete records at the tail end of the // Read all the records and add to a memtable diff --git a/db/log_reader.cc b/db/log_reader.cc index 2c57cde5d593..237fd1929486 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -24,8 +24,7 @@ Reader::Reporter::~Reporter() { Reader::Reader(std::shared_ptr info_log, std::unique_ptr&& _file, - Reporter* reporter, bool checksum, uint64_t log_num, - bool retry_after_eof) + Reporter* reporter, bool checksum, uint64_t log_num) : info_log_(info_log), file_(std::move(_file)), reporter_(reporter), @@ -39,7 +38,8 @@ Reader::Reader(std::shared_ptr info_log, end_of_buffer_offset_(0), log_number_(log_num), recycled_(false), - retry_after_eof_(retry_after_eof) {} + fragments_(), + in_fragmented_record_(false) {} Reader::~Reader() { delete[] backing_store_; @@ -199,6 +199,118 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, return false; } +// return true if a complete record has been read successfully. +bool Reader::TryReadRecord(Slice* record, std::string* scratch) { + assert(record != nullptr); + assert(scratch != nullptr); + record->clear(); + scratch->clear(); + + uint64_t prospective_record_offset = 0; + uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); + size_t drop_size = 0; + unsigned int fragment_type_or_err = 0; // Initialize to make compiler happy + Slice fragment; + while (TryReadFragment(&fragment, &drop_size, &fragment_type_or_err)) { + switch (fragment_type_or_err) { + case kFullType: + case kRecyclableFullType: + if (in_fragmented_record_ && !fragments_.empty()) { + ReportCorruption(fragments_.size(), "partial record without end(1)"); + } + fragments_.clear(); + *record = fragment; + prospective_record_offset = physical_record_offset; + last_record_offset_ = prospective_record_offset; + in_fragmented_record_ = false; + return true; + + case kFirstType: + case kRecyclableFirstType: + if (in_fragmented_record_ && !fragments_.empty()) { + ReportCorruption(fragments_.size(), "partial record without end(2)"); + } + prospective_record_offset = physical_record_offset; + fragments_.assign(fragment.data(), fragment.size()); + in_fragmented_record_ = true; + break; + + case kMiddleType: + case kRecyclableMiddleType: + if (!in_fragmented_record_) { + ReportCorruption(fragment.size(), + "missing start of fragmented record(1)"); + } else { + fragments_.append(fragment.data(), fragment.size()); + } + break; + + case kLastType: + case kRecyclableLastType: + if (!in_fragmented_record_) { + ReportCorruption(fragment.size(), + "missing start of fragmented record(2)"); + } else { + fragments_.append(fragment.data(), fragment.size()); + scratch->assign(fragments_.data(), fragments_.size()); + fragments_.clear(); + *record = Slice(*scratch); + last_record_offset_ = prospective_record_offset; + in_fragmented_record_ = false; + return true; + } + break; + + case kBadHeader: + case kEof: + case kOldRecord: + if (in_fragmented_record_) { + fragments_.clear(); + } + return false; + + case kBadRecord: + if (in_fragmented_record_) { + ReportCorruption(fragments_.size(), "error in middle of record"); + in_fragmented_record_ = false; + fragments_.clear(); + } + break; + + case kBadRecordLen: + case kBadRecordChecksum: + if (recycled_) { + fragments_.clear(); + return false; + } + if (fragment_type_or_err == kBadRecordLen) { + ReportCorruption(drop_size, "bad record length"); + } else { + ReportCorruption(drop_size, "checksum mismatch"); + } + if (in_fragmented_record_) { + ReportCorruption(fragments_.size(), "error in middle of record"); + in_fragmented_record_ = false; + fragments_.clear(); + } + break; + + default: { + char buf[40]; + snprintf(buf, sizeof(buf), "unknown record type %u", + fragment_type_or_err); + ReportCorruption( + fragment.size() + (in_fragmented_record_ ? fragments_.size() : 0), + buf); + in_fragmented_record_ = false; + fragments_.clear(); + break; + } + } + } + return false; +} + uint64_t Reader::LastRecordOffset() { return last_record_offset_; } @@ -207,14 +319,22 @@ void Reader::UnmarkEOF() { if (read_error_) { return; } - eof_ = false; + if (eof_offset_ == 0) { + return; + } + UnmarkEOFInternal(); +} - // If retry_after_eof_ is true, we have to proceed to read anyway. - if (!retry_after_eof_ && eof_offset_ == 0) { +void Reader::ForceUnmarkEOF() { + if (read_error_) { return; } + eof_ = false; + UnmarkEOFInternal(); +} +void Reader::UnmarkEOFInternal() { // If the EOF was in the middle of a block (a partial block was read) we have // to read the rest of the block as ReadPhysicalRecord can only read full // blocks and expects the file position indicator to be aligned to the start @@ -292,12 +412,8 @@ bool Reader::ReadMore(size_t* drop_size, int *error) { } else if (buffer_.size() < static_cast(kBlockSize)) { eof_ = true; eof_offset_ = buffer_.size(); - TEST_SYNC_POINT("LogReader::ReadMore:FirstEOF"); } return true; - } else if (retry_after_eof_ && !read_error_) { - UnmarkEOF(); - return !read_error_; } else { // Note that if buffer_ is non-empty, we have a truncated header at the // end of the file, which can be caused by the writer crashing in the @@ -355,24 +471,16 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) { } } if (header_size + length > buffer_.size()) { - if (!retry_after_eof_) { - *drop_size = buffer_.size(); - buffer_.clear(); - if (!eof_) { - return kBadRecordLen; - } - // If the end of the file has been reached without reading |length| - // bytes of payload, assume the writer died in the middle of writing the - // record. Don't report a corruption unless requested. - if (*drop_size) { - return kBadHeader; - } - } else { - int r = kEof; - if (!ReadMore(drop_size, &r)) { - return r; - } - continue; + *drop_size = buffer_.size(); + buffer_.clear(); + if (!eof_) { + return kBadRecordLen; + } + // If the end of the file has been reached without reading |length| + // bytes of payload, assume the writer died in the middle of writing the + // record. Don't report a corruption unless requested. + if (*drop_size) { + return kBadHeader; } return kEof; } @@ -409,5 +517,123 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) { } } +bool Reader::TryReadMore(size_t* drop_size, int* error) { + if (!eof_ && !read_error_) { + // Last read was a full read, so this is a trailer to skip + buffer_.clear(); + Status status = file_->Read(kBlockSize, &buffer_, backing_store_); + end_of_buffer_offset_ += buffer_.size(); + if (!status.ok()) { + buffer_.clear(); + ReportDrop(kBlockSize, status); + read_error_ = true; + *error = kEof; + return false; + } else if (buffer_.size() < static_cast(kBlockSize)) { + eof_ = true; + eof_offset_ = buffer_.size(); + TEST_SYNC_POINT_CALLBACK("LogReader::TryReadMore:FirstEOF", nullptr); + } + return true; + } else if (!read_error_) { + ForceUnmarkEOF(); + return !read_error_; + } else { + // Note that if buffer_ is non-empty, we have a truncated header at the + // end of the file, which can be caused by the writer crashing in the + // middle of writing the header. Unless explicitly requested we don't + // considering this an error, just report EOF. + if (buffer_.size()) { + *drop_size = buffer_.size(); + buffer_.clear(); + *error = kBadHeader; + return false; + } + buffer_.clear(); + *error = kEof; + return false; + } +} + +// return true if the caller should process the fragment_type_or_err. +bool Reader::TryReadFragment(Slice* fragment, size_t* drop_size, + unsigned int* fragment_type_or_err) { + assert(fragment != nullptr); + assert(drop_size != nullptr); + assert(fragment_type_or_err != nullptr); + + while (buffer_.size() < static_cast(kHeaderSize)) { + size_t old_size = buffer_.size(); + int error = kEof; + if (!TryReadMore(drop_size, &error)) { + *fragment_type_or_err = error; + return false; + } else if (old_size == buffer_.size()) { + return false; + } + } + const char* header = buffer_.data(); + const uint32_t a = static_cast(header[4]) & 0xff; + const uint32_t b = static_cast(header[5]) & 0xff; + const unsigned int type = header[6]; + const uint32_t length = a | (b << 8); + int header_size = kHeaderSize; + if (type >= kRecyclableFullType && type <= kRecyclableLastType) { + if (end_of_buffer_offset_ - buffer_.size() == 0) { + recycled_ = true; + } + header_size = kRecyclableHeaderSize; + while (buffer_.size() < static_cast(kRecyclableHeaderSize)) { + size_t old_size = buffer_.size(); + int error = kEof; + if (!TryReadMore(drop_size, &error)) { + *fragment_type_or_err = error; + return false; + } else if (old_size == buffer_.size()) { + return false; + } + } + const uint32_t log_num = DecodeFixed32(header + 7); + if (log_num != log_number_) { + *fragment_type_or_err = kOldRecord; + return true; + } + } + + while (header_size + length > buffer_.size()) { + size_t old_size = buffer_.size(); + int error = kEof; + if (!TryReadMore(drop_size, &error)) { + *fragment_type_or_err = error; + return false; + } else if (old_size == buffer_.size()) { + return false; + } + } + + if (type == kZeroType && length == 0) { + buffer_.clear(); + *fragment_type_or_err = kBadRecord; + return true; + } + + if (checksum_) { + uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header)); + uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6); + if (actual_crc != expected_crc) { + *drop_size = buffer_.size(); + buffer_.clear(); + *fragment_type_or_err = kBadRecordChecksum; + return true; + } + } + + buffer_.remove_prefix(header_size + length); + + *fragment = Slice(header + header_size, length); + *fragment_type_or_err = type; + return true; +} + } // namespace log } // namespace rocksdb diff --git a/db/log_reader.h b/db/log_reader.h index 2c4f4f059901..bd5f8f54883d 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -53,7 +53,7 @@ class Reader { Reader(std::shared_ptr info_log, // @lint-ignore TXT2 T25377293 Grandfathered in std::unique_ptr&& file, Reporter* reporter, - bool checksum, uint64_t log_num, bool retry_after_eof); + bool checksum, uint64_t log_num); ~Reader(); @@ -66,6 +66,8 @@ class Reader { WALRecoveryMode wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords); + bool TryReadRecord(Slice* record, std::string* scratch); + // Returns the physical offset of the last record returned by ReadRecord. // // Undefined before the first call to ReadRecord. @@ -76,6 +78,9 @@ class Reader { return eof_; } + // returns true if the reader has encountered read error. + bool hasReadError() const { return read_error_; } + // when we know more data has been written to the file. we can use this // function to force the reader to look again in the file. // Also aligns the file position indicator to the start of the next block @@ -83,6 +88,8 @@ class Reader { // block that was partially read. void UnmarkEOF(); + void ForceUnmarkEOF(); + SequentialFileReader* file() { return file_.get(); } private: @@ -91,6 +98,8 @@ class Reader { Reporter* const reporter_; bool const checksum_; char* const backing_store_; + + // Internal state variables used for reading records Slice buffer_; bool eof_; // Last Read() indicated EOF by returning < kBlockSize bool read_error_; // Error occurred while reading from file @@ -110,10 +119,8 @@ class Reader { // Whether this is a recycled log file bool recycled_; - // Whether retry after encountering EOF - // TODO (yanqin) add support for retry policy, e.g. sleep, max retry limit, - // etc. - const bool retry_after_eof_; + std::string fragments_; + bool in_fragmented_record_; // Extend record types with the following special values enum { @@ -136,9 +143,16 @@ class Reader { // Return type, or one of the preceding special values unsigned int ReadPhysicalRecord(Slice* result, size_t* drop_size); + bool TryReadFragment(Slice* result, size_t* drop_size, + unsigned int* fragment_type_or_err); + // Read some more bool ReadMore(size_t* drop_size, int *error); + bool TryReadMore(size_t* drop_size, int* error); + + void UnmarkEOFInternal(); + // Reports dropped bytes to the reporter. // buffer_ must be updated to remove the dropped bytes prior to invocation. void ReportCorruption(size_t bytes, const char* reason); diff --git a/db/log_test.cc b/db/log_test.cc index 9e8148f65396..c2f090c77543 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -43,7 +43,10 @@ static std::string RandomSkewedString(int i, Random* rnd) { return BigString(NumberString(i), rnd->Skewed(17)); } -class LogTest : public ::testing::TestWithParam { +// Param type is tuple +// get<0>(tuple): non-zero if recycling log, zero if regular log +// get<1>(tuple): true if allow retry after read EOF, false otherwise +class LogTest : public ::testing::TestWithParam> { private: class StringSource : public SequentialFile { public: @@ -53,16 +56,20 @@ class LogTest : public ::testing::TestWithParam { bool force_eof_; size_t force_eof_position_; bool returned_partial_; - explicit StringSource(Slice& contents) : - contents_(contents), - force_error_(false), - force_error_position_(0), - force_eof_(false), - force_eof_position_(0), - returned_partial_(false) { } + bool fail_after_read_partial_; + explicit StringSource(Slice& contents, bool fail_after_read_partial) + : contents_(contents), + force_error_(false), + force_error_position_(0), + force_eof_(false), + force_eof_position_(0), + returned_partial_(false), + fail_after_read_partial_(fail_after_read_partial) {} virtual Status Read(size_t n, Slice* result, char* scratch) override { - EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error"; + if (fail_after_read_partial_) { + EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error"; + } if (force_error_) { if (force_error_position_ >= n) { @@ -151,9 +158,8 @@ class LogTest : public ::testing::TestWithParam { Writer writer_; Reader reader_; - // Record metadata for testing initial offset functionality - static size_t initial_offset_record_sizes_[]; - uint64_t initial_offset_last_record_offsets_[4]; + protected: + bool allow_retry_read_; public: LogTest() @@ -161,19 +167,12 @@ class LogTest : public ::testing::TestWithParam { dest_holder_(test::GetWritableFileWriter( new test::StringSink(&reader_contents_), "" /* don't care */)), source_holder_(test::GetSequentialFileReader( - new StringSource(reader_contents_), "" /* file name */)), - writer_(std::move(dest_holder_), 123, GetParam()), + new StringSource(reader_contents_, !std::get<1>(GetParam())), + "" /* file name */)), + writer_(std::move(dest_holder_), 123, std::get<0>(GetParam())), reader_(nullptr, std::move(source_holder_), &report_, - true /* checksum */, 123 /* log_number */, - false /* retry_after_eof */) { - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; - initial_offset_last_record_offsets_[0] = 0; - initial_offset_last_record_offsets_[1] = header_size + 10000; - initial_offset_last_record_offsets_[2] = 2 * (header_size + 10000); - initial_offset_last_record_offsets_[3] = 2 * (header_size + 10000) + - (2 * log::kBlockSize - 1000) + - 3 * header_size; - } + true /* checksum */, 123 /* log_number */), + allow_retry_read_(std::get<1>(GetParam())) {} Slice* get_reader_contents() { return &reader_contents_; } @@ -189,7 +188,13 @@ class LogTest : public ::testing::TestWithParam { WALRecoveryMode::kTolerateCorruptedTailRecords) { std::string scratch; Slice record; - if (reader_.ReadRecord(&record, &scratch, wal_recovery_mode)) { + bool ret = false; + if (allow_retry_read_) { + ret = reader_.TryReadRecord(&record, &scratch); + } else { + ret = reader_.ReadRecord(&record, &scratch, wal_recovery_mode); + } + if (ret) { return record.ToString(); } else { return "EOF"; @@ -258,23 +263,8 @@ class LogTest : public ::testing::TestWithParam { return "OK"; } } - - void WriteInitialOffsetLog() { - for (int i = 0; i < 4; i++) { - std::string record(initial_offset_record_sizes_[i], - static_cast('a' + i)); - Write(record); - } - } - }; -size_t LogTest::initial_offset_record_sizes_[] = - {10000, // Two sizable records in first block - 10000, - 2 * log::kBlockSize - 1000, // Span three blocks - 1}; - TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); } TEST_P(LogTest, ReadWrite) { @@ -312,7 +302,8 @@ TEST_P(LogTest, Fragmentation) { TEST_P(LogTest, MarginalTrailer) { // Make a trailer that is exactly the same length as an empty record. - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + int header_size = + std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize; const int n = kBlockSize - 2 * header_size; Write(BigString("foo", n)); ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes()); @@ -326,7 +317,8 @@ TEST_P(LogTest, MarginalTrailer) { TEST_P(LogTest, MarginalTrailer2) { // Make a trailer that is exactly the same length as an empty record. - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + int header_size = + std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize; const int n = kBlockSize - 2 * header_size; Write(BigString("foo", n)); ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes()); @@ -339,7 +331,8 @@ TEST_P(LogTest, MarginalTrailer2) { } TEST_P(LogTest, ShortTrailer) { - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + int header_size = + std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize; const int n = kBlockSize - 2 * header_size + 4; Write(BigString("foo", n)); ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes()); @@ -352,7 +345,8 @@ TEST_P(LogTest, ShortTrailer) { } TEST_P(LogTest, AlignedEof) { - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + int header_size = + std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize; const int n = kBlockSize - 2 * header_size + 4; Write(BigString("foo", n)); ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes()); @@ -403,6 +397,11 @@ TEST_P(LogTest, TruncatedTrailingRecordIsIgnored) { } TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) { + if (allow_retry_read_) { + // If read retry is allowed, then truncated trailing record should not + // raise an error. + return; + } Write("foo"); ShrinkSize(4); // Drop all payload as well as a header byte ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency)); @@ -412,13 +411,20 @@ TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) { } TEST_P(LogTest, BadLength) { - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + if (allow_retry_read_) { + // If read retry is allowed, then we should not raise an error when the + // record length specified in header is longer than data currently + // available. It's possible that the body of the record is not written yet. + return; + } + bool recyclable_log = (std::get<0>(GetParam()) != 0); + int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize; const int kPayloadSize = kBlockSize - header_size; Write(BigString("bar", kPayloadSize)); Write("foo"); // Least significant size byte is stored in header[4]. IncrementByte(4, 1); - if (!GetParam()) { + if (!recyclable_log) { ASSERT_EQ("foo", Read()); ASSERT_EQ(kBlockSize, DroppedBytes()); ASSERT_EQ("OK", MatchError("bad record length")); @@ -428,6 +434,12 @@ TEST_P(LogTest, BadLength) { } TEST_P(LogTest, BadLengthAtEndIsIgnored) { + if (allow_retry_read_) { + // If read retry is allowed, then we should not raise an error when the + // record length specified in header is longer than data currently + // available. It's possible that the body of the record is not written yet. + return; + } Write("foo"); ShrinkSize(1); ASSERT_EQ("EOF", Read()); @@ -436,6 +448,12 @@ TEST_P(LogTest, BadLengthAtEndIsIgnored) { } TEST_P(LogTest, BadLengthAtEndIsNotIgnored) { + if (allow_retry_read_) { + // If read retry is allowed, then we should not raise an error when the + // record length specified in header is longer than data currently + // available. It's possible that the body of the record is not written yet. + return; + } Write("foo"); ShrinkSize(1); ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency)); @@ -447,7 +465,8 @@ TEST_P(LogTest, ChecksumMismatch) { Write("foooooo"); IncrementByte(0, 14); ASSERT_EQ("EOF", Read()); - if (!GetParam()) { + bool recyclable_log = (std::get<0>(GetParam()) != 0); + if (!recyclable_log) { ASSERT_EQ(14U, DroppedBytes()); ASSERT_EQ("OK", MatchError("checksum mismatch")); } else { @@ -458,8 +477,10 @@ TEST_P(LogTest, ChecksumMismatch) { TEST_P(LogTest, UnexpectedMiddleType) { Write("foo"); - SetByte(6, static_cast(GetParam() ? kRecyclableMiddleType : kMiddleType)); - FixChecksum(0, 3, !!GetParam()); + bool recyclable_log = (std::get<0>(GetParam()) != 0); + SetByte(6, static_cast(recyclable_log ? kRecyclableMiddleType + : kMiddleType)); + FixChecksum(0, 3, !!recyclable_log); ASSERT_EQ("EOF", Read()); ASSERT_EQ(3U, DroppedBytes()); ASSERT_EQ("OK", MatchError("missing start")); @@ -467,8 +488,10 @@ TEST_P(LogTest, UnexpectedMiddleType) { TEST_P(LogTest, UnexpectedLastType) { Write("foo"); - SetByte(6, static_cast(GetParam() ? kRecyclableLastType : kLastType)); - FixChecksum(0, 3, !!GetParam()); + bool recyclable_log = (std::get<0>(GetParam()) != 0); + SetByte(6, + static_cast(recyclable_log ? kRecyclableLastType : kLastType)); + FixChecksum(0, 3, !!recyclable_log); ASSERT_EQ("EOF", Read()); ASSERT_EQ(3U, DroppedBytes()); ASSERT_EQ("OK", MatchError("missing start")); @@ -477,8 +500,10 @@ TEST_P(LogTest, UnexpectedLastType) { TEST_P(LogTest, UnexpectedFullType) { Write("foo"); Write("bar"); - SetByte(6, static_cast(GetParam() ? kRecyclableFirstType : kFirstType)); - FixChecksum(0, 3, !!GetParam()); + bool recyclable_log = (std::get<0>(GetParam()) != 0); + SetByte( + 6, static_cast(recyclable_log ? kRecyclableFirstType : kFirstType)); + FixChecksum(0, 3, !!recyclable_log); ASSERT_EQ("bar", Read()); ASSERT_EQ("EOF", Read()); ASSERT_EQ(3U, DroppedBytes()); @@ -488,8 +513,10 @@ TEST_P(LogTest, UnexpectedFullType) { TEST_P(LogTest, UnexpectedFirstType) { Write("foo"); Write(BigString("bar", 100000)); - SetByte(6, static_cast(GetParam() ? kRecyclableFirstType : kFirstType)); - FixChecksum(0, 3, !!GetParam()); + bool recyclable_log = (std::get<0>(GetParam()) != 0); + SetByte( + 6, static_cast(recyclable_log ? kRecyclableFirstType : kFirstType)); + FixChecksum(0, 3, !!recyclable_log); ASSERT_EQ(BigString("bar", 100000), Read()); ASSERT_EQ("EOF", Read()); ASSERT_EQ(3U, DroppedBytes()); @@ -506,6 +533,11 @@ TEST_P(LogTest, MissingLastIsIgnored) { } TEST_P(LogTest, MissingLastIsNotIgnored) { + if (allow_retry_read_) { + // If read retry is allowed, then truncated trailing record should not + // raise an error. + return; + } Write(BigString("bar", kBlockSize)); // Remove the LAST block, including header. ShrinkSize(14); @@ -524,6 +556,11 @@ TEST_P(LogTest, PartialLastIsIgnored) { } TEST_P(LogTest, PartialLastIsNotIgnored) { + if (allow_retry_read_) { + // If read retry is allowed, then truncated trailing record should not + // raise an error. + return; + } Write(BigString("bar", kBlockSize)); // Cause a bad record length in the LAST block. ShrinkSize(1); @@ -550,7 +587,8 @@ TEST_P(LogTest, ErrorJoinsRecords) { SetByte(offset, 'x'); } - if (!GetParam()) { + bool recyclable_log = (std::get<0>(GetParam()) != 0); + if (!recyclable_log) { ASSERT_EQ("correct", Read()); ASSERT_EQ("EOF", Read()); size_t dropped = DroppedBytes(); @@ -564,7 +602,8 @@ TEST_P(LogTest, ErrorJoinsRecords) { TEST_P(LogTest, ClearEofSingleBlock) { Write("foo"); Write("bar"); - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + bool recyclable_log = (std::get<0>(GetParam()) != 0); + int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize; ForceEOF(3 + header_size + 2); ASSERT_EQ("foo", Read()); UnmarkEOF(); @@ -579,7 +618,8 @@ TEST_P(LogTest, ClearEofSingleBlock) { TEST_P(LogTest, ClearEofMultiBlock) { size_t num_full_blocks = 5; - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + bool recyclable_log = (std::get<0>(GetParam()) != 0); + int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize; size_t n = (kBlockSize - header_size) * num_full_blocks + 25; Write(BigString("foo", n)); Write(BigString("bar", n)); @@ -628,7 +668,8 @@ TEST_P(LogTest, ClearEofError2) { } TEST_P(LogTest, Recycle) { - if (!GetParam()) { + bool recyclable_log = (std::get<0>(GetParam()) != 0); + if (!recyclable_log) { return; // test is only valid for recycled logs } Write("foo"); @@ -651,7 +692,11 @@ TEST_P(LogTest, Recycle) { ASSERT_EQ("EOF", Read()); } -INSTANTIATE_TEST_CASE_P(bool, LogTest, ::testing::Values(0, 2)); +INSTANTIATE_TEST_CASE_P(bool, LogTest, + ::testing::Values(std::make_tuple(0, false), + std::make_tuple(0, true), + std::make_tuple(1, false), + std::make_tuple(1, true))); class RetriableLogTest : public ::testing::TestWithParam { private: @@ -717,8 +762,7 @@ class RetriableLogTest : public ::testing::TestWithParam { reader_.reset(new SequentialFileReader(std::move(seq_file), log_file_)); assert(reader_ != nullptr); log_reader_.reset(new Reader(nullptr, std::move(reader_), &report_, - true /* checksum */, 123 /* log_number */, - true /* retry_after_eof */)); + true /* checksum */, 123 /* log_number */)); assert(log_reader_ != nullptr); } return s; @@ -738,14 +782,17 @@ class RetriableLogTest : public ::testing::TestWithParam { writer_->Sync(true); } - std::string Read() { - auto wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords; + bool TryRead(std::string* result) { + assert(result != nullptr); + result->clear(); std::string scratch; Slice record; - if (log_reader_->ReadRecord(&record, &scratch, wal_recovery_mode)) { - return record.ToString(); + bool r = log_reader_->TryReadRecord(&record, &scratch); + if (r) { + result->assign(record.data(), record.size()); + return true; } else { - return "Read error"; + return false; } } }; @@ -754,12 +801,16 @@ TEST_P(RetriableLogTest, TailLog_PartialHeader) { ASSERT_OK(SetupTestEnv()); std::vector remaining_bytes_in_last_record; size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + bool eof = false; SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->LoadDependency( {{"RetriableLogTest::TailLog:AfterPart1", "RetriableLogTest::TailLog:BeforeReadRecord"}, - {"LogReader::ReadMore:FirstEOF", + {"LogReader::TryReadMore:FirstEOF", "RetriableLogTest::TailLog:BeforePart2"}}); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack("LogReader::TryReadMore:FirstEOF", + [&](void* /*arg*/) { eof = true; }); SyncPoint::GetInstance()->EnableProcessing(); size_t delta = header_size - 1; @@ -779,23 +830,29 @@ TEST_P(RetriableLogTest, TailLog_PartialHeader) { std::string record; port::Thread log_reader_thread([&]() { TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord"); - record = Read(); + while (!TryRead(&record)) { + } }); log_reader_thread.join(); log_writer_thread.join(); ASSERT_EQ("foo", record); + ASSERT_TRUE(eof); } TEST_P(RetriableLogTest, TailLog_FullHeader) { ASSERT_OK(SetupTestEnv()); std::vector remaining_bytes_in_last_record; size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + bool eof = false; SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->LoadDependency( {{"RetriableLogTest::TailLog:AfterPart1", "RetriableLogTest::TailLog:BeforeReadRecord"}, - {"LogReader::ReadMore:FirstEOF", + {"LogReader::TryReadMore:FirstEOF", "RetriableLogTest::TailLog:BeforePart2"}}); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack("LogReader::TryReadMore:FirstEOF", + [&](void* /*arg*/) { eof = true; }); SyncPoint::GetInstance()->EnableProcessing(); size_t delta = header_size + 1; @@ -810,18 +867,45 @@ TEST_P(RetriableLogTest, TailLog_FullHeader) { TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1"); TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2"); Write(Slice(part2)); + ASSERT_TRUE(eof); }); std::string record; port::Thread log_reader_thread([&]() { TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord"); - record = Read(); + while (!TryRead(&record)) { + } }); log_reader_thread.join(); log_writer_thread.join(); ASSERT_EQ("foo", record); } +TEST_P(RetriableLogTest, NonBlockingReadFullRecord) { + // Clear all sync point callbacks even if this test does not use sync point. + // It is necessary, otherwise the execute of this test may hit a sync point + // with which a callback is registered. The registered callback may access + // some dead variable, causing segfault. + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + ASSERT_OK(SetupTestEnv()); + size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + size_t delta = header_size - 1; + size_t old_sz = contents().size(); + Encode("foo-bar"); + size_t new_sz = contents().size(); + std::string part1 = contents().substr(old_sz, delta); + std::string part2 = + contents().substr(old_sz + delta, new_sz - old_sz - delta); + Write(Slice(part1)); + std::string record; + ASSERT_FALSE(TryRead(&record)); + ASSERT_TRUE(record.empty()); + Write(Slice(part2)); + ASSERT_TRUE(TryRead(&record)); + ASSERT_EQ("foo-bar", record); +} + INSTANTIATE_TEST_CASE_P(bool, RetriableLogTest, ::testing::Values(0, 2)); } // namespace log diff --git a/db/repair.cc b/db/repair.cc index 4e93a161cf1d..e6f94338982a 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -364,8 +364,7 @@ class Repairer { // propagating bad information (like overly large sequence // numbers). log::Reader reader(db_options_.info_log, std::move(lfile_reader), &reporter, - true /*enable checksum*/, log, - false /* retry_after_eof */); + true /*enable checksum*/, log); // Initialize per-column family memtables for (auto* cfd : *vset_.GetColumnFamilySet()) { diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index 4d6671ef66d7..4f55a30d30af 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -315,8 +315,7 @@ Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) { assert(file); currentLogReader_.reset( new log::Reader(options_->info_log, std::move(file), &reporter_, - read_options_.verify_checksums_, logFile->LogNumber(), - false /* retry_after_eof */)); + read_options_.verify_checksums_, logFile->LogNumber())); return Status::OK(); } } // namespace rocksdb diff --git a/db/version_set.cc b/db/version_set.cc index fb7542eb454f..1433e89f89e9 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3557,8 +3557,7 @@ Status VersionSet::Recover( VersionSet::LogReporter reporter; reporter.status = &s; log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter, - true /* checksum */, 0 /* log_number */, - false /* retry_after_eof */); + true /* checksum */, 0 /* log_number */); Slice record; std::string scratch; std::vector replay_buffer; @@ -3773,8 +3772,7 @@ Status VersionSet::ListColumnFamilies(std::vector* column_families, VersionSet::LogReporter reporter; reporter.status = &s; log::Reader reader(nullptr, std::move(file_reader), &reporter, - true /* checksum */, 0 /* log_number */, - false /* retry_after_eof */); + true /* checksum */, 0 /* log_number */); Slice record; std::string scratch; while (reader.ReadRecord(&record, &scratch) && s.ok()) { @@ -3934,8 +3932,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, VersionSet::LogReporter reporter; reporter.status = &s; log::Reader reader(nullptr, std::move(file_reader), &reporter, - true /* checksum */, 0 /* log_number */, - false /* retry_after_eof */); + true /* checksum */, 0 /* log_number */); Slice record; std::string scratch; while (reader.ReadRecord(&record, &scratch) && s.ok()) { diff --git a/db/wal_manager.cc b/db/wal_manager.cc index 667ecae41add..dc513e9997ef 100644 --- a/db/wal_manager.cc +++ b/db/wal_manager.cc @@ -457,7 +457,7 @@ Status WalManager::ReadFirstLine(const std::string& fname, reporter.status = &status; reporter.ignore_error = !db_options_.paranoid_checks; log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter, - true /*checksum*/, number, false /* retry_after_eof */); + true /*checksum*/, number); std::string scratch; Slice record; diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index abd8ec1e8426..0d7238ba6cd4 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -2001,9 +2001,8 @@ void DumpWalFile(Options options, std::string wal_file, bool print_header, // bogus input, carry on as best we can log_number = 0; } - log::Reader reader(options.info_log, std::move(wal_file_reader), &reporter, - true /* checksum */, log_number, - false /* retry_after_eof */); + log::Reader reader(options.info_log, std::move(wal_file_reader), + &reporter, true /* checksum */, log_number); std::string scratch; WriteBatch batch; Slice record;