Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add context parameter to datastore interface #174

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 29 additions & 26 deletions autobatch/autobatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package autobatch

import (
"context"

ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
)
Expand Down Expand Up @@ -34,16 +36,16 @@ func NewAutoBatching(d ds.Batching, size int) *Datastore {
}

// Delete deletes a key/value
func (d *Datastore) Delete(k ds.Key) error {
func (d *Datastore) Delete(ctx context.Context, k ds.Key) error {
d.buffer[k] = op{delete: true}
if len(d.buffer) > d.maxBufferEntries {
return d.Flush()
return d.Flush(ctx)
}
return nil
}

// Get retrieves a value given a key.
func (d *Datastore) Get(k ds.Key) ([]byte, error) {
func (d *Datastore) Get(ctx context.Context, k ds.Key) ([]byte, error) {
o, ok := d.buffer[k]
if ok {
if o.delete {
Expand All @@ -52,22 +54,22 @@ func (d *Datastore) Get(k ds.Key) ([]byte, error) {
return o.value, nil
}

return d.child.Get(k)
return d.child.Get(ctx, k)
}

// Put stores a key/value.
func (d *Datastore) Put(k ds.Key, val []byte) error {
func (d *Datastore) Put(ctx context.Context, k ds.Key, val []byte) error {
d.buffer[k] = op{value: val}
if len(d.buffer) > d.maxBufferEntries {
return d.Flush()
return d.Flush(ctx)
}
return nil
}

// Sync flushes all operations on keys at or under the prefix
// from the current batch to the underlying datastore
func (d *Datastore) Sync(prefix ds.Key) error {
b, err := d.child.Batch()
func (d *Datastore) Sync(ctx context.Context, prefix ds.Key) error {
b, err := d.child.Batch(ctx)
if err != nil {
return err
}
Expand All @@ -79,9 +81,9 @@ func (d *Datastore) Sync(prefix ds.Key) error {

var err error
if o.delete {
err = b.Delete(k)
err = b.Delete(ctx, k)
} else {
err = b.Put(k, o.value)
err = b.Put(ctx, k, o.value)
}
if err != nil {
return err
Expand All @@ -90,22 +92,22 @@ func (d *Datastore) Sync(prefix ds.Key) error {
delete(d.buffer, k)
}

return b.Commit()
return b.Commit(ctx)
}

// Flush flushes the current batch to the underlying datastore.
func (d *Datastore) Flush() error {
b, err := d.child.Batch()
func (d *Datastore) Flush(ctx context.Context) error {
b, err := d.child.Batch(ctx)
if err != nil {
return err
}

for k, o := range d.buffer {
var err error
if o.delete {
err = b.Delete(k)
err = b.Delete(ctx, k)
} else {
err = b.Put(k, o.value)
err = b.Put(ctx, k, o.value)
}
if err != nil {
return err
Expand All @@ -114,21 +116,21 @@ func (d *Datastore) Flush() error {
// clear out buffer
d.buffer = make(map[ds.Key]op, d.maxBufferEntries)

return b.Commit()
return b.Commit(ctx)
}

// Has checks if a key is stored.
func (d *Datastore) Has(k ds.Key) (bool, error) {
func (d *Datastore) Has(ctx context.Context, k ds.Key) (bool, error) {
o, ok := d.buffer[k]
if ok {
return !o.delete, nil
}

return d.child.Has(k)
return d.child.Has(ctx, k)
}

// GetSize implements Datastore.GetSize
func (d *Datastore) GetSize(k ds.Key) (int, error) {
func (d *Datastore) GetSize(ctx context.Context, k ds.Key) (int, error) {
o, ok := d.buffer[k]
if ok {
if o.delete {
Expand All @@ -137,26 +139,27 @@ func (d *Datastore) GetSize(k ds.Key) (int, error) {
return len(o.value), nil
}

return d.child.GetSize(k)
return d.child.GetSize(ctx, k)
}

// Query performs a query
func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
err := d.Flush()
func (d *Datastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) {
err := d.Flush(ctx)
if err != nil {
return nil, err
}

return d.child.Query(q)
return d.child.Query(ctx, q)
}

// DiskUsage implements the PersistentDatastore interface.
func (d *Datastore) DiskUsage() (uint64, error) {
return ds.DiskUsage(d.child)
func (d *Datastore) DiskUsage(ctx context.Context) (uint64, error) {
return ds.DiskUsage(ctx, d.child)
}

func (d *Datastore) Close() error {
err1 := d.Flush()
ctx := context.Background()
err1 := d.Flush(ctx)
err2 := d.child.Close()
if err1 != nil {
return err1
Expand Down
48 changes: 28 additions & 20 deletions autobatch/autobatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package autobatch

import (
"bytes"
"context"
"fmt"
"testing"

Expand All @@ -14,6 +15,8 @@ func TestAutobatch(t *testing.T) {
}

func TestFlushing(t *testing.T) {
ctx := context.Background()

child := ds.NewMapDatastore()
d := NewAutoBatching(child, 16)

Expand All @@ -24,15 +27,15 @@ func TestFlushing(t *testing.T) {
v := []byte("hello world")

for _, k := range keys {
err := d.Put(k, v)
err := d.Put(ctx, k, v)
if err != nil {
t.Fatal(err)
}
}

// Get works normally.
for _, k := range keys {
val, err := d.Get(k)
val, err := d.Get(ctx, k)
if err != nil {
t.Fatal(err)
}
Expand All @@ -43,36 +46,36 @@ func TestFlushing(t *testing.T) {
}

// Not flushed
_, err := child.Get(keys[0])
_, err := child.Get(ctx, keys[0])
if err != ds.ErrNotFound {
t.Fatal("shouldnt have found value")
}

// Delete works.
err = d.Delete(keys[14])
err = d.Delete(ctx, keys[14])
if err != nil {
t.Fatal(err)
}
_, err = d.Get(keys[14])
_, err = d.Get(ctx, keys[14])
if err != ds.ErrNotFound {
t.Fatal(err)
}

// Still not flushed
_, err = child.Get(keys[0])
_, err = child.Get(ctx, keys[0])
if err != ds.ErrNotFound {
t.Fatal("shouldnt have found value")
}

// Final put flushes.
err = d.Put(ds.NewKey("test16"), v)
err = d.Put(ctx, ds.NewKey("test16"), v)
if err != nil {
t.Fatal(err)
}

// should be flushed now, try to get keys from child datastore
for _, k := range keys[:14] {
val, err := child.Get(k)
val, err := child.Get(ctx, k)
if err != nil {
t.Fatal(err)
}
Expand All @@ -83,18 +86,18 @@ func TestFlushing(t *testing.T) {
}

// Never flushed the deleted key.
_, err = child.Get(keys[14])
_, err = child.Get(ctx, keys[14])
if err != ds.ErrNotFound {
t.Fatal("shouldnt have found value")
}

// Delete doesn't flush
err = d.Delete(keys[0])
err = d.Delete(ctx, keys[0])
if err != nil {
t.Fatal(err)
}

val, err := child.Get(keys[0])
val, err := child.Get(ctx, keys[0])
if err != nil {
t.Fatal(err)
}
Expand All @@ -105,22 +108,24 @@ func TestFlushing(t *testing.T) {
}

func TestSync(t *testing.T) {
ctx := context.Background()

child := ds.NewMapDatastore()
d := NewAutoBatching(child, 100)

put := func(key ds.Key) {
if err := d.Put(key, []byte(key.String())); err != nil {
if err := d.Put(ctx, key, []byte(key.String())); err != nil {
t.Fatal(err)
}
}
del := func(key ds.Key) {
if err := d.Delete(key); err != nil {
if err := d.Delete(ctx, key); err != nil {
t.Fatal(err)
}
}

get := func(d ds.Datastore, key ds.Key) {
val, err := d.Get(key)
val, err := d.Get(ctx, key)
if err != nil {
t.Fatal(err)
}
Expand All @@ -130,7 +135,7 @@ func TestSync(t *testing.T) {
}
}
invalidGet := func(d ds.Datastore, key ds.Key) {
if _, err := d.Get(key); err != ds.ErrNotFound {
if _, err := d.Get(ctx, key); err != ds.ErrNotFound {
t.Fatal("should not have found value")
}
}
Expand All @@ -146,6 +151,9 @@ func TestSync(t *testing.T) {
// For clarity comments are written as if op = Put and undoOp = Delete
func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Key),
checkOp, checkUndoOp func(ds.Datastore, ds.Key)) {

ctx := context.Background()

var keys []ds.Key
keymap := make(map[ds.Key]int)
for i := 0; i < 4; i++ {
Expand Down Expand Up @@ -185,7 +193,7 @@ func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Ke
checkUndoOp(child, ds.NewKey("0"))

// Sync the tree "0/*/*"
if err := d.Sync(ds.NewKey("0")); err != nil {
if err := d.Sync(ctx, ds.NewKey("0")); err != nil {
t.Fatal(err)
}

Expand All @@ -196,7 +204,7 @@ func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Ke
checkKeyRange(t, keymap, keys, child, [][]string{{"1", "3/1/1"}}, checkUndoOp)

// Sync the tree "1/1/*"
if err := d.Sync(ds.NewKey("1/1")); err != nil {
if err := d.Sync(ctx, ds.NewKey("1/1")); err != nil {
t.Fatal(err)
}

Expand All @@ -207,7 +215,7 @@ func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Ke
checkKeyRange(t, keymap, keys, child, [][]string{{"1", "1/0/1"}, {"2", "3/1/1"}}, checkUndoOp)

// Sync the tree "3/1/1"
if err := d.Sync(ds.NewKey("3/1/1")); err != nil {
if err := d.Sync(ctx, ds.NewKey("3/1/1")); err != nil {
t.Fatal(err)
}

Expand All @@ -217,7 +225,7 @@ func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Ke
// Verify no other keys were synchronized
checkKeyRange(t, keymap, keys, child, [][]string{{"1", "1/0/1"}, {"2", "3/1/0"}}, checkUndoOp)

if err := d.Sync(ds.Key{}); err != nil {
if err := d.Sync(ctx, ds.Key{}); err != nil {
t.Fatal(err)
}

Expand All @@ -231,7 +239,7 @@ func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Ke
op(deletedKey)

// Sync it
if err := d.Sync(deletedKey); err != nil {
if err := d.Sync(ctx, deletedKey); err != nil {
t.Fatal(err)
}

Expand Down
Loading