Skip to content

Commit

Permalink
Merge pull request #8558 from jzhou77/release-6.3
Browse files Browse the repository at this point in the history
Fix potential data corruptions [release 6.3]
  • Loading branch information
jzhou77 authored Oct 25, 2022
2 parents be8b344 + 912f46d commit 1d59860
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 34 deletions.
2 changes: 1 addition & 1 deletion fdbclient/BackupContainer.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1856,7 +1856,7 @@ class BackupContainerLocalDirectory : public BackupContainerFileSystem,
m_buffer = Standalone<VectorRef<uint8_t>>(old.slice(size, old.size()));

// Write the old buffer to the underlying file and update the write offset
Future<Void> r = holdWhile(old, m_file->write(old.begin(), size, m_writeOffset));
Future<Void> r = uncancellable(holdWhile(old, m_file->write(old.begin(), size, m_writeOffset)));
m_writeOffset += size;

return r;
Expand Down
4 changes: 2 additions & 2 deletions fdbclient/FileBackupAgent.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeRangeFileBlock(Reference<
int64_t offset,
int len) {
state Standalone<StringRef> buf = makeString(len);
int rLen = wait(file->read(mutateString(buf), len, offset));
int rLen = wait(uncancellable(holdWhile(buf, file->read(mutateString(buf), len, offset))));
if (rLen != len)
throw restore_bad_read();

Expand Down Expand Up @@ -682,7 +682,7 @@ ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeLogFileBlock(Reference<IA
int64_t offset,
int len) {
state Standalone<StringRef> buf = makeString(len);
int rLen = wait(file->read(mutateString(buf), len, offset));
int rLen = wait(uncancellable(holdWhile(buf, file->read(mutateString(buf), len, offset))));
if (rLen != len)
throw restore_bad_read();

Expand Down
24 changes: 16 additions & 8 deletions fdbrpc/AsyncFileNonDurable.actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,30 @@ ACTOR Future<Void> sendErrorOnProcess(ISimulator::ProcessInfo* process,
TaskPriority taskID);

ACTOR template <class T>
Future<T> sendErrorOnShutdown(Future<T> in) {
choose {
when(wait(success(g_simulator.getCurrentProcess()->shutdownSignal.getFuture()))) {
throw io_error().asInjectedFault();
Future<T> sendErrorOnShutdown(Future<T> in, bool assertOnCancel = false) {
try {
choose {
when(wait(success(g_simulator.getCurrentProcess()->shutdownSignal.getFuture()))) {
throw io_error().asInjectedFault();
}
when(T rep = wait(in)) { return rep; }
}
when(T rep = wait(in)) { return rep; }
} catch (Error& e) {
// ASSERT(e.code() != error_code_actor_cancelled || !assertOnCancel);
throw;
}
}

class AsyncFileDetachable sealed : public IAsyncFile, public ReferenceCounted<AsyncFileDetachable> {
private:
Reference<IAsyncFile> file;
Future<Void> shutdown;
bool assertOnReadWriteCancel;

public:
explicit AsyncFileDetachable(Reference<IAsyncFile> file) : file(file) { shutdown = doShutdown(this); }
explicit AsyncFileDetachable(Reference<IAsyncFile> file) : file(file), assertOnReadWriteCancel(true) {
shutdown = doShutdown(this);
}

ACTOR Future<Void> doShutdown(AsyncFileDetachable* self) {
wait(success(g_simulator.getCurrentProcess()->shutdownSignal.getFuture()));
Expand All @@ -86,13 +94,13 @@ class AsyncFileDetachable sealed : public IAsyncFile, public ReferenceCounted<As
Future<int> read(void* data, int length, int64_t offset) {
if (!file.getPtr() || g_simulator.getCurrentProcess()->shutdownSignal.getFuture().isReady())
return io_error().asInjectedFault();
return sendErrorOnShutdown(file->read(data, length, offset));
return sendErrorOnShutdown(file->read(data, length, offset), assertOnReadWriteCancel);
}

Future<Void> write(void const* data, int length, int64_t offset) {
if (!file.getPtr() || g_simulator.getCurrentProcess()->shutdownSignal.getFuture().isReady())
return io_error().asInjectedFault();
return sendErrorOnShutdown(file->write(data, length, offset));
return sendErrorOnShutdown(file->write(data, length, offset), assertOnReadWriteCancel);
}

Future<Void> truncate(int64_t size) {
Expand Down
2 changes: 1 addition & 1 deletion fdbrpc/AsyncFileReadAhead.actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class AsyncFileReadAheadCache : public IAsyncFile, public ReferenceCounted<Async

state Reference<CacheBlock> block(new CacheBlock(length));
try {
int len = wait(f->m_f->read(block->data, length, offset));
int len = wait(uncancellable(holdWhile(block, f->m_f->read(block->data, length, offset))));
block->len = len;
} catch (Error& e) {
f->m_max_concurrent_reads.release(1);
Expand Down
25 changes: 15 additions & 10 deletions fdbrpc/AsyncFileWriteChecker.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,18 @@ class AsyncFileWriteChecker : public IAsyncFile, public ReferenceCounted<AsyncFi

// For read() and write(), the data buffer must remain valid until the future is ready
Future<int> read(void* data, int length, int64_t offset) {
return map(m_f->read(data, length, offset), [=](int r) {
updateChecksumHistory(false, offset, r, (uint8_t*)data);
// Lambda must hold a reference to this to keep it alive until after the read
auto self = Reference<AsyncFileWriteChecker>::addRef(this);
return map(m_f->read(data, length, offset), [self, data, offset](int r) {
self->updateChecksumHistory(false, offset, r, (uint8_t*)data);
return r;
});
}
Future<Void> readZeroCopy(void** data, int* length, int64_t offset) {
return map(m_f->readZeroCopy(data, length, offset), [=](Void r) {
updateChecksumHistory(false, offset, *length, (uint8_t*)data);
// Lambda must hold a reference to this to keep it alive until after the read
auto self = Reference<AsyncFileWriteChecker>::addRef(this);
return map(m_f->readZeroCopy(data, length, offset), [self, data, length, offset](Void r) {
self->updateChecksumHistory(false, offset, *length, (uint8_t*)data);
return r;
});
}
Expand All @@ -50,12 +54,13 @@ class AsyncFileWriteChecker : public IAsyncFile, public ReferenceCounted<AsyncFi
}

Future<Void> truncate(int64_t size) {
return map(m_f->truncate(size), [=](Void r) {
// Truncate the page checksum history if it is in use
if ((size / checksumHistoryPageSize) < checksumHistory.size()) {
int oldCapacity = checksumHistory.capacity();
checksumHistory.resize(size / checksumHistoryPageSize);
checksumHistoryBudget.get() -= (checksumHistory.capacity() - oldCapacity);
// Lambda must hold a reference to this to keep it alive until after the read
auto self = Reference<AsyncFileWriteChecker>::addRef(this);
return map(m_f->truncate(size), [self, size](Void r) {
if ((size / checksumHistoryPageSize) < self->checksumHistory.size()) {
int oldCapacity = self->checksumHistory.capacity();
self->checksumHistory.resize(size / checksumHistoryPageSize);
checksumHistoryBudget.get() -= (self->checksumHistory.capacity() - oldCapacity);
}
return r;
});
Expand Down
9 changes: 6 additions & 3 deletions fdbserver/DiskQueue.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ class RawDiskQueue_TwoFiles : public Tracked<RawDiskQueue_TwoFiles> {
waitfor.push_back(self->files[1].f->write(pageData.begin(), pageData.size(), self->writingPos));
self->writingPos += pageData.size();

return waitForAll(waitfor);
return waitForAllReadyThenThrow(waitfor);
}

// Write the given data (pageData) to the queue files of self, sync data to disk, and delete the memory (pageMem)
Expand Down Expand Up @@ -645,7 +645,7 @@ class RawDiskQueue_TwoFiles : public Tracked<RawDiskQueue_TwoFiles> {
for (int i = 0; i < 2; i++)
if (self->files[i].size > 0)
reads.push_back(self->files[i].f->read(self->firstPages[i], sizeof(Page), 0));
wait(waitForAll(reads));
wait(waitForAllReadyThenThrow(reads));

// Determine which file comes first
if (compare(self->firstPages[1], self->firstPages[0])) {
Expand Down Expand Up @@ -733,7 +733,10 @@ class RawDiskQueue_TwoFiles : public Tracked<RawDiskQueue_TwoFiles> {
}

// Read nPages from pageOffset*sizeof(Page) offset in file self->files[file]
ACTOR static Future<Standalone<StringRef>> read(RawDiskQueue_TwoFiles* self, int file, int pageOffset, int nPages) {
ACTOR static UNCANCELLABLE Future<Standalone<StringRef>> read(RawDiskQueue_TwoFiles* self,
int file,
int pageOffset,
int nPages) {
state TrackMe trackMe(self);
state const size_t bytesRequested = nPages * sizeof(Page);
state Standalone<StringRef> result = makeAlignedString(sizeof(Page), bytesRequested);
Expand Down
50 changes: 41 additions & 9 deletions flow/genericactors.actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ Future<Void> waitForAllReady(std::vector<Future<T>> results) {
if (i == results.size())
return Void();
try {
wait(success(results[i]));
T t = wait(results[i]);
(void)t;
} catch (...) {
}
i++;
Expand Down Expand Up @@ -392,6 +393,20 @@ Future<Void> map(FutureStream<T> input, F func, PromiseStream<std::invoke_result
return Void();
}

// X + Y will wait for X, then wait for and return the result of Y
ACTOR template <class A, class B>
Future<B> operatorPlus(Future<A> a, Future<B> b) {
A resultA = wait(a);
(void)resultA;
B resultB = wait(b);
return resultB;
}

template <class A, class B>
Future<B> operator+(Future<A> a, Future<B> b) {
return operatorPlus(a, b);
}

// Returns if the future returns true, otherwise waits forever.
ACTOR Future<Void> returnIfTrue(Future<bool> f);

Expand Down Expand Up @@ -972,21 +987,22 @@ class QuorumCallback : public Callback<T> {

private:
template <class U>
friend Future<Void> quorum(std::vector<Future<U>> const& results, int n);
friend Future<Void> quorum(const Future<U>* pItems, int itemCount, int n);
Quorum<T>* head;
QuorumCallback() = default;
QuorumCallback(Future<T> future, Quorum<T>* head) : head(head) { future.addCallbackAndClear(this); }
};

template <class T>
Future<Void> quorum(std::vector<Future<T>> const& results, int n) {
ASSERT(n >= 0 && n <= results.size());
Future<Void> quorum(const Future<T>* pItems, int itemCount, int n) {
ASSERT(n >= 0 && n <= itemCount);

int size = Quorum<T>::sizeFor(results.size());
Quorum<T>* q = new (allocateFast(size)) Quorum<T>(n, results.size());
int size = Quorum<T>::sizeFor(itemCount);
Quorum<T>* q = new (allocateFast(size)) Quorum<T>(n, itemCount);

QuorumCallback<T>* nextCallback = q->callbacks();
for (auto& r : results) {
for (int i = 0; i < itemCount; ++i) {
auto& r = pItems[i];
if (r.isReady()) {
new (nextCallback) QuorumCallback<T>();
nextCallback->next = 0;
Expand All @@ -1001,6 +1017,11 @@ Future<Void> quorum(std::vector<Future<T>> const& results, int n) {
return Future<Void>(q);
}

template <class T>
Future<Void> quorum(std::vector<Future<T>> const& results, int n) {
return quorum(&results.front(), results.size(), n);
}

ACTOR template <class T>
Future<Void> smartQuorum(std::vector<Future<T>> results,
int required,
Expand All @@ -1022,6 +1043,15 @@ Future<Void> waitForAll(std::vector<Future<T>> const& results) {
return quorum(results, (int)results.size());
}

// Wait for all futures in results to be ready and then throw the first (in execution order) error
// if any of them resulted in an error.
template <class T>
Future<Void> waitForAllReadyThenThrow(std::vector<Future<T>> const& results) {
Future<Void> f = waitForAll(results);
Future<Void> fReady = waitForAllReady(results);
return fReady + f;
}

template <class T>
Future<Void> waitForAny(std::vector<Future<T>> const& results) {
if (results.empty())
Expand Down Expand Up @@ -1081,7 +1111,8 @@ Future<Void> success(Future<T> of) {
ACTOR template <class T>
Future<Void> ready(Future<T> f) {
try {
wait(success(f));
T t = wait(f);
(void)t;
} catch (...) {
}
return Void();
Expand Down Expand Up @@ -1200,7 +1231,8 @@ inline Future<Void> operator&&(Future<Void> const& lhs, Future<Void> const& rhs)
return lhs;
}

return waitForAll(std::vector<Future<Void>>{ lhs, rhs });
Future<Void> x[] = { lhs, rhs };
return quorum(x, 2, 2);
}

// error || unset -> error
Expand Down

0 comments on commit 1d59860

Please sign in to comment.