Skip to content

Commit

Permalink
[close #343]fix cdc residue metrics (#342)
Browse files Browse the repository at this point in the history
* fix cdc residue metrics

Signed-off-by: zeminzhou <[email protected]>

* fix comments

Signed-off-by: zeminzhou <[email protected]>

* fix ut

Signed-off-by: zeminzhou <[email protected]>

* rerun ut

Signed-off-by: zeminzhou <[email protected]>

* rerun ut

Signed-off-by: zeminzhou <[email protected]>

---------

Signed-off-by: zeminzhou <[email protected]>
Co-authored-by: Ping Yu <[email protected]>
  • Loading branch information
zeminzhou and pingyu authored Jun 2, 2023
1 parent 1ac96c5 commit 37f5972
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 7 deletions.
2 changes: 2 additions & 0 deletions cdc/cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func (c *Capture) run(stdCtx context.Context) error {
// (recoverable errors are intercepted in the processor tick)
// so we should also stop the processor and let capture restart or exit
processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval)
c.processorManager.SyncClose()
log.Info("the processor routine has exited", zap.Error(processorErr))
}()
wg.Add(1)
Expand Down Expand Up @@ -326,6 +327,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval)
c.setOwner(nil)
log.Info("run owner exited", zap.Error(err))
owner.CloseAllChangefeeds(ownerCtx)

// TODO: fix invalid resign
// When exiting normally, cancel will be called to make `owner routine`
Expand Down
18 changes: 12 additions & 6 deletions cdc/cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,17 +188,23 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState)
}
}
if atomic.LoadInt32(&o.closed) != 0 {
for changefeedID, cfReactor := range o.changefeeds {
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: changefeedID,
})
cfReactor.Close(ctx)
}
o.CloseAllChangefeeds(ctx)
return state, cerror.ErrReactorFinished.GenWithStackByArgs()
}
return state, nil
}

// CloseAllCaptures close all changefeeds.
// Note: Please be careful to call this method!
func (o *Owner) CloseAllChangefeeds(ctx cdcContext.Context) {
for changefeedID, cfReactor := range o.changefeeds {
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: changefeedID,
})
cfReactor.Close(ctx)
}
}

// EnqueueJob enqueues an admin job into an internal queue, and the Owner will handle the job in the next tick
func (o *Owner) EnqueueJob(adminJob model.AdminJob) {
o.pushOwnerJob(&ownerJob{
Expand Down
8 changes: 8 additions & 0 deletions cdc/cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ func (m *Manager) sendCommand(tp commandTp, payload interface{}) chan struct{} {
return cmd.done
}

// SyncClose closes all processors
// Note: This method must not be called with `Tick`!!!
func (m *Manager) SyncClose() {
for changefeedID := range m.processors {
m.closeProcessor(changefeedID)
}
}

func (m *Manager) handleCommand() error {
var cmd *command
select {
Expand Down
5 changes: 5 additions & 0 deletions cdc/cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,10 @@ func (p *processor) removeKeySpan(keyspan keyspanpipeline.KeySpanPipeline, keysp
}

func (p *processor) Close() error {
if !p.initialized {
return nil
}

for _, tbl := range p.keyspans {
tbl.Cancel()
}
Expand All @@ -675,6 +679,7 @@ func (p *processor) Close() error {
}
}

p.initialized = false
return nil
}

Expand Down
5 changes: 4 additions & 1 deletion cdc/cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ func newProcessor4Test(
createKeySpanPipeline func(ctx cdcContext.Context, keyspanID model.KeySpanID, replicaInfo *model.KeySpanReplicaInfo) (keyspanpipeline.KeySpanPipeline, error),
) *processor {
p := newProcessor(ctx)
p.lazyInit = func(ctx cdcContext.Context) error { return nil }
p.lazyInit = func(ctx cdcContext.Context) error {
p.initialized = true
return nil
}
p.sinkManager = &sink.Manager{}
p.createKeySpanPipeline = createKeySpanPipeline
return p
Expand Down

0 comments on commit 37f5972

Please sign in to comment.