Skip to content

Commit

Permalink
tikv: One transaction per iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
maeb committed Nov 7, 2022
1 parent 02fc725 commit 925c5b7
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 19 deletions.
16 changes: 2 additions & 14 deletions internal/tikvidx/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,12 @@ import (

"github.com/nlnwa/gowarcserver/index"
"github.com/nlnwa/gowarcserver/schema"
"github.com/tikv/client-go/v2/txnkv/transaction"
"google.golang.org/protobuf/proto"
)

// Closest returns the first closest cdx value(s).
func (db *DB) Closest(ctx context.Context, req index.ClosestRequest, res chan<- index.CdxResponse) error {
// begin transaction
tx, err := db.client.Begin()
if err != nil {
return err
}

it, err := NewIterClosest(ctx, tx, req.Key(), req.Closest())
it, err := newIterClosest(ctx, db.client, req.Key(), req.Closest())
if err != nil {
return err
}
Expand Down Expand Up @@ -61,18 +54,13 @@ func cdxFromValue(value []byte) (*schema.Cdx, error) {

func (db *DB) Search(ctx context.Context, req index.SearchRequest, res chan<- index.CdxResponse) error {
var err error
var tx *transaction.KVTxn
var it iterator

if len(req.Keys()) == 0 {
return errors.New("search request has no keys")
}
tx, err = db.client.Begin()
if err != nil {
return err
}

it, err = newIter(ctx, tx, req)
it, err = newIter(ctx, db.client, req)
if err != nil {
return err
}
Expand Down
25 changes: 20 additions & 5 deletions internal/tikvidx/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/nlnwa/gowarcserver/index"
"github.com/nlnwa/gowarcserver/timestamp"
"github.com/tikv/client-go/v2/txnkv"
"github.com/tikv/client-go/v2/txnkv/transaction"
)

Expand All @@ -30,7 +31,7 @@ type iterClosest struct {
cmp func(int64, int64) bool
}

func NewIterClosest(_ context.Context, tx *transaction.KVTxn, key string, closest string) (*iterClosest, error) {
func newIterClosest(_ context.Context, client *txnkv.Client, key string, closest string) (*iterClosest, error) {
ic := new(iterClosest)
if t, err := time.Parse(timestamp.CDX, closest); err != nil {
return nil, err
Expand All @@ -41,15 +42,23 @@ func NewIterClosest(_ context.Context, tx *transaction.KVTxn, key string, closes
ic.prefix = []byte(cdxPrefix + key)
k := []byte(cdxPrefix + key + " " + closest)

tx1, err := client.Begin()
if err != nil {
return nil, err
}
// initialize forward iterator
forward, err := tx.Iter(k, []byte(cdxEOF))
forward, err := tx1.Iter(k, []byte(cdxEOF))
if err != nil {
return nil, err
}
ic.forward = forward

tx2, err := client.Begin()
if err != nil {
return nil, err
}
// initialize backward iterator
backward, err := tx.IterReverse(k)
backward, err := tx2.IterReverse(k)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -126,20 +135,26 @@ type iterSort struct {
next <-chan maybeKV
}

func newIter(ctx context.Context, tx *transaction.KVTxn, req index.SearchRequest) (*iterSort, error) {
func newIter(ctx context.Context, client *txnkv.Client, req index.SearchRequest) (*iterSort, error) {
is := new(iterSort)

// initialize iterators
var results []chan *maybeKV
var err error
for _, key := range req.Keys() {
var tx *transaction.KVTxn
tx, err = client.Begin()
if err != nil {
break
}

k := []byte(cdxPrefix + key)
var it iterator
switch req.Sort() {
case index.SortDesc:
it, err = tx.IterReverse(k)
case index.SortClosest:
it, err = NewIterClosest(ctx, tx, key, req.Closest())
it, err = newIterClosest(ctx, client, key, req.Closest())
case index.SortAsc:
fallthrough
case index.SortNone:
Expand Down

0 comments on commit 925c5b7

Please sign in to comment.