Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions go/vt/worker/split_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ func (sdw *SplitDiffWorker) StatusAsHTML() template.HTML {
switch state {
case WorkerStateDiff:
result += "<b>Running...</b></br>\n"
case WorkerStateDiffWillFail:
result += "<b>Running - have already found differences...</b></br>\n"
case WorkerStateDone:
result += "<b>Success.</b></br>\n"
}
Expand All @@ -112,6 +114,8 @@ func (sdw *SplitDiffWorker) StatusAsText() string {
switch state {
case WorkerStateDiff:
result += "Running...\n"
case WorkerStateDiffWillFail:
result += "Running - have already found differences...\n"
case WorkerStateDone:
result += "Success.\n"
}
Expand Down Expand Up @@ -400,7 +404,9 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error {
sdw.destinationSchemaDefinition, err = sdw.wr.GetSchema(
shortCtx, sdw.destinationAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */)
cancel()
rec.RecordError(err)
if err != nil {
sdw.markAsWillFail(rec, err)
}
sdw.wr.Logger().Infof("Got schema from destination %v", sdw.destinationAlias)
wg.Done()
}()
Expand All @@ -411,7 +417,9 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error {
sdw.sourceSchemaDefinition, err = sdw.wr.GetSchema(
shortCtx, sdw.sourceAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */)
cancel()
rec.RecordError(err)
if err != nil {
sdw.markAsWillFail(rec, err)
}
sdw.wr.Logger().Infof("Got schema from source %v", sdw.sourceAlias)
wg.Done()
}()
Expand Down Expand Up @@ -495,7 +503,7 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error {
}
if err != nil {
newErr := vterrors.Wrap(err, "TableScan(ByKeyRange?)(source) failed")
rec.RecordError(newErr)
sdw.markAsWillFail(rec, newErr)
sdw.wr.Logger().Errorf("%v", newErr)
return
}
Expand All @@ -511,7 +519,7 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error {
}
if err != nil {
newErr := vterrors.Wrap(err, "TableScan(ByKeyRange?)(destination) failed")
rec.RecordError(newErr)
sdw.markAsWillFail(rec, newErr)
sdw.wr.Logger().Errorf("%v", newErr)
return
}
Expand All @@ -521,7 +529,7 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error {
differ, err := NewRowDiffer(sourceQueryResultReader, destinationQueryResultReader, tableDefinition)
if err != nil {
newErr := vterrors.Wrap(err, "NewRowDiffer() failed")
rec.RecordError(newErr)
sdw.markAsWillFail(rec, newErr)
sdw.wr.Logger().Errorf("%v", newErr)
return
}
Expand All @@ -530,12 +538,12 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error {
report, err := differ.Go(sdw.wr.Logger())
if err != nil {
newErr := fmt.Errorf("Differ.Go failed: %v", err.Error())
rec.RecordError(newErr)
sdw.markAsWillFail(rec, newErr)
sdw.wr.Logger().Errorf("%v", newErr)
} else {
if report.HasDifferences() {
err := fmt.Errorf("Table %v has differences: %v", tableDefinition.Name, report.String())
rec.RecordError(err)
sdw.markAsWillFail(rec, err)
sdw.wr.Logger().Warningf(err.Error())
} else {
sdw.wr.Logger().Infof("Table %v checks out (%v rows processed, %v qps)", tableDefinition.Name, report.processedRows, report.processingQPS)
Expand All @@ -549,3 +557,9 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error {

return rec.Error()
}

// markAsWillFail records the error and changes the state of the worker to reflect this
func (sdw *SplitDiffWorker) markAsWillFail(er concurrency.ErrorRecorder, err error) {
er.RecordError(err)
sdw.SetState(WorkerStateDiffWillFail)
}
3 changes: 3 additions & 0 deletions go/vt/worker/status_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ const (
// WorkerStateDiff is set when the worker compares the data.
WorkerStateDiff StatusWorkerState = "running the diff"

// WorkerStateDiffWillFail is set when the worker is still comparing the data, but we have already found discrepancies.
WorkerStateDiffWillFail StatusWorkerState = "running the diff, already found differences"

// WorkerStateDebugRunning is set when an internal command (e.g. Block or Ping) is currently running.
WorkerStateDebugRunning StatusWorkerState = "running an internal debug command"

Expand Down
26 changes: 20 additions & 6 deletions go/vt/worker/vertical_split_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ func (vsdw *VerticalSplitDiffWorker) StatusAsHTML() template.HTML {
switch state {
case WorkerStateDiff:
result += "<b>Running</b>:</br>\n"
case WorkerStateDiffWillFail:
result += "<b>Running - have already found differences...</b></br>\n"
case WorkerStateDone:
result += "<b>Success</b>:</br>\n"
}
Expand All @@ -105,6 +107,8 @@ func (vsdw *VerticalSplitDiffWorker) StatusAsText() string {
switch state {
case WorkerStateDiff:
result += "Running...\n"
case WorkerStateDiffWillFail:
result += "Running - have already found differences...\n"
case WorkerStateDone:
result += "Success.\n"
}
Expand Down Expand Up @@ -365,7 +369,9 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error {
vsdw.destinationSchemaDefinition, err = vsdw.wr.GetSchema(
shortCtx, vsdw.destinationAlias, vsdw.shardInfo.SourceShards[0].Tables, nil /* excludeTables */, false /* includeViews */)
cancel()
rec.RecordError(err)
if err != nil {
vsdw.markAsWillFail(rec, err)
}
vsdw.wr.Logger().Infof("Got schema from destination %v", topoproto.TabletAliasString(vsdw.destinationAlias))
wg.Done()
}()
Expand All @@ -376,7 +382,9 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error {
vsdw.sourceSchemaDefinition, err = vsdw.wr.GetSchema(
shortCtx, vsdw.sourceAlias, vsdw.shardInfo.SourceShards[0].Tables, nil /* excludeTables */, false /* includeViews */)
cancel()
rec.RecordError(err)
if err != nil {
vsdw.markAsWillFail(rec, err)
}
vsdw.wr.Logger().Infof("Got schema from source %v", topoproto.TabletAliasString(vsdw.sourceAlias))
wg.Done()
}()
Expand Down Expand Up @@ -409,7 +417,7 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error {
sourceQueryResultReader, err := TableScan(ctx, vsdw.wr.Logger(), vsdw.wr.TopoServer(), vsdw.sourceAlias, tableDefinition)
if err != nil {
newErr := vterrors.Wrap(err, "TableScan(source) failed")
rec.RecordError(newErr)
vsdw.markAsWillFail(rec, newErr)
vsdw.wr.Logger().Errorf("%v", newErr)
return
}
Expand All @@ -418,7 +426,7 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error {
destinationQueryResultReader, err := TableScan(ctx, vsdw.wr.Logger(), vsdw.wr.TopoServer(), vsdw.destinationAlias, tableDefinition)
if err != nil {
newErr := vterrors.Wrap(err, "TableScan(destination) failed")
rec.RecordError(newErr)
vsdw.markAsWillFail(rec, newErr)
vsdw.wr.Logger().Errorf("%v", newErr)
return
}
Expand All @@ -427,7 +435,7 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error {
differ, err := NewRowDiffer(sourceQueryResultReader, destinationQueryResultReader, tableDefinition)
if err != nil {
newErr := vterrors.Wrap(err, "NewRowDiffer() failed")
rec.RecordError(newErr)
vsdw.markAsWillFail(rec, newErr)
vsdw.wr.Logger().Errorf("%v", newErr)
return
}
Expand All @@ -438,7 +446,7 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error {
} else {
if report.HasDifferences() {
err := fmt.Errorf("Table %v has differences: %v", tableDefinition.Name, report.String())
rec.RecordError(err)
vsdw.markAsWillFail(rec, err)
vsdw.wr.Logger().Errorf("%v", err)
} else {
vsdw.wr.Logger().Infof("Table %v checks out (%v rows processed, %v qps)", tableDefinition.Name, report.processedRows, report.processingQPS)
Expand All @@ -450,3 +458,9 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error {

return rec.Error()
}

// markAsWillFail records the error and changes the state of the worker to reflect this
func (vsdw *VerticalSplitDiffWorker) markAsWillFail(er concurrency.ErrorRecorder, err error) {
er.RecordError(err)
vsdw.SetState(WorkerStateDiffWillFail)
}