Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 52 additions & 42 deletions go/vt/proto/workflow/workflow.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions go/vt/vtctld/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"vitess.io/vitess/go/vt/vtctl"
"vitess.io/vitess/go/vt/workflow"
"vitess.io/vitess/go/vt/workflow/resharding"
"vitess.io/vitess/go/vt/workflow/reshardingworkflowgen"
"vitess.io/vitess/go/vt/workflow/topovalidator"
)

Expand Down Expand Up @@ -59,6 +60,9 @@ func initWorkflowManager(ts *topo.Server) {
// Register the Horizontal Resharding workflow.
resharding.Register()

// Register workflow that generates Horizontal Resharding workflows.
reshardingworkflowgen.Register()

// Unregister the blacklisted workflows.
for _, name := range workflowManagerDisable {
workflow.Unregister(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package resharding
package workflow

import (
"sync"
Expand Down
2 changes: 1 addition & 1 deletion go/vt/workflow/long_polling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestLongPolling(t *testing.T) {
go http.Serve(listener, nil)

// Run the manager in the background.
wg, cancel := startManager(t, m)
wg, _, cancel := StartManager(m)

// Get the original tree with a 'create'.
u := url.URL{Scheme: "http", Host: listener.Addr().String(), Path: "/workflow/create"}
Expand Down
19 changes: 18 additions & 1 deletion go/vt/workflow/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (m *Manager) loadAndStartJobsLocked() {
}

// Create creates a workflow from the given factory name with the
// provided args. Returns the unique UUID of the workflow. The
// provided args. Returns the unique UUID of the workflow. The
// workflowpb.Workflow object is saved in the topo server after
// creation.
func (m *Manager) Create(ctx context.Context, factoryName string, args []string) (string, error) {
Expand All @@ -260,6 +260,7 @@ func (m *Manager) Create(ctx context.Context, factoryName string, args []string)
// Create the initial workflowpb.Workflow object.
w := &workflowpb.Workflow{
Uuid: gouuid.NewUUID().String(),
CreateTime: time.Now().UnixNano(),
FactoryName: factoryName,
State: workflowpb.WorkflowState_NotStarted,
}
Expand Down Expand Up @@ -295,6 +296,7 @@ func (m *Manager) instantiateWorkflow(w *workflowpb.Workflow) (*runningWorkflow,
}
rw.rootNode.Name = w.Name
rw.rootNode.PathName = w.Uuid
rw.rootNode.CreateTime = w.CreateTime
rw.rootNode.Path = "/" + rw.rootNode.PathName
rw.rootNode.State = w.State

Expand Down Expand Up @@ -588,3 +590,18 @@ func AvailableFactories() map[string]bool {
}
return result
}

// StartManager starts a manager. This function should only be used for tests purposes.
func StartManager(m *Manager) (*sync.WaitGroup, context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
m.Run(ctx)
wg.Done()
}()

m.WaitUntilRunning()

return wg, ctx, cancel
}
23 changes: 4 additions & 19 deletions go/vt/workflow/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package workflow

import (
"strings"
"sync"
"testing"
"time"

Expand All @@ -29,20 +28,6 @@ import (
workflowpb "vitess.io/vitess/go/vt/proto/workflow"
)

func startManager(t *testing.T, m *Manager) (*sync.WaitGroup, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
m.Run(ctx)
wg.Done()
}()

m.WaitUntilRunning()

return wg, cancel
}

// TestWaitUntilRunning verifies that WaitUntilRunning() works as expected
// (blocking until Run() has advanced far enough), even across multiple Manager
// starts and stops.
Expand All @@ -53,7 +38,7 @@ func TestWaitUntilRunning(t *testing.T) {
// Start it 3 times i.e. restart it 2 times.
for i := 1; i <= 3; i++ {
// Run the manager in the background.
wg, cancel := startManager(t, m)
wg, _, cancel := StartManager(m)

// Shut it down and wait for the shutdown to complete.
cancel()
Expand All @@ -67,7 +52,7 @@ func TestManagerSimpleRun(t *testing.T) {
m := NewManager(ts)

// Run the manager in the background.
wg, cancel := startManager(t, m)
wg, _, cancel := StartManager(m)

// Create a Sleep job.
uuid, err := m.Create(context.Background(), sleepFactoryName, []string{"-duration", "60"})
Expand Down Expand Up @@ -96,7 +81,7 @@ func TestManagerRestart(t *testing.T) {
m := NewManager(ts)

// Run the manager in the background.
wg, cancel := startManager(t, m)
wg, _, cancel := StartManager(m)

// Create a Sleep job.
uuid, err := m.Create(context.Background(), sleepFactoryName, []string{"-duration", "60"})
Expand Down Expand Up @@ -127,7 +112,7 @@ func TestManagerRestart(t *testing.T) {
}

// Restart the manager.
wg, cancel = startManager(t, m)
wg, _, cancel = StartManager(m)

// Make sure the job is in there shortly.
timeout := 0
Expand Down
5 changes: 5 additions & 0 deletions go/vt/workflow/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"path"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -139,6 +140,7 @@ type Node struct {
Path string `json:"path"`
Children []*Node `json:"children,omitempty"`
LastChanged int64 `json:"lastChanged"`
CreateTime int64 `json:"createTime"`
Progress int `json:"progress"`
ProgressMessage string `json:"progressMsg"`
State workflowpb.WorkflowState `json:"state"`
Expand Down Expand Up @@ -333,6 +335,9 @@ func (m *NodeManager) toJSON(index int) ([]byte, error) {
for _, n := range m.roots {
u.Nodes = append(u.Nodes, n)
}
sort.Slice(u.Nodes, func(i, j int) bool {
return u.Nodes[i].CreateTime < u.Nodes[j].CreateTime
})
return json.Marshal(u)
}

Expand Down
Loading