diff --git a/server/etcdserver/txn/delete.go b/server/etcdserver/txn/delete.go new file mode 100644 index 000000000000..7d1218e8ce18 --- /dev/null +++ b/server/etcdserver/txn/delete.go @@ -0,0 +1,70 @@ +// Copyright 2025 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package txn + +import ( + "context" + + "go.uber.org/zap" + + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/mvccpb" + "go.etcd.io/etcd/pkg/v3/traceutil" + "go.etcd.io/etcd/server/v3/storage/mvcc" +) + +func DeleteRange(ctx context.Context, lg *zap.Logger, kv mvcc.KV, dr *pb.DeleteRangeRequest) (resp *pb.DeleteRangeResponse, trace *traceutil.Trace, err error) { + ctx, trace = ensureTrace(ctx, lg, "delete_range", + traceutil.Field{Key: "key", Value: string(dr.Key)}, + traceutil.Field{Key: "range_end", Value: string(dr.RangeEnd)}, + ) + txnWrite := kv.Write(trace) + defer txnWrite.End() + resp, err = deleteRange(ctx, txnWrite, dr) + return resp, trace, err +} + +func deleteRange(ctx context.Context, txnWrite mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { + resp := &pb.DeleteRangeResponse{} + resp.Header = &pb.ResponseHeader{} + end := mkGteRange(dr.RangeEnd) + + if dr.PrevKv { + rr, err := txnWrite.Range(ctx, dr.Key, end, mvcc.RangeOptions{}) + if err != nil { + return nil, err + } + if rr != nil { + resp.PrevKvs = make([]*mvccpb.KeyValue, len(rr.KVs)) + for i := range rr.KVs { + resp.PrevKvs[i] = &rr.KVs[i] + } + } + } + + resp.Deleted, resp.Header.Revision = txnWrite.DeleteRange(dr.Key, end) + return resp, nil +} + +// mkGteRange determines if the range end is a >= range. This works around grpc +// sending empty byte strings as nil; >= is encoded in the range end as '\0'. +// If it is a GTE range, then []byte{} is returned to indicate the empty byte +// string (vs nil being no byte string). +func mkGteRange(rangeEnd []byte) []byte { + if len(rangeEnd) == 1 && rangeEnd[0] == 0 { + return []byte{} + } + return rangeEnd +} diff --git a/server/etcdserver/txn/put.go b/server/etcdserver/txn/put.go new file mode 100644 index 000000000000..8c304cc59293 --- /dev/null +++ b/server/etcdserver/txn/put.go @@ -0,0 +1,114 @@ +// Copyright 2025 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package txn + +import ( + "context" + + "go.uber.org/zap" + + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/pkg/v3/traceutil" + "go.etcd.io/etcd/server/v3/etcdserver/errors" + "go.etcd.io/etcd/server/v3/lease" + "go.etcd.io/etcd/server/v3/storage/mvcc" +) + +func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { + ctx, trace = ensureTrace(ctx, lg, "put", + traceutil.Field{Key: "key", Value: string(p.Key)}, + traceutil.Field{Key: "req_size", Value: p.Size()}, + ) + err = checkLease(lessor, p) + if err != nil { + return nil, trace, err + } + txnWrite := kv.Write(trace) + defer txnWrite.End() + prevKV, err := checkAndGetPrevKV(trace, txnWrite, p) + if err != nil { + return nil, trace, err + } + return put(ctx, txnWrite, p, prevKV), trace, nil +} + +func put(ctx context.Context, txnWrite mvcc.TxnWrite, p *pb.PutRequest, prevKV *mvcc.RangeResult) *pb.PutResponse { + trace := traceutil.Get(ctx) + resp := &pb.PutResponse{} + resp.Header = &pb.ResponseHeader{} + val, leaseID := p.Value, lease.LeaseID(p.Lease) + + if p.IgnoreValue { + val = prevKV.KVs[0].Value + } + if p.IgnoreLease { + leaseID = lease.LeaseID(prevKV.KVs[0].Lease) + } + if p.PrevKv { + if prevKV != nil && len(prevKV.KVs) != 0 { + resp.PrevKv = &prevKV.KVs[0] + } + } + + resp.Header.Revision = txnWrite.Put(p.Key, val, leaseID) + trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision}) + return resp +} + +func checkPut(trace *traceutil.Trace, txnWrite mvcc.ReadView, lessor lease.Lessor, p *pb.PutRequest) error { + err := checkLease(lessor, p) + if err != nil { + return err + } + _, err = checkAndGetPrevKV(trace, txnWrite, p) + return err +} + +func checkLease(lessor lease.Lessor, p *pb.PutRequest) error { + leaseID := lease.LeaseID(p.Lease) + if leaseID != lease.NoLease { + if l := lessor.Lookup(leaseID); l == nil { + return lease.ErrLeaseNotFound + } + } + return nil +} + +func checkAndGetPrevKV(trace *traceutil.Trace, txnWrite mvcc.ReadView, p *pb.PutRequest) (prevKV *mvcc.RangeResult, err error) { + prevKV, err = getPrevKV(trace, txnWrite, p) + if err != nil { + return nil, err + } + if p.IgnoreValue || p.IgnoreLease { + if prevKV == nil || len(prevKV.KVs) == 0 { + // ignore_{lease,value} flag expects previous key-value pair + return nil, errors.ErrKeyNotFound + } + } + return prevKV, nil +} + +func getPrevKV(trace *traceutil.Trace, txnWrite mvcc.ReadView, p *pb.PutRequest) (prevKV *mvcc.RangeResult, err error) { + if p.IgnoreValue || p.IgnoreLease || p.PrevKv { + trace.StepWithFunction(func() { + prevKV, err = txnWrite.Range(context.TODO(), p.Key, nil, mvcc.RangeOptions{}) + }, "get previous kv pair") + + if err != nil { + return nil, err + } + } + return prevKV, nil +} diff --git a/server/etcdserver/txn/range.go b/server/etcdserver/txn/range.go new file mode 100644 index 000000000000..8e9fef34ad7b --- /dev/null +++ b/server/etcdserver/txn/range.go @@ -0,0 +1,203 @@ +// Copyright 2025 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package txn + +import ( + "bytes" + "context" + "sort" + "time" + + "go.uber.org/zap" + + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/mvccpb" + "go.etcd.io/etcd/pkg/v3/traceutil" + "go.etcd.io/etcd/server/v3/storage/mvcc" +) + +func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest) (resp *pb.RangeResponse, trace *traceutil.Trace, err error) { + ctx, trace = ensureTrace(ctx, lg, "range") + defer func(start time.Time) { + success := err == nil + RangeSecObserve(success, time.Since(start)) + }(time.Now()) + txnRead := kv.Read(mvcc.ConcurrentReadTxMode, trace) + defer txnRead.End() + resp, err = executeRange(ctx, lg, txnRead, r) + return resp, trace, err +} + +func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { + trace := traceutil.Get(ctx) + + resp := &pb.RangeResponse{} + resp.Header = &pb.ResponseHeader{} + + limit := r.Limit + if r.SortOrder != pb.RangeRequest_NONE || + r.MinModRevision != 0 || r.MaxModRevision != 0 || + r.MinCreateRevision != 0 || r.MaxCreateRevision != 0 { + // fetch everything; sort and truncate afterwards + limit = 0 + } + if limit > 0 { + // fetch one extra for 'more' flag + limit = limit + 1 + } + + ro := mvcc.RangeOptions{ + Limit: limit, + Rev: r.Revision, + Count: r.CountOnly, + } + + rr, err := txnRead.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro) + if err != nil { + return nil, err + } + + if r.MaxModRevision != 0 { + f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision > r.MaxModRevision } + pruneKVs(rr, f) + } + if r.MinModRevision != 0 { + f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision < r.MinModRevision } + pruneKVs(rr, f) + } + if r.MaxCreateRevision != 0 { + f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision > r.MaxCreateRevision } + pruneKVs(rr, f) + } + if r.MinCreateRevision != 0 { + f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision < r.MinCreateRevision } + pruneKVs(rr, f) + } + + sortOrder := r.SortOrder + if r.SortTarget != pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_NONE { + // Since current mvcc.Range implementation returns results + // sorted by keys in lexiographically ascending order, + // sort ASCEND by default only when target is not 'KEY' + sortOrder = pb.RangeRequest_ASCEND + } else if r.SortTarget == pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_ASCEND { + // Since current mvcc.Range implementation returns results + // sorted by keys in lexiographically ascending order, + // don't re-sort when target is 'KEY' and order is ASCEND + sortOrder = pb.RangeRequest_NONE + } + if sortOrder != pb.RangeRequest_NONE { + var sorter sort.Interface + switch { + case r.SortTarget == pb.RangeRequest_KEY: + sorter = &kvSortByKey{&kvSort{rr.KVs}} + case r.SortTarget == pb.RangeRequest_VERSION: + sorter = &kvSortByVersion{&kvSort{rr.KVs}} + case r.SortTarget == pb.RangeRequest_CREATE: + sorter = &kvSortByCreate{&kvSort{rr.KVs}} + case r.SortTarget == pb.RangeRequest_MOD: + sorter = &kvSortByMod{&kvSort{rr.KVs}} + case r.SortTarget == pb.RangeRequest_VALUE: + sorter = &kvSortByValue{&kvSort{rr.KVs}} + default: + lg.Panic("unexpected sort target", zap.Int32("sort-target", int32(r.SortTarget))) + } + switch { + case sortOrder == pb.RangeRequest_ASCEND: + sort.Sort(sorter) + case sortOrder == pb.RangeRequest_DESCEND: + sort.Sort(sort.Reverse(sorter)) + } + } + + if r.Limit > 0 && len(rr.KVs) > int(r.Limit) { + rr.KVs = rr.KVs[:r.Limit] + resp.More = true + } + trace.Step("filter and sort the key-value pairs") + resp.Header.Revision = rr.Rev + resp.Count = int64(rr.Count) + resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs)) + for i := range rr.KVs { + if r.KeysOnly { + rr.KVs[i].Value = nil + } + resp.Kvs[i] = &rr.KVs[i] + } + trace.Step("assemble the response") + return resp, nil +} + +func checkRange(rv mvcc.ReadView, req *pb.RangeRequest) error { + switch { + case req.Revision == 0: + return nil + case req.Revision > rv.Rev(): + return mvcc.ErrFutureRev + case req.Revision < rv.FirstRev(): + return mvcc.ErrCompacted + } + return nil +} + +func pruneKVs(rr *mvcc.RangeResult, isPrunable func(*mvccpb.KeyValue) bool) { + j := 0 + for i := range rr.KVs { + rr.KVs[j] = rr.KVs[i] + if !isPrunable(&rr.KVs[i]) { + j++ + } + } + rr.KVs = rr.KVs[:j] +} + +type kvSort struct{ kvs []mvccpb.KeyValue } + +func (s *kvSort) Swap(i, j int) { + t := s.kvs[i] + s.kvs[i] = s.kvs[j] + s.kvs[j] = t +} +func (s *kvSort) Len() int { return len(s.kvs) } + +type kvSortByKey struct{ *kvSort } + +func (s *kvSortByKey) Less(i, j int) bool { + return bytes.Compare(s.kvs[i].Key, s.kvs[j].Key) < 0 +} + +type kvSortByVersion struct{ *kvSort } + +func (s *kvSortByVersion) Less(i, j int) bool { + return (s.kvs[i].Version - s.kvs[j].Version) < 0 +} + +type kvSortByCreate struct{ *kvSort } + +func (s *kvSortByCreate) Less(i, j int) bool { + return (s.kvs[i].CreateRevision - s.kvs[j].CreateRevision) < 0 +} + +type kvSortByMod struct{ *kvSort } + +func (s *kvSortByMod) Less(i, j int) bool { + return (s.kvs[i].ModRevision - s.kvs[j].ModRevision) < 0 +} + +type kvSortByValue struct{ *kvSort } + +func (s *kvSortByValue) Less(i, j int) bool { + return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0 +} diff --git a/server/etcdserver/txn/txn.go b/server/etcdserver/txn/txn.go index 22d232a864f8..b0d568d28ef9 100644 --- a/server/etcdserver/txn/txn.go +++ b/server/etcdserver/txn/txn.go @@ -18,8 +18,6 @@ import ( "bytes" "context" "fmt" - "sort" - "time" "go.uber.org/zap" @@ -27,197 +25,10 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/server/v3/auth" - "go.etcd.io/etcd/server/v3/etcdserver/errors" "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/storage/mvcc" ) -func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { - ctx, trace = ensureTrace(ctx, lg, "put", - traceutil.Field{Key: "key", Value: string(p.Key)}, - traceutil.Field{Key: "req_size", Value: p.Size()}, - ) - err = checkLease(lessor, p) - if err != nil { - return nil, trace, err - } - txnWrite := kv.Write(trace) - defer txnWrite.End() - prevKV, err := checkAndGetPrevKV(trace, txnWrite, p) - if err != nil { - return nil, trace, err - } - return put(ctx, txnWrite, p, prevKV), trace, nil -} - -func put(ctx context.Context, txnWrite mvcc.TxnWrite, p *pb.PutRequest, prevKV *mvcc.RangeResult) *pb.PutResponse { - trace := traceutil.Get(ctx) - resp := &pb.PutResponse{} - resp.Header = &pb.ResponseHeader{} - val, leaseID := p.Value, lease.LeaseID(p.Lease) - - if p.IgnoreValue { - val = prevKV.KVs[0].Value - } - if p.IgnoreLease { - leaseID = lease.LeaseID(prevKV.KVs[0].Lease) - } - if p.PrevKv { - if prevKV != nil && len(prevKV.KVs) != 0 { - resp.PrevKv = &prevKV.KVs[0] - } - } - - resp.Header.Revision = txnWrite.Put(p.Key, val, leaseID) - trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision}) - return resp -} - -func DeleteRange(ctx context.Context, lg *zap.Logger, kv mvcc.KV, dr *pb.DeleteRangeRequest) (resp *pb.DeleteRangeResponse, trace *traceutil.Trace, err error) { - ctx, trace = ensureTrace(ctx, lg, "delete_range", - traceutil.Field{Key: "key", Value: string(dr.Key)}, - traceutil.Field{Key: "range_end", Value: string(dr.RangeEnd)}, - ) - txnWrite := kv.Write(trace) - defer txnWrite.End() - resp, err = deleteRange(ctx, txnWrite, dr) - return resp, trace, err -} - -func deleteRange(ctx context.Context, txnWrite mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { - resp := &pb.DeleteRangeResponse{} - resp.Header = &pb.ResponseHeader{} - end := mkGteRange(dr.RangeEnd) - - if dr.PrevKv { - rr, err := txnWrite.Range(ctx, dr.Key, end, mvcc.RangeOptions{}) - if err != nil { - return nil, err - } - if rr != nil { - resp.PrevKvs = make([]*mvccpb.KeyValue, len(rr.KVs)) - for i := range rr.KVs { - resp.PrevKvs[i] = &rr.KVs[i] - } - } - } - - resp.Deleted, resp.Header.Revision = txnWrite.DeleteRange(dr.Key, end) - return resp, nil -} - -func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest) (resp *pb.RangeResponse, trace *traceutil.Trace, err error) { - ctx, trace = ensureTrace(ctx, lg, "range") - defer func(start time.Time) { - success := err == nil - RangeSecObserve(success, time.Since(start)) - }(time.Now()) - txnRead := kv.Read(mvcc.ConcurrentReadTxMode, trace) - defer txnRead.End() - resp, err = executeRange(ctx, lg, txnRead, r) - return resp, trace, err -} - -func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { - trace := traceutil.Get(ctx) - - resp := &pb.RangeResponse{} - resp.Header = &pb.ResponseHeader{} - - limit := r.Limit - if r.SortOrder != pb.RangeRequest_NONE || - r.MinModRevision != 0 || r.MaxModRevision != 0 || - r.MinCreateRevision != 0 || r.MaxCreateRevision != 0 { - // fetch everything; sort and truncate afterwards - limit = 0 - } - if limit > 0 { - // fetch one extra for 'more' flag - limit = limit + 1 - } - - ro := mvcc.RangeOptions{ - Limit: limit, - Rev: r.Revision, - Count: r.CountOnly, - } - - rr, err := txnRead.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro) - if err != nil { - return nil, err - } - - if r.MaxModRevision != 0 { - f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision > r.MaxModRevision } - pruneKVs(rr, f) - } - if r.MinModRevision != 0 { - f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision < r.MinModRevision } - pruneKVs(rr, f) - } - if r.MaxCreateRevision != 0 { - f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision > r.MaxCreateRevision } - pruneKVs(rr, f) - } - if r.MinCreateRevision != 0 { - f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision < r.MinCreateRevision } - pruneKVs(rr, f) - } - - sortOrder := r.SortOrder - if r.SortTarget != pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_NONE { - // Since current mvcc.Range implementation returns results - // sorted by keys in lexiographically ascending order, - // sort ASCEND by default only when target is not 'KEY' - sortOrder = pb.RangeRequest_ASCEND - } else if r.SortTarget == pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_ASCEND { - // Since current mvcc.Range implementation returns results - // sorted by keys in lexiographically ascending order, - // don't re-sort when target is 'KEY' and order is ASCEND - sortOrder = pb.RangeRequest_NONE - } - if sortOrder != pb.RangeRequest_NONE { - var sorter sort.Interface - switch { - case r.SortTarget == pb.RangeRequest_KEY: - sorter = &kvSortByKey{&kvSort{rr.KVs}} - case r.SortTarget == pb.RangeRequest_VERSION: - sorter = &kvSortByVersion{&kvSort{rr.KVs}} - case r.SortTarget == pb.RangeRequest_CREATE: - sorter = &kvSortByCreate{&kvSort{rr.KVs}} - case r.SortTarget == pb.RangeRequest_MOD: - sorter = &kvSortByMod{&kvSort{rr.KVs}} - case r.SortTarget == pb.RangeRequest_VALUE: - sorter = &kvSortByValue{&kvSort{rr.KVs}} - default: - lg.Panic("unexpected sort target", zap.Int32("sort-target", int32(r.SortTarget))) - } - switch { - case sortOrder == pb.RangeRequest_ASCEND: - sort.Sort(sorter) - case sortOrder == pb.RangeRequest_DESCEND: - sort.Sort(sort.Reverse(sorter)) - } - } - - if r.Limit > 0 && len(rr.KVs) > int(r.Limit) { - rr.KVs = rr.KVs[:r.Limit] - resp.More = true - } - trace.Step("filter and sort the key-value pairs") - resp.Header.Revision = rr.Rev - resp.Count = int64(rr.Count) - resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs)) - for i := range rr.KVs { - if r.KeysOnly { - rr.KVs[i].Value = nil - } - resp.Kvs[i] = &rr.KVs[i] - } - trace.Step("assemble the response") - return resp, nil -} - func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (txnResp *pb.TxnResponse, trace *traceutil.Trace, err error) { ctx, trace = ensureTrace(ctx, lg, "transaction") isWrite := !IsTxnReadonly(rt) @@ -376,64 +187,6 @@ func executeTxn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt return txns, nil } -func checkPut(trace *traceutil.Trace, txnWrite mvcc.ReadView, lessor lease.Lessor, p *pb.PutRequest) error { - err := checkLease(lessor, p) - if err != nil { - return err - } - _, err = checkAndGetPrevKV(trace, txnWrite, p) - return err -} - -func checkLease(lessor lease.Lessor, p *pb.PutRequest) error { - leaseID := lease.LeaseID(p.Lease) - if leaseID != lease.NoLease { - if l := lessor.Lookup(leaseID); l == nil { - return lease.ErrLeaseNotFound - } - } - return nil -} - -func checkAndGetPrevKV(trace *traceutil.Trace, txnWrite mvcc.ReadView, p *pb.PutRequest) (prevKV *mvcc.RangeResult, err error) { - prevKV, err = getPrevKV(trace, txnWrite, p) - if err != nil { - return nil, err - } - if p.IgnoreValue || p.IgnoreLease { - if prevKV == nil || len(prevKV.KVs) == 0 { - // ignore_{lease,value} flag expects previous key-value pair - return nil, errors.ErrKeyNotFound - } - } - return prevKV, nil -} - -func getPrevKV(trace *traceutil.Trace, txnWrite mvcc.ReadView, p *pb.PutRequest) (prevKV *mvcc.RangeResult, err error) { - if p.IgnoreValue || p.IgnoreLease || p.PrevKv { - trace.StepWithFunction(func() { - prevKV, err = txnWrite.Range(context.TODO(), p.Key, nil, mvcc.RangeOptions{}) - }, "get previous kv pair") - - if err != nil { - return nil, err - } - } - return prevKV, nil -} - -func checkRange(rv mvcc.ReadView, req *pb.RangeRequest) error { - switch { - case req.Revision == 0: - return nil - case req.Revision > rv.Rev(): - return mvcc.ErrFutureRev - case req.Revision < rv.FirstRev(): - return mvcc.ErrCompacted - } - return nil -} - func checkTxn(trace *traceutil.Trace, rv mvcc.ReadView, rt *pb.TxnRequest, lessor lease.Lessor, txnPath []bool) (int, error) { txnCount := 0 reqs := rt.Success @@ -463,67 +216,6 @@ func checkTxn(trace *traceutil.Trace, rv mvcc.ReadView, rt *pb.TxnRequest, lesso return txnCount, nil } -// mkGteRange determines if the range end is a >= range. This works around grpc -// sending empty byte strings as nil; >= is encoded in the range end as '\0'. -// If it is a GTE range, then []byte{} is returned to indicate the empty byte -// string (vs nil being no byte string). -func mkGteRange(rangeEnd []byte) []byte { - if len(rangeEnd) == 1 && rangeEnd[0] == 0 { - return []byte{} - } - return rangeEnd -} - -func pruneKVs(rr *mvcc.RangeResult, isPrunable func(*mvccpb.KeyValue) bool) { - j := 0 - for i := range rr.KVs { - rr.KVs[j] = rr.KVs[i] - if !isPrunable(&rr.KVs[i]) { - j++ - } - } - rr.KVs = rr.KVs[:j] -} - -type kvSort struct{ kvs []mvccpb.KeyValue } - -func (s *kvSort) Swap(i, j int) { - t := s.kvs[i] - s.kvs[i] = s.kvs[j] - s.kvs[j] = t -} -func (s *kvSort) Len() int { return len(s.kvs) } - -type kvSortByKey struct{ *kvSort } - -func (s *kvSortByKey) Less(i, j int) bool { - return bytes.Compare(s.kvs[i].Key, s.kvs[j].Key) < 0 -} - -type kvSortByVersion struct{ *kvSort } - -func (s *kvSortByVersion) Less(i, j int) bool { - return (s.kvs[i].Version - s.kvs[j].Version) < 0 -} - -type kvSortByCreate struct{ *kvSort } - -func (s *kvSortByCreate) Less(i, j int) bool { - return (s.kvs[i].CreateRevision - s.kvs[j].CreateRevision) < 0 -} - -type kvSortByMod struct{ *kvSort } - -func (s *kvSortByMod) Less(i, j int) bool { - return (s.kvs[i].ModRevision - s.kvs[j].ModRevision) < 0 -} - -type kvSortByValue struct{ *kvSort } - -func (s *kvSortByValue) Less(i, j int) bool { - return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0 -} - func compareInt64(a, b int64) int { switch { case a < b: