diff --git a/worker/groups.go b/worker/groups.go index 07193984a44..94b4edb5c96 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -646,11 +646,17 @@ func (g *groupi) proposeDelta(oracleDelta *intern.OracleDelta) { if !g.Node.AmLeader() { return } - // TODO (pawan) - All servers open a stream with Zero and processDelta. Why do we still have to - // propose these updates then? + + // Only the leader of a group proposes the commit proposal for a group after getting delta from + // Zero. for startTs, commitTs := range oracleDelta.Commits { + // The leader might not have yet applied the mutation and hence may not have the txn in the + // map. Its ok we can just continue, processOracleDeltaStream checks the oracle map every + // minute and calls proposeDelta. if posting.Txns().Get(startTs) == nil { - posting.Oracle().Done(startTs) + // Don't mark oracle as done here as then it would be deleted the entry from map and it + // won't be proposed to the group. This could eventually block snapshots from happening + // in a replicated cluster. continue } tctx := &api.TxnContext{StartTs: startTs, CommitTs: commitTs} @@ -658,7 +664,6 @@ func (g *groupi) proposeDelta(oracleDelta *intern.OracleDelta) { } for _, startTs := range oracleDelta.Aborts { if posting.Txns().Get(startTs) == nil { - posting.Oracle().Done(startTs) continue } tctx := &api.TxnContext{StartTs: startTs}