Skip to content

Commit

Permalink
Add Support of TxPatch
Browse files Browse the repository at this point in the history
  • Loading branch information
updogliu committed Dec 10, 2015
1 parent 5265666 commit 641186d
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 42 deletions.
24 changes: 24 additions & 0 deletions cell_key.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package lmdb

import "encoding/json"

type CellKey struct {
Bucket string
Key []byte
}

func (ck CellKey) Serialize() string {
b, err := json.Marshal(ck)
if err != nil {
panic(err)
}
return string(b)
}

func DeserializeCellKey(s string) (rst CellKey) {
err := json.Unmarshal([]byte(s), &rst)
if err != nil {
panic(err)
}
return
}
48 changes: 43 additions & 5 deletions database.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,20 @@ func (db *Database) TransactionalR(f func(ReadTxner)) {
}()
}

func (db *Database) TransactionalRW(f func(*ReadWriteTxn) error) (err error) {
func (db *Database) transactionalRWImpl(f func(*ReadWriteTxn) error, makingPatch bool) (
txPatch TxPatch, err error) {

txn, err := db.env.BeginTxn(nil, 0)
if err != nil { // Possible Errors: MDB_PANIC, MDB_MAP_RESIZED, MDB_READERS_FULL, ENOMEM
panic(err)
}

var panicF interface{} // panic from f
rwCtx := ReadWriteTxn{db.env, &ReadTxn{db.buckets, txn, nil}}
var dirtyKeys map[string]struct{}
if makingPatch {
dirtyKeys = make(map[string]struct{})
}
rwCtx := ReadWriteTxn{db.env, &ReadTxn{db.buckets, txn, nil}, dirtyKeys}

defer func() {
for _, itr := range rwCtx.itrs {
Expand All @@ -247,9 +253,19 @@ func (db *Database) TransactionalRW(f func(*ReadWriteTxn) error) (err error) {
rwCtx.itrs = nil

if err == nil && panicF == nil {
e := txn.Commit()
if e != nil { // Possible errors: EINVAL, ENOSPEC, EIO, ENOMEM
panic(e)
if !makingPatch {
e := txn.Commit()
if e != nil { // Possible errors: EINVAL, ENOSPEC, EIO, ENOMEM
panic(e)
}
} else {
for serializedCellkey := range rwCtx.dirtyKeys {
cellKey := DeserializeCellKey(serializedCellkey)
cell := cellState{bucket: cellKey.Bucket, key: cellKey.Key}
cell.value, cell.exists = rwCtx.Get(cellKey.Bucket, cellKey.Key)
txPatch = append(txPatch, cell)
}
txn.Abort()
}
} else {
txn.Abort()
Expand All @@ -268,3 +284,25 @@ func (db *Database) TransactionalRW(f func(*ReadWriteTxn) error) (err error) {

return
}

func (db *Database) TransactionalRW(f func(*ReadWriteTxn) error) error {
_, err := db.transactionalRWImpl(f, false)
return err
}

func (db *Database) MakeTxPatch(f func(*ReadWriteTxn) error) (TxPatch, error) {
return db.transactionalRWImpl(f, true)
}

func (db *Database) ApplyTxPatch(txPatch TxPatch) error {
return db.TransactionalRW(func(txn *ReadWriteTxn) error {
for _, cell := range txPatch {
if cell.exists {
txn.Put(cell.bucket, cell.key, cell.value)
} else {
txn.Delete(cell.bucket, cell.key)
}
}
return nil
})
}
38 changes: 19 additions & 19 deletions nested_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,42 @@ import (
)

const (
BucketName string = "Bucket0"
testBucket string = "Bucket0"
)

func SubTxns(txn *ReadWriteTxn, t *testing.T) {
if n := txn.BucketStat(BucketName).Entries; n != 0 {
if n := txn.BucketStat(testBucket).Entries; n != 0 {
t.Fatal("bucket not empty: ", n)
}

// first tx: success
txn.TransactionalRW(func(txn *ReadWriteTxn) error {
txn.Put(BucketName, []byte("txn00"), []byte("bar00"))
txn.Put(BucketName, []byte("txn01"), []byte("bar01"))
txn.Put(testBucket, []byte("txn00"), []byte("bar00"))
txn.Put(testBucket, []byte("txn01"), []byte("bar01"))
return nil
})
if n := txn.BucketStat(BucketName).Entries; n != 2 {
if n := txn.BucketStat(testBucket).Entries; n != 2 {
t.Fatalf("assertion failed. expect 2, got %d", n)
}

// second tx: fail
txn.TransactionalRW(func(txn *ReadWriteTxn) error {
txn.Put(BucketName, []byte("txn10"), []byte("bar10"))
txn.Put(BucketName, []byte("txn11"), []byte("bar11"))
txn.Put(testBucket, []byte("txn10"), []byte("bar10"))
txn.Put(testBucket, []byte("txn11"), []byte("bar11"))
return errors.New("whatever error")
})
if n := txn.BucketStat(BucketName).Entries; n != 2 {
if n := txn.BucketStat(testBucket).Entries; n != 2 {
t.Fatalf("assertion failed. expect 2, got %d", n)
}

// third tx: success
txn.TransactionalRW(func(txn *ReadWriteTxn) error {
txn.Put(BucketName, []byte("txn20"), []byte("bar20"))
txn.Put(BucketName, []byte("txn21"), []byte("bar21"))
txn.Put(BucketName, []byte("txn00"), []byte("bar99"))
txn.Put(testBucket, []byte("txn20"), []byte("bar20"))
txn.Put(testBucket, []byte("txn21"), []byte("bar21"))
txn.Put(testBucket, []byte("txn00"), []byte("bar99"))
return nil
})
if n := txn.BucketStat(BucketName).Entries; n != 4 {
if n := txn.BucketStat(testBucket).Entries; n != 4 {
t.Fatalf("assertion failed. expect 4, got %d", n)
}
}
Expand All @@ -55,7 +55,7 @@ func TestNestedTxn1(t *testing.T) {
}
defer os.RemoveAll(path)

bucketNames := []string{BucketName}
bucketNames := []string{testBucket}
db, err := Open(path, bucketNames)
defer db.Close()
if err != nil {
Expand All @@ -69,7 +69,7 @@ func TestNestedTxn1(t *testing.T) {
})

db.TransactionalR(func(txn ReadTxner) {
if n := txn.BucketStat(BucketName).Entries; n != 4 {
if n := txn.BucketStat(testBucket).Entries; n != 4 {
t.Fatalf("assertion failed. expect 4, got %d", n)
}

Expand All @@ -85,7 +85,7 @@ func TestNestedTxn1(t *testing.T) {
}

for _, c := range cases1 {
v, b := txn.Get(BucketName, []byte(c.key))
v, b := txn.Get(testBucket, []byte(c.key))
if !b {
t.Fatalf("key not found: %s", c.key)
}
Expand All @@ -96,7 +96,7 @@ func TestNestedTxn1(t *testing.T) {

cases2 := []string{"txn10", "txn11", "txn99"}
for _, key := range cases2 {
_, b := txn.Get(BucketName, []byte(key))
_, b := txn.Get(testBucket, []byte(key))
if b {
t.Fatalf("unexpected key: %s", key)
}
Expand All @@ -111,7 +111,7 @@ func TestNestedTxn2(t *testing.T) {
}
defer os.RemoveAll(path)

bucketNames := []string{BucketName}
bucketNames := []string{testBucket}
db, err := Open(path, bucketNames)
defer db.Close()
if err != nil {
Expand All @@ -125,13 +125,13 @@ func TestNestedTxn2(t *testing.T) {
})

db.TransactionalR(func(txn ReadTxner) {
if n := txn.BucketStat(BucketName).Entries; n != 0 {
if n := txn.BucketStat(testBucket).Entries; n != 0 {
t.Fatalf("assertion failed. expect 0, got %d", n)
}

cases := []string{"txn00", "txn10", "txn11", "txn99"}
for _, key := range cases {
_, b := txn.Get(BucketName, []byte(key))
_, b := txn.Get(testBucket, []byte(key))
if b {
t.Fatalf("unexpected key: %s", key)
}
Expand Down
36 changes: 36 additions & 0 deletions test_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package lmdb

import (
"reflect"
)

func MakePatchOfDb(db *Database) (rst TxPatch) {
buckets, err := db.GetExistingBuckets()
if err != nil {
panic(err)
}

db.TransactionalR(func(txn ReadTxner) {
for _, bucket := range buckets {
itr := txn.Iterate(bucket)
if itr == nil {
continue
}

for {
key, val := itr.Get()
rst = append(rst, cellState{bucket, key, true, val})
if !itr.Next() {
break
}
}
}
});
return
}

func IsEqualDb(db1 *Database, db2 *Database) bool {
patch1 := MakePatchOfDb(db1)
patch2 := MakePatchOfDb(db2)
return reflect.DeepEqual(patch1, patch2)
}
18 changes: 9 additions & 9 deletions thread_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,27 @@ func TestThread(t *testing.T) {
}
defer os.RemoveAll(path)

bucketNames := []string{BucketName}
bucketNames := []string{testBucket}
db, err := Open(path, bucketNames)
defer db.Close()
if err != nil {
panic(err)
}

db.TransactionalRW(func(txn *ReadWriteTxn) error {
txn.Put(BucketName, []byte("foo"), []byte("bar"))
txn.Put(testBucket, []byte("foo"), []byte("bar"))
return nil
})

db.TransactionalRW(func(txn *ReadWriteTxn) error {
txn.Put(BucketName, []byte("foo2"), []byte("bar2"))
txn.Put(testBucket, []byte("foo2"), []byte("bar2"))

db.TransactionalR(func(txnR ReadTxner) {
val, exist := txnR.Get(BucketName, []byte("foo"))
val, exist := txnR.Get(testBucket, []byte("foo"))
ensure.True(t, exist)
ensure.DeepEqual(t, val, []byte("bar"))

_, exist2 := txnR.Get(BucketName, []byte("foo2"))
_, exist2 := txnR.Get(testBucket, []byte("foo2"))
ensure.False(t, exist2)
})
return nil
Expand All @@ -49,7 +49,7 @@ func TestThread2(t *testing.T) {
}
defer os.RemoveAll(path)

bucketNames := []string{BucketName}
bucketNames := []string{testBucket}
db, err := Open(path, bucketNames)
defer db.Close()
if err != nil {
Expand All @@ -58,9 +58,9 @@ func TestThread2(t *testing.T) {

go db.TransactionalRW(func(txn *ReadWriteTxn) error {
t.Log("first RW txn begins")
txn.Put(BucketName, []byte("foo"), []byte("bar"))
txn.Put(testBucket, []byte("foo"), []byte("bar"))
time.Sleep(1 * time.Second)
_, exist := txn.Get(BucketName, []byte("foo1"))
_, exist := txn.Get(testBucket, []byte("foo1"))
ensure.False(t, exist)
t.Log("first RW txn ends")
return nil
Expand All @@ -69,7 +69,7 @@ func TestThread2(t *testing.T) {
time.Sleep(100 * time.Millisecond)
go db.TransactionalRW(func(txn *ReadWriteTxn) error {
t.Log("second RW txn begins")
txn.Put(BucketName, []byte("foo1"), []byte("bar1"))
txn.Put(testBucket, []byte("foo1"), []byte("bar1"))
t.Log("second RW txn ends")
return nil
})
Expand Down
10 changes: 10 additions & 0 deletions tx_patch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package lmdb

type cellState struct {
bucket string
key []byte
exists bool
value []byte
}

type TxPatch []cellState
Loading

0 comments on commit 641186d

Please sign in to comment.