From 09cb92ae3bb72a658a21b9f749a50b0b1ef4ac4a Mon Sep 17 00:00:00 2001 From: lanzafame Date: Tue, 29 Jun 2021 13:57:27 +1000 Subject: [PATCH] add context parameter to datastore interface --- autobatch/autobatch.go | 55 ++++---- autobatch/autobatch_test.go | 48 ++++--- basic_ds.go | 91 ++++++------- batch.go | 14 +- datastore.go | 51 ++++---- delayed/delayed.go | 35 ++--- delayed/delayed_test.go | 7 +- examples/fs.go | 25 ++-- examples/fs_test.go | 14 +- failstore/failstore.go | 50 +++---- go.sum | 8 ++ keytransform/keytransform.go | 62 ++++----- keytransform/keytransform_test.go | 17 ++- mount/mount.go | 67 +++++----- mount/mount_test.go | 209 +++++++++++++++++++----------- namespace/example_test.go | 9 +- namespace/namespace_test.go | 24 ++-- retrystore/retrystore.go | 25 ++-- retrystore/retrystore_test.go | 25 ++-- sync/sync.go | 61 ++++----- test/basic_tests.go | 57 ++++---- test/suite.go | 7 +- test/test_util.go | 45 ++++--- 23 files changed, 569 insertions(+), 437 deletions(-) diff --git a/autobatch/autobatch.go b/autobatch/autobatch.go index 0f86764..a864b6e 100644 --- a/autobatch/autobatch.go +++ b/autobatch/autobatch.go @@ -4,6 +4,8 @@ package autobatch import ( + "context" + ds "github.com/ipfs/go-datastore" dsq "github.com/ipfs/go-datastore/query" ) @@ -34,16 +36,16 @@ func NewAutoBatching(d ds.Batching, size int) *Datastore { } // Delete deletes a key/value -func (d *Datastore) Delete(k ds.Key) error { +func (d *Datastore) Delete(ctx context.Context, k ds.Key) error { d.buffer[k] = op{delete: true} if len(d.buffer) > d.maxBufferEntries { - return d.Flush() + return d.Flush(ctx) } return nil } // Get retrieves a value given a key. -func (d *Datastore) Get(k ds.Key) ([]byte, error) { +func (d *Datastore) Get(ctx context.Context, k ds.Key) ([]byte, error) { o, ok := d.buffer[k] if ok { if o.delete { @@ -52,22 +54,22 @@ func (d *Datastore) Get(k ds.Key) ([]byte, error) { return o.value, nil } - return d.child.Get(k) + return d.child.Get(ctx, k) } // Put stores a key/value. -func (d *Datastore) Put(k ds.Key, val []byte) error { +func (d *Datastore) Put(ctx context.Context, k ds.Key, val []byte) error { d.buffer[k] = op{value: val} if len(d.buffer) > d.maxBufferEntries { - return d.Flush() + return d.Flush(ctx) } return nil } // Sync flushes all operations on keys at or under the prefix // from the current batch to the underlying datastore -func (d *Datastore) Sync(prefix ds.Key) error { - b, err := d.child.Batch() +func (d *Datastore) Sync(ctx context.Context, prefix ds.Key) error { + b, err := d.child.Batch(ctx) if err != nil { return err } @@ -79,9 +81,9 @@ func (d *Datastore) Sync(prefix ds.Key) error { var err error if o.delete { - err = b.Delete(k) + err = b.Delete(ctx, k) } else { - err = b.Put(k, o.value) + err = b.Put(ctx, k, o.value) } if err != nil { return err @@ -90,12 +92,12 @@ func (d *Datastore) Sync(prefix ds.Key) error { delete(d.buffer, k) } - return b.Commit() + return b.Commit(ctx) } // Flush flushes the current batch to the underlying datastore. -func (d *Datastore) Flush() error { - b, err := d.child.Batch() +func (d *Datastore) Flush(ctx context.Context) error { + b, err := d.child.Batch(ctx) if err != nil { return err } @@ -103,9 +105,9 @@ func (d *Datastore) Flush() error { for k, o := range d.buffer { var err error if o.delete { - err = b.Delete(k) + err = b.Delete(ctx, k) } else { - err = b.Put(k, o.value) + err = b.Put(ctx, k, o.value) } if err != nil { return err @@ -114,21 +116,21 @@ func (d *Datastore) Flush() error { // clear out buffer d.buffer = make(map[ds.Key]op, d.maxBufferEntries) - return b.Commit() + return b.Commit(ctx) } // Has checks if a key is stored. -func (d *Datastore) Has(k ds.Key) (bool, error) { +func (d *Datastore) Has(ctx context.Context, k ds.Key) (bool, error) { o, ok := d.buffer[k] if ok { return !o.delete, nil } - return d.child.Has(k) + return d.child.Has(ctx, k) } // GetSize implements Datastore.GetSize -func (d *Datastore) GetSize(k ds.Key) (int, error) { +func (d *Datastore) GetSize(ctx context.Context, k ds.Key) (int, error) { o, ok := d.buffer[k] if ok { if o.delete { @@ -137,26 +139,27 @@ func (d *Datastore) GetSize(k ds.Key) (int, error) { return len(o.value), nil } - return d.child.GetSize(k) + return d.child.GetSize(ctx, k) } // Query performs a query -func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) { - err := d.Flush() +func (d *Datastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) { + err := d.Flush(ctx) if err != nil { return nil, err } - return d.child.Query(q) + return d.child.Query(ctx, q) } // DiskUsage implements the PersistentDatastore interface. -func (d *Datastore) DiskUsage() (uint64, error) { - return ds.DiskUsage(d.child) +func (d *Datastore) DiskUsage(ctx context.Context) (uint64, error) { + return ds.DiskUsage(ctx, d.child) } func (d *Datastore) Close() error { - err1 := d.Flush() + ctx := context.Background() + err1 := d.Flush(ctx) err2 := d.child.Close() if err1 != nil { return err1 diff --git a/autobatch/autobatch_test.go b/autobatch/autobatch_test.go index bd6fb30..6aa9020 100644 --- a/autobatch/autobatch_test.go +++ b/autobatch/autobatch_test.go @@ -2,6 +2,7 @@ package autobatch import ( "bytes" + "context" "fmt" "testing" @@ -14,6 +15,8 @@ func TestAutobatch(t *testing.T) { } func TestFlushing(t *testing.T) { + ctx := context.Background() + child := ds.NewMapDatastore() d := NewAutoBatching(child, 16) @@ -24,7 +27,7 @@ func TestFlushing(t *testing.T) { v := []byte("hello world") for _, k := range keys { - err := d.Put(k, v) + err := d.Put(ctx, k, v) if err != nil { t.Fatal(err) } @@ -32,7 +35,7 @@ func TestFlushing(t *testing.T) { // Get works normally. for _, k := range keys { - val, err := d.Get(k) + val, err := d.Get(ctx, k) if err != nil { t.Fatal(err) } @@ -43,36 +46,36 @@ func TestFlushing(t *testing.T) { } // Not flushed - _, err := child.Get(keys[0]) + _, err := child.Get(ctx, keys[0]) if err != ds.ErrNotFound { t.Fatal("shouldnt have found value") } // Delete works. - err = d.Delete(keys[14]) + err = d.Delete(ctx, keys[14]) if err != nil { t.Fatal(err) } - _, err = d.Get(keys[14]) + _, err = d.Get(ctx, keys[14]) if err != ds.ErrNotFound { t.Fatal(err) } // Still not flushed - _, err = child.Get(keys[0]) + _, err = child.Get(ctx, keys[0]) if err != ds.ErrNotFound { t.Fatal("shouldnt have found value") } // Final put flushes. - err = d.Put(ds.NewKey("test16"), v) + err = d.Put(ctx, ds.NewKey("test16"), v) if err != nil { t.Fatal(err) } // should be flushed now, try to get keys from child datastore for _, k := range keys[:14] { - val, err := child.Get(k) + val, err := child.Get(ctx, k) if err != nil { t.Fatal(err) } @@ -83,18 +86,18 @@ func TestFlushing(t *testing.T) { } // Never flushed the deleted key. - _, err = child.Get(keys[14]) + _, err = child.Get(ctx, keys[14]) if err != ds.ErrNotFound { t.Fatal("shouldnt have found value") } // Delete doesn't flush - err = d.Delete(keys[0]) + err = d.Delete(ctx, keys[0]) if err != nil { t.Fatal(err) } - val, err := child.Get(keys[0]) + val, err := child.Get(ctx, keys[0]) if err != nil { t.Fatal(err) } @@ -105,22 +108,24 @@ func TestFlushing(t *testing.T) { } func TestSync(t *testing.T) { + ctx := context.Background() + child := ds.NewMapDatastore() d := NewAutoBatching(child, 100) put := func(key ds.Key) { - if err := d.Put(key, []byte(key.String())); err != nil { + if err := d.Put(ctx, key, []byte(key.String())); err != nil { t.Fatal(err) } } del := func(key ds.Key) { - if err := d.Delete(key); err != nil { + if err := d.Delete(ctx, key); err != nil { t.Fatal(err) } } get := func(d ds.Datastore, key ds.Key) { - val, err := d.Get(key) + val, err := d.Get(ctx, key) if err != nil { t.Fatal(err) } @@ -130,7 +135,7 @@ func TestSync(t *testing.T) { } } invalidGet := func(d ds.Datastore, key ds.Key) { - if _, err := d.Get(key); err != ds.ErrNotFound { + if _, err := d.Get(ctx, key); err != ds.ErrNotFound { t.Fatal("should not have found value") } } @@ -146,6 +151,9 @@ func TestSync(t *testing.T) { // For clarity comments are written as if op = Put and undoOp = Delete func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Key), checkOp, checkUndoOp func(ds.Datastore, ds.Key)) { + + ctx := context.Background() + var keys []ds.Key keymap := make(map[ds.Key]int) for i := 0; i < 4; i++ { @@ -185,7 +193,7 @@ func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Ke checkUndoOp(child, ds.NewKey("0")) // Sync the tree "0/*/*" - if err := d.Sync(ds.NewKey("0")); err != nil { + if err := d.Sync(ctx, ds.NewKey("0")); err != nil { t.Fatal(err) } @@ -196,7 +204,7 @@ func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Ke checkKeyRange(t, keymap, keys, child, [][]string{{"1", "3/1/1"}}, checkUndoOp) // Sync the tree "1/1/*" - if err := d.Sync(ds.NewKey("1/1")); err != nil { + if err := d.Sync(ctx, ds.NewKey("1/1")); err != nil { t.Fatal(err) } @@ -207,7 +215,7 @@ func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Ke checkKeyRange(t, keymap, keys, child, [][]string{{"1", "1/0/1"}, {"2", "3/1/1"}}, checkUndoOp) // Sync the tree "3/1/1" - if err := d.Sync(ds.NewKey("3/1/1")); err != nil { + if err := d.Sync(ctx, ds.NewKey("3/1/1")); err != nil { t.Fatal(err) } @@ -217,7 +225,7 @@ func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Ke // Verify no other keys were synchronized checkKeyRange(t, keymap, keys, child, [][]string{{"1", "1/0/1"}, {"2", "3/1/0"}}, checkUndoOp) - if err := d.Sync(ds.Key{}); err != nil { + if err := d.Sync(ctx, ds.Key{}); err != nil { t.Fatal(err) } @@ -231,7 +239,7 @@ func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Ke op(deletedKey) // Sync it - if err := d.Sync(deletedKey); err != nil { + if err := d.Sync(ctx, deletedKey); err != nil { t.Fatal(err) } diff --git a/basic_ds.go b/basic_ds.go index 4c85187..1fb088b 100644 --- a/basic_ds.go +++ b/basic_ds.go @@ -1,6 +1,7 @@ package datastore import ( + "context" "log" dsq "github.com/ipfs/go-datastore/query" @@ -23,18 +24,18 @@ func NewMapDatastore() (d *MapDatastore) { } // Put implements Datastore.Put -func (d *MapDatastore) Put(key Key, value []byte) (err error) { +func (d *MapDatastore) Put(ctx context.Context, key Key, value []byte) (err error) { d.values[key] = value return nil } // Sync implements Datastore.Sync -func (d *MapDatastore) Sync(prefix Key) error { +func (d *MapDatastore) Sync(ctx context.Context, prefix Key) error { return nil } // Get implements Datastore.Get -func (d *MapDatastore) Get(key Key) (value []byte, err error) { +func (d *MapDatastore) Get(ctx context.Context, key Key) (value []byte, err error) { val, found := d.values[key] if !found { return nil, ErrNotFound @@ -43,13 +44,13 @@ func (d *MapDatastore) Get(key Key) (value []byte, err error) { } // Has implements Datastore.Has -func (d *MapDatastore) Has(key Key) (exists bool, err error) { +func (d *MapDatastore) Has(ctx context.Context, key Key) (exists bool, err error) { _, found := d.values[key] return found, nil } // GetSize implements Datastore.GetSize -func (d *MapDatastore) GetSize(key Key) (size int, err error) { +func (d *MapDatastore) GetSize(ctx context.Context, key Key) (size int, err error) { if v, found := d.values[key]; found { return len(v), nil } @@ -57,13 +58,13 @@ func (d *MapDatastore) GetSize(key Key) (size int, err error) { } // Delete implements Datastore.Delete -func (d *MapDatastore) Delete(key Key) (err error) { +func (d *MapDatastore) Delete(ctx context.Context, key Key) (err error) { delete(d.values, key) return nil } // Query implements Datastore.Query -func (d *MapDatastore) Query(q dsq.Query) (dsq.Results, error) { +func (d *MapDatastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) { re := make([]dsq.Entry, 0, len(d.values)) for k, v := range d.values { e := dsq.Entry{Key: k.String(), Size: len(v)} @@ -77,7 +78,7 @@ func (d *MapDatastore) Query(q dsq.Query) (dsq.Results, error) { return r, nil } -func (d *MapDatastore) Batch() (Batch, error) { +func (d *MapDatastore) Batch(ctx context.Context) (Batch, error) { return NewBasicBatch(d), nil } @@ -96,37 +97,37 @@ func NewNullDatastore() *NullDatastore { } // Put implements Datastore.Put -func (d *NullDatastore) Put(key Key, value []byte) (err error) { +func (d *NullDatastore) Put(ctx context.Context, key Key, value []byte) (err error) { return nil } // Sync implements Datastore.Sync -func (d *NullDatastore) Sync(prefix Key) error { +func (d *NullDatastore) Sync(ctx context.Context, prefix Key) error { return nil } // Get implements Datastore.Get -func (d *NullDatastore) Get(key Key) (value []byte, err error) { +func (d *NullDatastore) Get(ctx context.Context, key Key) (value []byte, err error) { return nil, ErrNotFound } // Has implements Datastore.Has -func (d *NullDatastore) Has(key Key) (exists bool, err error) { +func (d *NullDatastore) Has(ctx context.Context, key Key) (exists bool, err error) { return false, nil } // Has implements Datastore.GetSize -func (d *NullDatastore) GetSize(key Key) (size int, err error) { +func (d *NullDatastore) GetSize(ctx context.Context, key Key) (size int, err error) { return -1, ErrNotFound } // Delete implements Datastore.Delete -func (d *NullDatastore) Delete(key Key) (err error) { +func (d *NullDatastore) Delete(ctx context.Context, key Key) (err error) { return nil } // Query implements Datastore.Query -func (d *NullDatastore) Query(q dsq.Query) (dsq.Results, error) { +func (d *NullDatastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) { return dsq.ResultsWithEntries(q, nil), nil } @@ -165,50 +166,50 @@ func (d *LogDatastore) Children() []Datastore { } // Put implements Datastore.Put -func (d *LogDatastore) Put(key Key, value []byte) (err error) { +func (d *LogDatastore) Put(ctx context.Context, key Key, value []byte) (err error) { log.Printf("%s: Put %s\n", d.Name, key) // log.Printf("%s: Put %s ```%s```", d.Name, key, value) - return d.child.Put(key, value) + return d.child.Put(ctx, key, value) } // Sync implements Datastore.Sync -func (d *LogDatastore) Sync(prefix Key) error { +func (d *LogDatastore) Sync(ctx context.Context, prefix Key) error { log.Printf("%s: Sync %s\n", d.Name, prefix) - return d.child.Sync(prefix) + return d.child.Sync(ctx, prefix) } // Get implements Datastore.Get -func (d *LogDatastore) Get(key Key) (value []byte, err error) { +func (d *LogDatastore) Get(ctx context.Context, key Key) (value []byte, err error) { log.Printf("%s: Get %s\n", d.Name, key) - return d.child.Get(key) + return d.child.Get(ctx, key) } // Has implements Datastore.Has -func (d *LogDatastore) Has(key Key) (exists bool, err error) { +func (d *LogDatastore) Has(ctx context.Context, key Key) (exists bool, err error) { log.Printf("%s: Has %s\n", d.Name, key) - return d.child.Has(key) + return d.child.Has(ctx, key) } // GetSize implements Datastore.GetSize -func (d *LogDatastore) GetSize(key Key) (size int, err error) { +func (d *LogDatastore) GetSize(ctx context.Context, key Key) (size int, err error) { log.Printf("%s: GetSize %s\n", d.Name, key) - return d.child.GetSize(key) + return d.child.GetSize(ctx, key) } // Delete implements Datastore.Delete -func (d *LogDatastore) Delete(key Key) (err error) { +func (d *LogDatastore) Delete(ctx context.Context, key Key) (err error) { log.Printf("%s: Delete %s\n", d.Name, key) - return d.child.Delete(key) + return d.child.Delete(ctx, key) } // DiskUsage implements the PersistentDatastore interface. -func (d *LogDatastore) DiskUsage() (uint64, error) { +func (d *LogDatastore) DiskUsage(ctx context.Context, ) (uint64, error) { log.Printf("%s: DiskUsage\n", d.Name) - return DiskUsage(d.child) + return DiskUsage(ctx, d.child) } // Query implements Datastore.Query -func (d *LogDatastore) Query(q dsq.Query) (dsq.Results, error) { +func (d *LogDatastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) { log.Printf("%s: Query\n", d.Name) log.Printf("%s: q.Prefix: %s\n", d.Name, q.Prefix) log.Printf("%s: q.KeysOnly: %v\n", d.Name, q.KeysOnly) @@ -216,7 +217,7 @@ func (d *LogDatastore) Query(q dsq.Query) (dsq.Results, error) { log.Printf("%s: q.Orders: %d\n", d.Name, len(q.Orders)) log.Printf("%s: q.Offset: %d\n", d.Name, q.Offset) - return d.child.Query(q) + return d.child.Query(ctx, q) } // LogBatch logs all accesses through the batch. @@ -225,10 +226,10 @@ type LogBatch struct { child Batch } -func (d *LogDatastore) Batch() (Batch, error) { +func (d *LogDatastore) Batch(ctx context.Context, ) (Batch, error) { log.Printf("%s: Batch\n", d.Name) if bds, ok := d.child.(Batching); ok { - b, err := bds.Batch() + b, err := bds.Batch(ctx) if err != nil { return nil, err @@ -242,22 +243,22 @@ func (d *LogDatastore) Batch() (Batch, error) { } // Put implements Batch.Put -func (d *LogBatch) Put(key Key, value []byte) (err error) { +func (d *LogBatch) Put(ctx context.Context, key Key, value []byte) (err error) { log.Printf("%s: BatchPut %s\n", d.Name, key) // log.Printf("%s: Put %s ```%s```", d.Name, key, value) - return d.child.Put(key, value) + return d.child.Put(ctx, key, value) } // Delete implements Batch.Delete -func (d *LogBatch) Delete(key Key) (err error) { +func (d *LogBatch) Delete(ctx context.Context, key Key) (err error) { log.Printf("%s: BatchDelete %s\n", d.Name, key) - return d.child.Delete(key) + return d.child.Delete(ctx, key) } // Commit implements Batch.Commit -func (d *LogBatch) Commit() (err error) { +func (d *LogBatch) Commit(ctx context.Context, ) (err error) { log.Printf("%s: BatchCommit\n", d.Name) - return d.child.Commit() + return d.child.Commit(ctx) } func (d *LogDatastore) Close() error { @@ -265,23 +266,23 @@ func (d *LogDatastore) Close() error { return d.child.Close() } -func (d *LogDatastore) Check() error { +func (d *LogDatastore) Check(ctx context.Context, ) error { if c, ok := d.child.(CheckedDatastore); ok { - return c.Check() + return c.Check(ctx) } return nil } -func (d *LogDatastore) Scrub() error { +func (d *LogDatastore) Scrub(ctx context.Context, ) error { if c, ok := d.child.(ScrubbedDatastore); ok { - return c.Scrub() + return c.Scrub(ctx) } return nil } -func (d *LogDatastore) CollectGarbage() error { +func (d *LogDatastore) CollectGarbage(ctx context.Context, ) error { if c, ok := d.child.(GCDatastore); ok { - return c.CollectGarbage() + return c.CollectGarbage(ctx) } return nil } diff --git a/batch.go b/batch.go index 41e23ff..ad4f990 100644 --- a/batch.go +++ b/batch.go @@ -1,5 +1,9 @@ package datastore +import ( + "context" +) + type op struct { delete bool value []byte @@ -20,23 +24,23 @@ func NewBasicBatch(ds Datastore) Batch { } } -func (bt *basicBatch) Put(key Key, val []byte) error { +func (bt *basicBatch) Put(ctx context.Context, key Key, val []byte) error { bt.ops[key] = op{value: val} return nil } -func (bt *basicBatch) Delete(key Key) error { +func (bt *basicBatch) Delete(ctx context.Context, key Key) error { bt.ops[key] = op{delete: true} return nil } -func (bt *basicBatch) Commit() error { +func (bt *basicBatch) Commit(ctx context.Context) error { var err error for k, op := range bt.ops { if op.delete { - err = bt.target.Delete(k) + err = bt.target.Delete(ctx, k) } else { - err = bt.target.Put(k, op.value) + err = bt.target.Put(ctx, k, op.value) } if err != nil { break diff --git a/datastore.go b/datastore.go index 04ca726..0d075df 100644 --- a/datastore.go +++ b/datastore.go @@ -1,6 +1,7 @@ package datastore import ( + "context" "errors" "io" "time" @@ -40,7 +41,7 @@ type Datastore interface { // satisfy these requirements then Sync may be a no-op. // // If the prefix fails to Sync this method returns an error. - Sync(prefix Key) error + Sync(ctx context.Context, prefix Key) error io.Closer } @@ -55,29 +56,29 @@ type Write interface { // Ultimately, the lowest-level datastore will need to do some value checking // or risk getting incorrect values. It may also be useful to expose a more // type-safe interface to your application, and do the checking up-front. - Put(key Key, value []byte) error + Put(ctx context.Context, key Key, value []byte) error // Delete removes the value for given `key`. If the key is not in the // datastore, this method returns no error. - Delete(key Key) error + Delete(ctx context.Context, key Key) error } // Read is the read-side of the Datastore interface. type Read interface { // Get retrieves the object `value` named by `key`. // Get will return ErrNotFound if the key is not mapped to a value. - Get(key Key) (value []byte, err error) + Get(ctx context.Context, key Key) (value []byte, err error) // Has returns whether the `key` is mapped to a `value`. // In some contexts, it may be much cheaper only to check for existence of // a value, rather than retrieving the value itself. (e.g. HTTP HEAD). // The default implementation is found in `GetBackedHas`. - Has(key Key) (exists bool, err error) + Has(ctx context.Context, key Key) (exists bool, err error) // GetSize returns the size of the `value` named by `key`. // In some contexts, it may be much cheaper to only get the size of the // value rather than retrieving the value itself. - GetSize(key Key) (size int, err error) + GetSize(ctx context.Context, key Key) (size int, err error) // Query searches the datastore and returns a query result. This function // may return before the query actually runs. To wait for the query: @@ -91,7 +92,7 @@ type Read interface { // entries, _ := result.Rest() // for entry := range entries { ... } // - Query(q query.Query) (query.Results, error) + Query(ctx context.Context, q query.Query) (query.Results, error) } // Batching datastores support deferred, grouped updates to the database. @@ -103,7 +104,7 @@ type Read interface { type Batching interface { Datastore - Batch() (Batch, error) + Batch(ctx context.Context) (Batch, error) } // ErrBatchUnsupported is returned if the by Batch if the Datastore doesn't @@ -115,7 +116,7 @@ var ErrBatchUnsupported = errors.New("this datastore does not support batching") type CheckedDatastore interface { Datastore - Check() error + Check(ctx context.Context) error } // ScrubbedDatastore is an interface that should be implemented by datastores @@ -124,7 +125,7 @@ type CheckedDatastore interface { type ScrubbedDatastore interface { Datastore - Scrub() error + Scrub(ctx context.Context) error } // GCDatastore is an interface that should be implemented by datastores which @@ -132,7 +133,7 @@ type ScrubbedDatastore interface { type GCDatastore interface { Datastore - CollectGarbage() error + CollectGarbage(ctx context.Context) error } // PersistentDatastore is an interface that should be implemented by datastores @@ -141,18 +142,18 @@ type PersistentDatastore interface { Datastore // DiskUsage returns the space used by a datastore, in bytes. - DiskUsage() (uint64, error) + DiskUsage(ctx context.Context) (uint64, error) } // DiskUsage checks if a Datastore is a // PersistentDatastore and returns its DiskUsage(), // otherwise returns 0. -func DiskUsage(d Datastore) (uint64, error) { +func DiskUsage(ctx context.Context, d Datastore) (uint64, error) { persDs, ok := d.(PersistentDatastore) if !ok { return 0, nil } - return persDs.DiskUsage() + return persDs.DiskUsage(ctx) } // TTLDatastore is an interface that should be implemented by datastores that @@ -164,9 +165,9 @@ type TTLDatastore interface { // TTL encapulates the methods that deal with entries with time-to-live. type TTL interface { - PutWithTTL(key Key, value []byte, ttl time.Duration) error - SetTTL(key Key, ttl time.Duration) error - GetExpiration(key Key) (time.Time, error) + PutWithTTL(ctx context.Context, key Key, value []byte, ttl time.Duration) error + SetTTL(ctx context.Context, key Key, ttl time.Duration) error + GetExpiration(ctx context.Context, key Key) (time.Time, error) } // Txn extends the Datastore type. Txns allow users to batch queries and @@ -181,12 +182,12 @@ type Txn interface { // Commit finalizes a transaction, attempting to commit it to the Datastore. // May return an error if the transaction has gone stale. The presence of an // error is an indication that the data was not committed to the Datastore. - Commit() error + Commit(ctx context.Context) error // Discard throws away changes recorded in a transaction without committing // them to the underlying Datastore. Any calls made to Discard after Commit // has been successfully called will have no effect on the transaction and // state of the Datastore, making it safe to defer. - Discard() + Discard(ctx context.Context) } // TxnDatastore is an interface that should be implemented by datastores that @@ -194,7 +195,7 @@ type Txn interface { type TxnDatastore interface { Datastore - NewTransaction(readOnly bool) (Txn, error) + NewTransaction(ctx context.Context, readOnly bool) (Txn, error) } // Errors @@ -218,8 +219,8 @@ var ErrNotFound error = &dsError{error: errors.New("datastore: key not found"), // func (*d SomeDatastore) Has(key Key) (exists bool, err error) { // return GetBackedHas(d, key) // } -func GetBackedHas(ds Read, key Key) (bool, error) { - _, err := ds.Get(key) +func GetBackedHas(ctx context.Context, ds Read, key Key) (bool, error) { + _, err := ds.Get(ctx, key) switch err { case nil: return true, nil @@ -236,8 +237,8 @@ func GetBackedHas(ds Read, key Key) (bool, error) { // func (*d SomeDatastore) GetSize(key Key) (size int, err error) { // return GetBackedSize(d, key) // } -func GetBackedSize(ds Read, key Key) (int, error) { - value, err := ds.Get(key) +func GetBackedSize(ctx context.Context, ds Read, key Key) (int, error) { + value, err := ds.Get(ctx, key) if err == nil { return len(value), nil } @@ -247,5 +248,5 @@ func GetBackedSize(ds Read, key Key) (int, error) { type Batch interface { Write - Commit() error + Commit(ctx context.Context) error } diff --git a/delayed/delayed.go b/delayed/delayed.go index f634c94..8ddaa96 100644 --- a/delayed/delayed.go +++ b/delayed/delayed.go @@ -3,6 +3,7 @@ package delayed import ( + "context" "io" ds "github.com/ipfs/go-datastore" @@ -25,56 +26,56 @@ var _ ds.Batching = (*Delayed)(nil) var _ io.Closer = (*Delayed)(nil) // Put implements the ds.Datastore interface. -func (dds *Delayed) Put(key ds.Key, value []byte) (err error) { +func (dds *Delayed) Put(ctx context.Context, key ds.Key, value []byte) (err error) { dds.delay.Wait() - return dds.ds.Put(key, value) + return dds.ds.Put(ctx, key, value) } // Sync implements Datastore.Sync -func (dds *Delayed) Sync(prefix ds.Key) error { +func (dds *Delayed) Sync(ctx context.Context, prefix ds.Key) error { dds.delay.Wait() - return dds.ds.Sync(prefix) + return dds.ds.Sync(ctx, prefix) } // Get implements the ds.Datastore interface. -func (dds *Delayed) Get(key ds.Key) (value []byte, err error) { +func (dds *Delayed) Get(ctx context.Context, key ds.Key) (value []byte, err error) { dds.delay.Wait() - return dds.ds.Get(key) + return dds.ds.Get(ctx, key) } // Has implements the ds.Datastore interface. -func (dds *Delayed) Has(key ds.Key) (exists bool, err error) { +func (dds *Delayed) Has(ctx context.Context, key ds.Key) (exists bool, err error) { dds.delay.Wait() - return dds.ds.Has(key) + return dds.ds.Has(ctx, key) } // GetSize implements the ds.Datastore interface. -func (dds *Delayed) GetSize(key ds.Key) (size int, err error) { +func (dds *Delayed) GetSize(ctx context.Context, key ds.Key) (size int, err error) { dds.delay.Wait() - return dds.ds.GetSize(key) + return dds.ds.GetSize(ctx, key) } // Delete implements the ds.Datastore interface. -func (dds *Delayed) Delete(key ds.Key) (err error) { +func (dds *Delayed) Delete(ctx context.Context, key ds.Key) (err error) { dds.delay.Wait() - return dds.ds.Delete(key) + return dds.ds.Delete(ctx, key) } // Query implements the ds.Datastore interface. -func (dds *Delayed) Query(q dsq.Query) (dsq.Results, error) { +func (dds *Delayed) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) { dds.delay.Wait() - return dds.ds.Query(q) + return dds.ds.Query(ctx, q) } // Batch implements the ds.Batching interface. -func (dds *Delayed) Batch() (ds.Batch, error) { +func (dds *Delayed) Batch(ctx context.Context) (ds.Batch, error) { return ds.NewBasicBatch(dds), nil } // DiskUsage implements the ds.PersistentDatastore interface. -func (dds *Delayed) DiskUsage() (uint64, error) { +func (dds *Delayed) DiskUsage(ctx context.Context) (uint64, error) { dds.delay.Wait() - return ds.DiskUsage(dds.ds) + return ds.DiskUsage(ctx, dds.ds) } // Close closes the inner datastore (if it implements the io.Closer interface). diff --git a/delayed/delayed_test.go b/delayed/delayed_test.go index 4519f82..3d6987b 100644 --- a/delayed/delayed_test.go +++ b/delayed/delayed_test.go @@ -1,6 +1,7 @@ package delayed import ( + "context" "testing" "time" @@ -10,14 +11,16 @@ import ( ) func TestDelayed(t *testing.T) { + ctx := context.Background() + d := New(datastore.NewMapDatastore(), delay.Fixed(time.Second)) now := time.Now() k := datastore.NewKey("test") - err := d.Put(k, []byte("value")) + err := d.Put(ctx, k, []byte("value")) if err != nil { t.Fatal(err) } - _, err = d.Get(k) + _, err = d.Get(ctx, k) if err != nil { t.Fatal(err) } diff --git a/examples/fs.go b/examples/fs.go index 9c589cd..0f87125 100644 --- a/examples/fs.go +++ b/examples/fs.go @@ -18,6 +18,7 @@ package examples import ( + "context" "fmt" "io/ioutil" "log" @@ -51,7 +52,7 @@ func (d *Datastore) KeyFilename(key ds.Key) string { } // Put stores the given value. -func (d *Datastore) Put(key ds.Key, value []byte) (err error) { +func (d *Datastore) Put(ctx context.Context, key ds.Key, value []byte) (err error) { fn := d.KeyFilename(key) // mkdirall above. @@ -65,12 +66,12 @@ func (d *Datastore) Put(key ds.Key, value []byte) (err error) { // Sync would ensure that any previous Puts under the prefix are written to disk. // However, they already are. -func (d *Datastore) Sync(prefix ds.Key) error { +func (d *Datastore) Sync(ctx context.Context, prefix ds.Key) error { return nil } // Get returns the value for given key -func (d *Datastore) Get(key ds.Key) (value []byte, err error) { +func (d *Datastore) Get(ctx context.Context, key ds.Key) (value []byte, err error) { fn := d.KeyFilename(key) if !isFile(fn) { return nil, ds.ErrNotFound @@ -80,16 +81,16 @@ func (d *Datastore) Get(key ds.Key) (value []byte, err error) { } // Has returns whether the datastore has a value for a given key -func (d *Datastore) Has(key ds.Key) (exists bool, err error) { - return ds.GetBackedHas(d, key) +func (d *Datastore) Has(ctx context.Context, key ds.Key) (exists bool, err error) { + return ds.GetBackedHas(ctx, d, key) } -func (d *Datastore) GetSize(key ds.Key) (size int, err error) { - return ds.GetBackedSize(d, key) +func (d *Datastore) GetSize(ctx context.Context, key ds.Key) (size int, err error) { + return ds.GetBackedSize(ctx, d, key) } // Delete removes the value for given key -func (d *Datastore) Delete(key ds.Key) (err error) { +func (d *Datastore) Delete(ctx context.Context, key ds.Key) (err error) { fn := d.KeyFilename(key) if !isFile(fn) { return nil @@ -103,7 +104,7 @@ func (d *Datastore) Delete(key ds.Key) (err error) { } // Query implements Datastore.Query -func (d *Datastore) Query(q query.Query) (query.Results, error) { +func (d *Datastore) Query(ctx context.Context, q query.Query) (query.Results, error) { results := make(chan query.Result) walkFn := func(path string, info os.FileInfo, _ error) error { @@ -119,7 +120,7 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) { key := ds.NewKey(path) result.Entry.Key = key.String() if !q.KeysOnly { - result.Entry.Value, result.Error = d.Get(key) + result.Entry.Value, result.Error = d.Get(ctx, key) } results <- result } @@ -159,12 +160,12 @@ func (d *Datastore) Close() error { return nil } -func (d *Datastore) Batch() (ds.Batch, error) { +func (d *Datastore) Batch(ctx context.Context) (ds.Batch, error) { return ds.NewBasicBatch(d), nil } // DiskUsage returns the disk size used by the datastore in bytes. -func (d *Datastore) DiskUsage() (uint64, error) { +func (d *Datastore) DiskUsage(ctx context.Context) (uint64, error) { var du uint64 err := filepath.Walk(d.path, func(p string, f os.FileInfo, err error) error { if err != nil { diff --git a/examples/fs_test.go b/examples/fs_test.go index f222497..9a9c550 100644 --- a/examples/fs_test.go +++ b/examples/fs_test.go @@ -2,6 +2,7 @@ package examples import ( "bytes" + "context" "testing" . "gopkg.in/check.v1" @@ -35,6 +36,7 @@ func (ks *DSSuite) TestOpen(c *C) { } func (ks *DSSuite) TestBasic(c *C) { + ctx := context.Background() keys := strsToKeys([]string{ "foo", @@ -46,17 +48,17 @@ func (ks *DSSuite) TestBasic(c *C) { }) for _, k := range keys { - err := ks.ds.Put(k, []byte(k.String())) + err := ks.ds.Put(ctx, k, []byte(k.String())) c.Check(err, Equals, nil) } for _, k := range keys { - v, err := ks.ds.Get(k) + v, err := ks.ds.Get(ctx, k) c.Check(err, Equals, nil) c.Check(bytes.Equal(v, []byte(k.String())), Equals, true) } - r, err := ks.ds.Query(query.Query{Prefix: "/foo/bar/"}) + r, err := ks.ds.Query(ctx, query.Query{Prefix: "/foo/bar/"}) if err != nil { c.Check(err, Equals, nil) } @@ -87,6 +89,8 @@ func (ks *DSSuite) TestBasic(c *C) { } func (ks *DSSuite) TestDiskUsage(c *C) { + ctx := context.Background() + keys := strsToKeys([]string{ "foo", "foo/bar", @@ -100,12 +104,12 @@ func (ks *DSSuite) TestDiskUsage(c *C) { for _, k := range keys { value := []byte(k.String()) totalBytes += len(value) - err := ks.ds.Put(k, value) + err := ks.ds.Put(ctx, k, value) c.Check(err, Equals, nil) } if ps, ok := ks.ds.(ds.PersistentDatastore); ok { - if s, err := ps.DiskUsage(); s != uint64(totalBytes) || err != nil { + if s, err := ps.DiskUsage(ctx); s != uint64(totalBytes) || err != nil { c.Error("unexpected size is: ", s) } } else { diff --git a/failstore/failstore.go b/failstore/failstore.go index f790f96..a1d7ea7 100644 --- a/failstore/failstore.go +++ b/failstore/failstore.go @@ -4,6 +4,8 @@ package failstore import ( + "context" + ds "github.com/ipfs/go-datastore" dsq "github.com/ipfs/go-datastore/query" ) @@ -27,81 +29,81 @@ func NewFailstore(c ds.Datastore, efunc func(string) error) *Failstore { } // Put puts a key/value into the datastore. -func (d *Failstore) Put(k ds.Key, val []byte) error { +func (d *Failstore) Put(ctx context.Context, k ds.Key, val []byte) error { err := d.errfunc("put") if err != nil { return err } - return d.child.Put(k, val) + return d.child.Put(ctx, k, val) } // Sync implements Datastore.Sync -func (d *Failstore) Sync(prefix ds.Key) error { +func (d *Failstore) Sync(ctx context.Context, prefix ds.Key) error { err := d.errfunc("sync") if err != nil { return err } - return d.child.Sync(prefix) + return d.child.Sync(ctx, prefix) } // Get retrieves a value from the datastore. -func (d *Failstore) Get(k ds.Key) ([]byte, error) { +func (d *Failstore) Get(ctx context.Context, k ds.Key) ([]byte, error) { err := d.errfunc("get") if err != nil { return nil, err } - return d.child.Get(k) + return d.child.Get(ctx, k) } // Has returns if the datastore contains a key/value. -func (d *Failstore) Has(k ds.Key) (bool, error) { +func (d *Failstore) Has(ctx context.Context, k ds.Key) (bool, error) { err := d.errfunc("has") if err != nil { return false, err } - return d.child.Has(k) + return d.child.Has(ctx, k) } // GetSize returns the size of the value in the datastore, if present. -func (d *Failstore) GetSize(k ds.Key) (int, error) { +func (d *Failstore) GetSize(ctx context.Context, k ds.Key) (int, error) { err := d.errfunc("getsize") if err != nil { return -1, err } - return d.child.GetSize(k) + return d.child.GetSize(ctx, k) } // Delete removes a key/value from the datastore. -func (d *Failstore) Delete(k ds.Key) error { +func (d *Failstore) Delete(ctx context.Context, k ds.Key) error { err := d.errfunc("delete") if err != nil { return err } - return d.child.Delete(k) + return d.child.Delete(ctx, k) } // Query performs a query on the datastore. -func (d *Failstore) Query(q dsq.Query) (dsq.Results, error) { +func (d *Failstore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) { err := d.errfunc("query") if err != nil { return nil, err } - return d.child.Query(q) + return d.child.Query(ctx, q) } // DiskUsage implements the PersistentDatastore interface. -func (d *Failstore) DiskUsage() (uint64, error) { +func (d *Failstore) DiskUsage(ctx context.Context) (uint64, error) { if err := d.errfunc("disk-usage"); err != nil { return 0, err } - return ds.DiskUsage(d.child) + return ds.DiskUsage(ctx, d.child) } // Close implements the Datastore interface @@ -116,12 +118,12 @@ type FailBatch struct { } // Batch returns a new Batch Failstore. -func (d *Failstore) Batch() (ds.Batch, error) { +func (d *Failstore) Batch(ctx context.Context) (ds.Batch, error) { if err := d.errfunc("batch"); err != nil { return nil, err } - b, err := d.child.(ds.Batching).Batch() + b, err := d.child.(ds.Batching).Batch(ctx) if err != nil { return nil, err } @@ -133,28 +135,28 @@ func (d *Failstore) Batch() (ds.Batch, error) { } // Put does a batch put. -func (b *FailBatch) Put(k ds.Key, val []byte) error { +func (b *FailBatch) Put(ctx context.Context, k ds.Key, val []byte) error { if err := b.dstore.errfunc("batch-put"); err != nil { return err } - return b.cb.Put(k, val) + return b.cb.Put(ctx, k, val) } // Delete does a batch delete. -func (b *FailBatch) Delete(k ds.Key) error { +func (b *FailBatch) Delete(ctx context.Context, k ds.Key) error { if err := b.dstore.errfunc("batch-delete"); err != nil { return err } - return b.cb.Delete(k) + return b.cb.Delete(ctx, k) } // Commit commits all operations in the batch. -func (b *FailBatch) Commit() error { +func (b *FailBatch) Commit(ctx context.Context) error { if err := b.dstore.errfunc("batch-commit"); err != nil { return err } - return b.cb.Commit() + return b.cb.Commit(ctx) } diff --git a/go.sum b/go.sum index 162cb11..67e7857 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= @@ -16,17 +18,21 @@ github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= +go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -39,6 +45,7 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -46,4 +53,5 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/keytransform/keytransform.go b/keytransform/keytransform.go index cd03487..9498c94 100644 --- a/keytransform/keytransform.go +++ b/keytransform/keytransform.go @@ -1,6 +1,8 @@ package keytransform import ( + "context" + ds "github.com/ipfs/go-datastore" dsq "github.com/ipfs/go-datastore/query" ) @@ -33,42 +35,42 @@ func (d *Datastore) Children() []ds.Datastore { } // Put stores the given value, transforming the key first. -func (d *Datastore) Put(key ds.Key, value []byte) (err error) { - return d.child.Put(d.ConvertKey(key), value) +func (d *Datastore) Put(ctx context.Context, key ds.Key, value []byte) (err error) { + return d.child.Put(ctx, d.ConvertKey(key), value) } // Sync implements Datastore.Sync -func (d *Datastore) Sync(prefix ds.Key) error { - return d.child.Sync(d.ConvertKey(prefix)) +func (d *Datastore) Sync(ctx context.Context, prefix ds.Key) error { + return d.child.Sync(ctx, d.ConvertKey(prefix)) } // Get returns the value for given key, transforming the key first. -func (d *Datastore) Get(key ds.Key) (value []byte, err error) { - return d.child.Get(d.ConvertKey(key)) +func (d *Datastore) Get(ctx context.Context, key ds.Key) (value []byte, err error) { + return d.child.Get(ctx, d.ConvertKey(key)) } // Has returns whether the datastore has a value for a given key, transforming // the key first. -func (d *Datastore) Has(key ds.Key) (exists bool, err error) { - return d.child.Has(d.ConvertKey(key)) +func (d *Datastore) Has(ctx context.Context, key ds.Key) (exists bool, err error) { + return d.child.Has(ctx, d.ConvertKey(key)) } // GetSize returns the size of the value named by the given key, transforming // the key first. -func (d *Datastore) GetSize(key ds.Key) (size int, err error) { - return d.child.GetSize(d.ConvertKey(key)) +func (d *Datastore) GetSize(ctx context.Context, key ds.Key) (size int, err error) { + return d.child.GetSize(ctx, d.ConvertKey(key)) } // Delete removes the value for given key -func (d *Datastore) Delete(key ds.Key) (err error) { - return d.child.Delete(d.ConvertKey(key)) +func (d *Datastore) Delete(ctx context.Context, key ds.Key) (err error) { + return d.child.Delete(ctx, d.ConvertKey(key)) } // Query implements Query, inverting keys on the way back out. -func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) { +func (d *Datastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) { nq, cq := d.prepareQuery(q) - cqr, err := d.child.Query(cq) + cqr, err := d.child.Query(ctx, cq) if err != nil { return nil, err } @@ -194,17 +196,17 @@ func (d *Datastore) Close() error { } // DiskUsage implements the PersistentDatastore interface. -func (d *Datastore) DiskUsage() (uint64, error) { - return ds.DiskUsage(d.child) +func (d *Datastore) DiskUsage(ctx context.Context) (uint64, error) { + return ds.DiskUsage(ctx, d.child) } -func (d *Datastore) Batch() (ds.Batch, error) { +func (d *Datastore) Batch(ctx context.Context) (ds.Batch, error) { bds, ok := d.child.(ds.Batching) if !ok { return nil, ds.ErrBatchUnsupported } - childbatch, err := bds.Batch() + childbatch, err := bds.Batch(ctx) if err != nil { return nil, err } @@ -220,35 +222,35 @@ type transformBatch struct { f KeyMapping } -func (t *transformBatch) Put(key ds.Key, val []byte) error { - return t.dst.Put(t.f(key), val) +func (t *transformBatch) Put(ctx context.Context, key ds.Key, val []byte) error { + return t.dst.Put(ctx, t.f(key), val) } -func (t *transformBatch) Delete(key ds.Key) error { - return t.dst.Delete(t.f(key)) +func (t *transformBatch) Delete(ctx context.Context, key ds.Key) error { + return t.dst.Delete(ctx, t.f(key)) } -func (t *transformBatch) Commit() error { - return t.dst.Commit() +func (t *transformBatch) Commit(ctx context.Context) error { + return t.dst.Commit(ctx) } -func (d *Datastore) Check() error { +func (d *Datastore) Check(ctx context.Context) error { if c, ok := d.child.(ds.CheckedDatastore); ok { - return c.Check() + return c.Check(ctx) } return nil } -func (d *Datastore) Scrub() error { +func (d *Datastore) Scrub(ctx context.Context) error { if c, ok := d.child.(ds.ScrubbedDatastore); ok { - return c.Scrub() + return c.Scrub(ctx) } return nil } -func (d *Datastore) CollectGarbage() error { +func (d *Datastore) CollectGarbage(ctx context.Context) error { if c, ok := d.child.(ds.GCDatastore); ok { - return c.CollectGarbage() + return c.CollectGarbage(ctx) } return nil } diff --git a/keytransform/keytransform_test.go b/keytransform/keytransform_test.go index c799c92..2e5de09 100644 --- a/keytransform/keytransform_test.go +++ b/keytransform/keytransform_test.go @@ -2,6 +2,7 @@ package keytransform_test import ( "bytes" + "context" "sort" "testing" @@ -35,6 +36,8 @@ var pair = &kt.Pair{ } func (ks *DSSuite) TestBasic(c *C) { + ctx := context.Background() + mpds := dstest.NewTestDatastore(true) ktds := kt.Wrap(mpds, pair) @@ -48,22 +51,22 @@ func (ks *DSSuite) TestBasic(c *C) { }) for _, k := range keys { - err := ktds.Put(k, []byte(k.String())) + err := ktds.Put(ctx, k, []byte(k.String())) c.Check(err, Equals, nil) } for _, k := range keys { - v1, err := ktds.Get(k) + v1, err := ktds.Get(ctx, k) c.Check(err, Equals, nil) c.Check(bytes.Equal(v1, []byte(k.String())), Equals, true) - v2, err := mpds.Get(ds.NewKey("abc").Child(k)) + v2, err := mpds.Get(ctx, ds.NewKey("abc").Child(k)) c.Check(err, Equals, nil) c.Check(bytes.Equal(v2, []byte(k.String())), Equals, true) } run := func(d ds.Datastore, q dsq.Query) []ds.Key { - r, err := d.Query(q) + r, err := d.Query(ctx, q) c.Check(err, Equals, nil) e, err := r.Rest() @@ -89,15 +92,15 @@ func (ks *DSSuite) TestBasic(c *C) { c.Log("listA: ", listA) c.Log("listB: ", listB) - if err := ktds.Check(); err != dstest.ErrTest { + if err := ktds.Check(ctx); err != nil && err != dstest.ErrTest { c.Errorf("Unexpected Check() error: %s", err) } - if err := ktds.CollectGarbage(); err != dstest.ErrTest { + if err := ktds.CollectGarbage(ctx); err != nil && err != dstest.ErrTest { c.Errorf("Unexpected CollectGarbage() error: %s", err) } - if err := ktds.Scrub(); err != dstest.ErrTest { + if err := ktds.Scrub(ctx); err != nil && err != dstest.ErrTest { c.Errorf("Unexpected Scrub() error: %s", err) } } diff --git a/mount/mount.go b/mount/mount.go index ff1f66c..769a7e7 100644 --- a/mount/mount.go +++ b/mount/mount.go @@ -4,6 +4,7 @@ package mount import ( "container/heap" + "context" "errors" "fmt" "sort" @@ -232,23 +233,23 @@ func (d *Datastore) lookupAll(key ds.Key) (dst []ds.Datastore, mountpoint, rest // // Returns ErrNoMount if there no datastores are mounted at the appropriate // prefix for the given key. -func (d *Datastore) Put(key ds.Key, value []byte) error { +func (d *Datastore) Put(ctx context.Context, key ds.Key, value []byte) error { cds, _, k := d.lookup(key) if cds == nil { return ErrNoMount } - return cds.Put(k, value) + return cds.Put(ctx, k, value) } // Sync implements Datastore.Sync -func (d *Datastore) Sync(prefix ds.Key) error { +func (d *Datastore) Sync(ctx context.Context, prefix ds.Key) error { var merr error // Sync all mount points below the prefix // Sync the mount point right at (or above) the prefix dstores, prefixes, rest := d.lookupAll(prefix) for i, suffix := range rest { - if err := dstores[i].Sync(suffix); err != nil { + if err := dstores[i].Sync(ctx, suffix); err != nil { merr = multierr.Append(merr, fmt.Errorf( "syncing datastore at %s: %w", prefixes[i].String(), @@ -261,44 +262,44 @@ func (d *Datastore) Sync(prefix ds.Key) error { } // Get returns the value associated with the key from the appropriate datastore. -func (d *Datastore) Get(key ds.Key) (value []byte, err error) { +func (d *Datastore) Get(ctx context.Context, key ds.Key) (value []byte, err error) { cds, _, k := d.lookup(key) if cds == nil { return nil, ds.ErrNotFound } - return cds.Get(k) + return cds.Get(ctx, k) } // Has returns the true if there exists a value associated with key in the // appropriate datastore. -func (d *Datastore) Has(key ds.Key) (exists bool, err error) { +func (d *Datastore) Has(ctx context.Context, key ds.Key) (exists bool, err error) { cds, _, k := d.lookup(key) if cds == nil { return false, nil } - return cds.Has(k) + return cds.Has(ctx, k) } // Get returns the size of the value associated with the key in the appropriate // datastore. -func (d *Datastore) GetSize(key ds.Key) (size int, err error) { +func (d *Datastore) GetSize(ctx context.Context, key ds.Key) (size int, err error) { cds, _, k := d.lookup(key) if cds == nil { return -1, ds.ErrNotFound } - return cds.GetSize(k) + return cds.GetSize(ctx, k) } // Delete deletes the value associated with the key in the appropriate // datastore. // // Delete returns no error if there is no value associated with the given key. -func (d *Datastore) Delete(key ds.Key) error { +func (d *Datastore) Delete(ctx context.Context, key ds.Key) error { cds, _, k := d.lookup(key) if cds == nil { return nil } - return cds.Delete(k) + return cds.Delete(ctx, k) } // Query queries the appropriate mounted datastores, merging the results @@ -306,7 +307,7 @@ func (d *Datastore) Delete(key ds.Key) error { // // If a query prefix is specified, Query will avoid querying datastores mounted // outside that prefix. -func (d *Datastore) Query(master query.Query) (query.Results, error) { +func (d *Datastore) Query(ctx context.Context, master query.Query) (query.Results, error) { childQuery := query.Query{ Prefix: master.Prefix, Orders: master.Orders, @@ -330,7 +331,7 @@ func (d *Datastore) Query(master query.Query) (query.Results, error) { qi := childQuery qi.Prefix = rest.String() - results, err := dstore.Query(qi) + results, err := dstore.Query(ctx, qi) if err != nil { _ = queries.close() @@ -379,13 +380,13 @@ func (d *Datastore) Close() error { // DiskUsage returns the sum of DiskUsages for the mounted datastores. // Non PersistentDatastores will not be accounted. -func (d *Datastore) DiskUsage() (uint64, error) { +func (d *Datastore) DiskUsage(ctx context.Context) (uint64, error) { var ( merr error duTotal uint64 = 0 ) for _, d := range d.mounts { - du, err := ds.DiskUsage(d.Datastore) + du, err := ds.DiskUsage(ctx, d.Datastore) duTotal += du if err != nil { merr = multierr.Append(merr, fmt.Errorf( @@ -406,14 +407,14 @@ type mountBatch struct { } // Batch returns a batch that operates over all mounted datastores. -func (d *Datastore) Batch() (ds.Batch, error) { +func (d *Datastore) Batch(ctx context.Context) (ds.Batch, error) { return &mountBatch{ mounts: make(map[string]ds.Batch), d: d, }, nil } -func (mt *mountBatch) lookupBatch(key ds.Key) (ds.Batch, ds.Key, error) { +func (mt *mountBatch) lookupBatch(ctx context.Context, key ds.Key) (ds.Batch, ds.Key, error) { mt.lk.Lock() defer mt.lk.Unlock() @@ -425,7 +426,7 @@ func (mt *mountBatch) lookupBatch(key ds.Key) (ds.Batch, ds.Key, error) { return nil, ds.NewKey(""), ds.ErrBatchUnsupported } var err error - t, err = bds.Batch() + t, err = bds.Batch(ctx) if err != nil { return nil, ds.NewKey(""), err } @@ -434,31 +435,31 @@ func (mt *mountBatch) lookupBatch(key ds.Key) (ds.Batch, ds.Key, error) { return t, rest, nil } -func (mt *mountBatch) Put(key ds.Key, val []byte) error { - t, rest, err := mt.lookupBatch(key) +func (mt *mountBatch) Put(ctx context.Context, key ds.Key, val []byte) error { + t, rest, err := mt.lookupBatch(ctx, key) if err != nil { return err } - return t.Put(rest, val) + return t.Put(ctx, rest, val) } -func (mt *mountBatch) Delete(key ds.Key) error { - t, rest, err := mt.lookupBatch(key) +func (mt *mountBatch) Delete(ctx context.Context, key ds.Key) error { + t, rest, err := mt.lookupBatch(ctx, key) if err != nil { return err } - return t.Delete(rest) + return t.Delete(ctx, rest) } -func (mt *mountBatch) Commit() error { +func (mt *mountBatch) Commit(ctx context.Context) error { mt.lk.Lock() defer mt.lk.Unlock() var merr error for p, t := range mt.mounts { - if err := t.Commit(); err != nil { + if err := t.Commit(ctx); err != nil { merr = multierr.Append(merr, fmt.Errorf( "committing batch to datastore at %s: %w", p, err, @@ -468,11 +469,11 @@ func (mt *mountBatch) Commit() error { return merr } -func (d *Datastore) Check() error { +func (d *Datastore) Check(ctx context.Context) error { var merr error for _, m := range d.mounts { if c, ok := m.Datastore.(ds.CheckedDatastore); ok { - if err := c.Check(); err != nil { + if err := c.Check(ctx); err != nil { merr = multierr.Append(merr, fmt.Errorf( "checking datastore at %s: %w", m.Prefix.String(), @@ -484,11 +485,11 @@ func (d *Datastore) Check() error { return merr } -func (d *Datastore) Scrub() error { +func (d *Datastore) Scrub(ctx context.Context) error { var merr error for _, m := range d.mounts { if c, ok := m.Datastore.(ds.ScrubbedDatastore); ok { - if err := c.Scrub(); err != nil { + if err := c.Scrub(ctx); err != nil { merr = multierr.Append(merr, fmt.Errorf( "scrubbing datastore at %s: %w", m.Prefix.String(), @@ -500,11 +501,11 @@ func (d *Datastore) Scrub() error { return merr } -func (d *Datastore) CollectGarbage() error { +func (d *Datastore) CollectGarbage(ctx context.Context) error { var merr error for _, m := range d.mounts { if c, ok := m.Datastore.(ds.GCDatastore); ok { - if err := c.CollectGarbage(); err != nil { + if err := c.CollectGarbage(ctx); err != nil { merr = multierr.Append(merr, fmt.Errorf( "gc on datastore at %s: %w", m.Prefix.String(), diff --git a/mount/mount_test.go b/mount/mount_test.go index a3059b4..6859701 100644 --- a/mount/mount_test.go +++ b/mount/mount_test.go @@ -1,6 +1,7 @@ package mount_test import ( + "context" "errors" "testing" @@ -13,37 +14,43 @@ import ( ) func TestPutBadNothing(t *testing.T) { + ctx := context.Background() + m := mount.New(nil) - err := m.Put(datastore.NewKey("quux"), []byte("foobar")) + err := m.Put(ctx, datastore.NewKey("quux"), []byte("foobar")) if g, e := err, mount.ErrNoMount; g != e { t.Fatalf("Put got wrong error: %v != %v", g, e) } } func TestPutBadNoMount(t *testing.T) { + ctx := context.Background() + mapds := datastore.NewMapDatastore() m := mount.New([]mount.Mount{ {Prefix: datastore.NewKey("/redherring"), Datastore: mapds}, }) - err := m.Put(datastore.NewKey("/quux/thud"), []byte("foobar")) + err := m.Put(ctx, datastore.NewKey("/quux/thud"), []byte("foobar")) if g, e := err, mount.ErrNoMount; g != e { t.Fatalf("expected ErrNoMount, got: %v\n", g) } } func TestPut(t *testing.T) { + ctx := context.Background() + mapds := datastore.NewMapDatastore() m := mount.New([]mount.Mount{ {Prefix: datastore.NewKey("/quux"), Datastore: mapds}, }) - if err := m.Put(datastore.NewKey("/quux/thud"), []byte("foobar")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/quux/thud"), []byte("foobar")); err != nil { t.Fatalf("Put error: %v", err) } - buf, err := mapds.Get(datastore.NewKey("/thud")) + buf, err := mapds.Get(ctx, datastore.NewKey("/thud")) if err != nil { t.Fatalf("Get error: %v", err) } @@ -53,49 +60,57 @@ func TestPut(t *testing.T) { } func TestGetBadNothing(t *testing.T) { + ctx := context.Background() + m := mount.New([]mount.Mount{}) - _, err := m.Get(datastore.NewKey("/quux/thud")) + _, err := m.Get(ctx, datastore.NewKey("/quux/thud")) if g, e := err, datastore.ErrNotFound; g != e { t.Fatalf("expected ErrNotFound, got: %v\n", g) } } func TestGetBadNoMount(t *testing.T) { + ctx := context.Background() + mapds := datastore.NewMapDatastore() m := mount.New([]mount.Mount{ {Prefix: datastore.NewKey("/redherring"), Datastore: mapds}, }) - _, err := m.Get(datastore.NewKey("/quux/thud")) + _, err := m.Get(ctx, datastore.NewKey("/quux/thud")) if g, e := err, datastore.ErrNotFound; g != e { t.Fatalf("expected ErrNotFound, got: %v\n", g) } } func TestGetNotFound(t *testing.T) { + ctx := context.Background() + mapds := datastore.NewMapDatastore() m := mount.New([]mount.Mount{ {Prefix: datastore.NewKey("/quux"), Datastore: mapds}, }) - _, err := m.Get(datastore.NewKey("/quux/thud")) + _, err := m.Get(ctx, datastore.NewKey("/quux/thud")) if g, e := err, datastore.ErrNotFound; g != e { t.Fatalf("expected ErrNotFound, got: %v\n", g) } } func TestGet(t *testing.T) { + ctx := context.Background() + mapds := datastore.NewMapDatastore() m := mount.New([]mount.Mount{ {Prefix: datastore.NewKey("/quux"), Datastore: mapds}, }) - if err := mapds.Put(datastore.NewKey("/thud"), []byte("foobar")); err != nil { + if err := mapds.Put(ctx, datastore.NewKey("/thud"), []byte("foobar")); err != nil { t.Fatalf("Get error: %v", err) } - buf, err := m.Get(datastore.NewKey("/quux/thud")) + buf, err := m.Get(ctx, datastore.NewKey("/quux/thud")) if err != nil { t.Fatalf("Put error: %v", err) } @@ -105,9 +120,11 @@ func TestGet(t *testing.T) { } func TestHasBadNothing(t *testing.T) { + ctx := context.Background() + m := mount.New([]mount.Mount{}) - found, err := m.Has(datastore.NewKey("/quux/thud")) + found, err := m.Has(ctx, datastore.NewKey("/quux/thud")) if err != nil { t.Fatalf("Has error: %v", err) } @@ -117,12 +134,14 @@ func TestHasBadNothing(t *testing.T) { } func TestHasBadNoMount(t *testing.T) { + ctx := context.Background() + mapds := datastore.NewMapDatastore() m := mount.New([]mount.Mount{ {Prefix: datastore.NewKey("/redherring"), Datastore: mapds}, }) - found, err := m.Has(datastore.NewKey("/quux/thud")) + found, err := m.Has(ctx, datastore.NewKey("/quux/thud")) if err != nil { t.Fatalf("Has error: %v", err) } @@ -132,12 +151,14 @@ func TestHasBadNoMount(t *testing.T) { } func TestHasNotFound(t *testing.T) { + ctx := context.Background() + mapds := datastore.NewMapDatastore() m := mount.New([]mount.Mount{ {Prefix: datastore.NewKey("/quux"), Datastore: mapds}, }) - found, err := m.Has(datastore.NewKey("/quux/thud")) + found, err := m.Has(ctx, datastore.NewKey("/quux/thud")) if err != nil { t.Fatalf("Has error: %v", err) } @@ -147,16 +168,18 @@ func TestHasNotFound(t *testing.T) { } func TestHas(t *testing.T) { + ctx := context.Background() + mapds := datastore.NewMapDatastore() m := mount.New([]mount.Mount{ {Prefix: datastore.NewKey("/quux"), Datastore: mapds}, }) - if err := mapds.Put(datastore.NewKey("/thud"), []byte("foobar")); err != nil { + if err := mapds.Put(ctx, datastore.NewKey("/thud"), []byte("foobar")); err != nil { t.Fatalf("Put error: %v", err) } - found, err := m.Has(datastore.NewKey("/quux/thud")) + found, err := m.Has(ctx, datastore.NewKey("/quux/thud")) if err != nil { t.Fatalf("Has error: %v", err) } @@ -166,34 +189,38 @@ func TestHas(t *testing.T) { } func TestDeleteNotFound(t *testing.T) { + ctx := context.Background() + mapds := datastore.NewMapDatastore() m := mount.New([]mount.Mount{ {Prefix: datastore.NewKey("/quux"), Datastore: mapds}, }) - err := m.Delete(datastore.NewKey("/quux/thud")) + err := m.Delete(ctx, datastore.NewKey("/quux/thud")) if err != nil { t.Fatalf("expected nil, got: %v\n", err) } } func TestDelete(t *testing.T) { + ctx := context.Background() + mapds := datastore.NewMapDatastore() m := mount.New([]mount.Mount{ {Prefix: datastore.NewKey("/quux"), Datastore: mapds}, }) - if err := mapds.Put(datastore.NewKey("/thud"), []byte("foobar")); err != nil { + if err := mapds.Put(ctx, datastore.NewKey("/thud"), []byte("foobar")); err != nil { t.Fatalf("Put error: %v", err) } - err := m.Delete(datastore.NewKey("/quux/thud")) + err := m.Delete(ctx, datastore.NewKey("/quux/thud")) if err != nil { t.Fatalf("Delete error: %v", err) } // make sure it disappeared - found, err := mapds.Has(datastore.NewKey("/thud")) + found, err := mapds.Has(ctx, datastore.NewKey("/thud")) if err != nil { t.Fatalf("Has error: %v", err) } @@ -203,17 +230,19 @@ func TestDelete(t *testing.T) { } func TestQuerySimple(t *testing.T) { + ctx := context.Background() + mapds := datastore.NewMapDatastore() m := mount.New([]mount.Mount{ {Prefix: datastore.NewKey("/quux"), Datastore: mapds}, }) const myKey = "/quux/thud" - if err := m.Put(datastore.NewKey(myKey), []byte("foobar")); err != nil { + if err := m.Put(ctx, datastore.NewKey(myKey), []byte("foobar")); err != nil { t.Fatalf("Put error: %v", err) } - res, err := m.Query(query.Query{Prefix: "/quux"}) + res, err := m.Query(ctx, query.Query{Prefix: "/quux"}) if err != nil { t.Fatalf("Query fail: %v\n", err) } @@ -241,6 +270,8 @@ func TestQuerySimple(t *testing.T) { } func TestQueryAcrossMounts(t *testing.T) { + ctx := context.Background() + mapds0 := datastore.NewMapDatastore() mapds1 := datastore.NewMapDatastore() mapds2 := datastore.NewMapDatastore() @@ -252,25 +283,25 @@ func TestQueryAcrossMounts(t *testing.T) { {Prefix: datastore.NewKey("/"), Datastore: mapds0}, }) - if err := m.Put(datastore.NewKey("/foo/lorem"), []byte("123")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/foo/lorem"), []byte("123")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/bar/ipsum"), []byte("234")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/bar/ipsum"), []byte("234")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/bar/dolor"), []byte("345")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/bar/dolor"), []byte("345")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/baz/sit"), []byte("456")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/baz/sit"), []byte("456")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/banana"), []byte("567")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/banana"), []byte("567")); err != nil { t.Fatal(err) } expect := func(prefix string, values map[string]string) { t.Helper() - res, err := m.Query(query.Query{Prefix: prefix}) + res, err := m.Query(ctx, query.Query{Prefix: prefix}) if err != nil { t.Fatalf("Query fail: %v\n", err) } @@ -322,6 +353,8 @@ func TestQueryAcrossMounts(t *testing.T) { } func TestQueryAcrossMountsWithSort(t *testing.T) { + ctx := context.Background() + mapds0 := datastore.NewMapDatastore() mapds1 := datastore.NewMapDatastore() mapds2 := datastore.NewMapDatastore() @@ -331,23 +364,23 @@ func TestQueryAcrossMountsWithSort(t *testing.T) { {Prefix: datastore.NewKey("/boo"), Datastore: mapds0}, }) - if err := m.Put(datastore.NewKey("/zoo/0"), []byte("123")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/zoo/0"), []byte("123")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/zoo/1"), []byte("234")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/zoo/1"), []byte("234")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/boo/9"), []byte("345")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/boo/9"), []byte("345")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/boo/3"), []byte("456")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/boo/3"), []byte("456")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/boo/5/hello"), []byte("789")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/boo/5/hello"), []byte("789")); err != nil { t.Fatal(err) } - res, err := m.Query(query.Query{Orders: []query.Order{query.OrderByKey{}}}) + res, err := m.Query(ctx, query.Query{Orders: []query.Order{query.OrderByKey{}}}) if err != nil { t.Fatalf("Query fail: %v\n", err) } @@ -381,6 +414,8 @@ func TestQueryAcrossMountsWithSort(t *testing.T) { } func TestQueryLimitAcrossMountsWithSort(t *testing.T) { + ctx := context.Background() + mapds1 := sync.MutexWrap(datastore.NewMapDatastore()) mapds2 := sync.MutexWrap(datastore.NewMapDatastore()) mapds3 := sync.MutexWrap(datastore.NewMapDatastore()) @@ -390,30 +425,30 @@ func TestQueryLimitAcrossMountsWithSort(t *testing.T) { {Prefix: datastore.NewKey("/noop"), Datastore: mapds3}, }) - if err := m.Put(datastore.NewKey("/rok/0"), []byte("ghi")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/rok/0"), []byte("ghi")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/zoo/0"), []byte("123")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/zoo/0"), []byte("123")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/rok/1"), []byte("def")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/rok/1"), []byte("def")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/zoo/1"), []byte("167")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/zoo/1"), []byte("167")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/zoo/2"), []byte("345")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/zoo/2"), []byte("345")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/rok/3"), []byte("abc")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/rok/3"), []byte("abc")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/zoo/3"), []byte("456")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/zoo/3"), []byte("456")); err != nil { t.Fatal(err) } q := query.Query{Limit: 2, Orders: []query.Order{query.OrderByKeyDescending{}}} - res, err := m.Query(q) + res, err := m.Query(ctx, q) if err != nil { t.Fatalf("Query fail: %v\n", err) } @@ -445,6 +480,8 @@ func TestQueryLimitAcrossMountsWithSort(t *testing.T) { } func TestQueryLimitAndOffsetAcrossMountsWithSort(t *testing.T) { + ctx := context.Background() + mapds1 := sync.MutexWrap(datastore.NewMapDatastore()) mapds2 := sync.MutexWrap(datastore.NewMapDatastore()) mapds3 := sync.MutexWrap(datastore.NewMapDatastore()) @@ -454,30 +491,30 @@ func TestQueryLimitAndOffsetAcrossMountsWithSort(t *testing.T) { {Prefix: datastore.NewKey("/noop"), Datastore: mapds3}, }) - if err := m.Put(datastore.NewKey("/rok/0"), []byte("ghi")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/rok/0"), []byte("ghi")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/zoo/0"), []byte("123")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/zoo/0"), []byte("123")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/rok/1"), []byte("def")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/rok/1"), []byte("def")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/zoo/1"), []byte("167")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/zoo/1"), []byte("167")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/zoo/2"), []byte("345")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/zoo/2"), []byte("345")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/rok/3"), []byte("abc")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/rok/3"), []byte("abc")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/zoo/3"), []byte("456")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/zoo/3"), []byte("456")); err != nil { t.Fatal(err) } q := query.Query{Limit: 3, Offset: 2, Orders: []query.Order{query.OrderByKey{}}} - res, err := m.Query(q) + res, err := m.Query(ctx, q) if err != nil { t.Fatalf("Query fail: %v\n", err) } @@ -510,6 +547,8 @@ func TestQueryLimitAndOffsetAcrossMountsWithSort(t *testing.T) { } func TestQueryFilterAcrossMountsWithSort(t *testing.T) { + ctx := context.Background() + mapds1 := sync.MutexWrap(datastore.NewMapDatastore()) mapds2 := sync.MutexWrap(datastore.NewMapDatastore()) mapds3 := sync.MutexWrap(datastore.NewMapDatastore()) @@ -519,31 +558,31 @@ func TestQueryFilterAcrossMountsWithSort(t *testing.T) { {Prefix: datastore.NewKey("/noop"), Datastore: mapds3}, }) - if err := m.Put(datastore.NewKey("/rok/0"), []byte("ghi")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/rok/0"), []byte("ghi")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/zoo/0"), []byte("123")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/zoo/0"), []byte("123")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/rok/1"), []byte("def")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/rok/1"), []byte("def")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/zoo/1"), []byte("167")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/zoo/1"), []byte("167")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/zoo/2"), []byte("345")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/zoo/2"), []byte("345")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/rok/3"), []byte("abc")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/rok/3"), []byte("abc")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/zoo/3"), []byte("456")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/zoo/3"), []byte("456")); err != nil { t.Fatal(err) } f := &query.FilterKeyCompare{Op: query.Equal, Key: "/rok/3"} q := query.Query{Filters: []query.Filter{f}} - res, err := m.Query(q) + res, err := m.Query(ctx, q) if err != nil { t.Fatalf("Query fail: %v\n", err) } @@ -574,6 +613,8 @@ func TestQueryFilterAcrossMountsWithSort(t *testing.T) { } func TestQueryLimitAndOffsetWithNoData(t *testing.T) { + ctx := context.Background() + mapds1 := sync.MutexWrap(datastore.NewMapDatastore()) mapds2 := sync.MutexWrap(datastore.NewMapDatastore()) m := mount.New([]mount.Mount{ @@ -582,7 +623,7 @@ func TestQueryLimitAndOffsetWithNoData(t *testing.T) { }) q := query.Query{Limit: 4, Offset: 3} - res, err := m.Query(q) + res, err := m.Query(ctx, q) if err != nil { t.Fatalf("Query fail: %v\n", err) } @@ -605,6 +646,8 @@ func TestQueryLimitAndOffsetWithNoData(t *testing.T) { } func TestQueryLimitWithNotEnoughData(t *testing.T) { + ctx := context.Background() + mapds1 := sync.MutexWrap(datastore.NewMapDatastore()) mapds2 := sync.MutexWrap(datastore.NewMapDatastore()) m := mount.New([]mount.Mount{ @@ -612,15 +655,15 @@ func TestQueryLimitWithNotEnoughData(t *testing.T) { {Prefix: datastore.NewKey("/zoo"), Datastore: mapds2}, }) - if err := m.Put(datastore.NewKey("/zoo/0"), []byte("123")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/zoo/0"), []byte("123")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/rok/1"), []byte("167")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/rok/1"), []byte("167")); err != nil { t.Fatal(err) } q := query.Query{Limit: 4} - res, err := m.Query(q) + res, err := m.Query(ctx, q) if err != nil { t.Fatalf("Query fail: %v\n", err) } @@ -646,6 +689,8 @@ func TestQueryLimitWithNotEnoughData(t *testing.T) { } func TestQueryOffsetWithNotEnoughData(t *testing.T) { + ctx := context.Background() + mapds1 := sync.MutexWrap(datastore.NewMapDatastore()) mapds2 := sync.MutexWrap(datastore.NewMapDatastore()) m := mount.New([]mount.Mount{ @@ -653,15 +698,15 @@ func TestQueryOffsetWithNotEnoughData(t *testing.T) { {Prefix: datastore.NewKey("/zoo"), Datastore: mapds2}, }) - if err := m.Put(datastore.NewKey("/zoo/0"), []byte("123")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/zoo/0"), []byte("123")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/rok/1"), []byte("167")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/rok/1"), []byte("167")); err != nil { t.Fatal(err) } q := query.Query{Offset: 4} - res, err := m.Query(q) + res, err := m.Query(ctx, q) if err != nil { t.Fatalf("Query fail: %v\n", err) } @@ -684,6 +729,8 @@ func TestQueryOffsetWithNotEnoughData(t *testing.T) { } func TestLookupPrio(t *testing.T) { + ctx := context.Background() + mapds0 := datastore.NewMapDatastore() mapds1 := datastore.NewMapDatastore() @@ -692,14 +739,14 @@ func TestLookupPrio(t *testing.T) { {Prefix: datastore.NewKey("/foo"), Datastore: mapds1}, }) - if err := m.Put(datastore.NewKey("/foo/bar"), []byte("123")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/foo/bar"), []byte("123")); err != nil { t.Fatal(err) } - if err := m.Put(datastore.NewKey("/baz"), []byte("234")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/baz"), []byte("234")); err != nil { t.Fatal(err) } - found, err := mapds0.Has(datastore.NewKey("/baz")) + found, err := mapds0.Has(ctx, datastore.NewKey("/baz")) if err != nil { t.Fatalf("Has error: %v", err) } @@ -707,7 +754,7 @@ func TestLookupPrio(t *testing.T) { t.Fatalf("wrong value: %v != %v", g, e) } - found, err = mapds0.Has(datastore.NewKey("/foo/bar")) + found, err = mapds0.Has(ctx, datastore.NewKey("/foo/bar")) if err != nil { t.Fatalf("Has error: %v", err) } @@ -715,7 +762,7 @@ func TestLookupPrio(t *testing.T) { t.Fatalf("wrong value: %v != %v", g, e) } - found, err = mapds1.Has(datastore.NewKey("/bar")) + found, err = mapds1.Has(ctx, datastore.NewKey("/bar")) if err != nil { t.Fatalf("Has error: %v", err) } @@ -725,6 +772,8 @@ func TestLookupPrio(t *testing.T) { } func TestNestedMountSync(t *testing.T) { + ctx := context.Background() + internalDSRoot := datastore.NewMapDatastore() internalDSFoo := datastore.NewMapDatastore() internalDSFooBar := datastore.NewMapDatastore() @@ -742,14 +791,14 @@ func TestNestedMountSync(t *testing.T) { addToDS := func(str string) { t.Helper() - if err := m.Put(datastore.NewKey(str), []byte(str)); err != nil { + if err := m.Put(ctx, datastore.NewKey(str), []byte(str)); err != nil { t.Fatal(err) } } checkVal := func(d datastore.Datastore, str string, expectFound bool) { t.Helper() - res, err := d.Has(datastore.NewKey(str)) + res, err := d.Has(ctx, datastore.NewKey(str)) if err != nil { t.Fatal(err) } @@ -767,7 +816,7 @@ func TestNestedMountSync(t *testing.T) { addToDS("/foo/baz") addToDS("/beep/bop") - if err := m.Sync(datastore.NewKey("/foo")); err != nil { + if err := m.Sync(ctx, datastore.NewKey("/foo")); err != nil { t.Fatal(err) } @@ -780,7 +829,7 @@ func TestNestedMountSync(t *testing.T) { addToDS("/fwop") addToDS("/bloop") - if err := m.Sync(datastore.NewKey("/fwop")); err != nil { + if err := m.Sync(ctx, datastore.NewKey("/fwop")); err != nil { t.Fatal(err) } @@ -792,11 +841,13 @@ type errQueryDS struct { datastore.NullDatastore } -func (d *errQueryDS) Query(q query.Query) (query.Results, error) { +func (d *errQueryDS) Query(ctx context.Context, q query.Query) (query.Results, error) { return nil, errors.New("test error") } func TestErrQueryClose(t *testing.T) { + ctx := context.Background() + eqds := &errQueryDS{} mds := datastore.NewMapDatastore() @@ -805,11 +856,11 @@ func TestErrQueryClose(t *testing.T) { {Prefix: datastore.NewKey("/foo"), Datastore: eqds}, }) - if err := m.Put(datastore.NewKey("/baz"), []byte("123")); err != nil { + if err := m.Put(ctx, datastore.NewKey("/baz"), []byte("123")); err != nil { t.Fatal(err) } - _, err := m.Query(query.Query{}) + _, err := m.Query(ctx, query.Query{}) if err == nil { t.Fatal("expected query to fail") return @@ -817,20 +868,22 @@ func TestErrQueryClose(t *testing.T) { } func TestMaintenanceFunctions(t *testing.T) { + ctx := context.Background() + mapds := dstest.NewTestDatastore(true) m := mount.New([]mount.Mount{ {Prefix: datastore.NewKey("/"), Datastore: mapds}, }) - if err := m.Check(); err.Error() != "checking datastore at /: test error" { + if err := m.Check(ctx); err != nil && err.Error() != "checking datastore at /: test error" { t.Errorf("Unexpected Check() error: %s", err) } - if err := m.CollectGarbage(); err.Error() != "gc on datastore at /: test error" { + if err := m.CollectGarbage(ctx); err != nil && err.Error() != "gc on datastore at /: test error" { t.Errorf("Unexpected CollectGarbage() error: %s", err) } - if err := m.Scrub(); err.Error() != "scrubbing datastore at /: test error" { + if err := m.Scrub(ctx); err != nil && err.Error() != "scrubbing datastore at /: test error" { t.Errorf("Unexpected Scrub() error: %s", err) } } diff --git a/namespace/example_test.go b/namespace/example_test.go index 2fb234a..1d319a7 100644 --- a/namespace/example_test.go +++ b/namespace/example_test.go @@ -1,6 +1,7 @@ package namespace_test import ( + "context" "fmt" ds "github.com/ipfs/go-datastore" @@ -8,22 +9,24 @@ import ( ) func Example() { + ctx := context.Background() + mp := ds.NewMapDatastore() ns := nsds.Wrap(mp, ds.NewKey("/foo/bar")) k := ds.NewKey("/beep") v := "boop" - if err := ns.Put(k, []byte(v)); err != nil { + if err := ns.Put(ctx, k, []byte(v)); err != nil { panic(err) } fmt.Printf("ns.Put %s %s\n", k, v) - v2, _ := ns.Get(k) + v2, _ := ns.Get(ctx, k) fmt.Printf("ns.Get %s -> %s\n", k, v2) k3 := ds.NewKey("/foo/bar/beep") - v3, _ := mp.Get(k3) + v3, _ := mp.Get(ctx, k3) fmt.Printf("mp.Get %s -> %s\n", k3, v3) // Output: // ns.Put /beep boop diff --git a/namespace/namespace_test.go b/namespace/namespace_test.go index 95897ba..435a627 100644 --- a/namespace/namespace_test.go +++ b/namespace/namespace_test.go @@ -2,6 +2,7 @@ package namespace_test import ( "bytes" + "context" "sort" "testing" @@ -26,6 +27,7 @@ func (ks *DSSuite) TestBasic(c *C) { } func (ks *DSSuite) testBasic(c *C, prefix string) { + ctx := context.Background() mpds := ds.NewMapDatastore() nsds := ns.Wrap(mpds, ds.NewKey(prefix)) @@ -40,22 +42,22 @@ func (ks *DSSuite) testBasic(c *C, prefix string) { }) for _, k := range keys { - err := nsds.Put(k, []byte(k.String())) + err := nsds.Put(ctx, k, []byte(k.String())) c.Check(err, Equals, nil) } for _, k := range keys { - v1, err := nsds.Get(k) + v1, err := nsds.Get(ctx, k) c.Check(err, Equals, nil) c.Check(bytes.Equal(v1, []byte(k.String())), Equals, true) - v2, err := mpds.Get(ds.NewKey(prefix).Child(k)) + v2, err := mpds.Get(ctx, ds.NewKey(prefix).Child(k)) c.Check(err, Equals, nil) c.Check(bytes.Equal(v2, []byte(k.String())), Equals, true) } run := func(d ds.Datastore, q dsq.Query) []ds.Key { - r, err := d.Query(q) + r, err := d.Query(ctx, q) c.Check(err, Equals, nil) e, err := r.Rest() @@ -80,6 +82,8 @@ func (ks *DSSuite) testBasic(c *C, prefix string) { } func (ks *DSSuite) TestQuery(c *C) { + ctx := context.Background() + mpds := dstest.NewTestDatastore(true) nsds := ns.Wrap(mpds, ds.NewKey("/foo")) @@ -93,11 +97,11 @@ func (ks *DSSuite) TestQuery(c *C) { }) for _, k := range keys { - err := mpds.Put(k, []byte(k.String())) + err := mpds.Put(ctx, k, []byte(k.String())) c.Check(err, Equals, nil) } - qres, err := nsds.Query(dsq.Query{}) + qres, err := nsds.Query(ctx, dsq.Query{}) c.Check(err, Equals, nil) expect := []dsq.Entry{ @@ -118,7 +122,7 @@ func (ks *DSSuite) TestQuery(c *C) { err = qres.Close() c.Check(err, Equals, nil) - qres, err = nsds.Query(dsq.Query{Prefix: "bar"}) + qres, err = nsds.Query(ctx, dsq.Query{Prefix: "bar"}) c.Check(err, Equals, nil) expect = []dsq.Entry{ @@ -134,15 +138,15 @@ func (ks *DSSuite) TestQuery(c *C) { c.Check(string(ent.Value), Equals, string(expect[i].Value)) } - if err := nsds.Check(); err != dstest.ErrTest { + if err := nsds.Check(ctx); err != nil && err != dstest.ErrTest { c.Errorf("Unexpected Check() error: %s", err) } - if err := nsds.CollectGarbage(); err != dstest.ErrTest { + if err := nsds.CollectGarbage(ctx); err != nil && err != dstest.ErrTest { c.Errorf("Unexpected CollectGarbage() error: %s", err) } - if err := nsds.Scrub(); err != dstest.ErrTest { + if err := nsds.Scrub(ctx); err != nil && err != dstest.ErrTest { c.Errorf("Unexpected Scrub() error: %s", err) } } diff --git a/retrystore/retrystore.go b/retrystore/retrystore.go index d452758..507d199 100644 --- a/retrystore/retrystore.go +++ b/retrystore/retrystore.go @@ -3,6 +3,7 @@ package retrystore import ( + "context" "time" ds "github.com/ipfs/go-datastore" @@ -44,22 +45,22 @@ func (d *Datastore) runOp(op func() error) error { } // DiskUsage implements the PersistentDatastore interface. -func (d *Datastore) DiskUsage() (uint64, error) { +func (d *Datastore) DiskUsage(ctx context.Context) (uint64, error) { var size uint64 err := d.runOp(func() error { var err error - size, err = ds.DiskUsage(d.Batching) + size, err = ds.DiskUsage(ctx, d.Batching) return err }) return size, err } // Get retrieves a value given a key. -func (d *Datastore) Get(k ds.Key) ([]byte, error) { +func (d *Datastore) Get(ctx context.Context, k ds.Key) ([]byte, error) { var val []byte err := d.runOp(func() error { var err error - val, err = d.Batching.Get(k) + val, err = d.Batching.Get(ctx, k) return err }) @@ -67,36 +68,36 @@ func (d *Datastore) Get(k ds.Key) ([]byte, error) { } // Put stores a key/value. -func (d *Datastore) Put(k ds.Key, val []byte) error { +func (d *Datastore) Put(ctx context.Context, k ds.Key, val []byte) error { return d.runOp(func() error { - return d.Batching.Put(k, val) + return d.Batching.Put(ctx, k, val) }) } // Sync implements Datastore.Sync -func (d *Datastore) Sync(prefix ds.Key) error { +func (d *Datastore) Sync(ctx context.Context, prefix ds.Key) error { return d.runOp(func() error { - return d.Batching.Sync(prefix) + return d.Batching.Sync(ctx, prefix) }) } // Has checks if a key is stored. -func (d *Datastore) Has(k ds.Key) (bool, error) { +func (d *Datastore) Has(ctx context.Context, k ds.Key) (bool, error) { var has bool err := d.runOp(func() error { var err error - has, err = d.Batching.Has(k) + has, err = d.Batching.Has(ctx, k) return err }) return has, err } // GetSize returns the size of the value in the datastore, if present. -func (d *Datastore) GetSize(k ds.Key) (int, error) { +func (d *Datastore) GetSize(ctx context.Context, k ds.Key) (int, error) { var size int err := d.runOp(func() error { var err error - size, err = d.Batching.GetSize(k) + size, err = d.Batching.GetSize(ctx, k) return err }) return size, err diff --git a/retrystore/retrystore_test.go b/retrystore/retrystore_test.go index 8d6359d..ef2115b 100644 --- a/retrystore/retrystore_test.go +++ b/retrystore/retrystore_test.go @@ -1,6 +1,7 @@ package retrystore import ( + "context" "fmt" "strings" "testing" @@ -10,6 +11,8 @@ import ( ) func TestRetryFailure(t *testing.T) { + ctx := context.Background() + myErr := fmt.Errorf("this is an actual error") var count int fstore := failstore.NewFailstore(ds.NewMapDatastore(), func(op string) error { @@ -27,7 +30,7 @@ func TestRetryFailure(t *testing.T) { k := ds.NewKey("test") - _, err := rds.Get(k) + _, err := rds.Get(ctx, k) if err == nil { t.Fatal("expected this to fail") } @@ -42,6 +45,8 @@ func TestRetryFailure(t *testing.T) { } func TestRealErrorGetsThrough(t *testing.T) { + ctx := context.Background() + myErr := fmt.Errorf("this is an actual error") fstore := failstore.NewFailstore(ds.NewMapDatastore(), func(op string) error { return myErr @@ -56,23 +61,25 @@ func TestRealErrorGetsThrough(t *testing.T) { } k := ds.NewKey("test") - _, err := rds.Get(k) + _, err := rds.Get(ctx, k) if err != myErr { t.Fatal("expected my own error") } - _, err = rds.Has(k) + _, err = rds.Has(ctx, k) if err != myErr { t.Fatal("expected my own error") } - err = rds.Put(k, nil) + err = rds.Put(ctx, k, nil) if err != myErr { t.Fatal("expected my own error") } } func TestRealErrorAfterTemp(t *testing.T) { + ctx := context.Background() + myErr := fmt.Errorf("this is an actual error") tempErr := fmt.Errorf("this is a temp error") var count int @@ -94,13 +101,15 @@ func TestRealErrorAfterTemp(t *testing.T) { } k := ds.NewKey("test") - _, err := rds.Get(k) + _, err := rds.Get(ctx, k) if err != myErr { t.Fatal("expected my own error") } } func TestSuccessAfterTemp(t *testing.T) { + ctx := context.Background() + tempErr := fmt.Errorf("this is a temp error") var count int fstore := failstore.NewFailstore(ds.NewMapDatastore(), func(op string) error { @@ -123,12 +132,12 @@ func TestSuccessAfterTemp(t *testing.T) { k := ds.NewKey("test") val := []byte("foo") - err := rds.Put(k, val) + err := rds.Put(ctx, k, val) if err != nil { t.Fatal(err) } - has, err := rds.Has(k) + has, err := rds.Has(ctx, k) if err != nil { t.Fatal(err) } @@ -137,7 +146,7 @@ func TestSuccessAfterTemp(t *testing.T) { t.Fatal("should have this thing") } - out, err := rds.Get(k) + out, err := rds.Get(ctx, k) if err != nil { t.Fatal(err) } diff --git a/sync/sync.go b/sync/sync.go index 84609e3..6671bfc 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -1,6 +1,7 @@ package sync import ( + "context" "sync" ds "github.com/ipfs/go-datastore" @@ -27,56 +28,56 @@ func (d *MutexDatastore) Children() []ds.Datastore { } // Put implements Datastore.Put -func (d *MutexDatastore) Put(key ds.Key, value []byte) (err error) { +func (d *MutexDatastore) Put(ctx context.Context, key ds.Key, value []byte) (err error) { d.Lock() defer d.Unlock() - return d.child.Put(key, value) + return d.child.Put(ctx, key, value) } // Sync implements Datastore.Sync -func (d *MutexDatastore) Sync(prefix ds.Key) error { +func (d *MutexDatastore) Sync(ctx context.Context, prefix ds.Key) error { d.Lock() defer d.Unlock() - return d.child.Sync(prefix) + return d.child.Sync(ctx, prefix) } // Get implements Datastore.Get -func (d *MutexDatastore) Get(key ds.Key) (value []byte, err error) { +func (d *MutexDatastore) Get(ctx context.Context, key ds.Key) (value []byte, err error) { d.RLock() defer d.RUnlock() - return d.child.Get(key) + return d.child.Get(ctx, key) } // Has implements Datastore.Has -func (d *MutexDatastore) Has(key ds.Key) (exists bool, err error) { +func (d *MutexDatastore) Has(ctx context.Context, key ds.Key) (exists bool, err error) { d.RLock() defer d.RUnlock() - return d.child.Has(key) + return d.child.Has(ctx, key) } // GetSize implements Datastore.GetSize -func (d *MutexDatastore) GetSize(key ds.Key) (size int, err error) { +func (d *MutexDatastore) GetSize(ctx context.Context, key ds.Key) (size int, err error) { d.RLock() defer d.RUnlock() - return d.child.GetSize(key) + return d.child.GetSize(ctx, key) } // Delete implements Datastore.Delete -func (d *MutexDatastore) Delete(key ds.Key) (err error) { +func (d *MutexDatastore) Delete(ctx context.Context, key ds.Key) (err error) { d.Lock() defer d.Unlock() - return d.child.Delete(key) + return d.child.Delete(ctx, key) } // Query implements Datastore.Query -func (d *MutexDatastore) Query(q dsq.Query) (dsq.Results, error) { +func (d *MutexDatastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) { d.RLock() defer d.RUnlock() // Apply the entire query while locked. Non-sync datastores may not // allow concurrent queries. - results, err := d.child.Query(q) + results, err := d.child.Query(ctx, q) if err != nil { return nil, err } @@ -92,7 +93,7 @@ func (d *MutexDatastore) Query(q dsq.Query) (dsq.Results, error) { return dsq.ResultsWithEntries(q, entries), nil } -func (d *MutexDatastore) Batch() (ds.Batch, error) { +func (d *MutexDatastore) Batch(ctx context.Context, ) (ds.Batch, error) { d.RLock() defer d.RUnlock() bds, ok := d.child.(ds.Batching) @@ -100,7 +101,7 @@ func (d *MutexDatastore) Batch() (ds.Batch, error) { return nil, ds.ErrBatchUnsupported } - b, err := bds.Batch() + b, err := bds.Batch(ctx, ) if err != nil { return nil, err } @@ -117,10 +118,10 @@ func (d *MutexDatastore) Close() error { } // DiskUsage implements the PersistentDatastore interface. -func (d *MutexDatastore) DiskUsage() (uint64, error) { +func (d *MutexDatastore) DiskUsage(ctx context.Context, ) (uint64, error) { d.RLock() defer d.RUnlock() - return ds.DiskUsage(d.child) + return ds.DiskUsage(ctx, d.child) } type syncBatch struct { @@ -128,47 +129,47 @@ type syncBatch struct { mds *MutexDatastore } -func (b *syncBatch) Put(key ds.Key, val []byte) error { +func (b *syncBatch) Put(ctx context.Context, key ds.Key, val []byte) error { b.mds.Lock() defer b.mds.Unlock() - return b.batch.Put(key, val) + return b.batch.Put(ctx, key, val) } -func (b *syncBatch) Delete(key ds.Key) error { +func (b *syncBatch) Delete(ctx context.Context, key ds.Key) error { b.mds.Lock() defer b.mds.Unlock() - return b.batch.Delete(key) + return b.batch.Delete(ctx, key) } -func (b *syncBatch) Commit() error { +func (b *syncBatch) Commit(ctx context.Context, ) error { b.mds.Lock() defer b.mds.Unlock() - return b.batch.Commit() + return b.batch.Commit(ctx) } -func (d *MutexDatastore) Check() error { +func (d *MutexDatastore) Check(ctx context.Context, ) error { if c, ok := d.child.(ds.CheckedDatastore); ok { d.RWMutex.Lock() defer d.RWMutex.Unlock() - return c.Check() + return c.Check(ctx) } return nil } -func (d *MutexDatastore) Scrub() error { +func (d *MutexDatastore) Scrub(ctx context.Context, ) error { if c, ok := d.child.(ds.ScrubbedDatastore); ok { d.RWMutex.Lock() defer d.RWMutex.Unlock() - return c.Scrub() + return c.Scrub(ctx) } return nil } -func (d *MutexDatastore) CollectGarbage() error { +func (d *MutexDatastore) CollectGarbage(ctx context.Context, ) error { if c, ok := d.child.(ds.GCDatastore); ok { d.RWMutex.Lock() defer d.RWMutex.Unlock() - return c.CollectGarbage() + return c.CollectGarbage(ctx) } return nil } diff --git a/test/basic_tests.go b/test/basic_tests.go index 434abc5..027f2ba 100644 --- a/test/basic_tests.go +++ b/test/basic_tests.go @@ -2,6 +2,7 @@ package dstest import ( "bytes" + "context" "fmt" "math/rand" "reflect" @@ -24,15 +25,17 @@ func TestElemCount(t *testing.T) { } func SubtestBasicPutGet(t *testing.T, ds dstore.Datastore) { + ctx := context.Background() + k := dstore.NewKey("foo") val := []byte("Hello Datastore!") - err := ds.Put(k, val) + err := ds.Put(ctx, k, val) if err != nil { t.Fatal("error putting to datastore: ", err) } - have, err := ds.Has(k) + have, err := ds.Has(ctx, k) if err != nil { t.Fatal("error calling has on key we just put: ", err) } @@ -41,7 +44,7 @@ func SubtestBasicPutGet(t *testing.T, ds dstore.Datastore) { t.Fatal("should have key foo, has returned false") } - size, err := ds.GetSize(k) + size, err := ds.GetSize(ctx, k) if err != nil { t.Fatal("error getting size after put: ", err) } @@ -49,7 +52,7 @@ func SubtestBasicPutGet(t *testing.T, ds dstore.Datastore) { t.Fatalf("incorrect size: expected %d, got %d", len(val), size) } - out, err := ds.Get(k) + out, err := ds.Get(ctx, k) if err != nil { t.Fatal("error getting value after put: ", err) } @@ -58,7 +61,7 @@ func SubtestBasicPutGet(t *testing.T, ds dstore.Datastore) { t.Fatal("value received on get wasnt what we expected:", out) } - have, err = ds.Has(k) + have, err = ds.Has(ctx, k) if err != nil { t.Fatal("error calling has after get: ", err) } @@ -67,7 +70,7 @@ func SubtestBasicPutGet(t *testing.T, ds dstore.Datastore) { t.Fatal("should have key foo, has returned false") } - size, err = ds.GetSize(k) + size, err = ds.GetSize(ctx, k) if err != nil { t.Fatal("error getting size after get: ", err) } @@ -75,12 +78,12 @@ func SubtestBasicPutGet(t *testing.T, ds dstore.Datastore) { t.Fatalf("incorrect size: expected %d, got %d", len(val), size) } - err = ds.Delete(k) + err = ds.Delete(ctx, k) if err != nil { t.Fatal("error calling delete: ", err) } - have, err = ds.Has(k) + have, err = ds.Has(ctx, k) if err != nil { t.Fatal("error calling has after delete: ", err) } @@ -89,7 +92,7 @@ func SubtestBasicPutGet(t *testing.T, ds dstore.Datastore) { t.Fatal("should not have key foo, has returned true") } - size, err = ds.GetSize(k) + size, err = ds.GetSize(ctx, k) switch err { case dstore.ErrNotFound: case nil: @@ -103,9 +106,11 @@ func SubtestBasicPutGet(t *testing.T, ds dstore.Datastore) { } func SubtestNotFounds(t *testing.T, ds dstore.Datastore) { + ctx := context.Background() + badk := dstore.NewKey("notreal") - val, err := ds.Get(badk) + val, err := ds.Get(ctx, badk) if err != dstore.ErrNotFound { t.Fatal("expected ErrNotFound for key that doesnt exist, got: ", err) } @@ -114,7 +119,7 @@ func SubtestNotFounds(t *testing.T, ds dstore.Datastore) { t.Fatal("get should always return nil for not found values") } - have, err := ds.Has(badk) + have, err := ds.Has(ctx, badk) if err != nil { t.Fatal("error calling has on not found key: ", err) } @@ -122,7 +127,7 @@ func SubtestNotFounds(t *testing.T, ds dstore.Datastore) { t.Fatal("has returned true for key we don't have") } - size, err := ds.GetSize(badk) + size, err := ds.GetSize(ctx, badk) switch err { case dstore.ErrNotFound: case nil: @@ -134,7 +139,7 @@ func SubtestNotFounds(t *testing.T, ds dstore.Datastore) { t.Fatal("expected missing size to be -1") } - err = ds.Delete(badk) + err = ds.Delete(ctx, badk) if err != nil { t.Fatal("error calling delete on not found key: ", err) } @@ -193,31 +198,33 @@ func SubtestManyKeysAndQuery(t *testing.T, ds dstore.Datastore) { } func SubtestBasicSync(t *testing.T, ds dstore.Datastore) { - if err := ds.Sync(dstore.NewKey("prefix")); err != nil { + ctx := context.Background() + + if err := ds.Sync(ctx, dstore.NewKey("prefix")); err != nil { t.Fatal(err) } - if err := ds.Put(dstore.NewKey("/prefix"), []byte("foo")); err != nil { + if err := ds.Put(ctx, dstore.NewKey("/prefix"), []byte("foo")); err != nil { t.Fatal(err) } - if err := ds.Sync(dstore.NewKey("/prefix")); err != nil { + if err := ds.Sync(ctx, dstore.NewKey("/prefix")); err != nil { t.Fatal(err) } - if err := ds.Put(dstore.NewKey("/prefix/sub"), []byte("bar")); err != nil { + if err := ds.Put(ctx, dstore.NewKey("/prefix/sub"), []byte("bar")); err != nil { t.Fatal(err) } - if err := ds.Sync(dstore.NewKey("/prefix")); err != nil { + if err := ds.Sync(ctx, dstore.NewKey("/prefix")); err != nil { t.Fatal(err) } - if err := ds.Sync(dstore.NewKey("/prefix/sub")); err != nil { + if err := ds.Sync(ctx, dstore.NewKey("/prefix/sub")); err != nil { t.Fatal(err) } - if err := ds.Sync(dstore.NewKey("")); err != nil { + if err := ds.Sync(ctx, dstore.NewKey("")); err != nil { t.Fatal(err) } } @@ -387,6 +394,8 @@ func randValue() []byte { } func subtestQuery(t *testing.T, ds dstore.Datastore, q dsq.Query, count int) { + ctx := context.Background() + var input []dsq.Entry for i := 0; i < count; i++ { s := fmt.Sprintf("%dkey%d", i, i) @@ -434,7 +443,7 @@ func subtestQuery(t *testing.T, ds dstore.Datastore, q dsq.Query, count int) { t.Logf("putting %d values", len(input)) for i, e := range input { - err := ds.Put(dstore.RawKey(e.Key), e.Value) + err := ds.Put(ctx, dstore.RawKey(e.Key), e.Value) if err != nil { t.Fatalf("error on put[%d]: %s", i, err) } @@ -442,7 +451,7 @@ func subtestQuery(t *testing.T, ds dstore.Datastore, q dsq.Query, count int) { t.Log("getting values back") for i, e := range input { - val, err := ds.Get(dstore.RawKey(e.Key)) + val, err := ds.Get(ctx, dstore.RawKey(e.Key)) if err != nil { t.Fatalf("error on get[%d]: %s", i, err) } @@ -453,7 +462,7 @@ func subtestQuery(t *testing.T, ds dstore.Datastore, q dsq.Query, count int) { } t.Log("querying values") - resp, err := ds.Query(q) + resp, err := ds.Query(ctx, q) if err != nil { t.Fatal("calling query: ", err) } @@ -495,7 +504,7 @@ func subtestQuery(t *testing.T, ds dstore.Datastore, q dsq.Query, count int) { t.Log("deleting all keys") for _, e := range input { - if err := ds.Delete(dstore.RawKey(e.Key)); err != nil { + if err := ds.Delete(ctx, dstore.RawKey(e.Key)); err != nil { t.Fatal(err) } } diff --git a/test/suite.go b/test/suite.go index 012f802..e1ccd66 100644 --- a/test/suite.go +++ b/test/suite.go @@ -1,6 +1,7 @@ package dstest import ( + "context" "reflect" "runtime" "testing" @@ -35,7 +36,9 @@ func getFunctionName(i interface{}) string { } func clearDs(t *testing.T, ds dstore.Datastore) { - q, err := ds.Query(query.Query{KeysOnly: true}) + ctx := context.Background() + + q, err := ds.Query(ctx, query.Query{KeysOnly: true}) if err != nil { t.Fatal(err) } @@ -44,7 +47,7 @@ func clearDs(t *testing.T, ds dstore.Datastore) { t.Fatal(err) } for _, r := range res { - if err := ds.Delete(dstore.RawKey(r.Key)); err != nil { + if err := ds.Delete(ctx, dstore.RawKey(r.Key)); err != nil { t.Fatal(err) } } diff --git a/test/test_util.go b/test/test_util.go index 3393203..04c340e 100644 --- a/test/test_util.go +++ b/test/test_util.go @@ -2,6 +2,7 @@ package dstest import ( "bytes" + "context" "encoding/base32" "errors" "math/rand" @@ -13,7 +14,9 @@ import ( var ErrTest = errors.New("test error") func RunBatchTest(t *testing.T, ds dstore.Batching) { - batch, err := ds.Batch() + ctx := context.Background() + + batch, err := ds.Batch(ctx) if err != nil { t.Fatal(err) } @@ -28,7 +31,7 @@ func RunBatchTest(t *testing.T, ds dstore.Batching) { key := dstore.NewKey(base32.StdEncoding.EncodeToString(blk[:8])) keys = append(keys, key) - err := batch.Put(key, blk) + err := batch.Put(ctx, key, blk) if err != nil { t.Fatal(err) } @@ -36,20 +39,20 @@ func RunBatchTest(t *testing.T, ds dstore.Batching) { // Ensure they are not in the datastore before committing for _, k := range keys { - _, err := ds.Get(k) + _, err := ds.Get(ctx, k) if err == nil { t.Fatal("should not have found this block") } } // commit, write them to the datastore - err = batch.Commit() + err = batch.Commit(ctx) if err != nil { t.Fatal(err) } for i, k := range keys { - blk, err := ds.Get(k) + blk, err := ds.Get(ctx, k) if err != nil { t.Fatal(err) } @@ -61,6 +64,8 @@ func RunBatchTest(t *testing.T, ds dstore.Batching) { } func RunBatchDeleteTest(t *testing.T, ds dstore.Batching) { + ctx := context.Background() + var keys []dstore.Key for i := 0; i < 20; i++ { blk := make([]byte, 16) @@ -69,30 +74,30 @@ func RunBatchDeleteTest(t *testing.T, ds dstore.Batching) { key := dstore.NewKey(base32.StdEncoding.EncodeToString(blk[:8])) keys = append(keys, key) - err := ds.Put(key, blk) + err := ds.Put(ctx, key, blk) if err != nil { t.Fatal(err) } } - batch, err := ds.Batch() + batch, err := ds.Batch(ctx) if err != nil { t.Fatal(err) } for _, k := range keys { - err := batch.Delete(k) + err := batch.Delete(ctx, k) if err != nil { t.Fatal(err) } } - err = batch.Commit() + err = batch.Commit(ctx) if err != nil { t.Fatal(err) } for _, k := range keys { - _, err := ds.Get(k) + _, err := ds.Get(ctx, k) if err == nil { t.Fatal("shouldnt have found block") } @@ -100,7 +105,9 @@ func RunBatchDeleteTest(t *testing.T, ds dstore.Batching) { } func RunBatchPutAndDeleteTest(t *testing.T, ds dstore.Batching) { - batch, err := ds.Batch() + ctx := context.Background() + + batch, err := ds.Batch(ctx) if err != nil { t.Fatal(err) } @@ -108,29 +115,29 @@ func RunBatchPutAndDeleteTest(t *testing.T, ds dstore.Batching) { ka := dstore.NewKey("/a") kb := dstore.NewKey("/b") - if err := batch.Put(ka, []byte{1}); err != nil { + if err := batch.Put(ctx, ka, []byte{1}); err != nil { t.Error(err) } - if err := batch.Put(kb, []byte{2}); err != nil { + if err := batch.Put(ctx, kb, []byte{2}); err != nil { t.Error(err) } - if err := batch.Delete(ka); err != nil { + if err := batch.Delete(ctx, ka); err != nil { t.Error(err) } - if err := batch.Delete(kb); err != nil { + if err := batch.Delete(ctx, kb); err != nil { t.Error(err) } - if err := batch.Put(kb, []byte{3}); err != nil { + if err := batch.Put(ctx, kb, []byte{3}); err != nil { t.Error(err) } // TODO: assert that nothing has been flushed yet? What are the semantics here? - if err := batch.Commit(); err != nil { + if err := batch.Commit(ctx); err != nil { t.Error(err) } - switch _, err := ds.Get(ka); err { + switch _, err := ds.Get(ctx, ka); err { case dstore.ErrNotFound: case nil: t.Errorf("expected to not find %s", ka) @@ -138,7 +145,7 @@ func RunBatchPutAndDeleteTest(t *testing.T, ds dstore.Batching) { t.Error(err) } - if v, err := ds.Get(kb); err != nil { + if v, err := ds.Get(ctx, kb); err != nil { t.Error(err) } else { if len(v) != 1 || v[0] != 3 {