From 37f59721d954a64ba7a5e1f3a8111196a8a183ad Mon Sep 17 00:00:00 2001 From: zzm Date: Fri, 2 Jun 2023 14:08:11 +0800 Subject: [PATCH] [close #343]fix cdc residue metrics (#342) * fix cdc residue metrics Signed-off-by: zeminzhou * fix comments Signed-off-by: zeminzhou * fix ut Signed-off-by: zeminzhou * rerun ut Signed-off-by: zeminzhou * rerun ut Signed-off-by: zeminzhou --------- Signed-off-by: zeminzhou Co-authored-by: Ping Yu --- cdc/cdc/capture/capture.go | 2 ++ cdc/cdc/owner/owner.go | 18 ++++++++++++------ cdc/cdc/processor/manager.go | 8 ++++++++ cdc/cdc/processor/processor.go | 5 +++++ cdc/cdc/processor/processor_test.go | 5 ++++- 5 files changed, 31 insertions(+), 7 deletions(-) diff --git a/cdc/cdc/capture/capture.go b/cdc/cdc/capture/capture.go index 6e0e3210..188dc442 100644 --- a/cdc/cdc/capture/capture.go +++ b/cdc/cdc/capture/capture.go @@ -234,6 +234,7 @@ func (c *Capture) run(stdCtx context.Context) error { // (recoverable errors are intercepted in the processor tick) // so we should also stop the processor and let capture restart or exit processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval) + c.processorManager.SyncClose() log.Info("the processor routine has exited", zap.Error(processorErr)) }() wg.Add(1) @@ -326,6 +327,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error { err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval) c.setOwner(nil) log.Info("run owner exited", zap.Error(err)) + owner.CloseAllChangefeeds(ownerCtx) // TODO: fix invalid resign // When exiting normally, cancel will be called to make `owner routine` diff --git a/cdc/cdc/owner/owner.go b/cdc/cdc/owner/owner.go index aa86afc5..f2385632 100644 --- a/cdc/cdc/owner/owner.go +++ b/cdc/cdc/owner/owner.go @@ -188,17 +188,23 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) } } if atomic.LoadInt32(&o.closed) != 0 { - for changefeedID, cfReactor := range o.changefeeds { - ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ - ID: changefeedID, - }) - cfReactor.Close(ctx) - } + o.CloseAllChangefeeds(ctx) return state, cerror.ErrReactorFinished.GenWithStackByArgs() } return state, nil } +// CloseAllCaptures close all changefeeds. +// Note: Please be careful to call this method! +func (o *Owner) CloseAllChangefeeds(ctx cdcContext.Context) { + for changefeedID, cfReactor := range o.changefeeds { + ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ + ID: changefeedID, + }) + cfReactor.Close(ctx) + } +} + // EnqueueJob enqueues an admin job into an internal queue, and the Owner will handle the job in the next tick func (o *Owner) EnqueueJob(adminJob model.AdminJob) { o.pushOwnerJob(&ownerJob{ diff --git a/cdc/cdc/processor/manager.go b/cdc/cdc/processor/manager.go index 02bf7b1a..479db972 100644 --- a/cdc/cdc/processor/manager.go +++ b/cdc/cdc/processor/manager.go @@ -155,6 +155,14 @@ func (m *Manager) sendCommand(tp commandTp, payload interface{}) chan struct{} { return cmd.done } +// SyncClose closes all processors +// Note: This method must not be called with `Tick`!!! +func (m *Manager) SyncClose() { + for changefeedID := range m.processors { + m.closeProcessor(changefeedID) + } +} + func (m *Manager) handleCommand() error { var cmd *command select { diff --git a/cdc/cdc/processor/processor.go b/cdc/cdc/processor/processor.go index b823a5c1..74c46632 100644 --- a/cdc/cdc/processor/processor.go +++ b/cdc/cdc/processor/processor.go @@ -649,6 +649,10 @@ func (p *processor) removeKeySpan(keyspan keyspanpipeline.KeySpanPipeline, keysp } func (p *processor) Close() error { + if !p.initialized { + return nil + } + for _, tbl := range p.keyspans { tbl.Cancel() } @@ -675,6 +679,7 @@ func (p *processor) Close() error { } } + p.initialized = false return nil } diff --git a/cdc/cdc/processor/processor_test.go b/cdc/cdc/processor/processor_test.go index 00775584..ba50498b 100644 --- a/cdc/cdc/processor/processor_test.go +++ b/cdc/cdc/processor/processor_test.go @@ -42,7 +42,10 @@ func newProcessor4Test( createKeySpanPipeline func(ctx cdcContext.Context, keyspanID model.KeySpanID, replicaInfo *model.KeySpanReplicaInfo) (keyspanpipeline.KeySpanPipeline, error), ) *processor { p := newProcessor(ctx) - p.lazyInit = func(ctx cdcContext.Context) error { return nil } + p.lazyInit = func(ctx cdcContext.Context) error { + p.initialized = true + return nil + } p.sinkManager = &sink.Manager{} p.createKeySpanPipeline = createKeySpanPipeline return p