Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not implement batches using transactions #104

Merged
merged 1 commit into from
Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 70 additions & 2 deletions datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ type Datastore struct {
syncWrites bool
}

// Implements the datastore.Batch interface, enabling batching support for
// the badger Datastore.
type batch struct {
ds *Datastore
writeBatch *badger.WriteBatch
}

// Implements the datastore.Txn interface, enabling transaction support for
// the badger Datastore.
type txn struct {
Expand Down Expand Up @@ -112,6 +119,7 @@ var _ ds.Datastore = (*Datastore)(nil)
var _ ds.TxnDatastore = (*Datastore)(nil)
var _ ds.TTLDatastore = (*Datastore)(nil)
var _ ds.GCDatastore = (*Datastore)(nil)
var _ ds.Batching = (*Datastore)(nil)

// NewDatastore creates a new badger datastore.
//
Expand Down Expand Up @@ -388,9 +396,21 @@ func (d *Datastore) Close() error {
return d.DB.Close()
}

// Batch creats a new Batch object. This provides a way to do many writes, when
// there may be too many to fit into a single transaction.
//
// After writing to a Batch, always call Commit whether or not writing to the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of contract needs to be specified at the interface level to be effective. I think your original solution of adding a Cancel method to the interface is the right way to go (we can add it here now, then add it to the interface later.

For now, we could also use runtime.SetFinalizer to ensure everything gets cleaned up eventually. The overhead should be relatively minimal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a Cancel function and finalizer. Will add Cancel to datastore.Batch interface later.

// batch was completed successfully or not. This is necessary to flush any
// remaining data and free any resources associated with an incomplete
// transaction.
func (d *Datastore) Batch() (ds.Batch, error) {
tx, _ := d.NewTransaction(false)
return tx, nil
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
return nil, ErrClosed
}

return &batch{d, d.DB.NewWriteBatch()}, nil
}

func (d *Datastore) CollectGarbage() (err error) {
Expand All @@ -416,6 +436,54 @@ func (d *Datastore) gcOnce() error {
return d.DB.RunValueLogGC(d.gcDiscardRatio)
}

var _ ds.Batch = (*batch)(nil)

func (b *batch) Put(key ds.Key, value []byte) error {
b.ds.closeLk.RLock()
defer b.ds.closeLk.RUnlock()
if b.ds.closed {
return ErrClosed
}
return b.put(key, value)
}

func (b *batch) put(key ds.Key, value []byte) error {
return b.writeBatch.Set(key.Bytes(), value)
}

func (b *batch) Delete(key ds.Key) error {
b.ds.closeLk.RLock()
defer b.ds.closeLk.RUnlock()
if b.ds.closed {
return ErrClosed
}

return b.delete(key)
}

func (b *batch) delete(key ds.Key) error {
return b.writeBatch.Delete(key.Bytes())
}

func (b *batch) Commit() error {
b.ds.closeLk.RLock()
defer b.ds.closeLk.RUnlock()
if b.ds.closed {
return ErrClosed
}

return b.commit()
}

func (b *batch) commit() error {
err := b.writeBatch.Flush()
if err != nil {
// Discard incomplete transaction held by b.writeBatch
b.writeBatch.Cancel()
}
return err
}

var _ ds.Datastore = (*txn)(nil)
var _ ds.TTLDatastore = (*txn)(nil)

Expand Down
65 changes: 65 additions & 0 deletions ds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,71 @@ func TestBatching(t *testing.T) {

}

func TestBatchingRequired(t *testing.T) {
path, err := ioutil.TempDir(os.TempDir(), "testing_badger_")
if err != nil {
t.Fatal(err)
}

dsOpts := DefaultOptions
d, err := NewDatastore(path, &dsOpts)
if err != nil {
t.Fatal(err)
}
defer func() {
d.Close()
os.RemoveAll(path)
}()

const valSize = 1000

// Check that transaction fails when there are too many writes. This is
// not testing batching logic, but is here to prove that batching works
// where a transaction fails.
t.Logf("putting %d byte values until transaction overflows", valSize)
tx, err := d.NewTransaction(false)
if err != nil {
t.Fatal(err)
}
var puts int
for ; puts < 10000000; puts++ {
buf := make([]byte, valSize)
rand.Read(buf)
err = tx.Put(ds.NewKey(fmt.Sprintf("/key%d", puts)), buf)
if err != nil {
break
}
puts++
}
if err == nil {
t.Error("expected transaction to fail")
} else {
t.Logf("OK - transaction cannot handle %d puts: %s", puts, err)
}
tx.Discard()

// Check that batch succeeds with the same number of writes that caused a
// transaction to fail.
t.Logf("putting %d %d byte values using batch", puts, valSize)
b, err := d.Batch()
if err != nil {
t.Fatal(err)
}
for i := 0; i < puts; i++ {
buf := make([]byte, valSize)
rand.Read(buf)
err = b.Put(ds.NewKey(fmt.Sprintf("/key%d", i)), buf)
if err != nil {
t.Fatal(err)
}
}

err = b.Commit()
if err != nil {
t.Fatal(err)
}
}

// Tests from basic_tests from go-datastore

func TestBasicPutGet(t *testing.T) {
Expand Down