diff --git a/go/vt/worker/split_diff.go b/go/vt/worker/split_diff.go index a4f07cdf60b..e6cc6a7ff7a 100644 --- a/go/vt/worker/split_diff.go +++ b/go/vt/worker/split_diff.go @@ -96,6 +96,8 @@ func (sdw *SplitDiffWorker) StatusAsHTML() template.HTML { switch state { case WorkerStateDiff: result += "Running...
\n" + case WorkerStateDiffWillFail: + result += "Running - have already found differences...
\n" case WorkerStateDone: result += "Success.
\n" } @@ -112,6 +114,8 @@ func (sdw *SplitDiffWorker) StatusAsText() string { switch state { case WorkerStateDiff: result += "Running...\n" + case WorkerStateDiffWillFail: + result += "Running - have already found differences...\n" case WorkerStateDone: result += "Success.\n" } @@ -400,7 +404,9 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error { sdw.destinationSchemaDefinition, err = sdw.wr.GetSchema( shortCtx, sdw.destinationAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */) cancel() - rec.RecordError(err) + if err != nil { + sdw.markAsWillFail(rec, err) + } sdw.wr.Logger().Infof("Got schema from destination %v", sdw.destinationAlias) wg.Done() }() @@ -411,7 +417,9 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error { sdw.sourceSchemaDefinition, err = sdw.wr.GetSchema( shortCtx, sdw.sourceAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */) cancel() - rec.RecordError(err) + if err != nil { + sdw.markAsWillFail(rec, err) + } sdw.wr.Logger().Infof("Got schema from source %v", sdw.sourceAlias) wg.Done() }() @@ -495,7 +503,7 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error { } if err != nil { newErr := vterrors.Wrap(err, "TableScan(ByKeyRange?)(source) failed") - rec.RecordError(newErr) + sdw.markAsWillFail(rec, newErr) sdw.wr.Logger().Errorf("%v", newErr) return } @@ -511,7 +519,7 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error { } if err != nil { newErr := vterrors.Wrap(err, "TableScan(ByKeyRange?)(destination) failed") - rec.RecordError(newErr) + sdw.markAsWillFail(rec, newErr) sdw.wr.Logger().Errorf("%v", newErr) return } @@ -521,7 +529,7 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error { differ, err := NewRowDiffer(sourceQueryResultReader, destinationQueryResultReader, tableDefinition) if err != nil { newErr := vterrors.Wrap(err, "NewRowDiffer() failed") - rec.RecordError(newErr) + sdw.markAsWillFail(rec, newErr) sdw.wr.Logger().Errorf("%v", newErr) return } @@ -530,12 +538,12 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error { report, err := differ.Go(sdw.wr.Logger()) if err != nil { newErr := fmt.Errorf("Differ.Go failed: %v", err.Error()) - rec.RecordError(newErr) + sdw.markAsWillFail(rec, newErr) sdw.wr.Logger().Errorf("%v", newErr) } else { if report.HasDifferences() { err := fmt.Errorf("Table %v has differences: %v", tableDefinition.Name, report.String()) - rec.RecordError(err) + sdw.markAsWillFail(rec, err) sdw.wr.Logger().Warningf(err.Error()) } else { sdw.wr.Logger().Infof("Table %v checks out (%v rows processed, %v qps)", tableDefinition.Name, report.processedRows, report.processingQPS) @@ -549,3 +557,9 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error { return rec.Error() } + +// markAsWillFail records the error and changes the state of the worker to reflect this +func (sdw *SplitDiffWorker) markAsWillFail(er concurrency.ErrorRecorder, err error) { + er.RecordError(err) + sdw.SetState(WorkerStateDiffWillFail) +} diff --git a/go/vt/worker/status_worker.go b/go/vt/worker/status_worker.go index 73bb49c1727..81c7e61feec 100644 --- a/go/vt/worker/status_worker.go +++ b/go/vt/worker/status_worker.go @@ -50,6 +50,9 @@ const ( // WorkerStateDiff is set when the worker compares the data. WorkerStateDiff StatusWorkerState = "running the diff" + // WorkerStateDiffWillFail is set when the worker is still comparing the data, but we have already found discrepancies. + WorkerStateDiffWillFail StatusWorkerState = "running the diff, already found differences" + // WorkerStateDebugRunning is set when an internal command (e.g. Block or Ping) is currently running. WorkerStateDebugRunning StatusWorkerState = "running an internal debug command" diff --git a/go/vt/worker/vertical_split_diff.go b/go/vt/worker/vertical_split_diff.go index c79321312ab..2980db2fdbf 100644 --- a/go/vt/worker/vertical_split_diff.go +++ b/go/vt/worker/vertical_split_diff.go @@ -89,6 +89,8 @@ func (vsdw *VerticalSplitDiffWorker) StatusAsHTML() template.HTML { switch state { case WorkerStateDiff: result += "Running:
\n" + case WorkerStateDiffWillFail: + result += "Running - have already found differences...
\n" case WorkerStateDone: result += "Success:
\n" } @@ -105,6 +107,8 @@ func (vsdw *VerticalSplitDiffWorker) StatusAsText() string { switch state { case WorkerStateDiff: result += "Running...\n" + case WorkerStateDiffWillFail: + result += "Running - have already found differences...\n" case WorkerStateDone: result += "Success.\n" } @@ -365,7 +369,9 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error { vsdw.destinationSchemaDefinition, err = vsdw.wr.GetSchema( shortCtx, vsdw.destinationAlias, vsdw.shardInfo.SourceShards[0].Tables, nil /* excludeTables */, false /* includeViews */) cancel() - rec.RecordError(err) + if err != nil { + vsdw.markAsWillFail(rec, err) + } vsdw.wr.Logger().Infof("Got schema from destination %v", topoproto.TabletAliasString(vsdw.destinationAlias)) wg.Done() }() @@ -376,7 +382,9 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error { vsdw.sourceSchemaDefinition, err = vsdw.wr.GetSchema( shortCtx, vsdw.sourceAlias, vsdw.shardInfo.SourceShards[0].Tables, nil /* excludeTables */, false /* includeViews */) cancel() - rec.RecordError(err) + if err != nil { + vsdw.markAsWillFail(rec, err) + } vsdw.wr.Logger().Infof("Got schema from source %v", topoproto.TabletAliasString(vsdw.sourceAlias)) wg.Done() }() @@ -409,7 +417,7 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error { sourceQueryResultReader, err := TableScan(ctx, vsdw.wr.Logger(), vsdw.wr.TopoServer(), vsdw.sourceAlias, tableDefinition) if err != nil { newErr := vterrors.Wrap(err, "TableScan(source) failed") - rec.RecordError(newErr) + vsdw.markAsWillFail(rec, newErr) vsdw.wr.Logger().Errorf("%v", newErr) return } @@ -418,7 +426,7 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error { destinationQueryResultReader, err := TableScan(ctx, vsdw.wr.Logger(), vsdw.wr.TopoServer(), vsdw.destinationAlias, tableDefinition) if err != nil { newErr := vterrors.Wrap(err, "TableScan(destination) failed") - rec.RecordError(newErr) + vsdw.markAsWillFail(rec, newErr) vsdw.wr.Logger().Errorf("%v", newErr) return } @@ -427,7 +435,7 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error { differ, err := NewRowDiffer(sourceQueryResultReader, destinationQueryResultReader, tableDefinition) if err != nil { newErr := vterrors.Wrap(err, "NewRowDiffer() failed") - rec.RecordError(newErr) + vsdw.markAsWillFail(rec, newErr) vsdw.wr.Logger().Errorf("%v", newErr) return } @@ -438,7 +446,7 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error { } else { if report.HasDifferences() { err := fmt.Errorf("Table %v has differences: %v", tableDefinition.Name, report.String()) - rec.RecordError(err) + vsdw.markAsWillFail(rec, err) vsdw.wr.Logger().Errorf("%v", err) } else { vsdw.wr.Logger().Infof("Table %v checks out (%v rows processed, %v qps)", tableDefinition.Name, report.processedRows, report.processingQPS) @@ -450,3 +458,9 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error { return rec.Error() } + +// markAsWillFail records the error and changes the state of the worker to reflect this +func (vsdw *VerticalSplitDiffWorker) markAsWillFail(er concurrency.ErrorRecorder, err error) { + er.RecordError(err) + vsdw.SetState(WorkerStateDiffWillFail) +}