Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherrypick PR #5585 to release/v20.03 #5636

Merged
merged 3 commits into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func newNode(store *raftwal.DiskStorage, gid uint32, id uint64, myAddr string) *
ops: make(map[op]*y.Closer),
}
if x.WorkerConfig.LudicrousMode {
n.ex = newExecutor()
n.ex = newExecutor(&m.Applied)
}
return n
}
Expand Down Expand Up @@ -420,7 +420,7 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr
})

if x.WorkerConfig.LudicrousMode {
n.ex.addEdges(ctx, m.StartTs, m.Edges)
n.ex.addEdges(ctx, proposal.Index, m.StartTs, m.Edges)
return nil
}

Expand Down Expand Up @@ -489,14 +489,7 @@ func (n *node) applyCommitted(proposal *pb.Proposal) error {
span.Annotatef(nil, "While applying mutations: %v", err)
return err
}
if x.WorkerConfig.LudicrousMode {
ts := proposal.Mutations.StartTs
return n.commitOrAbort(proposal.Key, &pb.OracleDelta{
Txns: []*pb.TxnStatus{
{StartTs: ts, CommitTs: ts},
},
})
}

span.Annotate(nil, "Done")
return nil
}
Expand Down Expand Up @@ -1430,13 +1423,18 @@ func (n *node) calculateSnapshot(startIdx uint64, discardN int) (*pb.Snapshot, e
span.Annotatef(nil, "Error: %v", err)
return nil, err
}

var start uint64
if proposal.Mutations != nil {
start := proposal.Mutations.StartTs
start = proposal.Mutations.StartTs
if start >= minPendingStart && snapshotIdx == 0 {
snapshotIdx = entry.Index - 1
}
}
if proposal.Delta != nil {
// In ludicrous mode commitTs for any transaction is same as startTs.
if x.WorkerConfig.LudicrousMode {
maxCommitTs = x.Max(maxCommitTs, start)
} else if proposal.Delta != nil {
for _, txn := range proposal.Delta.GetTxns() {
maxCommitTs = x.Max(maxCommitTs, txn.CommitTs)
}
Expand Down
10 changes: 8 additions & 2 deletions worker/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type subMutation struct {
edges []*pb.DirectedEdge
ctx context.Context
startTs uint64
index uint64
}

type executor struct {
Expand All @@ -41,12 +42,14 @@ type executor struct {
sync.RWMutex
predChan map[string]chan *subMutation
closer *y.Closer
applied *y.WaterMark
}

func newExecutor() *executor {
func newExecutor(applied *y.WaterMark) *executor {
ex := &executor{
predChan: make(map[string]chan *subMutation),
closer: y.NewCloser(0),
applied: applied,
}
go ex.shutdown()
return ex
Expand Down Expand Up @@ -80,6 +83,7 @@ func (e *executor) processMutationCh(ch chan *subMutation) {
glog.Errorf("Error while waiting for writes: %v", err)
}

e.applied.Done(payload.index)
atomic.AddInt64(&e.pendingSize, -esize)
}
}
Expand Down Expand Up @@ -111,7 +115,7 @@ const (
executorAddEdges = "executor.addEdges"
)

func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.DirectedEdge) {
func (e *executor) addEdges(ctx context.Context, index, startTs uint64, edges []*pb.DirectedEdge) {
rampMeter(&e.pendingSize, maxPendingEdgesSize, executorAddEdges)

payloadMap := make(map[string]*subMutation)
Expand All @@ -122,6 +126,7 @@ func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.Dir
payloadMap[edge.Attr] = &subMutation{
ctx: ctx,
startTs: startTs,
index: index,
}
payload = payloadMap[edge.Attr]
}
Expand All @@ -138,6 +143,7 @@ func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.Dir
default:
// Closer is not closed. And we have the Lock, so sending on channel should be safe.
for attr, payload := range payloadMap {
e.applied.Begin(index)
e.getChannelUnderLock(attr) <- payload
}
}
Expand Down