Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 197 additions & 17 deletions go/vt/proto/workflow/workflow.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions go/vt/vtctld/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func initWorkflowManager(ts topo.Server) {

// Register the Horizontal Resharding workflow.
resharding.Register()

// Unregister the blacklisted workflows.
for _, name := range workflowManagerDisable {
workflow.Unregister(name)
Expand Down
2 changes: 2 additions & 0 deletions go/vt/workflow/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type Factory interface {
// variable filled it. This Init method should fill in the
// Name and Data attributes, based on the provided args.
// This is called during the Manager.Create phase.
// TODO(yipeiw): We should extend the interface to pass the topology server
// as well. The topology server is needed in the resarding workflow.
Init(w *workflowpb.Workflow, args []string) error

// Instantiate loads a workflow from the proto representation
Expand Down
25 changes: 25 additions & 0 deletions go/vt/workflow/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,30 @@ func (n *Node) deepCopyFrom(otherNode *Node, copyChildren bool) error {
return nil
}

// GetChildByPath returns the child node given the relative path to this node.
// The caller must ensure that the node tree is not modified during the call.
func (n *Node) GetChildByPath(subPath string) (*Node, error) {
// Find the subnode if needed.
parts := strings.Split(subPath, "/")

currentNode := n
for i := 0; i < len(parts); i++ {
childPathName := parts[i]
found := false
for _, child := range currentNode.Children {
if child.PathName == childPathName {
found = true
currentNode = child
break
}
}
if !found {
return nil, fmt.Errorf("node %v has no children named %v", currentNode.Path, childPathName)
}
}
return currentNode, nil
}

// ActionParameters describe an action initiated by the user.
type ActionParameters struct {
// Path is the path of the Node the action was performed on.
Expand Down Expand Up @@ -355,6 +379,7 @@ func (m *NodeManager) updateNodeAndBroadcastLocked(userNode *Node, updateChildre
if err != nil {
return err
}

userNode.LastChanged = time.Now().Unix()
if err := savedNode.deepCopyFrom(userNode, updateChildren); err != nil {
return err
Expand Down
56 changes: 56 additions & 0 deletions go/vt/workflow/resharding/checkpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package resharding

import (
"context"
"sync"

"github.com/golang/protobuf/proto"
"github.com/youtube/vitess/go/vt/topo"

workflowpb "github.com/youtube/vitess/go/vt/proto/workflow"
)

// CheckpointWriter saves the checkpoint data into topology server.
type CheckpointWriter struct {
topoServer topo.Server

// checkpointMu is used for protecting data access during checkpointing.
mu sync.Mutex
checkpoint *workflowpb.WorkflowCheckpoint
wi *topo.WorkflowInfo
}

// NewCheckpointWriter creates a CheckpointWriter.
func NewCheckpointWriter(ts topo.Server, checkpoint *workflowpb.WorkflowCheckpoint, wi *topo.WorkflowInfo) *CheckpointWriter {
return &CheckpointWriter{
topoServer: ts,
checkpoint: checkpoint,
wi: wi,
}
}

// UpdateTask updates the task status in the checkpointing copy and
// saves the full checkpoint to the topology server.
func (c *CheckpointWriter) UpdateTask(taskID string, status workflowpb.TaskState, err error) error {
c.mu.Lock()
defer c.mu.Unlock()

errorMessage := ""
if err != nil {
errorMessage = err.Error()
}

t := c.checkpoint.Tasks[taskID]
t.State = status
t.Error = errorMessage
return c.saveLocked()
}

func (c *CheckpointWriter) saveLocked() error {
var err error
c.wi.Data, err = proto.Marshal(c.checkpoint)
if err != nil {
return err
}
return c.topoServer.SaveWorkflow(context.TODO(), c.wi)
}
Loading