Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: plumb through contexts #119

Merged
merged 1 commit into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 30 additions & 29 deletions datastore.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package badger

import (
"context"
"errors"
"fmt"
"runtime"
Expand Down Expand Up @@ -210,7 +211,7 @@ func (d *Datastore) periodicGC() {
// NewTransaction starts a new transaction. The resulting transaction object
// can be mutated without incurring changes to the underlying Datastore until
// the transaction is Committed.
func (d *Datastore) NewTransaction(readOnly bool) (ds.Txn, error) {
func (d *Datastore) NewTransaction(ctx context.Context, readOnly bool) (ds.Txn, error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -226,7 +227,7 @@ func (d *Datastore) newImplicitTransaction(readOnly bool) *txn {
return &txn{d, d.DB.NewTransaction(!readOnly), true}
}

func (d *Datastore) Put(key ds.Key, value []byte) error {
func (d *Datastore) Put(ctx context.Context, key ds.Key, value []byte) error {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -243,7 +244,7 @@ func (d *Datastore) Put(key ds.Key, value []byte) error {
return txn.commit()
}

func (d *Datastore) Sync(prefix ds.Key) error {
func (d *Datastore) Sync(ctx context.Context, prefix ds.Key) error {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -257,7 +258,7 @@ func (d *Datastore) Sync(prefix ds.Key) error {
return d.DB.Sync()
}

func (d *Datastore) PutWithTTL(key ds.Key, value []byte, ttl time.Duration) error {
func (d *Datastore) PutWithTTL(ctx context.Context, key ds.Key, value []byte, ttl time.Duration) error {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -274,7 +275,7 @@ func (d *Datastore) PutWithTTL(key ds.Key, value []byte, ttl time.Duration) erro
return txn.commit()
}

func (d *Datastore) SetTTL(key ds.Key, ttl time.Duration) error {
func (d *Datastore) SetTTL(ctx context.Context, key ds.Key, ttl time.Duration) error {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -291,7 +292,7 @@ func (d *Datastore) SetTTL(key ds.Key, ttl time.Duration) error {
return txn.commit()
}

func (d *Datastore) GetExpiration(key ds.Key) (time.Time, error) {
func (d *Datastore) GetExpiration(ctx context.Context, key ds.Key) (time.Time, error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -304,7 +305,7 @@ func (d *Datastore) GetExpiration(key ds.Key) (time.Time, error) {
return txn.getExpiration(key)
}

func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
func (d *Datastore) Get(ctx context.Context, key ds.Key) (value []byte, err error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -317,7 +318,7 @@ func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
return txn.get(key)
}

func (d *Datastore) Has(key ds.Key) (bool, error) {
func (d *Datastore) Has(ctx context.Context, key ds.Key) (bool, error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -330,7 +331,7 @@ func (d *Datastore) Has(key ds.Key) (bool, error) {
return txn.has(key)
}

func (d *Datastore) GetSize(key ds.Key) (size int, err error) {
func (d *Datastore) GetSize(ctx context.Context, key ds.Key) (size int, err error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -343,7 +344,7 @@ func (d *Datastore) GetSize(key ds.Key) (size int, err error) {
return txn.getSize(key)
}

func (d *Datastore) Delete(key ds.Key) error {
func (d *Datastore) Delete(ctx context.Context, key ds.Key) error {
d.closeLk.RLock()
defer d.closeLk.RUnlock()

Expand All @@ -358,7 +359,7 @@ func (d *Datastore) Delete(key ds.Key) error {
return txn.commit()
}

func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
func (d *Datastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -374,7 +375,7 @@ func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {

// DiskUsage implements the PersistentDatastore interface.
// It returns the sum of lsm and value log files sizes in bytes.
func (d *Datastore) DiskUsage() (uint64, error) {
func (d *Datastore) DiskUsage(ctx context.Context) (uint64, error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -399,7 +400,7 @@ func (d *Datastore) Close() error {

// Batch creats a new Batch object. This provides a way to do many writes, when
// there may be too many to fit into a single transaction.
func (d *Datastore) Batch() (ds.Batch, error) {
func (d *Datastore) Batch(ctx context.Context) (ds.Batch, error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -417,7 +418,7 @@ func (d *Datastore) Batch() (ds.Batch, error) {
return b, nil
}

func (d *Datastore) CollectGarbage() (err error) {
func (d *Datastore) CollectGarbage(ctx context.Context) (err error) {
// The idea is to keep calling DB.RunValueLogGC() till Badger no longer has any log files
// to GC(which would be indicated by an error, please refer to Badger GC docs).
for err == nil {
Expand All @@ -444,7 +445,7 @@ func (d *Datastore) gcOnce() error {

var _ ds.Batch = (*batch)(nil)

func (b *batch) Put(key ds.Key, value []byte) error {
func (b *batch) Put(ctx context.Context, key ds.Key, value []byte) error {
b.ds.closeLk.RLock()
defer b.ds.closeLk.RUnlock()
if b.ds.closed {
Expand All @@ -457,7 +458,7 @@ func (b *batch) put(key ds.Key, value []byte) error {
return b.writeBatch.Set(key.Bytes(), value)
}

func (b *batch) Delete(key ds.Key) error {
func (b *batch) Delete(ctx context.Context, key ds.Key) error {
b.ds.closeLk.RLock()
defer b.ds.closeLk.RUnlock()
if b.ds.closed {
Expand All @@ -471,7 +472,7 @@ func (b *batch) delete(key ds.Key) error {
return b.writeBatch.Delete(key.Bytes())
}

func (b *batch) Commit() error {
func (b *batch) Commit(ctx context.Context) error {
b.ds.closeLk.RLock()
defer b.ds.closeLk.RUnlock()
if b.ds.closed {
Expand Down Expand Up @@ -511,7 +512,7 @@ func (b *batch) cancel() {
var _ ds.Datastore = (*txn)(nil)
var _ ds.TTLDatastore = (*txn)(nil)

func (t *txn) Put(key ds.Key, value []byte) error {
func (t *txn) Put(ctx context.Context, key ds.Key, value []byte) error {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -524,7 +525,7 @@ func (t *txn) put(key ds.Key, value []byte) error {
return t.txn.Set(key.Bytes(), value)
}

func (t *txn) Sync(prefix ds.Key) error {
func (t *txn) Sync(ctx context.Context, prefix ds.Key) error {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -534,7 +535,7 @@ func (t *txn) Sync(prefix ds.Key) error {
return nil
}

func (t *txn) PutWithTTL(key ds.Key, value []byte, ttl time.Duration) error {
func (t *txn) PutWithTTL(ctx context.Context, key ds.Key, value []byte, ttl time.Duration) error {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -547,7 +548,7 @@ func (t *txn) putWithTTL(key ds.Key, value []byte, ttl time.Duration) error {
return t.txn.SetEntry(badger.NewEntry(key.Bytes(), value).WithTTL(ttl))
}

func (t *txn) GetExpiration(key ds.Key) (time.Time, error) {
func (t *txn) GetExpiration(ctx context.Context, key ds.Key) (time.Time, error) {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -567,7 +568,7 @@ func (t *txn) getExpiration(key ds.Key) (time.Time, error) {
return time.Unix(int64(item.ExpiresAt()), 0), nil
}

func (t *txn) SetTTL(key ds.Key, ttl time.Duration) error {
func (t *txn) SetTTL(ctx context.Context, key ds.Key, ttl time.Duration) error {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -588,7 +589,7 @@ func (t *txn) setTTL(key ds.Key, ttl time.Duration) error {

}

func (t *txn) Get(key ds.Key) ([]byte, error) {
func (t *txn) Get(ctx context.Context, key ds.Key) ([]byte, error) {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -610,7 +611,7 @@ func (t *txn) get(key ds.Key) ([]byte, error) {
return item.ValueCopy(nil)
}

func (t *txn) Has(key ds.Key) (bool, error) {
func (t *txn) Has(ctx context.Context, key ds.Key) (bool, error) {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -632,7 +633,7 @@ func (t *txn) has(key ds.Key) (bool, error) {
}
}

func (t *txn) GetSize(key ds.Key) (int, error) {
func (t *txn) GetSize(ctx context.Context, key ds.Key) (int, error) {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -654,7 +655,7 @@ func (t *txn) getSize(key ds.Key) (int, error) {
}
}

func (t *txn) Delete(key ds.Key) error {
func (t *txn) Delete(ctx context.Context, key ds.Key) error {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -668,7 +669,7 @@ func (t *txn) delete(key ds.Key) error {
return t.txn.Delete(key.Bytes())
}

func (t *txn) Query(q dsq.Query) (dsq.Results, error) {
func (t *txn) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand Down Expand Up @@ -857,7 +858,7 @@ func (t *txn) query(q dsq.Query) (dsq.Results, error) {
return qrb.Results(), nil
}

func (t *txn) Commit() error {
func (t *txn) Commit(ctx context.Context) error {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand Down Expand Up @@ -885,7 +886,7 @@ func (t *txn) close() error {
return t.txn.Commit()
}

func (t *txn) Discard() {
func (t *txn) Discard(ctx context.Context) {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand Down
Loading