-
Notifications
You must be signed in to change notification settings - Fork 2.3k
VReplication: Improve Error/Status Reporting #12052
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a7c5080
ec67dcc
7e1d6f8
7e820dd
da59866
fc0895c
c487d1c
be50843
d808374
3a9d882
ddf0c8a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2180,22 +2180,28 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pfl | |
| if err != nil { | ||
| return err | ||
| } | ||
| s += fmt.Sprintf("Following vreplication streams are running for workflow %s.%s:\n\n", target, workflowName) | ||
| s += fmt.Sprintf("The following vreplication streams exist for workflow %s.%s:\n\n", target, workflowName) | ||
| for ksShard := range res.ShardStatuses { | ||
| statuses := res.ShardStatuses[ksShard].PrimaryReplicationStatuses | ||
| for _, st := range statuses { | ||
| now := time.Now().Nanosecond() | ||
| msg := "" | ||
| updateLag := int64(now) - st.TimeUpdated | ||
| if updateLag > 0*1e9 { | ||
| msg += " Vstream may not be running." | ||
| } | ||
| txLag := int64(now) - st.TransactionTimestamp | ||
| msg += fmt.Sprintf(" VStream Lag: %ds.", txLag/1e9) | ||
| if st.TransactionTimestamp > 0 { // if no events occur after copy phase, TransactionTimeStamp can be 0 | ||
| msg += fmt.Sprintf(" Tx time: %s.", time.Unix(st.TransactionTimestamp, 0).Format(time.ANSIC)) | ||
| if st.State == "Error" { | ||
| msg += fmt.Sprintf(": %s.", st.Message) | ||
| } else if st.Pos == "" { | ||
| msg += ". VStream has not started." | ||
| } else { | ||
| now := time.Now().Nanosecond() | ||
| updateLag := int64(now) - st.TimeUpdated | ||
| if updateLag > 0*1e9 { | ||
| msg += ". VStream may not be running" | ||
| } | ||
| txLag := int64(now) - st.TransactionTimestamp | ||
| msg += fmt.Sprintf(". VStream Lag: %ds.", txLag/1e9) | ||
| if st.TransactionTimestamp > 0 { // if no events occur after copy phase, TransactionTimeStamp can be 0 | ||
| msg += fmt.Sprintf(" Tx time: %s.", time.Unix(st.TransactionTimestamp, 0).Format(time.ANSIC)) | ||
| } | ||
| } | ||
| s += fmt.Sprintf("id=%d on %s: Status: %s.%s\n", st.ID, ksShard, st.State, msg) | ||
| s += fmt.Sprintf("id=%d on %s: Status: %s%s\n", st.ID, ksShard, st.State, msg) | ||
| } | ||
| } | ||
| wr.Logger().Printf("\n%s\n", s) | ||
|
|
@@ -2319,9 +2325,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pfl | |
| if err != nil { | ||
| return err | ||
| } | ||
| if copyProgress == nil { | ||
| wr.Logger().Printf("\nCopy Completed.\n") | ||
| } else { | ||
| if copyProgress != nil { | ||
| wr.Logger().Printf("\nCopy Progress (approx):\n") | ||
| var tables []string | ||
| for table := range *copyProgress { | ||
|
|
@@ -2346,7 +2350,6 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pfl | |
| wr.Logger().Printf("\n%s\n", s) | ||
| } | ||
| return printDetails() | ||
|
|
||
| } | ||
|
|
||
| if *dryRun { | ||
|
|
@@ -2390,7 +2393,6 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pfl | |
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| errCh <- fmt.Errorf("workflow did not start within %s", (*timeout).String()) | ||
| return | ||
| case <-ticker.C: | ||
| totalStreams, startedStreams, workflowErrors, err := wf.GetStreamCount() | ||
|
|
@@ -2419,9 +2421,13 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pfl | |
| return nil | ||
| } | ||
| wr.Logger().Printf("%d%% ... ", 100*progress.started/progress.total) | ||
| case <-timedCtx.Done(): | ||
| wr.Logger().Printf("\nThe workflow did not start within %s. The workflow may simply be slow to start or there may be an issue.\n", | ||
| (*timeout).String()) | ||
| wr.Logger().Printf("Check the status using the 'Workflow %s show' client command for details.\n", ksWorkflow) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if it possible to check the status at this point and print the output? Save the user an extra step?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO the output is already a little noisy and at this point (at least sub-second before) it had not yet started. I also like that we're informing/reminding them of the way to monitor the status and see the workflow details. |
||
| return fmt.Errorf("timed out waiting for workflow to start") | ||
| case err := <-errCh: | ||
| wr.Logger().Error(err) | ||
| cancelTimedCtx() | ||
| return err | ||
| case wfErrs := <-wfErrCh: | ||
| wr.Logger().Printf("Found problems with the streams created for this workflow:\n") | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes in this file were just me ordering the subgroups by error code as it makes it easier to walk through all of them as I did to see what ones should be in the
vreplication.unRecoverableError()list.