Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions db/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -626,8 +626,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// to be skipped instead of propagating bad information (like overly
// large sequence numbers).
log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
&reporter, true /*checksum*/, log_number,
false /* retry_after_eof */);
&reporter, true /*checksum*/, log_number);

// Determine if we should tolerate incomplete records at the tail end of the
// Read all the records and add to a memtable
Expand Down
282 changes: 254 additions & 28 deletions db/log_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ Reader::Reporter::~Reporter() {

Reader::Reader(std::shared_ptr<Logger> info_log,
std::unique_ptr<SequentialFileReader>&& _file,
Reporter* reporter, bool checksum, uint64_t log_num,
bool retry_after_eof)
Reporter* reporter, bool checksum, uint64_t log_num)
: info_log_(info_log),
file_(std::move(_file)),
reporter_(reporter),
Expand All @@ -39,7 +38,8 @@ Reader::Reader(std::shared_ptr<Logger> info_log,
end_of_buffer_offset_(0),
log_number_(log_num),
recycled_(false),
retry_after_eof_(retry_after_eof) {}
fragments_(),
in_fragmented_record_(false) {}

Reader::~Reader() {
delete[] backing_store_;
Expand Down Expand Up @@ -199,6 +199,118 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
return false;
}

// return true if a complete record has been read successfully.
bool Reader::TryReadRecord(Slice* record, std::string* scratch) {
assert(record != nullptr);
assert(scratch != nullptr);
record->clear();
scratch->clear();

uint64_t prospective_record_offset = 0;
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
size_t drop_size = 0;
unsigned int fragment_type_or_err = 0; // Initialize to make compiler happy
Slice fragment;
while (TryReadFragment(&fragment, &drop_size, &fragment_type_or_err)) {
switch (fragment_type_or_err) {
case kFullType:
case kRecyclableFullType:
if (in_fragmented_record_ && !fragments_.empty()) {
ReportCorruption(fragments_.size(), "partial record without end(1)");
}
fragments_.clear();
*record = fragment;
prospective_record_offset = physical_record_offset;
last_record_offset_ = prospective_record_offset;
in_fragmented_record_ = false;
return true;

case kFirstType:
case kRecyclableFirstType:
if (in_fragmented_record_ && !fragments_.empty()) {
ReportCorruption(fragments_.size(), "partial record without end(2)");
}
prospective_record_offset = physical_record_offset;
fragments_.assign(fragment.data(), fragment.size());
in_fragmented_record_ = true;
break;

case kMiddleType:
case kRecyclableMiddleType:
if (!in_fragmented_record_) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(1)");
} else {
fragments_.append(fragment.data(), fragment.size());
}
break;

case kLastType:
case kRecyclableLastType:
if (!in_fragmented_record_) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(2)");
} else {
fragments_.append(fragment.data(), fragment.size());
scratch->assign(fragments_.data(), fragments_.size());
fragments_.clear();
*record = Slice(*scratch);
last_record_offset_ = prospective_record_offset;
in_fragmented_record_ = false;
return true;
}
break;

case kBadHeader:
case kEof:
case kOldRecord:
if (in_fragmented_record_) {
fragments_.clear();
}
return false;

case kBadRecord:
if (in_fragmented_record_) {
ReportCorruption(fragments_.size(), "error in middle of record");
in_fragmented_record_ = false;
fragments_.clear();
}
break;

case kBadRecordLen:
case kBadRecordChecksum:
if (recycled_) {
fragments_.clear();
return false;
}
if (fragment_type_or_err == kBadRecordLen) {
ReportCorruption(drop_size, "bad record length");
} else {
ReportCorruption(drop_size, "checksum mismatch");
}
if (in_fragmented_record_) {
ReportCorruption(fragments_.size(), "error in middle of record");
in_fragmented_record_ = false;
fragments_.clear();
}
break;

default: {
char buf[40];
snprintf(buf, sizeof(buf), "unknown record type %u",
fragment_type_or_err);
ReportCorruption(
fragment.size() + (in_fragmented_record_ ? fragments_.size() : 0),
buf);
in_fragmented_record_ = false;
fragments_.clear();
break;
}
}
}
return false;
}

uint64_t Reader::LastRecordOffset() {
return last_record_offset_;
}
Expand All @@ -207,14 +319,22 @@ void Reader::UnmarkEOF() {
if (read_error_) {
return;
}

eof_ = false;
if (eof_offset_ == 0) {
return;
}
UnmarkEOFInternal();
}

// If retry_after_eof_ is true, we have to proceed to read anyway.
if (!retry_after_eof_ && eof_offset_ == 0) {
void Reader::ForceUnmarkEOF() {
if (read_error_) {
return;
}
eof_ = false;
UnmarkEOFInternal();
}

void Reader::UnmarkEOFInternal() {
// If the EOF was in the middle of a block (a partial block was read) we have
// to read the rest of the block as ReadPhysicalRecord can only read full
// blocks and expects the file position indicator to be aligned to the start
Expand Down Expand Up @@ -292,12 +412,8 @@ bool Reader::ReadMore(size_t* drop_size, int *error) {
} else if (buffer_.size() < static_cast<size_t>(kBlockSize)) {
eof_ = true;
eof_offset_ = buffer_.size();
TEST_SYNC_POINT("LogReader::ReadMore:FirstEOF");
}
return true;
} else if (retry_after_eof_ && !read_error_) {
UnmarkEOF();
return !read_error_;
} else {
// Note that if buffer_ is non-empty, we have a truncated header at the
// end of the file, which can be caused by the writer crashing in the
Expand Down Expand Up @@ -355,24 +471,16 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
}
}
if (header_size + length > buffer_.size()) {
if (!retry_after_eof_) {
*drop_size = buffer_.size();
buffer_.clear();
if (!eof_) {
return kBadRecordLen;
}
// If the end of the file has been reached without reading |length|
// bytes of payload, assume the writer died in the middle of writing the
// record. Don't report a corruption unless requested.
if (*drop_size) {
return kBadHeader;
}
} else {
int r = kEof;
if (!ReadMore(drop_size, &r)) {
return r;
}
continue;
*drop_size = buffer_.size();
buffer_.clear();
if (!eof_) {
return kBadRecordLen;
}
// If the end of the file has been reached without reading |length|
// bytes of payload, assume the writer died in the middle of writing the
// record. Don't report a corruption unless requested.
if (*drop_size) {
return kBadHeader;
}
return kEof;
}
Expand Down Expand Up @@ -409,5 +517,123 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
}
}

bool Reader::TryReadMore(size_t* drop_size, int* error) {
if (!eof_ && !read_error_) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
end_of_buffer_offset_ += buffer_.size();
if (!status.ok()) {
buffer_.clear();
ReportDrop(kBlockSize, status);
read_error_ = true;
*error = kEof;
return false;
} else if (buffer_.size() < static_cast<size_t>(kBlockSize)) {
eof_ = true;
eof_offset_ = buffer_.size();
TEST_SYNC_POINT_CALLBACK("LogReader::TryReadMore:FirstEOF", nullptr);
}
return true;
} else if (!read_error_) {
ForceUnmarkEOF();
return !read_error_;
} else {
// Note that if buffer_ is non-empty, we have a truncated header at the
// end of the file, which can be caused by the writer crashing in the
// middle of writing the header. Unless explicitly requested we don't
// considering this an error, just report EOF.
if (buffer_.size()) {
*drop_size = buffer_.size();
buffer_.clear();
*error = kBadHeader;
return false;
}
buffer_.clear();
*error = kEof;
return false;
}
}

// return true if the caller should process the fragment_type_or_err.
bool Reader::TryReadFragment(Slice* fragment, size_t* drop_size,
unsigned int* fragment_type_or_err) {
assert(fragment != nullptr);
assert(drop_size != nullptr);
assert(fragment_type_or_err != nullptr);

while (buffer_.size() < static_cast<size_t>(kHeaderSize)) {
size_t old_size = buffer_.size();
int error = kEof;
if (!TryReadMore(drop_size, &error)) {
*fragment_type_or_err = error;
return false;
} else if (old_size == buffer_.size()) {
return false;
}
}
const char* header = buffer_.data();
const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
const unsigned int type = header[6];
const uint32_t length = a | (b << 8);
int header_size = kHeaderSize;
if (type >= kRecyclableFullType && type <= kRecyclableLastType) {
if (end_of_buffer_offset_ - buffer_.size() == 0) {
recycled_ = true;
}
header_size = kRecyclableHeaderSize;
while (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) {
size_t old_size = buffer_.size();
int error = kEof;
if (!TryReadMore(drop_size, &error)) {
*fragment_type_or_err = error;
return false;
} else if (old_size == buffer_.size()) {
return false;
}
}
const uint32_t log_num = DecodeFixed32(header + 7);
if (log_num != log_number_) {
*fragment_type_or_err = kOldRecord;
return true;
}
}

while (header_size + length > buffer_.size()) {
size_t old_size = buffer_.size();
int error = kEof;
if (!TryReadMore(drop_size, &error)) {
*fragment_type_or_err = error;
return false;
} else if (old_size == buffer_.size()) {
return false;
}
}

if (type == kZeroType && length == 0) {
buffer_.clear();
*fragment_type_or_err = kBadRecord;
return true;
}

if (checksum_) {
uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6);
if (actual_crc != expected_crc) {
*drop_size = buffer_.size();
buffer_.clear();
*fragment_type_or_err = kBadRecordChecksum;
return true;
}
}

buffer_.remove_prefix(header_size + length);

*fragment = Slice(header + header_size, length);
*fragment_type_or_err = type;
return true;
}

} // namespace log
} // namespace rocksdb
Loading