Skip to content

Commit

Permalink
Merge pull request #2680 from cloudflare/kenton/sqlite-reset
Browse files Browse the repository at this point in the history
Extend SqliteDatabase with a reset() method that recreates the database
  • Loading branch information
kentonv authored Sep 12, 2024
2 parents d1bd777 + cf8c335 commit a05f580
Show file tree
Hide file tree
Showing 13 changed files with 377 additions and 83 deletions.
17 changes: 1 addition & 16 deletions src/workerd/api/actor-state.c++
Original file line number Diff line number Diff line change
Expand Up @@ -510,24 +510,9 @@ jsg::Promise<void> DurableObjectStorage::deleteAll(
jsg::Lock& js, jsg::Optional<PutOptions> maybeOptions) {
auto options = configureOptions(kj::mv(maybeOptions).orDefault(PutOptions{}));

auto& context = IoContext::current();
{
// Log to get a sense of whether users are potentially depending on alarms being kept around
// after deleteAll is called.
auto getOptions = configureOptions(GetOptions{});
context.addTask(context
.awaitJs(js,
transformCacheResult(js, cache->getAlarm(getOptions), getOptions,
[](jsg::Lock&, kj::Maybe<kj::Date> alarmValue) {
if (alarmValue != kj::none) {
LOG_WARNING_PERIODICALLY("NOSENTRY deleteAll called with an alarm still set");
}
return alarmValue;
})).ignoreResult());
}

auto deleteAll = cache->deleteAll(options);

auto& context = IoContext::current();
context.addTask(updateStorageDeletes(context, currentActorMetrics(), kj::mv(deleteAll.count)));

return transformMaybeBackpressure(js, options, kj::mv(deleteAll.backpressure));
Expand Down
10 changes: 10 additions & 0 deletions src/workerd/api/sql-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,11 @@ export class DurableObjectExample {
} else if (req.url.endsWith('/streaming-ingestion')) {
await testStreamingIngestion(req, this.state.storage);
return Response.json({ ok: true });
} else if (req.url.endsWith('/deleteAll')) {
this.state.storage.put('counter', 888); // will be deleted
this.state.storage.deleteAll();
assert.strictEqual(await this.state.storage.get('counter'), undefined);
return Response.json({ ok: true });
}

throw new Error('unknown url: ' + req.url);
Expand Down Expand Up @@ -1251,6 +1256,11 @@ export default {

// Everything's still consistent.
assert.equal(await doReq('increment'), 3);

// Delete all: increments start over
await doReq('deleteAll');
assert.equal(await doReq('increment'), 1);
assert.equal(await doReq('increment'), 2);
},
};

Expand Down
52 changes: 52 additions & 0 deletions src/workerd/io/actor-sqlite.c++
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ void ActorSqlite::ImplicitTxn::commit() {
}
}

void ActorSqlite::ImplicitTxn::rollback() {
// Ignore redundant commit()s.
if (!committed) {
// As of this writing, rollback() is only called when the database is about to be reset.
// Preparing a statement for it would be a waste since that statement would never be executed
// more than once, since resetting requires repreparing all statements anyway. So we don't
// bother.
parent.db->run("ROLLBACK TRANSACTION");
committed = true;
}
}

ActorSqlite::ExplicitTxn::ExplicitTxn(ActorSqlite& actorSqlite): actorSqlite(actorSqlite) {
KJ_SWITCH_ONEOF(actorSqlite.currentTxn) {
KJ_CASE_ONEOF(_, NoTxn) {
Expand Down Expand Up @@ -283,6 +295,46 @@ kj::Own<ActorCacheInterface::Transaction> ActorSqlite::startTransaction() {
ActorCacheInterface::DeleteAllResults ActorSqlite::deleteAll(WriteOptions options) {
requireNotBroken();

// deleteAll() cannot be part of a transaction because it deletes the database altogether. So,
// we have to close our transactions or fail.
KJ_SWITCH_ONEOF(currentTxn) {
KJ_CASE_ONEOF(_, NoTxn) {
// good
}
KJ_CASE_ONEOF(implicit, ImplicitTxn*) {
// Whatever the implicit transaction did, it's about to be blown away anyway. Roll it back
// so we don't waste time flushing these writes anywhere.
implicit->rollback();
currentTxn = NoTxn();
}
KJ_CASE_ONEOF(exp, ExplicitTxn*) {
// Keep in mind:
//
// ctx.storage.transaction(txn => {
// txn.deleteAll(); // calls `DurableObjectTransaction::deleteAll()`
// ctx.storage.deleteAll(); // calls this method, `ActorSqlite::deleteAll()`
// });
//
// `DurableObjectTransaction::deleteAll()` throws this exception, since `deleteAll()` is not
// supported inside a transaction. Under the new SQLite-backed storage system, directly
// calling `cxt.storage` inside a transaction (as opposed to using the `txn` object) should
// still be treated as part of the transaction, and so should throw the same thing.
JSG_FAIL_REQUIRE(Error, "Cannot call deleteAll() within a transaction");
}
}

if (!deleteAllCommitScheduled) {
// We'll want to make sure the commit callback is called for the deleteAll().
commitTasks.add(outputGate.lockWhile(kj::evalLater([this]() mutable -> kj::Promise<void> {
// Don't commit if shutdown() has been called.
requireNotBroken();

deleteAllCommitScheduled = false;
return commitCallback();
})));
deleteAllCommitScheduled = true;
}

uint count = kv.deleteAll();
return {
.backpressure = kj::none,
Expand Down
6 changes: 5 additions & 1 deletion src/workerd/io/actor-sqlite.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH
Hooks& hooks = const_cast<Hooks&>(Hooks::DEFAULT));

bool isCommitScheduled() {
return !currentTxn.is<NoTxn>();
return !currentTxn.is<NoTxn>() || deleteAllCommitScheduled;
}

kj::Maybe<SqliteDatabase&> getSqliteDatabase() override {
Expand Down Expand Up @@ -101,6 +101,7 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH
KJ_DISALLOW_COPY_AND_MOVE(ImplicitTxn);

void commit();
void rollback();

private:
ActorSqlite& parent;
Expand Down Expand Up @@ -156,6 +157,9 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH
// transactions should be used in the meantime.
kj::OneOf<NoTxn, ImplicitTxn*, ExplicitTxn*> currentTxn = NoTxn();

// If true, then a commit is scheduled as a result of deleteAll() having been called.
bool deleteAllCommitScheduled = false;

kj::TaskSet commitTasks;

void onWrite();
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/server/alarm-scheduler.c++
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ std::default_random_engine makeSeededRandomEngine() {
} // namespace

AlarmScheduler::AlarmScheduler(
const kj::Clock& clock, kj::Timer& timer, const SqliteDatabase::Vfs& vfs, kj::PathPtr path)
const kj::Clock& clock, kj::Timer& timer, const SqliteDatabase::Vfs& vfs, kj::Path path)
: clock(clock),
timer(timer),
random(makeSeededRandomEngine()),
db([&] {
auto db = kj::heap<SqliteDatabase>(vfs, path,
auto db = kj::heap<SqliteDatabase>(vfs, kj::mv(path),
kj::WriteMode::CREATE | kj::WriteMode::MODIFY | kj::WriteMode::CREATE_PARENT);
ensureInitialized(*db);
return kj::mv(db);
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/server/alarm-scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class AlarmScheduler final: kj::TaskSet::ErrorHandler {
using GetActorFn = kj::Function<kj::Own<WorkerInterface>(kj::String)>;

AlarmScheduler(
const kj::Clock& clock, kj::Timer& timer, const SqliteDatabase::Vfs& vfs, kj::PathPtr path);
const kj::Clock& clock, kj::Timer& timer, const SqliteDatabase::Vfs& vfs, kj::Path path);

kj::Maybe<kj::Date> getAlarm(ActorKey actor);
bool setAlarm(ActorKey actor, kj::Date scheduledTime);
Expand Down
7 changes: 5 additions & 2 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1941,8 +1941,11 @@ public:
kj::Path({d.uniqueKey, kj::str(idPtr, ".sqlite")}),
kj::WriteMode::CREATE | kj::WriteMode::MODIFY | kj::WriteMode::CREATE_PARENT);

// Before we do anything, make sure the database is in WAL mode.
db->run("PRAGMA journal_mode=WAL;");
// Before we do anything, make sure the database is in WAL mode. We also need to
// do this after reset() is used, so register a callback for that.
auto setWalMode = [](SqliteDatabase& db) { db.run("PRAGMA journal_mode=WAL;"); };
setWalMode(*db);
db->afterReset(kj::mv(setWalMode));

return kj::heap<ActorSqlite>(kj::mv(db), outputGate,
[]() -> kj::Promise<void> { return kj::READY_NOW; }, *sqliteHooks)
Expand Down
21 changes: 20 additions & 1 deletion src/workerd/util/sqlite-kv-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,27 @@ KJ_TEST("SQLite-KV") {
kv.put("foo", "hello"_kj.asBytes());
KJ_EXPECT(list(nullptr, kj::none, kj::none, F) == "bar=def, foo=hello, qux=321");

kv.deleteAll();
// deleteAll()
KJ_EXPECT(kv.deleteAll() == 3);
KJ_EXPECT(list(nullptr, kj::none, kj::none, F) == "");

KJ_EXPECT(!kv.get("bar", [&](kj::ArrayPtr<const byte> value) {
KJ_FAIL_EXPECT("should not call callback when no match", value.asChars());
}));

kv.put("bar", "ghi"_kj.asBytes());
kv.put("corge", "garply"_kj.asBytes());

KJ_EXPECT(list(nullptr, kj::none, kj::none, F) == "bar=ghi, corge=garply");

{
bool called = false;
KJ_EXPECT(kv.get("bar", [&](kj::ArrayPtr<const byte> value) {
KJ_EXPECT(kj::str(value.asChars()) == "ghi");
called = true;
}));
KJ_EXPECT(called);
}
}

} // namespace
Expand Down
40 changes: 26 additions & 14 deletions src/workerd/util/sqlite-kv.c++
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,33 @@

namespace workerd {

SqliteKv::SqliteKv(SqliteDatabase& db) {
SqliteKv::SqliteKv(SqliteDatabase& db): ResetListener(db) {
if (db.run("SELECT name FROM sqlite_master WHERE type='table' AND name='_cf_KV'").isDone()) {
// The _cf_KV table doesn't exist. Defer initialization.
state = Uninitialized{db};
state.init<Uninitialized>(Uninitialized{});
} else {
// The KV table was initialized in the past. We can go ahead and prepare our statements.
// (We don't call ensureInitialized() here because the `CREATE TABLE IF NOT EXISTS` query it
// executes would be redundant.)
state = Initialized(db);
tableCreated = true;
state.init<Initialized>(db);
}
}

SqliteKv::Initialized& SqliteKv::ensureInitialized() {
KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(uninitialized, Uninitialized) {
auto& db = uninitialized.db;
if (!tableCreated) {
db.run(R"(
CREATE TABLE IF NOT EXISTS _cf_KV (
key TEXT PRIMARY KEY,
value BLOB
) WITHOUT ROWID;
)");

db.run(R"(
CREATE TABLE IF NOT EXISTS _cf_KV (
key TEXT PRIMARY KEY,
value BLOB
) WITHOUT ROWID;
)");
tableCreated = true;
}

KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(uninitialized, Uninitialized) {
return state.init<Initialized>(db);
}
KJ_CASE_ONEOF(initialized, Initialized) {
Expand All @@ -49,8 +52,17 @@ bool SqliteKv::delete_(KeyPtr key) {
}

uint SqliteKv::deleteAll() {
auto query = ensureInitialized().stmtDeleteAll.run();
return query.changeCount();
// TODO(perf): Consider introducing a compatibility flag that causes deleteAll() to always return
// 1. Apps almost certainly don't care about the return value but historically we returned the
// count of keys deleted, so now we're stuck counting the table size for no good reason.
uint count = tableCreated ? ensureInitialized().stmtCountKeys.run().getInt(0) : 0;
db.reset();
return count;
}

void SqliteKv::beforeSqliteReset() {
// We'll need to recreate the table on the next operation.
tableCreated = false;
}

} // namespace workerd
20 changes: 13 additions & 7 deletions src/workerd/util/sqlite-kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace workerd {
// perform direct SQL queries, we can block it from accessing any table prefixed with `_cf_`.
// (Ideally this class would allow configuring the table name, but this would require a somewhat
// obnoxious amount of string allocation.)
class SqliteKv {
class SqliteKv: private SqliteDatabase::ResetListener {
public:
explicit SqliteKv(SqliteDatabase& db);

Expand Down Expand Up @@ -51,11 +51,11 @@ class SqliteKv {
// byte blobs or strings containing NUL bytes.

private:
struct Uninitialized {
SqliteDatabase& db;
};
struct Uninitialized {};

struct Initialized {
// This reference is redundant but storing it here makes the prepared statement code below
// easier to manage.
SqliteDatabase& db;

SqliteDatabase::Statement stmtGet = db.prepare(R"(
Expand Down Expand Up @@ -112,20 +112,24 @@ class SqliteKv {
ORDER BY key DESC
LIMIT ?
)");
SqliteDatabase::Statement stmtDeleteAll = db.prepare(R"(
DELETE FROM _cf_KV
SqliteDatabase::Statement stmtCountKeys = db.prepare(R"(
SELECT count(*) FROM _cf_KV
)");

Initialized(SqliteDatabase& db): db(db) {}
};

kj::OneOf<Uninitialized, Initialized> state;

// Has the _cf_KV table been created? This is separate from Uninitialized/Initialized since it
// has to be repeated after a reset, whereas the statements do not need to be recreated.
bool tableCreated = false;

Initialized& ensureInitialized();
// Make sure the KV table is created and prepared statements are ready. Not called until the
// first write.

SqliteKv(SqliteDatabase& db, bool);
void beforeSqliteReset() override;
};

// =======================================================================================
Expand All @@ -137,6 +141,7 @@ class SqliteKv {

template <typename Func>
bool SqliteKv::get(KeyPtr key, Func&& callback) {
if (!tableCreated) return 0;
auto& stmts = KJ_UNWRAP_OR(state.tryGet<Initialized>(), return false);

auto query = stmts.stmtGet.run(key);
Expand All @@ -152,6 +157,7 @@ bool SqliteKv::get(KeyPtr key, Func&& callback) {
template <typename Func>
uint SqliteKv::list(
KeyPtr begin, kj::Maybe<KeyPtr> end, kj::Maybe<uint> limit, Order order, Func&& callback) {
if (!tableCreated) return 0;
auto& stmts = KJ_UNWRAP_OR(state.tryGet<Initialized>(), return 0);

auto iterate = [&](SqliteDatabase::Query&& query) {
Expand Down
42 changes: 42 additions & 0 deletions src/workerd/util/sqlite-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -740,5 +740,47 @@ KJ_TEST("DELETE with LIMIT") {
KJ_EXPECT(q.getInt(0) == 3);
}

KJ_TEST("reset database") {
auto dir = kj::newInMemoryDirectory(kj::nullClock());
SqliteDatabase::Vfs vfs(*dir);
SqliteDatabase db(vfs, kj::Path({"foo"}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY);

db.run("PRAGMA journal_mode=WAL;");

db.run("CREATE TABLE things (id INTEGER PRIMARY KEY)");

db.run("INSERT INTO things VALUES (123)");
db.run("INSERT INTO things VALUES (321)");

auto stmt = db.prepare("SELECT * FROM things");

auto query = stmt.run();
KJ_ASSERT(!query.isDone());
KJ_EXPECT(query.getInt(0) == 123);

db.reset();
db.run("PRAGMA journal_mode=WAL;");

// The query was canceled.
KJ_EXPECT_THROW_MESSAGE("query canceled because reset()", query.nextRow());
KJ_EXPECT_THROW_MESSAGE("query canceled because reset()", query.getInt(0));

// The statement doesn't work because the table is gone.
KJ_EXPECT_THROW_MESSAGE("no such table: things: SQLITE_ERROR", stmt.run());

// But we can recreate it.
db.run("CREATE TABLE things (id INTEGER PRIMARY KEY)");
db.run("INSERT INTO things VALUES (456)");

// Now the statement works.
{
auto q2 = stmt.run();
KJ_ASSERT(!q2.isDone());
KJ_EXPECT(q2.getInt(0) == 456);
q2.nextRow();
KJ_EXPECT(q2.isDone());
}
}

} // namespace
} // namespace workerd
Loading

0 comments on commit a05f580

Please sign in to comment.