diff --git a/go/vt/proto/workflow/workflow.pb.go b/go/vt/proto/workflow/workflow.pb.go index ef73eb3cb8d..cb3cf7a64d6 100644 --- a/go/vt/proto/workflow/workflow.pb.go +++ b/go/vt/proto/workflow/workflow.pb.go @@ -10,6 +10,8 @@ It is generated from these files: It has these top-level messages: Workflow + WorkflowCheckpoint + Task */ package workflow @@ -56,6 +58,30 @@ func (x WorkflowState) String() string { } func (WorkflowState) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } +type TaskState int32 + +const ( + TaskState_TaskNotStarted TaskState = 0 + TaskState_TaskRunning TaskState = 1 + TaskState_TaskDone TaskState = 2 +) + +var TaskState_name = map[int32]string{ + 0: "TaskNotStarted", + 1: "TaskRunning", + 2: "TaskDone", +} +var TaskState_value = map[string]int32{ + "TaskNotStarted": 0, + "TaskRunning": 1, + "TaskDone": 2, +} + +func (x TaskState) String() string { + return proto.EnumName(TaskState_name, int32(x)) +} +func (TaskState) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + // Workflow is the persisted state of a long-running workflow. type Workflow struct { // uuid is set when the workflow is created, and immutable after @@ -97,29 +123,183 @@ func (m *Workflow) String() string { return proto.CompactTextString(m func (*Workflow) ProtoMessage() {} func (*Workflow) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } +func (m *Workflow) GetUuid() string { + if m != nil { + return m.Uuid + } + return "" +} + +func (m *Workflow) GetFactoryName() string { + if m != nil { + return m.FactoryName + } + return "" +} + +func (m *Workflow) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *Workflow) GetState() WorkflowState { + if m != nil { + return m.State + } + return WorkflowState_NotStarted +} + +func (m *Workflow) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +func (m *Workflow) GetError() string { + if m != nil { + return m.Error + } + return "" +} + +func (m *Workflow) GetStartTime() int64 { + if m != nil { + return m.StartTime + } + return 0 +} + +func (m *Workflow) GetEndTime() int64 { + if m != nil { + return m.EndTime + } + return 0 +} + +type WorkflowCheckpoint struct { + // code_version is used to detect incompabilities between the version of the + // running workflow and the one which wrote the checkpoint. If they don't + // match, the workflow must not continue. The author of workflow must update + // this variable in their implementation when incompabilities are introduced. + CodeVersion int32 `protobuf:"varint,1,opt,name=code_version,json=codeVersion" json:"code_version,omitempty"` + // Task is the data structure that stores the execution status and the + // attributes of a task. + Tasks map[string]*Task `protobuf:"bytes,2,rep,name=tasks" json:"tasks,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + // settings includes workflow specific data, e.g. the resharding workflow + // would store the source shards and destination shards. + Settings map[string]string `protobuf:"bytes,3,rep,name=settings" json:"settings,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` +} + +func (m *WorkflowCheckpoint) Reset() { *m = WorkflowCheckpoint{} } +func (m *WorkflowCheckpoint) String() string { return proto.CompactTextString(m) } +func (*WorkflowCheckpoint) ProtoMessage() {} +func (*WorkflowCheckpoint) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *WorkflowCheckpoint) GetCodeVersion() int32 { + if m != nil { + return m.CodeVersion + } + return 0 +} + +func (m *WorkflowCheckpoint) GetTasks() map[string]*Task { + if m != nil { + return m.Tasks + } + return nil +} + +func (m *WorkflowCheckpoint) GetSettings() map[string]string { + if m != nil { + return m.Settings + } + return nil +} + +type Task struct { + Id string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"` + State TaskState `protobuf:"varint,2,opt,name=state,enum=workflow.TaskState" json:"state,omitempty"` + // attributes includes the parameters the task needs. + Attributes map[string]string `protobuf:"bytes,3,rep,name=attributes" json:"attributes,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + Error string `protobuf:"bytes,4,opt,name=error" json:"error,omitempty"` +} + +func (m *Task) Reset() { *m = Task{} } +func (m *Task) String() string { return proto.CompactTextString(m) } +func (*Task) ProtoMessage() {} +func (*Task) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func (m *Task) GetId() string { + if m != nil { + return m.Id + } + return "" +} + +func (m *Task) GetState() TaskState { + if m != nil { + return m.State + } + return TaskState_TaskNotStarted +} + +func (m *Task) GetAttributes() map[string]string { + if m != nil { + return m.Attributes + } + return nil +} + +func (m *Task) GetError() string { + if m != nil { + return m.Error + } + return "" +} + func init() { proto.RegisterType((*Workflow)(nil), "workflow.Workflow") + proto.RegisterType((*WorkflowCheckpoint)(nil), "workflow.WorkflowCheckpoint") + proto.RegisterType((*Task)(nil), "workflow.Task") proto.RegisterEnum("workflow.WorkflowState", WorkflowState_name, WorkflowState_value) + proto.RegisterEnum("workflow.TaskState", TaskState_name, TaskState_value) } func init() { proto.RegisterFile("workflow.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 246 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x90, 0x41, 0x4b, 0x03, 0x31, - 0x10, 0x85, 0x4d, 0xbb, 0xdb, 0x4d, 0xa7, 0x75, 0x59, 0x06, 0xc1, 0x78, 0x10, 0x56, 0x4f, 0x8b, - 0x60, 0x0f, 0x0a, 0xfe, 0x02, 0xcf, 0x3d, 0xa4, 0x82, 0xc7, 0x12, 0xcd, 0x54, 0x16, 0xdd, 0x44, - 0xd2, 0x59, 0x8a, 0xff, 0xd8, 0x9f, 0x21, 0xc9, 0x76, 0x85, 0xde, 0xde, 0x9b, 0x2f, 0x6f, 0x5e, - 0x18, 0x28, 0x0f, 0x3e, 0x7c, 0xee, 0xbe, 0xfc, 0x61, 0xf5, 0x1d, 0x3c, 0x7b, 0x94, 0xa3, 0xbf, - 0xfd, 0x15, 0x20, 0x5f, 0x8f, 0x06, 0x11, 0xb2, 0xbe, 0x6f, 0xad, 0x12, 0xb5, 0x68, 0xe6, 0x3a, - 0x69, 0xbc, 0x81, 0xe5, 0xce, 0xbc, 0xb3, 0x0f, 0x3f, 0x5b, 0x67, 0x3a, 0x52, 0x93, 0xc4, 0x16, - 0xc7, 0xd9, 0xda, 0x74, 0x14, 0x63, 0x09, 0x4d, 0x87, 0x58, 0xd4, 0x78, 0x0f, 0xf9, 0x9e, 0x0d, - 0x93, 0xca, 0x6a, 0xd1, 0x94, 0x0f, 0x97, 0xab, 0xff, 0x1f, 0x8c, 0x6d, 0x9b, 0x88, 0xf5, 0xf0, - 0x2a, 0xae, 0xb0, 0x86, 0x8d, 0xca, 0x6b, 0xd1, 0x2c, 0x75, 0xd2, 0x78, 0x01, 0x39, 0x85, 0xe0, - 0x83, 0x9a, 0xa5, 0xbd, 0x83, 0xc1, 0x6b, 0x80, 0x3d, 0x9b, 0xc0, 0x5b, 0x6e, 0x3b, 0x52, 0x45, - 0x2d, 0x9a, 0xa9, 0x9e, 0xa7, 0xc9, 0x4b, 0xdb, 0x11, 0x5e, 0x81, 0x24, 0x67, 0x07, 0x28, 0x13, - 0x2c, 0xc8, 0xd9, 0x88, 0xee, 0x9e, 0xe0, 0xfc, 0xa4, 0x1b, 0x4b, 0x80, 0xb5, 0xe7, 0x4d, 0xcc, - 0x92, 0xad, 0xce, 0x70, 0x01, 0x85, 0xee, 0x9d, 0x6b, 0xdd, 0x47, 0x25, 0x50, 0x42, 0xf6, 0xec, - 0x1d, 0x55, 0x93, 0xb7, 0x59, 0xba, 0xd9, 0xe3, 0x5f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xe2, 0x1f, - 0x18, 0x22, 0x45, 0x01, 0x00, 0x00, + // 477 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x94, 0x53, 0xdb, 0x6e, 0xd3, 0x40, + 0x10, 0x65, 0x7d, 0x69, 0x9c, 0x71, 0xea, 0x46, 0x43, 0x25, 0x4c, 0x24, 0x90, 0x89, 0x90, 0x30, + 0x91, 0xc8, 0x43, 0x90, 0x10, 0x02, 0xb5, 0x12, 0xe2, 0x22, 0x9e, 0xfa, 0xe0, 0x54, 0xf0, 0x18, + 0x6d, 0xe3, 0x6d, 0x59, 0xa5, 0xd9, 0xad, 0xd6, 0xeb, 0x56, 0xf9, 0x40, 0x7e, 0x81, 0x6f, 0xe0, + 0x33, 0xd0, 0xee, 0xc6, 0x4e, 0x0c, 0x08, 0x89, 0xb7, 0x99, 0x39, 0x73, 0xce, 0x78, 0xf6, 0x8c, + 0x21, 0xb9, 0x93, 0x6a, 0x75, 0x79, 0x2d, 0xef, 0xa6, 0x37, 0x4a, 0x6a, 0x89, 0x51, 0x93, 0x8f, + 0x7f, 0x12, 0x88, 0xbe, 0x6e, 0x13, 0x44, 0x08, 0xea, 0x9a, 0x97, 0x29, 0xc9, 0x48, 0xde, 0x2f, + 0x6c, 0x8c, 0x4f, 0x60, 0x70, 0x49, 0x97, 0x5a, 0xaa, 0xcd, 0x42, 0xd0, 0x35, 0x4b, 0x3d, 0x8b, + 0xc5, 0xdb, 0xda, 0x19, 0x5d, 0x33, 0x43, 0xb3, 0x90, 0xef, 0x68, 0x26, 0xc6, 0x17, 0x10, 0x56, + 0x9a, 0x6a, 0x96, 0x06, 0x19, 0xc9, 0x93, 0xd9, 0x83, 0x69, 0xfb, 0x05, 0xcd, 0xb4, 0xb9, 0x81, + 0x0b, 0xd7, 0x65, 0x24, 0x4a, 0xaa, 0x69, 0x1a, 0x66, 0x24, 0x1f, 0x14, 0x36, 0xc6, 0x63, 0x08, + 0x99, 0x52, 0x52, 0xa5, 0x07, 0x56, 0xd7, 0x25, 0xf8, 0x08, 0xa0, 0xd2, 0x54, 0xe9, 0x85, 0xe6, + 0x6b, 0x96, 0xf6, 0x32, 0x92, 0xfb, 0x45, 0xdf, 0x56, 0xce, 0xf9, 0x9a, 0xe1, 0x43, 0x88, 0x98, + 0x28, 0x1d, 0x18, 0x59, 0xb0, 0xc7, 0x44, 0x69, 0xa0, 0xf1, 0x77, 0x0f, 0xb0, 0x19, 0xfe, 0xfe, + 0x1b, 0x5b, 0xae, 0x6e, 0x24, 0x17, 0xda, 0x2c, 0xb8, 0x94, 0x25, 0x5b, 0xdc, 0x32, 0x55, 0x71, + 0x29, 0xec, 0xf2, 0x61, 0x11, 0x9b, 0xda, 0x17, 0x57, 0xc2, 0x13, 0x08, 0x35, 0xad, 0x56, 0x55, + 0xea, 0x65, 0x7e, 0x1e, 0xcf, 0x9e, 0xfd, 0xb9, 0xcc, 0x4e, 0x6f, 0x7a, 0x6e, 0x3a, 0x3f, 0x0a, + 0xad, 0x36, 0x85, 0x63, 0xe1, 0x27, 0x88, 0x2a, 0xa6, 0x35, 0x17, 0x57, 0x55, 0xea, 0x5b, 0x85, + 0xc9, 0x3f, 0x15, 0xe6, 0xdb, 0x66, 0x27, 0xd2, 0x72, 0x47, 0x9f, 0x01, 0x76, 0xe2, 0x38, 0x04, + 0x7f, 0xc5, 0x36, 0x5b, 0xaf, 0x4c, 0x88, 0x4f, 0x21, 0xbc, 0xa5, 0xd7, 0xb5, 0xf3, 0x28, 0x9e, + 0x25, 0xbb, 0x21, 0x86, 0x56, 0x38, 0xf0, 0x8d, 0xf7, 0x9a, 0x8c, 0xde, 0xc2, 0x61, 0x67, 0xc8, + 0x5f, 0xc4, 0x8e, 0xf7, 0xc5, 0xfa, 0x7b, 0xe4, 0xf1, 0x0f, 0x02, 0x81, 0x11, 0xc4, 0x04, 0xbc, + 0xf6, 0x58, 0x3c, 0x5e, 0xe2, 0xf3, 0xc6, 0x73, 0xcf, 0x7a, 0x7e, 0xbf, 0x3b, 0xbf, 0xe3, 0xf7, + 0x29, 0x00, 0xd5, 0x5a, 0xf1, 0x8b, 0x5a, 0xb3, 0xe6, 0x51, 0x1e, 0x77, 0xfb, 0xa7, 0xef, 0xda, + 0x06, 0xf7, 0x10, 0x7b, 0x8c, 0xdd, 0x6d, 0x04, 0x7b, 0xb7, 0x31, 0x3a, 0x81, 0xa3, 0xdf, 0x48, + 0xff, 0xb3, 0xd8, 0xe4, 0x15, 0x1c, 0x76, 0x8e, 0x13, 0x13, 0x80, 0x33, 0xa9, 0xe7, 0xe6, 0xb8, + 0x58, 0x39, 0xbc, 0x87, 0x31, 0xf4, 0x8a, 0x5a, 0x08, 0x2e, 0xae, 0x86, 0x04, 0x23, 0x08, 0x3e, + 0x48, 0xc1, 0x86, 0xde, 0xe4, 0x14, 0xfa, 0xed, 0x82, 0x88, 0x90, 0x98, 0xa4, 0xc3, 0x3b, 0x82, + 0xd8, 0x3a, 0xd0, 0x72, 0x07, 0x10, 0x99, 0x82, 0xe3, 0x5f, 0x1c, 0xd8, 0x9f, 0xf2, 0xe5, 0xaf, + 0x00, 0x00, 0x00, 0xff, 0xff, 0x72, 0x5c, 0x6d, 0x7f, 0xa6, 0x03, 0x00, 0x00, } diff --git a/go/vt/vtctld/workflow.go b/go/vt/vtctld/workflow.go index 3e8af44b4f2..d74921b1626 100644 --- a/go/vt/vtctld/workflow.go +++ b/go/vt/vtctld/workflow.go @@ -42,6 +42,7 @@ func initWorkflowManager(ts topo.Server) { // Register the Horizontal Resharding workflow. resharding.Register() + // Unregister the blacklisted workflows. for _, name := range workflowManagerDisable { workflow.Unregister(name) diff --git a/go/vt/workflow/manager.go b/go/vt/workflow/manager.go index 1d48516912c..0910894192b 100644 --- a/go/vt/workflow/manager.go +++ b/go/vt/workflow/manager.go @@ -39,6 +39,8 @@ type Factory interface { // variable filled it. This Init method should fill in the // Name and Data attributes, based on the provided args. // This is called during the Manager.Create phase. + // TODO(yipeiw): We should extend the interface to pass the topology server + // as well. The topology server is needed in the resarding workflow. Init(w *workflowpb.Workflow, args []string) error // Instantiate loads a workflow from the proto representation diff --git a/go/vt/workflow/node.go b/go/vt/workflow/node.go index 7f06fee31af..11aaa3bf951 100644 --- a/go/vt/workflow/node.go +++ b/go/vt/workflow/node.go @@ -220,6 +220,30 @@ func (n *Node) deepCopyFrom(otherNode *Node, copyChildren bool) error { return nil } +// GetChildByPath returns the child node given the relative path to this node. +// The caller must ensure that the node tree is not modified during the call. +func (n *Node) GetChildByPath(subPath string) (*Node, error) { + // Find the subnode if needed. + parts := strings.Split(subPath, "/") + + currentNode := n + for i := 0; i < len(parts); i++ { + childPathName := parts[i] + found := false + for _, child := range currentNode.Children { + if child.PathName == childPathName { + found = true + currentNode = child + break + } + } + if !found { + return nil, fmt.Errorf("node %v has no children named %v", currentNode.Path, childPathName) + } + } + return currentNode, nil +} + // ActionParameters describe an action initiated by the user. type ActionParameters struct { // Path is the path of the Node the action was performed on. @@ -355,6 +379,7 @@ func (m *NodeManager) updateNodeAndBroadcastLocked(userNode *Node, updateChildre if err != nil { return err } + userNode.LastChanged = time.Now().Unix() if err := savedNode.deepCopyFrom(userNode, updateChildren); err != nil { return err diff --git a/go/vt/workflow/resharding/checkpoint.go b/go/vt/workflow/resharding/checkpoint.go new file mode 100644 index 00000000000..7392675cb82 --- /dev/null +++ b/go/vt/workflow/resharding/checkpoint.go @@ -0,0 +1,56 @@ +package resharding + +import ( + "context" + "sync" + + "github.com/golang/protobuf/proto" + "github.com/youtube/vitess/go/vt/topo" + + workflowpb "github.com/youtube/vitess/go/vt/proto/workflow" +) + +// CheckpointWriter saves the checkpoint data into topology server. +type CheckpointWriter struct { + topoServer topo.Server + + // checkpointMu is used for protecting data access during checkpointing. + mu sync.Mutex + checkpoint *workflowpb.WorkflowCheckpoint + wi *topo.WorkflowInfo +} + +// NewCheckpointWriter creates a CheckpointWriter. +func NewCheckpointWriter(ts topo.Server, checkpoint *workflowpb.WorkflowCheckpoint, wi *topo.WorkflowInfo) *CheckpointWriter { + return &CheckpointWriter{ + topoServer: ts, + checkpoint: checkpoint, + wi: wi, + } +} + +// UpdateTask updates the task status in the checkpointing copy and +// saves the full checkpoint to the topology server. +func (c *CheckpointWriter) UpdateTask(taskID string, status workflowpb.TaskState, err error) error { + c.mu.Lock() + defer c.mu.Unlock() + + errorMessage := "" + if err != nil { + errorMessage = err.Error() + } + + t := c.checkpoint.Tasks[taskID] + t.State = status + t.Error = errorMessage + return c.saveLocked() +} + +func (c *CheckpointWriter) saveLocked() error { + var err error + c.wi.Data, err = proto.Marshal(c.checkpoint) + if err != nil { + return err + } + return c.topoServer.SaveWorkflow(context.TODO(), c.wi) +} diff --git a/go/vt/workflow/resharding/horizontal_resharding_workflow.go b/go/vt/workflow/resharding/horizontal_resharding_workflow.go index 0103bd1eddb..61eb6ce18b6 100644 --- a/go/vt/workflow/resharding/horizontal_resharding_workflow.go +++ b/go/vt/workflow/resharding/horizontal_resharding_workflow.go @@ -3,24 +3,19 @@ package resharding // Package resharding contains a workflow for automatic horizontal resharding. // The workflow assumes that there are as many vtworker processes running as source shards. // Plus, these vtworker processes must be reachable via RPC. -// TO DO: it can be used to save checkpointer import ( - "encoding/json" "flag" "fmt" "strings" - "sync" log "github.com/golang/glog" + "github.com/golang/protobuf/proto" "golang.org/x/net/context" - "github.com/youtube/vitess/go/vt/automation" - "github.com/youtube/vitess/go/vt/concurrency" "github.com/youtube/vitess/go/vt/logutil" "github.com/youtube/vitess/go/vt/tabletmanager/tmclient" "github.com/youtube/vitess/go/vt/topo" - "github.com/youtube/vitess/go/vt/topo/topoproto" "github.com/youtube/vitess/go/vt/topotools" "github.com/youtube/vitess/go/vt/workflow" @@ -31,350 +26,361 @@ import ( ) const ( + codeVersion = 1 + horizontalReshardingFactoryName = "horizontal_resharding" ) -// HorizontalReshardingData is the data structure to store resharding arguments. -type HorizontalReshardingData struct { - Keyspace string - Vtworkers []string -} - -// HorizontalReshardingWorkflow contains meta-information and methods to control horizontal resharding workflow. -type HorizontalReshardingWorkflow struct { - // ctx is the context of the whole horizontal resharding process. Once this context is canceled, - // the horizontal resharding process stops. - ctx context.Context - wr ReshardingWrangler - manager *workflow.Manager - topoServer topo.Server - - // logger is the logger we export UI logs from. - logger *logutil.MemoryLogger - - // rootUINode is the root node representing the workflow in the UI. - rootUINode *workflow.Node - copySchemaUINode *workflow.Node - splitCloneUINode *workflow.Node - splitDiffUINode *workflow.Node - migrateUINode *workflow.Node - - keyspace string - vtworkers []string +// PhaseType is used to store the phase name in a workflow. +type PhaseType string - subWorkflows []*PerShardHorizontalResharding -} - -// PerShardHorizontalReshardingData is the data structure to store the resharding arguments for each shard. -type PerShardHorizontalReshardingData struct { - Keyspace string - SourceShard string - DestinationShards []string - Vtworker string -} - -// PerShardHorizontalResharding contains the data and method for horizontal resharding from a single source shard. -type PerShardHorizontalResharding struct { - PerShardHorizontalReshardingData - parent *HorizontalReshardingWorkflow - - copySchemaShardUINode *workflow.Node - splitCloneShardUINode *workflow.Node - splitDiffShardUINode *workflow.Node - migrateShardUINode *workflow.Node +const ( + phaseCopySchema PhaseType = "copy_schema" + phaseClone PhaseType = "clone" + phaseWaitForFilteredReplication PhaseType = "wait_for_filtered_replication" + phaseDiff PhaseType = "diff" + phaseMigrateRdonly PhaseType = "migrate_rdonly" + phaseMigrateReplica PhaseType = "migrate_replica" + phaseMigrateMaster PhaseType = "migrate_master" +) - shardUILogger *logutil.MemoryLogger +// Register registers the HorizontalReshardingWorkflowFactory as a factory +// in the workflow framework. +func Register() { + workflow.Register(horizontalReshardingFactoryName, &HorizontalReshardingWorkflowFactory{}) } -// Run executes the horizontal resharding process and updates the UI message. -// It implements the workflow.Workflow interface. -func (hw *HorizontalReshardingWorkflow) Run(ctx context.Context, manager *workflow.Manager, wi *topo.WorkflowInfo) error { - hw.ctx = ctx - hw.topoServer = manager.TopoServer() - hw.wr = wrangler.New(logutil.NewConsoleLogger(), manager.TopoServer(), tmclient.NewTabletManagerClient()) - - hw.createSubWorkflows() +// HorizontalReshardingWorkflowFactory is the factory to create +// a horizontal resharding workflow. +type HorizontalReshardingWorkflowFactory struct{} - hw.setUIMessage("Horizontal resharding: workflow created successfully.") - - hw.rootUINode.Display = workflow.NodeDisplayDeterminate - hw.rootUINode.BroadcastChanges(true /* updateChildren */) +// Init is part of the workflow.Factory interface. +func (*HorizontalReshardingWorkflowFactory) Init(w *workflowpb.Workflow, args []string) error { + subFlags := flag.NewFlagSet(horizontalReshardingFactoryName, flag.ContinueOnError) + keyspace := subFlags.String("keyspace", "", "Name of keyspace to perform horizontal resharding") + vtworkersStr := subFlags.String("vtworkers", "", "A comma-separated list of vtworker addresses") - // TODO(yipeiw): Support action button to allow retry, stop, restart. - if err := hw.executeWorkflow(); err != nil { + if err := subFlags.Parse(args); err != nil { return err } + if *keyspace == "" || *vtworkersStr == "" { + return fmt.Errorf("Keyspace name, vtworkers information must be provided for horizontal resharding") + } - hw.setUIMessage(fmt.Sprintf("Horizontal Resharding on %v: finished sucessfully.", hw.keyspace)) - - return nil -} + vtworkers := strings.Split(*vtworkersStr, ",") + w.Name = fmt.Sprintf("Horizontal resharding on keyspace %s", *keyspace) -// createSubWorkflows creates a per source shard horizontal resharding workflow for each source shard in the keyspace. -func (hw *HorizontalReshardingWorkflow) createSubWorkflows() error { - overlappingShards, err := topotools.FindOverlappingShards(hw.ctx, hw.topoServer, hw.keyspace) + checkpoint, err := initCheckpoint(*keyspace, vtworkers) if err != nil { - hw.logger.Infof("Horizontal Resharding: createSubWorkflows error in finding overlapping shards: %v.", err) return err } - for i, os := range overlappingShards { - var sourceShard *topo.ShardInfo - var destinationShards []*topo.ShardInfo - // Judge which side is source shard by checking the number of servedTypes. - if len(os.Left[0].ServedTypes) > 0 { - sourceShard = os.Left[0] - destinationShards = os.Right - } else { - sourceShard = os.Right[0] - destinationShards = os.Left - } - - if err := hw.createWorkflowPerShard(sourceShard, destinationShards, hw.vtworkers[i]); err != nil { - return err - } + w.Data, err = proto.Marshal(checkpoint) + if err != nil { + return err } return nil } -func (hw *HorizontalReshardingWorkflow) createWorkflowPerShard(sourceShard *topo.ShardInfo, destinationShards []*topo.ShardInfo, vtworker string) error { - sourceShardName := sourceShard.ShardName() - var destShardNames []string - for _, s := range destinationShards { - destShardNames = append(destShardNames, s.ShardName()) +// Instantiate is part the workflow.Factory interface. +func (*HorizontalReshardingWorkflowFactory) Instantiate(w *workflowpb.Workflow, rootNode *workflow.Node) (workflow.Workflow, error) { + rootNode.Message = "This is a workflow to execute horizontal resharding automatically." + + checkpoint := &workflowpb.WorkflowCheckpoint{} + if err := proto.Unmarshal(w.Data, checkpoint); err != nil { + return nil, err } - perShard := &PerShardHorizontalResharding{ - PerShardHorizontalReshardingData: PerShardHorizontalReshardingData{ - Keyspace: hw.keyspace, - SourceShard: sourceShardName, - DestinationShards: destShardNames, - Vtworker: vtworker, - }, - copySchemaShardUINode: &workflow.Node{ - Name: "Shard " + sourceShardName, - PathName: "shard_" + sourceShardName, - }, - splitCloneShardUINode: &workflow.Node{ - Name: "Shard " + sourceShardName, - PathName: "shard_" + sourceShardName, - }, - splitDiffShardUINode: &workflow.Node{ - Name: "Shard " + sourceShardName, - PathName: "shard_" + sourceShardName, - }, - migrateShardUINode: &workflow.Node{ - Name: "Shard " + sourceShardName, - PathName: "shard_" + sourceShardName, - }, - shardUILogger: logutil.NewMemoryLogger(), + hw := &HorizontalReshardingWorkflow{ + checkpoint: checkpoint, + rootUINode: rootNode, + logger: logutil.NewMemoryLogger(), + } + copySchemaUINode := &workflow.Node{ + Name: "CopySchemaShard", + PathName: string(phaseCopySchema), + } + cloneUINode := &workflow.Node{ + Name: "SplitClone", + PathName: string(phaseClone), + } + waitForFilteredReplicationUINode := &workflow.Node{ + Name: "WaitForFilteredReplication", + PathName: string(phaseWaitForFilteredReplication), + } + diffUINode := &workflow.Node{ + Name: "SplitDiff", + PathName: string(phaseDiff), + } + migrateRdonlyUINode := &workflow.Node{ + Name: "MigrateServedTypeRDONLY", + PathName: string(phaseMigrateRdonly), + } + migrateReplicaUINode := &workflow.Node{ + Name: "MigrateServedTypeREPLICA", + PathName: string(phaseMigrateReplica), + } + migrateMasterUINode := &workflow.Node{ + Name: "MigrateServedTypeMASTER", + PathName: string(phaseMigrateMaster), } - perShard.parent = hw - hw.copySchemaUINode.Children = append(hw.copySchemaUINode.Children, perShard.copySchemaShardUINode) - hw.splitCloneUINode.Children = append(hw.splitCloneUINode.Children, perShard.splitCloneShardUINode) - hw.splitDiffUINode.Children = append(hw.splitDiffUINode.Children, perShard.splitDiffShardUINode) - hw.migrateUINode.Children = append(hw.migrateUINode.Children, perShard.migrateShardUINode) + hw.rootUINode.Children = []*workflow.Node{ + copySchemaUINode, + cloneUINode, + waitForFilteredReplicationUINode, + diffUINode, + migrateRdonlyUINode, + migrateReplicaUINode, + migrateMasterUINode, + } - hw.subWorkflows = append(hw.subWorkflows, perShard) - return nil -} + sourceShards := strings.Split(hw.checkpoint.Settings["source_shards"], ",") + destinationShards := strings.Split(hw.checkpoint.Settings["destination_shards"], ",") -func (hw *HorizontalReshardingWorkflow) executeWorkflow() error { - if err := hw.runAllSubWorkflows(hw.executeCopySchemaPerShard); err != nil { - hw.logger.Infof("Horizontal Resharding: error in CopySchemaShard: %v.", err) - return err + if err := createUINodes(hw.rootUINode, phaseCopySchema, destinationShards); err != nil { + return hw, err } - if err := hw.runAllSubWorkflows(hw.executeSplitClonePerShard); err != nil { - hw.logger.Infof("Horizontal Resharding: error in SplitClone: %v.", err) - return err + if err := createUINodes(hw.rootUINode, phaseClone, sourceShards); err != nil { + return hw, err } - if err := hw.runAllSubWorkflows(hw.executeSplitDiffPerShard); err != nil { - hw.logger.Infof("Horizontal Resharding: error in SplitDiff: %v.", err) - return err + if err := createUINodes(hw.rootUINode, phaseWaitForFilteredReplication, destinationShards); err != nil { + return hw, err } - if err := hw.runAllSubWorkflows(hw.executeMigratePerShard); err != nil { - hw.logger.Infof("Horizontal Resharding: error in MigratedServedType: %v.", err) - return err + if err := createUINodes(hw.rootUINode, phaseDiff, destinationShards); err != nil { + return hw, err + } + if err := createUINodes(hw.rootUINode, phaseMigrateRdonly, sourceShards); err != nil { + return hw, err + } + if err := createUINodes(hw.rootUINode, phaseMigrateReplica, sourceShards); err != nil { + return hw, err + } + if err := createUINodes(hw.rootUINode, phaseMigrateMaster, sourceShards); err != nil { + return hw, err } - return nil -} -// runAllSubWorkflows runs jobs in parallel. -func (hw *HorizontalReshardingWorkflow) runAllSubWorkflows(executeFunc func(subWorkflow *PerShardHorizontalResharding) error) error { - ec := concurrency.AllErrorRecorder{} - wg := sync.WaitGroup{} - for _, sw := range hw.subWorkflows { - wg.Add(1) - go func(s *PerShardHorizontalResharding) { - defer wg.Done() - ec.RecordError(executeFunc(s)) - }(sw) - } - wg.Wait() - return ec.Error() + return hw, nil } -// executeCopySchemaPerShard runs CopySchemaShard to copy the schema of a source shard to all its destination shards. -// TODO(yipeiw): excludeTable information can be added to UI input parameters, s.t the user can customize excluded tables during resharding. -func (hw *HorizontalReshardingWorkflow) executeCopySchemaPerShard(perhw *PerShardHorizontalResharding) error { - sourceKeyspaceShard := topoproto.KeyspaceShardString(perhw.Keyspace, perhw.SourceShard) - for _, d := range perhw.DestinationShards { - err := hw.wr.CopySchemaShardFromShard(hw.ctx, nil /* tableArray*/, nil /* excludeTableArray */, true /*includeViews*/, perhw.Keyspace, perhw.SourceShard, perhw.Keyspace, d, wrangler.DefaultWaitSlaveTimeout) - if err != nil { - hw.logger.Infof("Horizontal Resharding: error in CopySchemaShardFromShard from %s to %s: %v.", sourceKeyspaceShard, d, err) - return err +func createUINodes(rootNode *workflow.Node, phaseName PhaseType, shards []string) error { + phaseNode, err := rootNode.GetChildByPath(string(phaseName)) + if err != nil { + return fmt.Errorf("fails to find phase node for: %v", phaseName) + } + + for _, shard := range shards { + taskUINode := &workflow.Node{ + Name: "Shard " + shard, + PathName: shard, } - hw.logger.Infof("Horizontal Resharding: CopySchemaShardFromShard from %s to %s is finished.", sourceKeyspaceShard, d) + phaseNode.Children = append(phaseNode.Children, taskUINode) } return nil } -// executeSplitClonePerShard runs SplitClone to clone the data within a keyspace from a source shard to its destination shards. -func (hw *HorizontalReshardingWorkflow) executeSplitClonePerShard(perhw *PerShardHorizontalResharding) error { - sourceKeyspaceShard := topoproto.KeyspaceShardString(perhw.Keyspace, perhw.SourceShard) - var destinationKeyspaceShards []string - for _, destShard := range perhw.DestinationShards { - destinationKeyspaceShards = append(destinationKeyspaceShards, topoproto.KeyspaceShardString(perhw.Keyspace, destShard)) - } - - // Reset the vtworker to avoid error if vtworker command has been called elsewhere. - // This is because vtworker class doesn't cleanup the environment after execution. - automation.ExecuteVtworker(hw.ctx, perhw.Vtworker, []string{"Reset"}) - // The flag min_healthy_rdonly_tablets is set to 1 (default value is 2). - // Therefore, we can reuse the normal end to end test setting, which has only 1 rdonly tablet. - // TODO(yipeiw): Add min_healthy_rdonly_tablets as an input argument in UI. - args := []string{"SplitClone", "--min_healthy_rdonly_tablets=1", sourceKeyspaceShard} - if _, err := automation.ExecuteVtworker(hw.ctx, perhw.Vtworker, args); err != nil { - hw.logger.Infof("Horizontal resharding: error in SplitClone in keyspace %s: %v.", perhw.Keyspace, err) - return err +// initCheckpoint initialize the checkpoint for the horizontal workflow. +func initCheckpoint(keyspace string, vtworkers []string) (*workflowpb.WorkflowCheckpoint, error) { + sourceShards, destinationShards, err := findSourceAndDestinationShards(keyspace) + if err != nil { + return nil, err + } + return initCheckpointFromShards(keyspace, vtworkers, sourceShards, destinationShards) +} + +func findSourceAndDestinationShards(keyspace string) ([]string, []string, error) { + ts := topo.Open() + defer ts.Close() + + overlappingShards, err := topotools.FindOverlappingShards(context.Background(), ts, keyspace) + if err != nil { + return nil, nil, err } - hw.logger.Infof("Horizontal resharding: SplitClone is finished.") - // Wait for filtered replication task. - for _, d := range perhw.DestinationShards { - if err := hw.wr.WaitForFilteredReplication(hw.ctx, perhw.Keyspace, d, wrangler.DefaultWaitForFilteredReplicationMaxDelay); err != nil { - hw.logger.Infof("Horizontal Resharding: error in WaitForFilteredReplication: %v.", err) - return err + + var sourceShards, destinationShards []string + + for _, os := range overlappingShards { + var sourceShardInfo *topo.ShardInfo + var destinationShardInfos []*topo.ShardInfo + // Judge which side is source shard by checking the number of servedTypes. + if len(os.Left[0].ServedTypes) > 0 { + sourceShardInfo = os.Left[0] + destinationShardInfos = os.Right + } else { + sourceShardInfo = os.Right[0] + destinationShardInfos = os.Left + } + sourceShards = append(sourceShards, sourceShardInfo.ShardName()) + for _, d := range destinationShardInfos { + destinationShards = append(destinationShards, d.ShardName()) } - hw.logger.Infof("Horizontal Resharding:WaitForFilteredReplication is finished on " + d) } - return nil + return sourceShards, destinationShards, nil } -// executeSplitDiffPerShard runs SplitDiff for every destination shard to the source and destination -// to ensure all the data is present and correct. -func (hw *HorizontalReshardingWorkflow) executeSplitDiffPerShard(perhw *PerShardHorizontalResharding) error { - var destinationKeyspaceShards []string - for _, destShard := range perhw.DestinationShards { - destinationKeyspaceShards = append(destinationKeyspaceShards, topoproto.KeyspaceShardString(perhw.Keyspace, destShard)) +func initCheckpointFromShards(keyspace string, vtworkers, sourceShards, destinationShards []string) (*workflowpb.WorkflowCheckpoint, error) { + if len(vtworkers) != len(sourceShards) { + return nil, fmt.Errorf("there are %v vtworkers, %v source shards: the number should be same", len(vtworkers), len(sourceShards)) } - for _, d := range destinationKeyspaceShards { - automation.ExecuteVtworker(hw.ctx, perhw.Vtworker, []string{"Reset"}) - args := []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", d} - _, err := automation.ExecuteVtworker(hw.ctx, perhw.Vtworker, args) - if err != nil { - return err + tasks := make(map[string]*workflowpb.Task) + initTasks(tasks, phaseCopySchema, destinationShards, func(i int, shard string) map[string]string { + return map[string]string{ + "keyspace": keyspace, + "source_shard": sourceShards[0], + "destination_shard": shard, } - } - hw.logger.Infof("Horizontal resharding: SplitDiff is finished.") - return nil + }) + initTasks(tasks, phaseClone, sourceShards, func(i int, shard string) map[string]string { + return map[string]string{ + "keyspace": keyspace, + "source_shard": shard, + "vtworker": vtworkers[i], + } + }) + initTasks(tasks, phaseWaitForFilteredReplication, destinationShards, func(i int, shard string) map[string]string { + return map[string]string{ + "keyspace": keyspace, + "destination_shard": shard, + } + }) + initTasks(tasks, phaseDiff, destinationShards, func(i int, shard string) map[string]string { + return map[string]string{ + "keyspace": keyspace, + "destination_shard": shard, + "vtworker": vtworkers[0], + } + }) + initTasks(tasks, phaseMigrateRdonly, sourceShards, func(i int, shard string) map[string]string { + return map[string]string{ + "keyspace": keyspace, + "source_shard": shard, + "served_type": topodatapb.TabletType_RDONLY.String(), + } + }) + initTasks(tasks, phaseMigrateReplica, sourceShards, func(i int, shard string) map[string]string { + return map[string]string{ + "keyspace": keyspace, + "source_shard": shard, + "served_type": topodatapb.TabletType_REPLICA.String(), + } + }) + initTasks(tasks, phaseMigrateMaster, sourceShards, func(i int, shard string) map[string]string { + return map[string]string{ + "keyspace": keyspace, + "source_shard": shard, + "served_type": topodatapb.TabletType_MASTER.String(), + } + }) + + return &workflowpb.WorkflowCheckpoint{ + CodeVersion: codeVersion, + Tasks: tasks, + Settings: map[string]string{ + "source_shards": strings.Join(sourceShards, ","), + "destination_shards": strings.Join(destinationShards, ","), + }, + }, nil } -// executeMigratePerShard runs MigrateServedTypes to switch over to serving from the new shards. -func (hw *HorizontalReshardingWorkflow) executeMigratePerShard(perhw *PerShardHorizontalResharding) error { - sourceKeyspaceShard := topoproto.KeyspaceShardString(perhw.Keyspace, perhw.SourceShard) - servedTypeParams := []topodatapb.TabletType{topodatapb.TabletType_RDONLY, - topodatapb.TabletType_REPLICA, - topodatapb.TabletType_MASTER} - for _, servedType := range servedTypeParams { - err := hw.wr.MigrateServedTypes(hw.ctx, perhw.Keyspace, perhw.SourceShard, nil /* cells */, servedType, false /* reverse */, false /* skipReFreshState */, wrangler.DefaultFilteredReplicationWaitTime) - if err != nil { - hw.logger.Infof("Horizontal Resharding: error in MigrateServedTypes on servedType %s: %v.", servedType, err) - return err +func initTasks(tasks map[string]*workflowpb.Task, phase PhaseType, shards []string, getAttributes func(int, string) map[string]string) { + for i, shard := range shards { + taskID := createTaskID(phase, shard) + tasks[taskID] = &workflowpb.Task{ + Id: taskID, + State: workflowpb.TaskState_TaskNotStarted, + Attributes: getAttributes(i, shard), } - hw.logger.Infof("Horizontal Resharding: MigrateServedTypes is finished on tablet %s serve type %s.", sourceKeyspaceShard, servedType) } - return nil } -func (hw *HorizontalReshardingWorkflow) setUIMessage(message string) { - log.Infof("Horizontal resharding on keyspace %v: %v.", hw.keyspace, message) - hw.rootUINode.Log = hw.logger.String() - hw.rootUINode.Message = message - hw.rootUINode.BroadcastChanges(false /* updateChildren */) -} +// HorizontalReshardingWorkflow contains meta-information and methods to +// control the horizontal resharding workflow. +type HorizontalReshardingWorkflow struct { + ctx context.Context + wr ReshardingWrangler + manager *workflow.Manager + topoServer topo.Server + wi *topo.WorkflowInfo + // logger is the logger we export UI logs from. + logger *logutil.MemoryLogger -// WorkflowFactory is the factory to register the HorizontalReshard Workflow. -type WorkflowFactory struct{} + // rootUINode is the root node representing the workflow in the UI. + rootUINode *workflow.Node -// Register registers horizontal_resharding as a valid factory in the workflow framework. -func Register() { - workflow.Register(horizontalReshardingFactoryName, &WorkflowFactory{}) + checkpoint *workflowpb.WorkflowCheckpoint + checkpointWriter *CheckpointWriter } -// Init is part of the workflow.Factory interface. -func (*WorkflowFactory) Init(workflowProto *workflowpb.Workflow, args []string) error { - subFlags := flag.NewFlagSet(horizontalReshardingFactoryName, flag.ContinueOnError) - keyspace := subFlags.String("keyspace", "", "Name of keyspace to perform horizontal resharding") - vtworkersStr := subFlags.String("vtworkers", "", "A comma-separated list of vtworker addresses") +// Run executes the horizontal resharding process. +// It implements the workflow.Workflow interface. +func (hw *HorizontalReshardingWorkflow) Run(ctx context.Context, manager *workflow.Manager, wi *topo.WorkflowInfo) error { + hw.ctx = ctx + hw.topoServer = manager.TopoServer() + hw.manager = manager + hw.wr = wrangler.New(logutil.NewConsoleLogger(), manager.TopoServer(), tmclient.NewTabletManagerClient()) + hw.wi = wi + hw.checkpointWriter = NewCheckpointWriter(hw.topoServer, hw.checkpoint, hw.wi) - if err := subFlags.Parse(args); err != nil { + hw.rootUINode.Display = workflow.NodeDisplayDeterminate + hw.rootUINode.BroadcastChanges(true /* updateChildren */) + + if err := hw.runWorkflow(); err != nil { return err } - if *keyspace == "" || *vtworkersStr == "" { - return fmt.Errorf("Keyspace name, vtworkers information must be provided for horizontal resharding") + hw.setUIMessage(fmt.Sprintf("Horizontal Resharding is finished sucessfully.")) + return nil +} + +func (hw *HorizontalReshardingWorkflow) runWorkflow() error { + copySchemaTasks := hw.GetTasks(phaseCopySchema) + copySchemaRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, copySchemaTasks, hw.runCopySchema, Parallel) + if err := copySchemaRunner.Run(); err != nil { + return err } - vtworkers := strings.Split(*vtworkersStr, ",") - workflowProto.Name = fmt.Sprintf("Horizontal resharding on keyspace %s", *keyspace) - data := &HorizontalReshardingData{ - Keyspace: *keyspace, - Vtworkers: vtworkers, + cloneTasks := hw.GetTasks(phaseClone) + cloneRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, cloneTasks, hw.runSplitClone, Parallel) + if err := cloneRunner.Run(); err != nil { + return err } - var err error - workflowProto.Data, err = json.Marshal(data) - if err != nil { + + waitForFilteredReplicationTasks := hw.GetTasks(phaseWaitForFilteredReplication) + waitForFilteredReplicationRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, waitForFilteredReplicationTasks, hw.runWaitForFilteredReplication, Parallel) + if err := waitForFilteredReplicationRunner.Run(); err != nil { return err } - return nil -} -// Instantiate is part of the workflow.Factory interface. -func (*WorkflowFactory) Instantiate(workflowProto *workflowpb.Workflow, rootNode *workflow.Node) (workflow.Workflow, error) { - rootNode.Message = "This is a workflow to execute horizontal resharding automatically." - data := &HorizontalReshardingData{} - if err := json.Unmarshal(workflowProto.Data, data); err != nil { - return nil, err + diffTasks := hw.GetTasks(phaseDiff) + diffRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, diffTasks, hw.runSplitDiff, Sequential) + if err := diffRunner.Run(); err != nil { + return err } - hw := &HorizontalReshardingWorkflow{ - keyspace: data.Keyspace, - vtworkers: data.Vtworkers, - rootUINode: rootNode, - copySchemaUINode: &workflow.Node{ - Name: "CopySchemaShard", - PathName: "copy_schema", - }, - splitCloneUINode: &workflow.Node{ - Name: "SplitClone", - PathName: "clone", - }, - splitDiffUINode: &workflow.Node{ - Name: "SplitDiff", - PathName: "diff", - }, - migrateUINode: &workflow.Node{ - Name: "MigrateServedType", - PathName: "migrate", - }, - logger: logutil.NewMemoryLogger(), + migrateRdonlyTasks := hw.GetTasks(phaseMigrateRdonly) + migrateRdonlyRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, migrateRdonlyTasks, hw.runMigrate, Sequential) + if err := migrateRdonlyRunner.Run(); err != nil { + return err } - hw.rootUINode.Children = []*workflow.Node{ - hw.copySchemaUINode, - hw.splitCloneUINode, - hw.splitDiffUINode, - hw.migrateUINode, + + migrateReplicaTasks := hw.GetTasks(phaseMigrateReplica) + migrateReplicaRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, migrateReplicaTasks, hw.runMigrate, Sequential) + if err := migrateReplicaRunner.Run(); err != nil { + return err } - return hw, nil + + migrateMasterTasks := hw.GetTasks(phaseMigrateMaster) + migrateMasterRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, migrateMasterTasks, hw.runMigrate, Sequential) + if err := migrateMasterRunner.Run(); err != nil { + return err + } + + return nil +} + +func (hw *HorizontalReshardingWorkflow) setUIMessage(message string) { + log.Infof("Horizontal resharding : %v.", message) + hw.rootUINode.Log = hw.logger.String() + hw.rootUINode.Message = message + hw.rootUINode.BroadcastChanges(false /* updateChildren */) } diff --git a/go/vt/workflow/resharding/horizontal_resharding_workflow_test.go b/go/vt/workflow/resharding/horizontal_resharding_workflow_test.go index efc445e5e8f..fe5096e737d 100644 --- a/go/vt/workflow/resharding/horizontal_resharding_workflow_test.go +++ b/go/vt/workflow/resharding/horizontal_resharding_workflow_test.go @@ -1,46 +1,177 @@ package resharding import ( + "context" "flag" "testing" "github.com/golang/mock/gomock" "github.com/youtube/vitess/go/vt/logutil" + "github.com/youtube/vitess/go/vt/topo/memorytopo" "github.com/youtube/vitess/go/vt/worker/fakevtworkerclient" "github.com/youtube/vitess/go/vt/worker/vtworkerclient" + "github.com/youtube/vitess/go/vt/workflow" "github.com/youtube/vitess/go/vt/wrangler" topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" + workflowpb "github.com/youtube/vitess/go/vt/proto/workflow" ) +// TestHorizontalResharding runs the happy path of HorizontalReshardingWorkflow. func TestHorizontalResharding(t *testing.T) { - // Create fake wrangler using mock interface, which is used for the unit test in steps CopySchema and MigratedServedType. + // Set up the mock wrangler. It is used for the CopySchema and Migrate phase. ctrl := gomock.NewController(t) defer ctrl.Finish() - mockWranglerInterface := NewMockReshardingWrangler(ctrl) + ctx := context.Background() + mockWranglerInterface := setupMockWrangler(ctx, ctrl) + + // Set up the fakeworkerclient. It is used at SplitClone and SplitDiff phase. + fakeVtworkerClient := setupFakeVtworker() + vtworkerclient.RegisterFactory("fake", fakeVtworkerClient.FakeVtworkerClientFactory) + defer vtworkerclient.UnregisterFactoryForTest("fake") + + // Create a checkpoint with initialized tasks. + sourceShards := []string{"0"} + destinationShards := []string{"-80", "80-"} + vtworkers := []string{"localhost:15032"} + checkpoint, err := initCheckpointFromShards("test_keyspace", vtworkers, sourceShards, destinationShards) + if err != nil { + t.Errorf("initialize checkpoint fails: %v", err) + } + + hw, err := createWorkflow(ctx, mockWranglerInterface, checkpoint) + if err != nil { + t.Errorf("initialize Workflow fails: %v", err) + } + if err := hw.runWorkflow(); err != nil { + t.Errorf("%s: Horizontal resharding workflow should not fail", err) + } + + verifySuccess(t, hw.checkpoint) +} + +// TestHorizontalReshardingRetry retries a stopped workflow, +// which the tasks are partially finished. +func TestHorizontalReshardingRetry(t *testing.T) { + // Set up mock wrangler. It is used for the CopySchema and Migrate phase. + ctrl := gomock.NewController(t) + defer ctrl.Finish() + ctx := context.Background() + mockWranglerInterface := setupMockWranglerForRetry(ctx, ctrl) + + // Set up fakeworkerclient. It is used at SplitClone and SplitDiff phase. + fakeVtworkerClient := setupFakeVtworker() + vtworkerclient.RegisterFactory("fake", fakeVtworkerClient.FakeVtworkerClientFactory) + defer vtworkerclient.UnregisterFactoryForTest("fake") + + // Create a checkpoint for the stopped workflow. For the stopped workflow, + // the task of copying schema to shard 80- succeed while the task of copying + // schema to shard -80 failed. The rest of tasks haven't been executed. + sourceShards := []string{"0"} + destinationShards := []string{"-80", "80-"} + vtworkers := []string{"localhost:15032"} + checkpoint, err := initCheckpointFromShards("test_keyspace", vtworkers, sourceShards, destinationShards) + if err != nil { + t.Errorf("initialize checkpoint fails: %v", err) + } + setTaskSuccessOrFailure(checkpoint, createTaskID(phaseCopySchema, "80-"), true /* isSuccess*/) + setTaskSuccessOrFailure(checkpoint, createTaskID(phaseCopySchema, "-80"), false /* isSuccess*/) + + hw, err := createWorkflow(ctx, mockWranglerInterface, checkpoint) + if err != nil { + t.Errorf("initialize Workflow fails: %v", err) + } + // Rerunning the workflow. + if err := hw.runWorkflow(); err != nil { + t.Errorf("%s: Horizontal resharding workflow should not fail", err) + } + + verifySuccess(t, hw.checkpoint) +} + +func setTaskSuccessOrFailure(checkpoint *workflowpb.WorkflowCheckpoint, taskID string, isSuccess bool) { + t := checkpoint.Tasks[taskID] + t.State = workflowpb.TaskState_TaskDone + if !isSuccess { + t.Error = "failed" + } else { + t.Error = "" + } +} - // Create the workflow (ignore the node construction since we don't test the front-end part in this unit test). +func createWorkflow(ctx context.Context, mockWranglerInterface *MockReshardingWrangler, checkpoint *workflowpb.WorkflowCheckpoint) (*HorizontalReshardingWorkflow, error) { + ts := memorytopo.NewServer("cell") + w := &workflowpb.Workflow{ + Uuid: "test_hw", + FactoryName: horizontalReshardingFactoryName, + State: workflowpb.WorkflowState_NotStarted, + } + wi, err := ts.CreateWorkflow(ctx, w) + if err != nil { + return nil, err + } hw := &HorizontalReshardingWorkflow{ - keyspace: "test_keyspace", - vtworkers: []string{"localhost:15032"}, - wr: mockWranglerInterface, - logger: logutil.NewMemoryLogger(), + ctx: ctx, + wr: mockWranglerInterface, + manager: workflow.NewManager(ts), + wi: wi, + topoServer: ts, + logger: logutil.NewMemoryLogger(), + checkpoint: checkpoint, + checkpointWriter: NewCheckpointWriter(ts, checkpoint, wi), } + return hw, nil +} - perShard := &PerShardHorizontalResharding{ - PerShardHorizontalReshardingData: PerShardHorizontalReshardingData{ - Keyspace: "test_keyspace", - SourceShard: "0", - DestinationShards: []string{"-80", "80-"}, - Vtworker: "localhost:15032", - }, +func setupFakeVtworker() *fakevtworkerclient.FakeVtworkerClient { + flag.Set("vtworker_client_protocol", "fake") + fakeVtworkerClient := fakevtworkerclient.NewFakeVtworkerClient() + fakeVtworkerClient.RegisterResultForAddr("localhost:15032", []string{"SplitClone", "--min_healthy_rdonly_tablets=1", "test_keyspace/0"}, "", nil) + fakeVtworkerClient.RegisterResultForAddr("localhost:15032", []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", "test_keyspace/-80"}, "", nil) + fakeVtworkerClient.RegisterResultForAddr("localhost:15032", []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", "test_keyspace/80-"}, "", nil) + return fakeVtworkerClient +} + +func setupMockWranglerForRetry(ctx context.Context, ctrl *gomock.Controller) *MockReshardingWrangler { + mockWranglerInterface := NewMockReshardingWrangler(ctrl) + // Set the expected behaviors for mock wrangler. copy schema to shard 80- + // should not be called. + mockWranglerInterface.EXPECT().CopySchemaShardFromShard( + ctx, + nil, /* tableArray*/ + nil, /* excludeTableArray */ + true, /*includeViews*/ + "test_keyspace", + "0", + "test_keyspace", + "-80", + wrangler.DefaultWaitSlaveTimeout).Return(nil) + + mockWranglerInterface.EXPECT().WaitForFilteredReplication(ctx, "test_keyspace", "-80", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil) + mockWranglerInterface.EXPECT().WaitForFilteredReplication(ctx, "test_keyspace", "80-", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil) + + servedTypeParams := []topodatapb.TabletType{topodatapb.TabletType_RDONLY, + topodatapb.TabletType_REPLICA, + topodatapb.TabletType_MASTER} + for _, servedType := range servedTypeParams { + mockWranglerInterface.EXPECT().MigrateServedTypes( + ctx, + "test_keyspace", + "0", + nil, /* cells */ + servedType, + false, /* reverse */ + false, /* skipReFreshState */ + wrangler.DefaultFilteredReplicationWaitTime).Return(nil) } - perShard.parent = hw - hw.subWorkflows = append(hw.subWorkflows, perShard) + return mockWranglerInterface +} +func setupMockWrangler(ctx context.Context, ctrl *gomock.Controller) *MockReshardingWrangler { + mockWranglerInterface := NewMockReshardingWrangler(ctrl) // Set the expected behaviors for mock wrangler. mockWranglerInterface.EXPECT().CopySchemaShardFromShard( - hw.ctx, + ctx, nil, /* tableArray*/ nil, /* excludeTableArray */ true, /*includeViews*/ @@ -51,7 +182,7 @@ func TestHorizontalResharding(t *testing.T) { wrangler.DefaultWaitSlaveTimeout).Return(nil) mockWranglerInterface.EXPECT().CopySchemaShardFromShard( - hw.ctx, + ctx, nil, /* tableArray*/ nil, /* excludeTableArray */ true, /*includeViews*/ @@ -61,15 +192,15 @@ func TestHorizontalResharding(t *testing.T) { "80-", wrangler.DefaultWaitSlaveTimeout).Return(nil) - mockWranglerInterface.EXPECT().WaitForFilteredReplication(hw.ctx, "test_keyspace", "-80", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil) - mockWranglerInterface.EXPECT().WaitForFilteredReplication(hw.ctx, "test_keyspace", "80-", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil) + mockWranglerInterface.EXPECT().WaitForFilteredReplication(ctx, "test_keyspace", "-80", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil) + mockWranglerInterface.EXPECT().WaitForFilteredReplication(ctx, "test_keyspace", "80-", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil) servedTypeParams := []topodatapb.TabletType{topodatapb.TabletType_RDONLY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_MASTER} for _, servedType := range servedTypeParams { mockWranglerInterface.EXPECT().MigrateServedTypes( - hw.ctx, + ctx, "test_keyspace", "0", nil, /* cells */ @@ -78,18 +209,13 @@ func TestHorizontalResharding(t *testing.T) { false, /* skipReFreshState */ wrangler.DefaultFilteredReplicationWaitTime).Return(nil) } + return mockWranglerInterface +} - // Create fakeworkerclient, which is used for the unit test in steps SplitClone and SplitDiff. - fakeVtworkerClient := fakevtworkerclient.NewFakeVtworkerClient() - vtworkerclient.RegisterFactory("fake", fakeVtworkerClient.FakeVtworkerClientFactory) - defer vtworkerclient.UnregisterFactoryForTest("fake") - flag.Set("vtworker_client_protocol", "fake") - fakeVtworkerClient.RegisterResultForAddr("localhost:15032", []string{"SplitClone", "--min_healthy_rdonly_tablets=1", "test_keyspace/0"}, "", nil) - fakeVtworkerClient.RegisterResultForAddr("localhost:15032", []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", "test_keyspace/-80"}, "", nil) - fakeVtworkerClient.RegisterResultForAddr("localhost:15032", []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", "test_keyspace/80-"}, "", nil) - - // Test the execution of horizontal resharding. - if err := hw.executeWorkflow(); err != nil { - t.Errorf("%s: Horizontal resharding workflow should not fail", err) +func verifySuccess(t *testing.T, checkpoint *workflowpb.WorkflowCheckpoint) { + for _, task := range checkpoint.Tasks { + if task.State != workflowpb.TaskState_TaskDone || task.Error != "" { + t.Fatalf("task: %v should succeed: task status: %v, %v", task.Id, task.State, task.Error) + } } } diff --git a/go/vt/workflow/resharding/parallel_runner.go b/go/vt/workflow/resharding/parallel_runner.go new file mode 100644 index 00000000000..ac7e504d375 --- /dev/null +++ b/go/vt/workflow/resharding/parallel_runner.go @@ -0,0 +1,206 @@ +package resharding + +import ( + "fmt" + "sync" + + log "github.com/golang/glog" + "golang.org/x/net/context" + + "github.com/youtube/vitess/go/vt/workflow" + + workflowpb "github.com/youtube/vitess/go/vt/proto/workflow" +) + +type level int + +const ( + // Sequential means that the tasks will run sequentially. + Sequential level = iota + //Parallel means that the tasks will run in parallel. + Parallel +) + +// ParallelRunner is used to control executing tasks concurrently. +// Each phase has its own ParallelRunner object. +type ParallelRunner struct { + ctx context.Context + rootUINode *workflow.Node + checkpointWriter *CheckpointWriter + // tasks stores selected tasks for the phase with expected execution order. + tasks []*workflowpb.Task + concurrencyLevel level + executeFunc func(context.Context, *workflowpb.Task) error + + // mu is used to protect the retryActionRegistry. + mu sync.Mutex + // retryAtionRegistry stores the data for retry actions. + // Each task can retrieve its RetryController through its UI node path. + retryActionRegistry map[string]*RetryController + + // reportTaskStatus gives the worklflow debug option to output the task + // status through UI. + // TODO(yipeiw): We will remove this option and make it always report task + // status, once we can unit test resharding workflow through manager + // (we ignore creating UI nodes when manually creating the workflow now). + reportTaskStatus bool +} + +// NewParallelRunner returns a new ParallelRunner. +func NewParallelRunner(ctx context.Context, rootUINode *workflow.Node, cp *CheckpointWriter, tasks []*workflowpb.Task, executeFunc func(context.Context, *workflowpb.Task) error, concurrencyLevel level) *ParallelRunner { + return &ParallelRunner{ + ctx: ctx, + rootUINode: rootUINode, + checkpointWriter: cp, + tasks: tasks, + executeFunc: executeFunc, + concurrencyLevel: concurrencyLevel, + retryActionRegistry: make(map[string]*RetryController), + reportTaskStatus: false, + } +} + +// Run is the entry point for controling task executions. +func (p *ParallelRunner) Run() error { + var parallelNum int // default value is 0. The task will not run in this case. + switch p.concurrencyLevel { + case Sequential: + parallelNum = 1 + case Parallel: + parallelNum = len(p.tasks) + default: + panic(fmt.Sprintf("BUG: Invalid concurrency level: %v", p.concurrencyLevel)) + } + + // sem is a channel used to control the level of concurrency. + sem := make(chan bool, parallelNum) + for _, task := range p.tasks { + if task.State == workflowpb.TaskState_TaskDone && task.Error == "" { + continue + } + + sem <- true + go func(t *workflowpb.Task) { + defer func() { <-sem }() + defer p.setFinishUIMessage(t.Id) + + taskID := t.Id + for { + // Update the task status to running in the checkpoint. + if updateErr := p.checkpointWriter.UpdateTask(taskID, workflowpb.TaskState_TaskRunning, nil); updateErr != nil { + // Only logging the error rather then passing it to ErrorRecorder. + // Errors in ErrorRecorder will lead to the stop of a workflow. We + // don't want to stop the workflow if only checkpointing fails. + log.Errorf("%v", updateErr) + } + err := p.executeFunc(p.ctx, t) + // Update the task status to done in the checkpoint. + if updateErr := p.checkpointWriter.UpdateTask(taskID, workflowpb.TaskState_TaskDone, err); updateErr != nil { + log.Errorf("%v", updateErr) + } + + // The function returns if the task is executed successfully. + if err == nil { + log.Infof("task %v has finished.", taskID) + return + } + // When task fails, first check whether the context is canceled. + // If so, return right away. If not, enable the retry action. + select { + case <-p.ctx.Done(): + return + default: + } + retryChannel, nodePath := p.addRetryAction(taskID) + + // Block the task execution until the retry action is triggered + // or the context is canceled. + select { + case <-retryChannel: + continue + case <-p.ctx.Done(): + p.unregisterRetryController(nodePath) + return + } + } + }(task) + } + + // Wait until all running jobs are done. + for i := 0; i < parallelNum; i++ { + sem <- true + } + // TODO: collect error message from tasks.Error instead, s.t. if the task is retried, we can update the error + return nil +} + +// Action handles the retry action. It implements the interface ActionListener. +func (p *ParallelRunner) Action(ctx context.Context, path, name string) error { + switch name { + case "Retry": + return p.triggerRetry(path) + default: + return fmt.Errorf("Unknown action: %v", name) + } +} + +func (p *ParallelRunner) triggerRetry(nodePath string) error { + p.mu.Lock() + defer p.mu.Unlock() + c, ok := p.retryActionRegistry[nodePath] + if !ok { + return fmt.Errorf("Unregistered action for node: %v", nodePath) + } + p.unregisterRetryControllerLocked(nodePath) + c.triggerRetry() + return nil +} + +func (p *ParallelRunner) addRetryAction(taskID string) (chan struct{}, string) { + node, err := p.rootUINode.GetChildByPath(taskID) + if err != nil { + panic(fmt.Errorf("node on child path %v not found", taskID)) + } + + p.mu.Lock() + defer p.mu.Unlock() + retryController := CreateRetryController(node, p /* actionListener */) + p.registerRetryControllerLocked(node.Path, retryController) + node.BroadcastChanges(false /* updateChildren */) + return retryController.retryChannel, node.Path +} + +func (p *ParallelRunner) registerRetryControllerLocked(nodePath string, c *RetryController) { + if _, ok := p.retryActionRegistry[nodePath]; ok { + panic(fmt.Errorf("duplicate retry action for node: %v", nodePath)) + } + p.retryActionRegistry[nodePath] = c +} + +func (p *ParallelRunner) unregisterRetryController(nodePath string) { + p.mu.Lock() + p.mu.Unlock() + p.unregisterRetryControllerLocked(nodePath) +} + +func (p *ParallelRunner) unregisterRetryControllerLocked(nodePath string) { + if _, ok := p.retryActionRegistry[nodePath]; !ok { + log.Warningf("retry action for node: %v doesn't exist, cannot unregister it", nodePath) + } else { + delete(p.retryActionRegistry, nodePath) + } +} + +func (p *ParallelRunner) setFinishUIMessage(taskID string) { + if p.reportTaskStatus { + taskNode, err := p.rootUINode.GetChildByPath(taskID) + if err != nil { + panic(fmt.Errorf("nodepath %v not found", taskID)) + } + + p.mu.Lock() + defer p.mu.Unlock() + taskNode.Message = fmt.Sprintf("task %v finished", taskID) + taskNode.BroadcastChanges(false /* updateChildren */) + } +} diff --git a/go/vt/workflow/resharding/parallel_runner_test.go b/go/vt/workflow/resharding/parallel_runner_test.go new file mode 100644 index 00000000000..7c83c51fae2 --- /dev/null +++ b/go/vt/workflow/resharding/parallel_runner_test.go @@ -0,0 +1,213 @@ +package resharding + +import ( + "context" + "fmt" + "path" + "strings" + "sync" + "testing" + + "github.com/golang/protobuf/proto" + "github.com/youtube/vitess/go/vt/topo" + "github.com/youtube/vitess/go/vt/topo/memorytopo" + "github.com/youtube/vitess/go/vt/workflow" + + workflowpb "github.com/youtube/vitess/go/vt/proto/workflow" +) + +func TestParallelRunner(t *testing.T) { + ts := memorytopo.NewServer("cell") + m := workflow.NewManager(ts) + + // Run the manager in the background. + wg, cancel, _ := startManager(t, m) + + // Create a testworkflow. + uuid, err := m.Create(context.Background(), testWorkflowFactoryName, []string{"-retry=false", "-count=2"}) + if err != nil { + t.Fatalf("cannot create testworkflow: %v", err) + } + + // Start the job + if err := m.Start(context.Background(), uuid); err != nil { + t.Fatalf("cannot start testworkflow: %v", err) + } + + // Wait for the workflow to end. + m.Wait(context.Background(), uuid) + + verifyWorkflowSuccess(context.Background(), t, ts, uuid) + + // Stop the manager. + if err := m.Stop(context.Background(), uuid); err != nil { + t.Fatalf("cannot stop testworkflow: %v", err) + } + cancel() + wg.Wait() +} + +func TestParallelRunnerRetryAction(t *testing.T) { + // Tasks in the workflow are forced to fail at the first attempt. Then we + // retry task1, after it is finished successfully, we retry task2. + ts := memorytopo.NewServer("cell") + m := workflow.NewManager(ts) + + // Run the manager in the background. + wg, cancel, ctx := startManager(t, m) + + // Create a testworkflow. + uuid, err := m.Create(context.Background(), testWorkflowFactoryName, []string{"-retry=true", "-count=2"}) + if err != nil { + t.Fatalf("cannot create testworkflow: %v", err) + } + + // We use notifications channel to monitor the update of UI. + notifications := make(chan []byte, 10) + _, index, err := m.NodeManager().GetAndWatchFullTree(notifications) + if err != nil { + t.Errorf("GetAndWatchTree Failed: %v", err) + } + defer m.NodeManager().CloseWatcher(index) + go func() { + // This goroutine is used to detect and trigger the retry actions. + task1ID := createTestTaskID(phaseSimple, 0) + task2ID := createTestTaskID(phaseSimple, 1) + + retry1 := false + retry2 := false + for { + select { + case monitor, ok := <-notifications: + monitorStr := string(monitor) + if !ok { + t.Errorf("notifications channel is closed unexpectedly: %v, %v", ok, monitorStr) + } + if strings.Contains(monitorStr, "Retry") { + if strings.Contains(monitorStr, task1ID) { + verifyTaskSuccessOrFailure(context.Background(), t, ts, uuid, task1ID, false /* isSuccess*/) + retry1 = true + } + if strings.Contains(monitorStr, task2ID) { + verifyTaskSuccessOrFailure(context.Background(), t, ts, uuid, task2ID, false /* isSuccess*/) + retry2 = true + } + } + // After detecting both tasks have enabled retry actions after failure, + // retry task1, check its success, then retry task2, check its success. + if retry1 && retry2 { + clickRetry(ctx, t, m, path.Join("/"+uuid, task1ID)) + waitForFinished(ctx, t, notifications, task1ID) + verifyTaskSuccessOrFailure(context.Background(), t, ts, uuid, task1ID, true /* isSuccess*/) + + clickRetry(ctx, t, m, path.Join("/"+uuid, task2ID)) + waitForFinished(ctx, t, notifications, task2ID) + verifyTaskSuccessOrFailure(context.Background(), t, ts, uuid, task2ID, true /* isSuccess*/) + return + } + case <-ctx.Done(): + t.Errorf("context is canceled") + return + } + } + }() + + // Start the job + if err := m.Start(context.Background(), uuid); err != nil { + t.Fatalf("cannot start testworkflow: %v", err) + } + // Wait for the workflow to end. + m.Wait(context.Background(), uuid) + + verifyWorkflowSuccess(context.Background(), t, ts, uuid) + // Stop the manager. + if err := m.Stop(context.Background(), uuid); err != nil { + t.Fatalf("cannot stop testworkflow: %v", err) + } + cancel() + wg.Wait() +} + +func startManager(t *testing.T, m *workflow.Manager) (*sync.WaitGroup, context.CancelFunc, context.Context) { + // Run the manager in the background. + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + m.Run(ctx) + wg.Done() + }() + + m.WaitUntilRunning() + return wg, cancel, ctx +} + +func clickRetry(ctx context.Context, t *testing.T, m *workflow.Manager, nodePath string) { + t.Logf("Click retry action on node: %v.", nodePath) + if err := m.NodeManager().Action(ctx, &workflow.ActionParameters{ + Path: nodePath, + Name: "Retry", + }); err != nil { + t.Errorf("unexpected action error: %v", err) + } +} + +func waitForFinished(ctx context.Context, t *testing.T, notifications chan []byte, taskID string) { + for { + select { + case monitor, ok := <-notifications: + monitorStr := string(monitor) + if !ok { + t.Errorf("unexpected notification: %v, %v", ok, monitorStr) + } + + finishMessage := fmt.Sprintf(`"message":"task %v finished"`, taskID) + if strings.Contains(monitorStr, finishMessage) { + if strings.Contains(monitorStr, `"actions":[{"name:`) { + t.Fatalf("the node actions should be empty after triggering retry: %v", monitorStr) + } + return + } + case <-ctx.Done(): + return + } + } +} + +func verifyWorkflowSuccess(ctx context.Context, t *testing.T, ts topo.Server, uuid string) { + wi, err := ts.GetWorkflow(ctx, uuid) + if err != nil { + t.Errorf("fail to get workflow for: %v", uuid) + } + checkpoint := &workflowpb.WorkflowCheckpoint{} + if err := proto.Unmarshal(wi.Workflow.Data, checkpoint); err != nil { + t.Errorf("fails to get checkpoint for the workflow: %v", err) + } + + for _, task := range checkpoint.Tasks { + if task.State != workflowpb.TaskState_TaskDone || task.Error != "" { + t.Fatalf("task: %v should succeed: task status: %v, %v", task.Id, task.State, task.Attributes) + } + } +} + +func verifyTaskSuccessOrFailure(ctx context.Context, t *testing.T, ts topo.Server, uuid, taskID string, isSuccess bool) { + wi, err := ts.GetWorkflow(ctx, uuid) + if err != nil { + t.Errorf("fail to get workflow for: %v", uuid) + } + + checkpoint := &workflowpb.WorkflowCheckpoint{} + if err := proto.Unmarshal(wi.Workflow.Data, checkpoint); err != nil { + t.Errorf("fails to get checkpoint for the workflow: %v", err) + } + task := checkpoint.Tasks[taskID] + + taskError := "" + if !isSuccess { + taskError = errMessage + } + if task.State != workflowpb.TaskState_TaskDone || task.Error != taskError { + t.Errorf("task: %v should succeed. Task status: %v, %v", task.Id, task.State, task.Error) + } +} diff --git a/go/vt/workflow/resharding/retry_controller.go b/go/vt/workflow/resharding/retry_controller.go new file mode 100644 index 00000000000..6ae0482d7d6 --- /dev/null +++ b/go/vt/workflow/resharding/retry_controller.go @@ -0,0 +1,37 @@ +package resharding + +import "github.com/youtube/vitess/go/vt/workflow" + +// RetryController stores the data for controlling the retry action. +type RetryController struct { + node *workflow.Node + // retryChannel is used to trigger the retrying of task + // when pressing the button. + retryChannel chan struct{} +} + +// CreateRetryController create a RetryController for a specific node and +// enable the retry action on the node. +func CreateRetryController(node *workflow.Node, actionListener workflow.ActionListener) *RetryController { + retryAction := &workflow.Action{ + Name: "Retry", + State: workflow.ActionStateEnabled, + Style: workflow.ActionStyleWaiting, + } + node.Actions = []*workflow.Action{retryAction} + node.Listener = actionListener + return &RetryController{ + node: node, + retryChannel: make(chan struct{}), + } +} + +// triggerRetry closes the retryChannel and empties the Actions list +// in the UI Node. This disables the retry action. +func (c *RetryController) triggerRetry() { + if len(c.node.Actions) != 0 { + c.node.Actions = []*workflow.Action{} + close(c.retryChannel) + } + c.node.BroadcastChanges(false /* updateChildren */) +} diff --git a/go/vt/workflow/resharding/tasks.go b/go/vt/workflow/resharding/tasks.go new file mode 100644 index 00000000000..7c624dfe70c --- /dev/null +++ b/go/vt/workflow/resharding/tasks.go @@ -0,0 +1,101 @@ +package resharding + +import ( + "fmt" + "strings" + + "golang.org/x/net/context" + + "github.com/youtube/vitess/go/vt/automation" + "github.com/youtube/vitess/go/vt/topo/topoproto" + "github.com/youtube/vitess/go/vt/wrangler" + + topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" + workflowpb "github.com/youtube/vitess/go/vt/proto/workflow" +) + +func createTaskID(phase PhaseType, shardName string) string { + return fmt.Sprintf("%s/%s", phase, shardName) +} + +// GetTasks returns selected tasks for a phase from the checkpoint +// with expected execution order. +func (hw *HorizontalReshardingWorkflow) GetTasks(phase PhaseType) []*workflowpb.Task { + var shards []string + switch phase { + case phaseCopySchema, phaseWaitForFilteredReplication, phaseDiff: + shards = strings.Split(hw.checkpoint.Settings["destination_shards"], ",") + case phaseClone, phaseMigrateRdonly, phaseMigrateReplica, phaseMigrateMaster: + shards = strings.Split(hw.checkpoint.Settings["source_shards"], ",") + default: + panic(fmt.Sprintf("BUG: unknown phase type: %v", phase)) + } + + var tasks []*workflowpb.Task + for _, s := range shards { + taskID := createTaskID(phase, s) + tasks = append(tasks, hw.checkpoint.Tasks[taskID]) + } + return tasks +} + +func (hw *HorizontalReshardingWorkflow) runCopySchema(ctx context.Context, t *workflowpb.Task) error { + keyspace := t.Attributes["keyspace"] + sourceShard := t.Attributes["source_shard"] + destShard := t.Attributes["destination_shard"] + return hw.wr.CopySchemaShardFromShard(ctx, nil /* tableArray*/, nil /* excludeTableArray */, true, /*includeViews*/ + keyspace, sourceShard, keyspace, destShard, wrangler.DefaultWaitSlaveTimeout) +} + +func (hw *HorizontalReshardingWorkflow) runSplitClone(ctx context.Context, t *workflowpb.Task) error { + keyspace := t.Attributes["keyspace"] + sourceShard := t.Attributes["source_shard"] + worker := t.Attributes["vtworker"] + + sourceKeyspaceShard := topoproto.KeyspaceShardString(keyspace, sourceShard) + // Reset the vtworker to avoid error if vtworker command has been called elsewhere. + // This is because vtworker class doesn't cleanup the environment after execution. + automation.ExecuteVtworker(ctx, worker, []string{"Reset"}) + // The flag min_healthy_rdonly_tablets is set to 1 (default value is 2). + // Therefore, we can reuse the normal end to end test setting, which has only 1 rdonly tablet. + // TODO(yipeiw): Add min_healthy_rdonly_tablets as an input argument in UI. + args := []string{"SplitClone", "--min_healthy_rdonly_tablets=1", sourceKeyspaceShard} + _, err := automation.ExecuteVtworker(hw.ctx, worker, args) + return err +} + +func (hw *HorizontalReshardingWorkflow) runWaitForFilteredReplication(ctx context.Context, t *workflowpb.Task) error { + keyspace := t.Attributes["keyspace"] + destShard := t.Attributes["destination_shard"] + return hw.wr.WaitForFilteredReplication(ctx, keyspace, destShard, wrangler.DefaultWaitForFilteredReplicationMaxDelay) +} + +func (hw *HorizontalReshardingWorkflow) runSplitDiff(ctx context.Context, t *workflowpb.Task) error { + keyspace := t.Attributes["keyspace"] + destShard := t.Attributes["destination_shard"] + worker := t.Attributes["vtworker"] + + automation.ExecuteVtworker(hw.ctx, worker, []string{"Reset"}) + args := []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", topoproto.KeyspaceShardString(keyspace, destShard)} + _, err := automation.ExecuteVtworker(ctx, worker, args) + return err +} + +func (hw *HorizontalReshardingWorkflow) runMigrate(ctx context.Context, t *workflowpb.Task) error { + keyspace := t.Attributes["keyspace"] + sourceShard := t.Attributes["source_shard"] + servedTypeStr := t.Attributes["served_type"] + + servedType, err := topoproto.ParseTabletType(servedTypeStr) + if err != nil { + return fmt.Errorf("unknown tablet type: %v", servedTypeStr) + } + + if servedType != topodatapb.TabletType_RDONLY && + servedType != topodatapb.TabletType_REPLICA && + servedType != topodatapb.TabletType_MASTER { + return fmt.Errorf("wrong served type to be migrated: %v", servedTypeStr) + } + + return hw.wr.MigrateServedTypes(ctx, keyspace, sourceShard, nil /* cells */, servedType, false /* reverse */, false /* skipReFreshState */, wrangler.DefaultFilteredReplicationWaitTime) +} diff --git a/go/vt/workflow/resharding/test_workflow.go b/go/vt/workflow/resharding/test_workflow.go new file mode 100644 index 00000000000..0a332e36d13 --- /dev/null +++ b/go/vt/workflow/resharding/test_workflow.go @@ -0,0 +1,181 @@ +package resharding + +import ( + "errors" + "flag" + "fmt" + "strconv" + "sync" + + log "github.com/golang/glog" + "golang.org/x/net/context" + + "github.com/golang/protobuf/proto" + "github.com/youtube/vitess/go/vt/logutil" + "github.com/youtube/vitess/go/vt/topo" + "github.com/youtube/vitess/go/vt/workflow" + + workflowpb "github.com/youtube/vitess/go/vt/proto/workflow" +) + +const ( + testWorkflowFactoryName = "test_workflow" + phaseSimple PhaseType = "simple" + errMessage = "fake error for testing retry" +) + +func createTestTaskID(phase PhaseType, count int) string { + return fmt.Sprintf("%s/%v", phase, count) +} + +func init() { + workflow.Register(testWorkflowFactoryName, &TestWorkflowFactory{}) +} + +// TestWorkflow is created to simplfy the unit test of ParallelRunner. +type TestWorkflow struct { + ctx context.Context + manager *workflow.Manager + topoServer topo.Server + wi *topo.WorkflowInfo + logger *logutil.MemoryLogger + + retryMu sync.Mutex + // retryFlags stores the retry flag for all the tasks. + retryFlags map[string]bool + + rootUINode *workflow.Node + + checkpoint *workflowpb.WorkflowCheckpoint + checkpointWriter *CheckpointWriter +} + +// Run implements the worklfow.Workflow interface. +func (tw *TestWorkflow) Run(ctx context.Context, manager *workflow.Manager, wi *topo.WorkflowInfo) error { + tw.ctx = ctx + tw.topoServer = manager.TopoServer() + tw.manager = manager + tw.wi = wi + tw.checkpointWriter = NewCheckpointWriter(tw.topoServer, tw.checkpoint, tw.wi) + tw.rootUINode.Display = workflow.NodeDisplayDeterminate + tw.rootUINode.BroadcastChanges(true /* updateChildren */) + + simpleTasks := tw.getTasks(phaseSimple) + simpleRunner := NewParallelRunner(tw.ctx, tw.rootUINode, tw.checkpointWriter, simpleTasks, tw.runSimple, Parallel) + simpleRunner.reportTaskStatus = true + if err := simpleRunner.Run(); err != nil { + return err + } + + log.Infof("Horizontal resharding is finished successfully.") + return nil +} + +func (tw *TestWorkflow) getTasks(phaseName PhaseType) []*workflowpb.Task { + count, err := strconv.Atoi(tw.checkpoint.Settings["count"]) + if err != nil { + log.Info("converting count in checkpoint.Settings to int fails: %v \n", tw.checkpoint.Settings["count"]) + return nil + } + var tasks []*workflowpb.Task + for i := 0; i < count; i++ { + taskID := createTestTaskID(phaseName, i) + tasks = append(tasks, tw.checkpoint.Tasks[taskID]) + } + return tasks +} + +func (tw *TestWorkflow) runSimple(ctx context.Context, t *workflowpb.Task) error { + log.Info("The number passed to me is %v \n", t.Attributes["number"]) + + tw.retryMu.Lock() + defer tw.retryMu.Unlock() + if tw.retryFlags[t.Id] { + log.Info("I will fail at this time since retry flag is true.") + tw.retryFlags[t.Id] = false + return errors.New(errMessage) + } + return nil +} + +// TestWorkflowFactory is the factory to create a test workflow. +type TestWorkflowFactory struct{} + +// Init is part of the workflow.Factory interface. +func (*TestWorkflowFactory) Init(w *workflowpb.Workflow, args []string) error { + subFlags := flag.NewFlagSet(testWorkflowFactoryName, flag.ContinueOnError) + retryFlag := subFlags.Bool("retry", false, "The retry flag should be true if the retry action need to be tested") + count := subFlags.Int("count", 0, "The number of simple tasks") + if err := subFlags.Parse(args); err != nil { + return err + } + + // Initialize the checkpoint. + taskMap := make(map[string]*workflowpb.Task) + for i := 0; i < *count; i++ { + taskID := createTestTaskID(phaseSimple, i) + taskMap[taskID] = &workflowpb.Task{ + Id: taskID, + State: workflowpb.TaskState_TaskNotStarted, + Attributes: map[string]string{"number": fmt.Sprintf("%v", i)}, + } + } + checkpoint := &workflowpb.WorkflowCheckpoint{ + CodeVersion: 0, + Tasks: taskMap, + Settings: map[string]string{"count": fmt.Sprintf("%v", *count), "retry": fmt.Sprintf("%v", *retryFlag)}, + } + var err error + w.Data, err = proto.Marshal(checkpoint) + if err != nil { + return err + } + return nil +} + +// Instantiate is part the workflow.Factory interface. +func (*TestWorkflowFactory) Instantiate(w *workflowpb.Workflow, rootNode *workflow.Node) (workflow.Workflow, error) { + checkpoint := &workflowpb.WorkflowCheckpoint{} + if err := proto.Unmarshal(w.Data, checkpoint); err != nil { + return nil, err + } + // Get the retry flags for all tasks from the checkpoint. + retry, err := strconv.ParseBool(checkpoint.Settings["retry"]) + if err != nil { + log.Errorf("converting retry in checkpoint.Settings to bool fails: %v \n", checkpoint.Settings["retry"]) + return nil, err + } + retryFlags := make(map[string]bool) + for _, task := range checkpoint.Tasks { + retryFlags[task.Id] = retry + } + + tw := &TestWorkflow{ + checkpoint: checkpoint, + rootUINode: rootNode, + logger: logutil.NewMemoryLogger(), + retryFlags: retryFlags, + } + + count, err := strconv.Atoi(checkpoint.Settings["count"]) + if err != nil { + log.Errorf("converting count in checkpoint.Settings to int fails: %v \n", checkpoint.Settings["count"]) + return nil, err + } + + phaseNode := &workflow.Node{ + Name: string(phaseSimple), + PathName: string(phaseSimple), + } + tw.rootUINode.Children = append(tw.rootUINode.Children, phaseNode) + + for i := 0; i < count; i++ { + taskName := fmt.Sprintf("%v", i) + taskUINode := &workflow.Node{ + Name: taskName, + PathName: taskName, + } + phaseNode.Children = append(phaseNode.Children, taskUINode) + } + return tw, nil +} diff --git a/proto/workflow.proto b/proto/workflow.proto index b107c9682cf..3c7bbb50bb2 100644 --- a/proto/workflow.proto +++ b/proto/workflow.proto @@ -58,3 +58,35 @@ message Workflow { // This field only makes sense if 'state' is Done. int64 end_time = 8; } + +message WorkflowCheckpoint { + // code_version is used to detect incompabilities between the version of the + // running workflow and the one which wrote the checkpoint. If they don't + // match, the workflow must not continue. The author of workflow must update + // this variable in their implementation when incompabilities are introduced. + int32 code_version = 1; + // tasks stores all tasks of the workflow in a map. The key is a unique name + // to identify the task, e.g. clone/-80. + + // Task is the data structure that stores the execution status and the + // attributes of a task. + map tasks = 2; + // settings includes workflow specific data, e.g. the resharding workflow + // would store the source shards and destination shards. + map settings = 3; +} + +enum TaskState { + TaskNotStarted = 0; + TaskRunning = 1; + TaskDone = 2; +} + +message Task { + string id = 1; + TaskState state = 2; + // attributes includes the parameters the task needs. + map attributes = 3; + string error = 4; +} +