Skip to content

Commit c7e23e1

Browse files
committed
setting maxts after successful sending of event
1 parent 104bbd2 commit c7e23e1

File tree

1 file changed

+6
-6
lines changed

1 file changed

+6
-6
lines changed

worker/cdc_ee.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ type CDC struct {
4040
// sentTs is the timestamp till which we have send the event of txns.
4141
// There will be no event below this timestamp for which we need to send the events
4242
sentTs uint64
43-
maxTs uint64
43+
// maxSentTs is the maximum timestamp till which we have sent the events of txns.
44+
// this is helpful to maintain the state of the CDC till which we can clear the raft logs
45+
maxSentTs uint64
4446
}
4547

4648
func newCDC() *CDC {
@@ -76,7 +78,6 @@ func (cdc *CDC) updateTs(newTs uint64) {
7678
if ts >= newTs {
7779
return
7880
}
79-
glog.Infoln("ts updated to ", newTs)
8081
atomic.CompareAndSwapUint64(&cdc.sentTs, ts, newTs)
8182
}
8283

@@ -139,7 +140,7 @@ func (cdc *CDC) processCDCEvents() {
139140
// skip ahead the index to prevent uncontrolled growth of raft logs.
140141
if uint64(len(cdc.pendingTxnEvents)) > cdc.maxRecoveryEntries {
141142
glog.Info("too many pending cdc events. Skipping for now.")
142-
cdc.updateTs(cdc.maxTs)
143+
cdc.updateTs(posting.Oracle().MaxAssigned())
143144
cdc.index = last
144145
cdc.pendingTxnEvents = make(map[uint64][]CDCEvent)
145146
return
@@ -191,7 +192,6 @@ func (cdc *CDC) processCDCEvents() {
191192

192193
if proposal.Delta != nil {
193194
for _, ts := range proposal.Delta.Txns {
194-
cdc.maxTs = x.Max(cdc.maxTs, ts.StartTs)
195195
pending := cdc.pendingTxnEvents[ts.StartTs]
196196
if ts.CommitTs > 0 && len(pending) > 0 {
197197
if err := sendEvents(ts, pending); err != nil {
@@ -201,6 +201,7 @@ func (cdc *CDC) processCDCEvents() {
201201
}
202202
// delete from pending events once events are sent
203203
delete(cdc.pendingTxnEvents, ts.StartTs)
204+
cdc.maxSentTs = x.Max(cdc.maxSentTs, ts.StartTs)
204205
cdc.evaluateAndSetTs()
205206
}
206207
}
@@ -244,8 +245,7 @@ func (cdc *CDC) evaluateAndSetTs() {
244245
if cdc == nil || x.WorkerConfig.LudicrousMode {
245246
return
246247
}
247-
min := cdc.maxTs
248-
glog.Infoln(min, cdc.pendingTxnEvents)
248+
min := cdc.maxSentTs
249249
for ts := range cdc.pendingTxnEvents {
250250
if ts < min {
251251
min = ts

0 commit comments

Comments
 (0)