Skip to content

Commit

Permalink
sqlite: ensure statement finalization on db close
Browse files Browse the repository at this point in the history
This commit adds statement tracking to the DatabaseSync class.
When a database is closed manually or via garbage collection, it
will force all associated prepared statements to be finalized.
This should mitigate "zombie" connections which can introduce
test flakiness in the CI on Windows.

PR-URL: #54014
Fixes: #54006
Reviewed-By: Moshe Atlow <[email protected]>
Reviewed-By: Tobias Nießen <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Yagiz Nizipli <[email protected]>
Reviewed-By: Richard Lau <[email protected]>
Reviewed-By: Michaël Zasso <[email protected]>
  • Loading branch information
cjihrig authored and targos committed Aug 14, 2024
1 parent af5fb71 commit c8fddbd
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 26 deletions.
102 changes: 79 additions & 23 deletions src/node_sqlite.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,19 @@ DatabaseSync::DatabaseSync(Environment* env,
}

DatabaseSync::~DatabaseSync() {
sqlite3_close_v2(connection_);
connection_ = nullptr;
if (IsOpen()) {
FinalizeStatements();
sqlite3_close_v2(connection_);
connection_ = nullptr;
}
}

void DatabaseSync::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackField("location", location_);
}

bool DatabaseSync::Open() {
if (connection_ != nullptr) {
if (IsOpen()) {
node::THROW_ERR_INVALID_STATE(env(), "database is already open");
return false;
}
Expand All @@ -112,6 +115,29 @@ bool DatabaseSync::Open() {
return true;
}

void DatabaseSync::FinalizeStatements() {
for (auto stmt : statements_) {
stmt->Finalize();
}

statements_.clear();
}

void DatabaseSync::UntrackStatement(StatementSync* statement) {
auto it = statements_.find(statement);
if (it != statements_.end()) {
statements_.erase(it);
}
}

inline bool DatabaseSync::IsOpen() {
return connection_ != nullptr;
}

inline sqlite3* DatabaseSync::Connection() {
return connection_;
}

void DatabaseSync::New(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);

Expand Down Expand Up @@ -164,8 +190,8 @@ void DatabaseSync::Close(const FunctionCallbackInfo<Value>& args) {
DatabaseSync* db;
ASSIGN_OR_RETURN_UNWRAP(&db, args.This());
Environment* env = Environment::GetCurrent(args);
THROW_AND_RETURN_ON_BAD_STATE(
env, db->connection_ == nullptr, "database is not open");
THROW_AND_RETURN_ON_BAD_STATE(env, !db->IsOpen(), "database is not open");
db->FinalizeStatements();
int r = sqlite3_close_v2(db->connection_);
CHECK_ERROR_OR_THROW(env->isolate(), db->connection_, r, SQLITE_OK, void());
db->connection_ = nullptr;
Expand All @@ -175,8 +201,7 @@ void DatabaseSync::Prepare(const FunctionCallbackInfo<Value>& args) {
DatabaseSync* db;
ASSIGN_OR_RETURN_UNWRAP(&db, args.This());
Environment* env = Environment::GetCurrent(args);
THROW_AND_RETURN_ON_BAD_STATE(
env, db->connection_ == nullptr, "database is not open");
THROW_AND_RETURN_ON_BAD_STATE(env, !db->IsOpen(), "database is not open");

if (!args[0]->IsString()) {
node::THROW_ERR_INVALID_ARG_TYPE(env->isolate(),
Expand All @@ -188,17 +213,16 @@ void DatabaseSync::Prepare(const FunctionCallbackInfo<Value>& args) {
sqlite3_stmt* s = nullptr;
int r = sqlite3_prepare_v2(db->connection_, *sql, -1, &s, 0);
CHECK_ERROR_OR_THROW(env->isolate(), db->connection_, r, SQLITE_OK, void());
BaseObjectPtr<StatementSync> stmt =
StatementSync::Create(env, db->connection_, s);
BaseObjectPtr<StatementSync> stmt = StatementSync::Create(env, db, s);
db->statements_.insert(stmt.get());
args.GetReturnValue().Set(stmt->object());
}

void DatabaseSync::Exec(const FunctionCallbackInfo<Value>& args) {
DatabaseSync* db;
ASSIGN_OR_RETURN_UNWRAP(&db, args.This());
Environment* env = Environment::GetCurrent(args);
THROW_AND_RETURN_ON_BAD_STATE(
env, db->connection_ == nullptr, "database is not open");
THROW_AND_RETURN_ON_BAD_STATE(env, !db->IsOpen(), "database is not open");

if (!args[0]->IsString()) {
node::THROW_ERR_INVALID_ARG_TYPE(env->isolate(),
Expand All @@ -213,7 +237,7 @@ void DatabaseSync::Exec(const FunctionCallbackInfo<Value>& args) {

StatementSync::StatementSync(Environment* env,
Local<Object> object,
sqlite3* db,
DatabaseSync* db,
sqlite3_stmt* stmt)
: BaseObject(env, object) {
MakeWeak();
Expand All @@ -227,13 +251,25 @@ StatementSync::StatementSync(Environment* env,
}

StatementSync::~StatementSync() {
if (!IsFinalized()) {
db_->UntrackStatement(this);
Finalize();
}
}

void StatementSync::Finalize() {
sqlite3_finalize(statement_);
statement_ = nullptr;
}

inline bool StatementSync::IsFinalized() {
return statement_ == nullptr;
}

bool StatementSync::BindParams(const FunctionCallbackInfo<Value>& args) {
int r = sqlite3_clear_bindings(statement_);
CHECK_ERROR_OR_THROW(env()->isolate(), db_, r, SQLITE_OK, false);
CHECK_ERROR_OR_THROW(
env()->isolate(), db_->Connection(), r, SQLITE_OK, false);

int anon_idx = 1;
int anon_start = 0;
Expand Down Expand Up @@ -364,7 +400,8 @@ bool StatementSync::BindValue(const Local<Value>& value, const int index) {
return false;
}

CHECK_ERROR_OR_THROW(env()->isolate(), db_, r, SQLITE_OK, false);
CHECK_ERROR_OR_THROW(
env()->isolate(), db_->Connection(), r, SQLITE_OK, false);
return true;
}

Expand Down Expand Up @@ -435,8 +472,11 @@ void StatementSync::All(const FunctionCallbackInfo<Value>& args) {
StatementSync* stmt;
ASSIGN_OR_RETURN_UNWRAP(&stmt, args.This());
Environment* env = Environment::GetCurrent(args);
THROW_AND_RETURN_ON_BAD_STATE(
env, stmt->IsFinalized(), "statement has been finalized");
int r = sqlite3_reset(stmt->statement_);
CHECK_ERROR_OR_THROW(env->isolate(), stmt->db_, r, SQLITE_OK, void());
CHECK_ERROR_OR_THROW(
env->isolate(), stmt->db_->Connection(), r, SQLITE_OK, void());

if (!stmt->BindParams(args)) {
return;
Expand All @@ -462,7 +502,8 @@ void StatementSync::All(const FunctionCallbackInfo<Value>& args) {
rows.emplace_back(row);
}

CHECK_ERROR_OR_THROW(env->isolate(), stmt->db_, r, SQLITE_DONE, void());
CHECK_ERROR_OR_THROW(
env->isolate(), stmt->db_->Connection(), r, SQLITE_DONE, void());
args.GetReturnValue().Set(
Array::New(env->isolate(), rows.data(), rows.size()));
}
Expand All @@ -471,8 +512,11 @@ void StatementSync::Get(const FunctionCallbackInfo<Value>& args) {
StatementSync* stmt;
ASSIGN_OR_RETURN_UNWRAP(&stmt, args.This());
Environment* env = Environment::GetCurrent(args);
THROW_AND_RETURN_ON_BAD_STATE(
env, stmt->IsFinalized(), "statement has been finalized");
int r = sqlite3_reset(stmt->statement_);
CHECK_ERROR_OR_THROW(env->isolate(), stmt->db_, r, SQLITE_OK, void());
CHECK_ERROR_OR_THROW(
env->isolate(), stmt->db_->Connection(), r, SQLITE_OK, void());

if (!stmt->BindParams(args)) {
return;
Expand All @@ -482,7 +526,7 @@ void StatementSync::Get(const FunctionCallbackInfo<Value>& args) {
r = sqlite3_step(stmt->statement_);
if (r == SQLITE_DONE) return;
if (r != SQLITE_ROW) {
THROW_ERR_SQLITE_ERROR(env->isolate(), stmt->db_);
THROW_ERR_SQLITE_ERROR(env->isolate(), stmt->db_->Connection());
return;
}

Expand Down Expand Up @@ -511,8 +555,11 @@ void StatementSync::Run(const FunctionCallbackInfo<Value>& args) {
StatementSync* stmt;
ASSIGN_OR_RETURN_UNWRAP(&stmt, args.This());
Environment* env = Environment::GetCurrent(args);
THROW_AND_RETURN_ON_BAD_STATE(
env, stmt->IsFinalized(), "statement has been finalized");
int r = sqlite3_reset(stmt->statement_);
CHECK_ERROR_OR_THROW(env->isolate(), stmt->db_, r, SQLITE_OK, void());
CHECK_ERROR_OR_THROW(
env->isolate(), stmt->db_->Connection(), r, SQLITE_OK, void());

if (!stmt->BindParams(args)) {
return;
Expand All @@ -521,7 +568,7 @@ void StatementSync::Run(const FunctionCallbackInfo<Value>& args) {
auto reset = OnScopeLeave([&]() { sqlite3_reset(stmt->statement_); });
r = sqlite3_step(stmt->statement_);
if (r != SQLITE_ROW && r != SQLITE_DONE) {
THROW_ERR_SQLITE_ERROR(env->isolate(), stmt->db_);
THROW_ERR_SQLITE_ERROR(env->isolate(), stmt->db_->Connection());
return;
}

Expand All @@ -530,8 +577,9 @@ void StatementSync::Run(const FunctionCallbackInfo<Value>& args) {
FIXED_ONE_BYTE_STRING(env->isolate(), "lastInsertRowid");
Local<String> changes_string =
FIXED_ONE_BYTE_STRING(env->isolate(), "changes");
sqlite3_int64 last_insert_rowid = sqlite3_last_insert_rowid(stmt->db_);
sqlite3_int64 changes = sqlite3_changes64(stmt->db_);
sqlite3_int64 last_insert_rowid =
sqlite3_last_insert_rowid(stmt->db_->Connection());
sqlite3_int64 changes = sqlite3_changes64(stmt->db_->Connection());
Local<Value> last_insert_rowid_val;
Local<Value> changes_val;

Expand All @@ -557,6 +605,8 @@ void StatementSync::SourceSQL(const FunctionCallbackInfo<Value>& args) {
StatementSync* stmt;
ASSIGN_OR_RETURN_UNWRAP(&stmt, args.This());
Environment* env = Environment::GetCurrent(args);
THROW_AND_RETURN_ON_BAD_STATE(
env, stmt->IsFinalized(), "statement has been finalized");
Local<String> sql;
if (!String::NewFromUtf8(env->isolate(), sqlite3_sql(stmt->statement_))
.ToLocal(&sql)) {
Expand All @@ -569,6 +619,8 @@ void StatementSync::ExpandedSQL(const FunctionCallbackInfo<Value>& args) {
StatementSync* stmt;
ASSIGN_OR_RETURN_UNWRAP(&stmt, args.This());
Environment* env = Environment::GetCurrent(args);
THROW_AND_RETURN_ON_BAD_STATE(
env, stmt->IsFinalized(), "statement has been finalized");
char* expanded = sqlite3_expanded_sql(stmt->statement_);
auto maybe_expanded = String::NewFromUtf8(env->isolate(), expanded);
sqlite3_free(expanded);
Expand All @@ -584,6 +636,8 @@ void StatementSync::SetAllowBareNamedParameters(
StatementSync* stmt;
ASSIGN_OR_RETURN_UNWRAP(&stmt, args.This());
Environment* env = Environment::GetCurrent(args);
THROW_AND_RETURN_ON_BAD_STATE(
env, stmt->IsFinalized(), "statement has been finalized");

if (!args[0]->IsBoolean()) {
node::THROW_ERR_INVALID_ARG_TYPE(
Expand All @@ -599,6 +653,8 @@ void StatementSync::SetReadBigInts(const FunctionCallbackInfo<Value>& args) {
StatementSync* stmt;
ASSIGN_OR_RETURN_UNWRAP(&stmt, args.This());
Environment* env = Environment::GetCurrent(args);
THROW_AND_RETURN_ON_BAD_STATE(
env, stmt->IsFinalized(), "statement has been finalized");

if (!args[0]->IsBoolean()) {
node::THROW_ERR_INVALID_ARG_TYPE(
Expand Down Expand Up @@ -640,7 +696,7 @@ Local<FunctionTemplate> StatementSync::GetConstructorTemplate(
}

BaseObjectPtr<StatementSync> StatementSync::Create(Environment* env,
sqlite3* db,
DatabaseSync* db,
sqlite3_stmt* stmt) {
Local<Object> obj;
if (!GetConstructorTemplate(env)
Expand Down
16 changes: 13 additions & 3 deletions src/node_sqlite.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
#include "util.h"

#include <map>
#include <unordered_set>

namespace node {
namespace sqlite {

class StatementSync;

class DatabaseSync : public BaseObject {
public:
DatabaseSync(Environment* env,
Expand All @@ -25,6 +28,10 @@ class DatabaseSync : public BaseObject {
static void Close(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Prepare(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Exec(const v8::FunctionCallbackInfo<v8::Value>& args);
void FinalizeStatements();
void UntrackStatement(StatementSync* statement);
bool IsOpen();
sqlite3* Connection();

SET_MEMORY_INFO_NAME(DatabaseSync)
SET_SELF_SIZE(DatabaseSync)
Expand All @@ -35,19 +42,20 @@ class DatabaseSync : public BaseObject {
~DatabaseSync() override;
std::string location_;
sqlite3* connection_;
std::unordered_set<StatementSync*> statements_;
};

class StatementSync : public BaseObject {
public:
StatementSync(Environment* env,
v8::Local<v8::Object> object,
sqlite3* db,
DatabaseSync* db,
sqlite3_stmt* stmt);
void MemoryInfo(MemoryTracker* tracker) const override;
static v8::Local<v8::FunctionTemplate> GetConstructorTemplate(
Environment* env);
static BaseObjectPtr<StatementSync> Create(Environment* env,
sqlite3* db,
DatabaseSync* db,
sqlite3_stmt* stmt);
static void All(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Get(const v8::FunctionCallbackInfo<v8::Value>& args);
Expand All @@ -57,13 +65,15 @@ class StatementSync : public BaseObject {
static void SetAllowBareNamedParameters(
const v8::FunctionCallbackInfo<v8::Value>& args);
static void SetReadBigInts(const v8::FunctionCallbackInfo<v8::Value>& args);
void Finalize();
bool IsFinalized();

SET_MEMORY_INFO_NAME(StatementSync)
SET_SELF_SIZE(StatementSync)

private:
~StatementSync() override;
sqlite3* db_;
DatabaseSync* db_;
sqlite3_stmt* statement_;
bool use_big_ints_;
bool allow_bare_named_params_;
Expand Down

0 comments on commit c8fddbd

Please sign in to comment.