Skip to content

Commit

Permalink
auto cleanup of open iterators on db.close()
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Mar 21, 2013
1 parent cc48eef commit 727ab37
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 17 deletions.
71 changes: 69 additions & 2 deletions src/database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ namespace leveldown {

Database::Database (char* location) : location(location) {
db = NULL;
currentIteratorId = 0;
pendingCloseWorker = NULL;
};

Database::~Database () {
Expand Down Expand Up @@ -87,6 +89,19 @@ void Database::ReleaseSnapshot (const leveldb::Snapshot* snapshot) {
return db->ReleaseSnapshot(snapshot);
}

void Database::ReleaseIterator (uint32_t id) {
// called each time an Iterator is End()ed, in the main thread
// we have to remove our reference to it and if it's the last iterator
// we have to invoke a pending CloseWorker if there is one
// if there is a pending CloseWorker it means that we're waiting for
// iterators to end before we can close them
iterators.erase(id);
if (iterators.size() == 0 && pendingCloseWorker != NULL) {
AsyncQueueWorker((AsyncWorker*)pendingCloseWorker);
pendingCloseWorker = NULL;
}
}

void Database::CloseDatabase () {
delete db;
db = NULL;
Expand Down Expand Up @@ -207,7 +222,45 @@ v8::Handle<v8::Value> Database::Close (const v8::Arguments& args) {
LD_METHOD_SETUP_COMMON_ONEARG(close)

CloseWorker* worker = new CloseWorker(database, callback);
AsyncQueueWorker(worker);

if (database->iterators.size() > 0) {
// yikes, we still have iterators open! naughty naughty.
// we have to queue up a CloseWorker and manually close each of them.
// the CloseWorker will be invoked once they are all cleaned up
database->pendingCloseWorker = worker;

for (
std::map< uint32_t, v8::Persistent<v8::Object> >::iterator it
= database->iterators.begin()
; it != database->iterators.end()
; ++it) {

// for each iterator still open, first check if it's already in
// the process of ending (ended==true means an async End() is
// in progress), if not, then we call End() with an empty callback
// function and wait for it to hit ReleaseIterator() where our
// CloseWorker will be invoked

leveldown::Iterator* iterator =
node::ObjectWrap::Unwrap<leveldown::Iterator>(it->second);

if (!iterator->ended) {
v8::Local<v8::Function> end =
v8::Local<v8::Function>::Cast(it->second->Get(
v8::String::NewSymbol("end")));
v8::Local<v8::Value> argv[] = {
v8::FunctionTemplate::New()->GetFunction() // empty callback
};
v8::TryCatch try_catch;
end->Call(it->second, 1, argv);
if (try_catch.HasCaught()) {
node::FatalException(try_catch);
}
}
}
} else {
AsyncQueueWorker(worker);
}

return v8::Undefined();
}
Expand Down Expand Up @@ -425,12 +478,26 @@ v8::Handle<v8::Value> Database::ApproximateSize (const v8::Arguments& args) {
v8::Handle<v8::Value> Database::Iterator (const v8::Arguments& args) {
v8::HandleScope scope;

Database* database = node::ObjectWrap::Unwrap<Database>(args.This());

v8::Local<v8::Object> optionsObj;
if (args.Length() > 0 && args[0]->IsObject()) {
optionsObj = v8::Local<v8::Object>::Cast(args[0]);
}

return scope.Close(Iterator::NewInstance(args.This(), optionsObj));
// each iterator gets a unique id for this Database, so we can
// easily store & lookup on our `iterators` map
uint32_t id = database->currentIteratorId++;
v8::Handle<v8::Object> iterator = Iterator::NewInstance(
args.This()
, v8::Number::New(id)
, optionsObj
);
// register our iterator
database->iterators[id] =
node::ObjectWrap::Unwrap<leveldown::Iterator>(iterator)->handle_;

return scope.Close(iterator);
}


Expand Down
8 changes: 7 additions & 1 deletion src/database.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#ifndef LD_DATABASE_H
#define LD_DATABASE_H

#include <map>
#include <node.h>

#include "leveldb/db.h"
Expand Down Expand Up @@ -56,13 +57,18 @@ class Database : public node::ObjectWrap {
void ReleaseSnapshot (const leveldb::Snapshot* snapshot);
void CloseDatabase ();
const char* Location() const;
void ReleaseIterator (uint32_t id);

private:
Database (char* location);
~Database ();

private:
leveldb::DB* db;
char* location;
uint32_t currentIteratorId;
void(*pendingCloseWorker);

std::map< uint32_t, v8::Persistent<v8::Object> > iterators;

static v8::Persistent<v8::Function> constructor;
static void WriteDoing(uv_work_t *req);
Expand Down
28 changes: 19 additions & 9 deletions src/iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace leveldown {

Iterator::Iterator (
Database* database
, uint32_t id
, leveldb::Slice* start
, std::string* end
, bool reverse
Expand All @@ -25,6 +26,7 @@ Iterator::Iterator (
, bool valueAsBuffer
, v8::Persistent<v8::Value> startPtr
) : database(database)
, id(id)
, start(start)
, end(end)
, reverse(reverse)
Expand Down Expand Up @@ -104,6 +106,10 @@ void Iterator::IteratorEnd () {
dbIterator = NULL;
}

void Iterator::Release () {
database->ReleaseIterator(id);
}

void checkEndCallback (Iterator* iterator) {
iterator->nexting = false;
if (iterator->endWorker != NULL) {
Expand Down Expand Up @@ -142,7 +148,7 @@ v8::Handle<v8::Value> Iterator::Next (const v8::Arguments& args) {
iterator->nexting = true;
AsyncQueueWorker(worker);

return v8::Undefined();
return scope.Close(args.Holder());
}

v8::Handle<v8::Value> Iterator::End (const v8::Arguments& args) {
Expand Down Expand Up @@ -175,7 +181,7 @@ v8::Handle<v8::Value> Iterator::End (const v8::Arguments& args) {
AsyncQueueWorker(worker);
}

return v8::Undefined();
return scope.Close(args.Holder());
}

v8::Persistent<v8::Function> Iterator::constructor;
Expand All @@ -197,20 +203,21 @@ void Iterator::Init () {
tpl->GetFunction());
}

v8::Handle<v8::Value> Iterator::NewInstance (
v8::Handle<v8::Object> Iterator::NewInstance (
v8::Handle<v8::Object> database
, v8::Handle<v8::Number> id
, v8::Handle<v8::Object> optionsObj
) {

v8::HandleScope scope;
v8::Local<v8::Object> instance;

if (optionsObj.IsEmpty()) {
v8::Handle<v8::Value> argv[1] = { database };
instance = constructor->NewInstance(1, argv);
} else {
v8::Handle<v8::Value> argv[2] = { database, optionsObj };
v8::Handle<v8::Value> argv[2] = { database, id };
instance = constructor->NewInstance(2, argv);
} else {
v8::Handle<v8::Value> argv[3] = { database, id, optionsObj };
instance = constructor->NewInstance(3, argv);
}

return scope.Close(instance);
Expand All @@ -229,10 +236,12 @@ v8::Handle<v8::Value> Iterator::New (const v8::Arguments& args) {
std::string* end = NULL;
int limit = -1;

v8::Local<v8::Value> id = args[1];

v8::Local<v8::Object> optionsObj;

if (args.Length() > 1 && args[1]->IsObject()) {
optionsObj = v8::Local<v8::Object>::Cast(args[1]);
if (args.Length() > 1 && args[2]->IsObject()) {
optionsObj = v8::Local<v8::Object>::Cast(args[2]);

if (optionsObj->Has(option_start)
&& (node::Buffer::HasInstance(optionsObj->Get(option_start))
Expand Down Expand Up @@ -268,6 +277,7 @@ v8::Handle<v8::Value> Iterator::New (const v8::Arguments& args) {

Iterator* iterator = new Iterator(
database
, (uint32_t)id->Int32Value()
, start
, end
, reverse
Expand Down
14 changes: 9 additions & 5 deletions src/iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,15 @@ v8::Handle<v8::Value> CreateIterator (const v8::Arguments& args);
class Iterator : public node::ObjectWrap {
public:
static void Init ();
static v8::Handle<v8::Value> NewInstance (
static v8::Handle<v8::Object> NewInstance (
v8::Handle<v8::Object> database
, v8::Handle<v8::Number> id
, v8::Handle<v8::Object> optionsObj
);

bool IteratorNext (std::string& key, std::string& value);
leveldb::Status IteratorStatus ();
void IteratorEnd ();

Iterator (
Database* database
, uint32_t id
, leveldb::Slice* start
, std::string* end
, bool reverse
Expand All @@ -53,8 +51,14 @@ class Iterator : public node::ObjectWrap {

~Iterator ();

bool IteratorNext (std::string& key, std::string& value);
leveldb::Status IteratorStatus ();
void IteratorEnd ();
void Release ();

private:
Database* database;
uint32_t id;
leveldb::Iterator* dbIterator;
leveldb::ReadOptions* options;
leveldb::Slice* start;
Expand Down
6 changes: 6 additions & 0 deletions src/iterator_async.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,10 @@ void EndWorker::Execute () {
iterator->IteratorEnd();
}

void EndWorker::HandleOKCallback () {
iterator->Release();
v8::Local<v8::Value> argv[0];
LD_RUN_CALLBACK(callback, argv, 0);
}

} // namespace leveldown
1 change: 1 addition & 0 deletions src/iterator_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class EndWorker : public AsyncWorker {

virtual ~EndWorker ();
virtual void Execute ();
virtual void HandleOKCallback ();

private:
Iterator* iterator;
Expand Down
77 changes: 77 additions & 0 deletions test/cleanup-hanging-iterators-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
const test = require('tap').test
, testCommon = require('./common')
, leveldown = require('../')

, makeTest = function (name, testFn) {
test(name, function (t) {
testCommon.cleanup(function () {
var db = leveldown(testCommon.location())
, done = function () {
db.close(function (err) {
t.notOk(err, 'no error from close()')
testCommon.cleanup(t.end.bind(t))
})
}
db.open(function (err) {
t.notOk(err, 'no error from open()')
db.batch(
[
{ type: 'put', key: 'one', value: '1' }
, { type: 'put', key: 'two', value: '2' }
, { type: 'put', key: 'three', value: '3' }
]
, function (err) {
t.notOk(err, 'no error from batch()')
testFn(db, t, done)
}
)
})
})
})
}

makeTest('test ended iterator', function (db, t, done) {
// standard iterator with an end() properly called, easy

var it = db.iterator({ keyAsBuffer: false, valueAsBuffer: false })
it.next(function (err, key, value) {
t.notOk(err, 'no error from next()')
t.equal(key, 'one', 'correct key')
t.equal(value, '1', 'correct value')
it.end(function (err) {
t.notOk(err, 'no error from next()')
done()
})
})
})

makeTest('test non-ended iterator', function (db, t, done) {
// no end() call on our iterator, cleanup should crash Node if not handled properly
var it = db.iterator({ keyAsBuffer: false, valueAsBuffer: false })
it.next(function (err, key, value) {
t.notOk(err, 'no error from next()')
t.equal(key, 'one', 'correct key')
t.equal(value, '1', 'correct value')
done()
})
})

makeTest('test multiple non-ended iterators', function (db, t, done) {
// no end() call on our iterator, cleanup should crash Node if not handled properly
db.iterator()
db.iterator().next(function () {})
db.iterator().next(function () {})
db.iterator().next(function () {})
setTimeout(done, 50)
})

makeTest('test ending iterators', function (db, t, done) {
// no end() call on our iterator, cleanup should crash Node if not handled properly
var it1 = db.iterator().next(function () {
it1.end(function () {})
})
, it2 = db.iterator().next(function () {
it2.end(function () {})
done()
})
})
1 change: 1 addition & 0 deletions test/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ var dbidx = 0

module.exports = {
location : location
, cleanup : cleanup
, lastLocation : lastLocation
, setUp : setUp
, tearDown : tearDown
Expand Down

0 comments on commit 727ab37

Please sign in to comment.