Skip to content

workflow: resharding workflow: Implement checkpointing.#2495

Merged
michael-berlin merged 7 commits intovitessio:masterfrom
wangyipei01:workflow-control
Feb 23, 2017
Merged

workflow: resharding workflow: Implement checkpointing.#2495
michael-berlin merged 7 commits intovitessio:masterfrom
wangyipei01:workflow-control

Conversation

@wangyipei01
Copy link
Copy Markdown
Contributor

@wangyipei01 wangyipei01 commented Jan 23, 2017

Defined structure to track status per tasks in workflowstatus proto.
Demonstrated its usage in function Run. In each step, the workflow will
first check the status, then generates the tasks parameters for
unfinished tasks. After the execution of each task, it will update the
status as Done/Failed. This status update is verified in the unit test
through a happy path test.


This change is Reviewable

@wangyipei01 wangyipei01 force-pushed the workflow-control branch 2 times, most recently from 49d6d5f to 9a8eea5 Compare January 25, 2017 23:23
@mberlin-bot
Copy link
Copy Markdown

I've only reviewed the updated proto definitions.

Looks very good. I've left a couple minor comments which are easy to fix. In particular, I like that you documented each message and field :)


Reviewed 1 of 1 files at r1, 1 of 4 files at r3.
Review status: 2 of 6 files reviewed at latest revision, 8 unresolved discussions, some commit checks broke.


proto/workflow.proto, line 64 at r3 (raw file):

// TaskArribute includes the parameters the task needs.
// Not all fields are necessary for all tasks.
message TaskAttribute {

This can be removed, see below.


proto/workflow.proto, line 73 at r3 (raw file):

// Task is the data structure that stores the execution status and the attributes
// of a task.
message Task {

Please move this message after the first use of it i.e. below "WorkflowCheckpoint". This will make it easier to read the file from top to bottom and a reader does not have to "cache" message definitions in his brain which aren't used yet.


proto/workflow.proto, line 74 at r3 (raw file):

WorkflowState

Reusing the message for the task state sounds great. But in that case you should rename it to "State".

For now I suggest not to reuse it. Instead, please duplicate the "WorkflowState" message as a new message "TaskState".

Once you've finished the implementation, you'll know if you can actually reuse the same message :)


proto/workflow.proto, line 75 at r3 (raw file):

TaskAttribute attribute

Please change this to a map of strings. This way it's more generic and not resharding specific.

Then also rename the field to "attributes" because it will store multiple attributes.


proto/workflow.proto, line 78 at r3 (raw file):

WorkflowCheckpoints

nit: WorkflowCheckpoint (without the s)


proto/workflow.proto, line 79 at r3 (raw file):

is to

nit: This is missing a word between "is" and "to". Change it e.g. to "is used to".

I would add stronger language here e.g.:

// code_version is used to detect incompatibilities between the version of the workflow which is currently run and the one which was used to write the checkpoint. If they do not match, the workflow must not continue. Workflow authors must increase the version in their code (but not in the checkpoint) when incompatibilities are introduced.

proto/workflow.proto, line 81 at r3 (raw file):

string

Let's use an int for this. It should be a simple constant in your code which will only be incremented when you're changing the code and it becomes out of sync with older state versions.


proto/workflow.proto, line 86 at r3 (raw file):

// settings includes information like source shards, destination shards for the
// workflow

Keep the comment slightly more generic: "settings contains workflow specific data e.g. the resharding workflow would store the source and destination shards".

(It currently sounds like this message is specific to the resharding workflow, but it doesn't have to be.)


Comments from Reviewable

@wangyipei01
Copy link
Copy Markdown
Contributor Author

Review status: 2 of 6 files reviewed at latest revision, 8 unresolved discussions, some commit checks broke.


proto/workflow.proto, line 64 at r3 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

This can be removed, see below.

Done.


proto/workflow.proto, line 73 at r3 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please move this message after the first use of it i.e. below "WorkflowCheckpoint". This will make it easier to read the file from top to bottom and a reader does not have to "cache" message definitions in his brain which aren't used yet.

Done.


proto/workflow.proto, line 74 at r3 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

WorkflowState

Reusing the message for the task state sounds great. But in that case you should rename it to "State".

For now I suggest not to reuse it. Instead, please duplicate the "WorkflowState" message as a new message "TaskState".

Once you've finished the implementation, you'll know if you can actually reuse the same message :)

Done.


proto/workflow.proto, line 75 at r3 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

TaskAttribute attribute

Please change this to a map of strings. This way it's more generic and not resharding specific.

Then also rename the field to "attributes" because it will store multiple attributes.

Done.


proto/workflow.proto, line 78 at r3 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

WorkflowCheckpoints

nit: WorkflowCheckpoint (without the s)

Done.


proto/workflow.proto, line 79 at r3 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

is to

nit: This is missing a word between "is" and "to". Change it e.g. to "is used to".

I would add stronger language here e.g.:

// code_version is used to detect incompatibilities between the version of the workflow which is currently run and the one which was used to write the checkpoint. If they do not match, the workflow must not continue. Workflow authors must increase the version in their code (but not in the checkpoint) when incompatibilities are introduced.

Done.


proto/workflow.proto, line 81 at r3 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

string

Let's use an int for this. It should be a simple constant in your code which will only be incremented when you're changing the code and it becomes out of sync with older state versions.

Done.


proto/workflow.proto, line 86 at r3 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

// settings includes information like source shards, destination shards for the
// workflow

Keep the comment slightly more generic: "settings contains workflow specific data e.g. the resharding workflow would store the source and destination shards".

(It currently sounds like this message is specific to the resharding workflow, but it doesn't have to be.)

Done.


Comments from Reviewable

@mberlin-bot
Copy link
Copy Markdown

Reviewed 3 of 5 files at r2, 5 of 5 files at r4.
Review status: all files reviewed at latest revision, 17 unresolved discussions, some commit checks failed.


go/vt/workflow/resharding/checkpoint.go, line 15 at r4 (raw file):

// Checkpoint defines the interface for checkpointer.
type Checkpoint interface {

For now you don't need this interface. Instead, let's have only the one implementation and use it everywhere.

(Instead of using CheckpointFile, you can use the in-memory topology in unit tests.)


go/vt/workflow/resharding/checkpoint.go, line 16 at r4 (raw file):

// Checkpoint defines the interface for checkpointer.
type Checkpoint interface {
	CheckpointFunc(*workflowpb.WorkflowCheckpoint) error

Please remove the suffix Func because it's redundant.


go/vt/workflow/resharding/parallel_runner.go, line 16 at r4 (raw file):

	Checkpoints *workflowpb.WorkflowCheckpoint
	// checkpointMutex is used for protecting data access during checkpointing.
	checkpointMutex sync.Mutex

nit: Please use Mu instead of Mutex as suffix in the variable name to be consistent with the rest of the code base.


go/vt/workflow/resharding/parallel_runner.go, line 16 at r4 (raw file):

	Checkpoints *workflowpb.WorkflowCheckpoint
	// checkpointMutex is used for protecting data access during checkpointing.
	checkpointMutex sync.Mutex

nit: Please move this above the fields which are guarded by the mutex.


go/vt/workflow/resharding/parallel_runner.go, line 20 at r4 (raw file):

// Run is entry point for controling task executions.
func (p ParallelRunner) Run(stepName string,

nit: As agreed during the last team meeting, please put the complete function signature on a single line.


go/vt/workflow/resharding/parallel_runner.go, line 20 at r4 (raw file):

stepName string

See my comment below in "extractTasks". This function must have the list of tasks as input and not just the step/phase name. Generating the list of tasks is outside the scope of this function.


go/vt/workflow/resharding/parallel_runner.go, line 28 at r4 (raw file):

	// sem is a channel used to control the level of concurrency.
	var ec concurrency.AllErrorRecorder
	sem := make(chan bool, concurrencyLevel)

Please use our Semaphore implementation from the go/sync2 package.


go/vt/workflow/resharding/parallel_runner.go, line 39 at r4 (raw file):

			}

			p.checkpointMutex.Lock()

Please move this into its own class (struct). That class should have a method with which you can update the state of a particular task.

This way your ParallelRunner does not need to know the WorkflowCheckpoint protobuf message at all and you can also remove the mutex from ParallelRunner. Instead, that mutex should be part of the new class.


go/vt/workflow/resharding/parallel_runner.go, line 56 at r4 (raw file):

func (p ParallelRunner) extractTasks(tasks map[string]*workflowpb.Task, settings map[string]string, stepName string) []*workflowpb.Task {
	var executeTasks []*workflowpb.Task
	for _, taskID := range strings.Split(settings[stepName], ",") {

settings should store workflow specific settings e.g. source_shards would have the list of the source shards. But it should not store the actual task names. You'll have to generate these when you execute the workflow.


go/vt/workflow/resharding/parallel_runner_test.go, line 24 at r4 (raw file):

		return err
	}
	time.Sleep(time.Duration(sleepDuration) * time.Millisecond)

Let's not use time.Sleep() in tests because they'll prolong the execution of this test and therefore the overall execution of tests.

Instead, unit tests should be as fast as possible.


go/vt/workflow/resharding/parallel_runner_test.go, line 66 at r4 (raw file):

	if err := p.Run(sleepStepName, executeSleep, cp, taskNum); err != nil {
		t.Errorf("%s: Parallel Runner should not fail", err)
	}

This test is missing checks e.g. you could check in the checkpoint that all tasks are changed to "Done".


proto/workflow.proto, line 73 at r3 (raw file):

Previously, wangyipei01 wrote…

Done.

This is not done yet.


proto/workflow.proto, line 79 at r3 (raw file):

Previously, wangyipei01 wrote…

Done.

Please add the comment I wrote or a similar one. It's necessary to document the API here.


proto/workflow.proto, line 17 at r4 (raw file):

  Running = 1;
  Done = 2;
  Failed = 3;

Please remove this again.


proto/workflow.proto, line 63 at r4 (raw file):

TaskAttribute

No need for this indirection. Please embed it in the task message instead.


proto/workflow.proto, line 67 at r4 (raw file):

}

// Prefix "Task" is used here for naming since protoc dosen't allow defining same

No need for this explanation. This can be removed.

Instead, please make sure that all your comments follow the style>

// <message name> ... .

i.e. it must start with // TaskState ...


proto/workflow.proto, line 95 at r4 (raw file):

sotre

typo: store


Comments from Reviewable

@wangyipei01-bot
Copy link
Copy Markdown

Review status: all files reviewed at latest revision, 17 unresolved discussions, some commit checks failed.


go/vt/workflow/resharding/checkpoint.go, line 15 at r4 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

For now you don't need this interface. Instead, let's have only the one implementation and use it everywhere.

(Instead of using CheckpointFile, you can use the in-memory topology in unit tests.)

Done.


go/vt/workflow/resharding/checkpoint.go, line 16 at r4 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please remove the suffix Func because it's redundant.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 16 at r4 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

nit: Please move this above the fields which are guarded by the mutex.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 16 at r4 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

nit: Please use Mu instead of Mutex as suffix in the variable name to be consistent with the rest of the code base.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 20 at r4 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

nit: As agreed during the last team meeting, please put the complete function signature on a single line.

Done.


go/vt/workflow/resharding/parallel_runner_test.go, line 24 at r4 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Let's not use time.Sleep() in tests because they'll prolong the execution of this test and therefore the overall execution of tests.

Instead, unit tests should be as fast as possible.

I just keep the print function there now.


go/vt/workflow/resharding/parallel_runner_test.go, line 66 at r4 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

This test is missing checks e.g. you could check in the checkpoint that all tasks are changed to "Done".

Done.


proto/workflow.proto, line 73 at r3 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

This is not done yet.

Done.


proto/workflow.proto, line 79 at r3 (raw file):

// code_version is used to detect incompatibilities between the version of the workflow which is currently run and the one which was used to write the checkpoint. If they do not match, the workflow must not continue. Workflow authors must increase the version in their code (but not in the checkpoint) when incompatibilities are introduced.

Done.


proto/workflow.proto, line 17 at r4 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please remove this again.

Done.


proto/workflow.proto, line 63 at r4 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

TaskAttribute

No need for this indirection. Please embed it in the task message instead.

Done.


proto/workflow.proto, line 67 at r4 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

No need for this explanation. This can be removed.

Instead, please make sure that all your comments follow the style>

// <message name> ... .

i.e. it must start with // TaskState ...

Done.


proto/workflow.proto, line 95 at r4 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

sotre

typo: store

Done.


Comments from Reviewable

@mberlin-bot
Copy link
Copy Markdown

Changes look good to me. I've left many small comments which are mostly nits and should be easy to address.

Can you please use it in the workflow itself as well and publish that code?


Reviewed 5 of 5 files at r5.
Review status: all files reviewed at latest revision, 33 unresolved discussions, some commit checks failed.


go/vt/workflow/resharding/checkpoint.go, line 14 at r5 (raw file):

Checkpoint

I suggest to call this CheckpointWriter because it's more than just a checkpoint.


go/vt/workflow/resharding/checkpoint.go, line 16 at r5 (raw file):

type Checkpoint struct {
	topoServer topo.Server
	// checkpointMu is used for protecting data access during checkpointing.

Please add a newline before this.

That's because all fields guarded by one mutex should be in one group with the mutex definition at the top. See also: https://talks.golang.org/2014/readability.slide#21


go/vt/workflow/resharding/checkpoint.go, line 18 at r5 (raw file):

wcp

Let's not use abbreviations. Instead, this field can be called checkpoint.


go/vt/workflow/resharding/checkpoint.go, line 23 at r5 (raw file):

Update

Please rename this to UpdateTask because that better describes what this does.


go/vt/workflow/resharding/checkpoint.go, line 26 at r5 (raw file):

	c.checkpointMu.Lock()
	defer c.checkpointMu.Unlock()
	c.wcp.Tasks[taskID].State = status

Please add a newline here.

That makes it easier to see what's the mutex boiler plate code and what's the actual core of the function.


go/vt/workflow/resharding/checkpoint.go, line 31 at r5 (raw file):

Store

Since it's called SaveWorkflow, let's call this method Save to be more consistent.


go/vt/workflow/resharding/checkpoint.go, line 34 at r5 (raw file):

data, err = json.Marshal(c.wcp)

You can simplify this. See checkpointLocked in sleep_workflow.go which assigns the data directly to wi.Data.


go/vt/workflow/resharding/checkpoint.go, line 42 at r5 (raw file):

}

// CheckpointFile checkpoints the data into local files. This is used for debugging.

Please remove deleted code. if you use new commits, you could get it back from the history instead.


go/vt/workflow/resharding/parallel_runner.go, line 23 at r5 (raw file):

// Each phase has its own ParallelRunner object.
type ParallelRunner struct {
	// TODO(yipeiw) : ParallelRunner should fields for per-task controllable actions.

nit: Missing verb: probably "have"?


go/vt/workflow/resharding/parallel_runner.go, line 30 at r5 (raw file):

runTasks

Just tasks.


go/vt/workflow/resharding/parallel_runner.go, line 31 at r5 (raw file):

parallelNum

Please name this concurrency.


go/vt/workflow/resharding/parallel_runner.go, line 31 at r5 (raw file):

The task will not run in this case.

Having the code deadlock won't be nice.

Instead, you could add a default clause below as follows:

default:
  panic(fmt.Sprintf("BUG: Invalid concurrency level: %v", concurrencyLevel))

go/vt/workflow/resharding/parallel_runner.go, line 55 at r5 (raw file):

			if err := executeFunc(t.Attributes); err != nil {
				status = workflowpb.TaskState_TaskNotStarted
				t.Error = fmt.Sprintf("%v", err)

Error() will return the string representation as well. That's simpler:

t.Error = err.Error()


go/vt/workflow/resharding/parallel_runner.go, line 69 at r5 (raw file):

	}
	// Wait until all running jobs are done.
	for i := 0; i < parallelNum; i++ {

optional: While it's neat that you can reuse the semaphore for this, I think it is more straight forward to use a WaitGroup here.


go/vt/workflow/resharding/parallel_runner_test.go, line 22 at r5 (raw file):

fmt.Printf

By default, tests should not output anything. Sometimes, this is unavoidable e.g. when testing an error case which will also log.

But here it's easy to avoid. Please use "t.Logf" instead where "t" is testing.T. The only downside is that you get "t" only from within the test and not as global variable.


go/vt/workflow/resharding/parallel_runner_test.go, line 26 at r5 (raw file):

taskNameOfPrint

Please avoid Of here and instead write it as multiple words e.g. printTaskName.


go/vt/workflow/resharding/parallel_runner_test.go, line 26 at r5 (raw file):

num string

Make this an int please.


go/vt/workflow/resharding/parallel_runner_test.go, line 38 at r5 (raw file):

Number

Let's use lower case for all task attributes to be consistent.


go/vt/workflow/resharding/parallel_runner_test.go, line 52 at r5 (raw file):

wcp.Settings["numbers"]

In this example, the settings entry should be called "count" and it should only have the number of tasks in it.

You could make this task count a parameter (type "int") of this function and then you can reuse it for the initialization and running the tasks.


go/vt/workflow/resharding/parallel_runner_test.go, line 59 at r5 (raw file):

}

func TestParallelRunner(t *testing.T) {

Same comment as for the protobuf file: Please organize the elements from top to bottom where it makes sense.


go/vt/workflow/resharding/parallel_runner_test.go, line 68 at r5 (raw file):

=

Use := instead and do not declare the two variables explicitly.


go/vt/workflow/resharding/parallel_runner_test.go, line 76 at r5 (raw file):

&Checkpoint{

Please define a constructor instead i.e. NewCheckpointWriter and use it here.


go/vt/workflow/resharding/parallel_runner_test.go, line 83 at r5 (raw file):

	cp.Store()

	var p *ParallelRunner

This is going to be nil. Instead, I suggest the following:

p := &ParallelRunner{}


go/vt/workflow/resharding/parallel_runner_test.go, line 86 at r5 (raw file):

t.Errorf

This should be a t.Fatalf. Unlike Errorf, it doesn't resume the test execution.


go/vt/workflow/resharding/parallel_runner_test.go, line 89 at r5 (raw file):

	}

	//Check whether all tasks are in finished status.

nit: Missing space after //.


proto/workflow.proto, line 73 at r3 (raw file):

Previously, wangyipei01-bot wrote…

Done.

Please see my first comment. I want you to move this message further done the file.


proto/workflow.proto, line 69 at r5 (raw file):

task_id

Just id because task_ is already part of the message name.


Comments from Reviewable

@wangyipei01-bot
Copy link
Copy Markdown

go/vt/workflow/resharding/parallel_runner.go, line 31 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

parallelNum

Please name this concurrency.

this name is the same with concurrency package, go/vt/concurrency, and I used this package in parallel_runner. If I changed to this name, it will leads to error.


Comments from Reviewable

@wangyipei01-bot
Copy link
Copy Markdown

go/vt/workflow/resharding/parallel_runner.go, line 69 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

optional: While it's neat that you can reuse the semaphore for this, I think it is more straight forward to use a WaitGroup here.

is there a neat way to support both parallel and sequential call using WaitGroup? I only know it's easy to use WaitGroup if we want to set all tasks run in parallel.


Comments from Reviewable

@wangyipei01-bot
Copy link
Copy Markdown

go/vt/workflow/resharding/parallel_runner_test.go, line 86 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

t.Errorf

This should be a t.Fatalf. Unlike Errorf, it doesn't resume the test execution.

Done. should I also change it to Fatalf when p.Run generate en error? it seems that the job should also not continue in that case.


Comments from Reviewable

@wangyipei01-bot
Copy link
Copy Markdown

Review status: 9 of 10 files reviewed at latest revision, 33 unresolved discussions, some commit checks failed.


go/vt/workflow/resharding/checkpoint.go, line 14 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Checkpoint

I suggest to call this CheckpointWriter because it's more than just a checkpoint.

Done.


go/vt/workflow/resharding/checkpoint.go, line 16 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please add a newline before this.

That's because all fields guarded by one mutex should be in one group with the mutex definition at the top. See also: https://talks.golang.org/2014/readability.slide#21

Done.


go/vt/workflow/resharding/checkpoint.go, line 18 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

wcp

Let's not use abbreviations. Instead, this field can be called checkpoint.

Done.


go/vt/workflow/resharding/checkpoint.go, line 23 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Update

Please rename this to UpdateTask because that better describes what this does.

Done.


go/vt/workflow/resharding/checkpoint.go, line 26 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please add a newline here.

That makes it easier to see what's the mutex boiler plate code and what's the actual core of the function.

Done.


go/vt/workflow/resharding/checkpoint.go, line 31 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Store

Since it's called SaveWorkflow, let's call this method Save to be more consistent.

Done.


go/vt/workflow/resharding/checkpoint.go, line 34 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

data, err = json.Marshal(c.wcp)

You can simplify this. See checkpointLocked in sleep_workflow.go which assigns the data directly to wi.Data.

Done.


go/vt/workflow/resharding/checkpoint.go, line 42 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please remove deleted code. if you use new commits, you could get it back from the history instead.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 20 at r4 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

stepName string

See my comment below in "extractTasks". This function must have the list of tasks as input and not just the step/phase name. Generating the list of tasks is outside the scope of this function.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 28 at r4 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please use our Semaphore implementation from the go/sync2 package.

this is delayed. (the current API made my implementation more complex without any benefit)


go/vt/workflow/resharding/parallel_runner.go, line 39 at r4 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please move this into its own class (struct). That class should have a method with which you can update the state of a particular task.

This way your ParallelRunner does not need to know the WorkflowCheckpoint protobuf message at all and you can also remove the mutex from ParallelRunner. Instead, that mutex should be part of the new class.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 56 at r4 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

settings should store workflow specific settings e.g. source_shards would have the list of the source shards. But it should not store the actual task names. You'll have to generate these when you execute the workflow.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 23 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

nit: Missing verb: probably "have"?

Done.


go/vt/workflow/resharding/parallel_runner.go, line 30 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

runTasks

Just tasks.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 31 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

The task will not run in this case.

Having the code deadlock won't be nice.

Instead, you could add a default clause below as follows:

default:
  panic(fmt.Sprintf("BUG: Invalid concurrency level: %v", concurrencyLevel))

Done.


go/vt/workflow/resharding/parallel_runner.go, line 55 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Error() will return the string representation as well. That's simpler:

t.Error = err.Error()

Done.


go/vt/workflow/resharding/parallel_runner_test.go, line 22 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

fmt.Printf

By default, tests should not output anything. Sometimes, this is unavoidable e.g. when testing an error case which will also log.

But here it's easy to avoid. Please use "t.Logf" instead where "t" is testing.T. The only downside is that you get "t" only from within the test and not as global variable.

Done.


go/vt/workflow/resharding/parallel_runner_test.go, line 26 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

taskNameOfPrint

Please avoid Of here and instead write it as multiple words e.g. printTaskName.

Done.


go/vt/workflow/resharding/parallel_runner_test.go, line 26 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

num string

Make this an int please.

Done.


go/vt/workflow/resharding/parallel_runner_test.go, line 38 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Number

Let's use lower case for all task attributes to be consistent.

Done.


go/vt/workflow/resharding/parallel_runner_test.go, line 68 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

=

Use := instead and do not declare the two variables explicitly.

Done.


go/vt/workflow/resharding/parallel_runner_test.go, line 76 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

&Checkpoint{

Please define a constructor instead i.e. NewCheckpointWriter and use it here.

Done.


go/vt/workflow/resharding/parallel_runner_test.go, line 83 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

This is going to be nil. Instead, I suggest the following:

p := &ParallelRunner{}

Done.


go/vt/workflow/resharding/parallel_runner_test.go, line 89 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

nit: Missing space after //.

Done.


proto/workflow.proto, line 73 at r3 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please see my first comment. I want you to move this message further done the file.

Done. I forgot to check-in this file.


proto/workflow.proto, line 69 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

task_id

Just id because task_ is already part of the message name.

Done.


Comments from Reviewable

@mberlin-bot
Copy link
Copy Markdown

First round of comments.

Overall, the structure looks good. I would like to see some changes to how the tasks are stored and how the migrate tasks are executed.


Reviewed 1 of 2 files at r6, 3 of 5 files at r8, 5 of 8 files at r9.
Review status: 8 of 11 files reviewed at latest revision, 49 unresolved discussions, some commit checks broke.


go/vt/topo/workflow.go, line 30 at r6 (raw file):

}

func (w *WorkflowInfo) SetVersion(v Version) {

Please remove this again.

Note that this version cannot be used for the version which you have to store in the proto. It's only used for interactions with the topology.


go/vt/workflow/node.go, line 105 at r9 (raw file):

type Node struct {
	// nodeManager is the NodeManager handling this Node.
	// It is set by AddRootNode, and propagated by AddChildren.

Please undo this change.

I'll fix the documentation in a separate PR.


go/vt/workflow/resharding/checkpoint.go, line 13 at r9 (raw file):

)

// CheckpointWriter save the checkpoint data into topology server.

nit: saves


go/vt/workflow/resharding/checkpoint.go, line 32 at r9 (raw file):

checkpointing the update

This doesn't reflect that you're writing the complete checkpoint. I suggest to change it as follows:

// UpdateTask updates the task status in the checkpoint copy and writes the full checkpoint to the topology.


go/vt/workflow/resharding/checkpoint.go, line 43 at r9 (raw file):

Save()

You assume that the lock on the mutex is already hold when this is called.

We signal this in the code by adding the suffix "Locked".

Given that you want to export this method as well, I suggest the following:

  • rename this method to "saveLocked"
  • add an additional method "Save" which obtains the lock and then calls "saveLocked".

go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 33 at r9 (raw file):

	horizontalReshardingFactoryName = "horizontal_resharding"

	CopySchemaName              = "copy_schema"

Please create an enum type for these phases.

I like that you're already using a common suffix ("Name"). But that's use a prefix instead and let's name it "Phase" to be more clear what this thing is. Use this prefix for the enum type name as well.

Example:

type PhaseType string

// Different phases the resharding workflow goes through.
const (
	PhaseCopySchema PhaseType = "copy_schema"
	PhaseClone PhaseType = "clone"
...
)

go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 35 at r9 (raw file):

WaitFilteredReplication

nit: Let's not shorten this one because there's a difference between the MySQL replication and the Vitess filtered replication.

Therefore, please call it "PhaseWaitForFilteredReplication" and "wait_for_filtered_replication".


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 61 at r9 (raw file):

splitCloneUINode

Please rename this to "cloneUINode" to be consistent with the phase name.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 63 at r9 (raw file):

splitDiffUINode

Please rename this to "diffUINode" to be consistent with the phase name.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 94 at r9 (raw file):

func (hw *HorizontalReshardingWorkflow) runWorkflow() error {
	copyTasks := hw.GenerateTasks(hw.checkpoint, CopySchemaName)

There is a better way to generalize this:

Pass in the name of the phase and the list of shards.

This way you don't need any type switch in the method itself and it's clear in the calling code over which list you're iterating.

Please also rename the method to "getTasks" for example because the task objects are already generated.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 96 at r9 (raw file):

w.runCopySchema, PARALLEL

Please move these two parameters into the constructor of ParallelRunner as well. At the end, Run() shouldn't have any argument.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 140 at r9 (raw file):

HorizontalReshard

HorizontalResharding


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 144 at r9 (raw file):

workflowProto

Please use the same name as in the interface i.e. "w".


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 159 at r9 (raw file):

	workflowProto.Name = fmt.Sprintf("Horizontal resharding on keyspace %s", *keyspace)

	ts := topo.Open()

Please move these two lines into "initCheckpoint". You don't need the "ts" object here and this way you avoid having to pass it.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 161 at r9 (raw file):

workflowCheckpoint

That's too verbose/redundant. Let's go with "checkpoint".


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 174 at r9 (raw file):

workflowProto

Please use the same name as in the interface i.e. "w".


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 177 at r9 (raw file):

workflowCheckpoint

checkpoint


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 191 at r9 (raw file):

"clone"

Please be consistent and use the constants everywhere.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 238 at r9 (raw file):

}

// createSubWorkflows creates a per source shard horizontal resharding workflow for each source shard in the keyspace.

Please update the comment.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 263 at r9 (raw file):

		sourceShardList = append(sourceShardList, s)
		worker := vtworkers[i]
		for _, ds := range destinationShards {

You're mixing here two different things in one method.

Instead, please:

  • write a separate function which gets you the list of source and destination shards
  • then go over these two lists and create the tasks.

go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 267 at r9 (raw file):

			destinationShardList = append(destinationShardList, d)

			updatePerDestinationTask(keyspace, s, d, worker, CopySchemaName, taskMap)

a) Please change this to the same structure as the original workflow i.e. create tasks in the same order.
b) Don't pass arguments when they are not needed e.g. the migration doesn't need the vtworker address.

If you want to generalize things, I suggest the following:

Write a "addTasks" function which has the phase name and the list of shards as argument. Additionally, it should have "func (i int, name string) map[string]string" as argument which returns the attributes depending on the index in the list.

With that, you can write the task creation like this:

addTasks(tasks, PhaseClone, sourceShards, func (i int, name string) map[string]string {
  return map[string]string{
    "vtworker": vtworkers[i],
    "keyspace": keyspace,
    "source_shard": name,
  }
})

go/vt/workflow/resharding/parallel_runner.go, line 30 at r8 (raw file):

actionRegistery

typo: actionRegistry


go/vt/workflow/resharding/parallel_runner.go, line 37 at r8 (raw file):

	node *workflow.Node
	// mu is used to protect access to retryChannel.
	mu sync.Mutex

As discussed offline, the mutex can be removed here.


go/vt/workflow/resharding/parallel_runner.go, line 91 at r8 (raw file):

n

The scope of this variable is longer than a couple of lines.

Please call it "node" instead. That will make it easier to read.


go/vt/workflow/resharding/parallel_runner.go, line 116 at r8 (raw file):

					continue
				case <-p.ctx.Done():
					p.closeRetryChannel(p.actionRegistery[taskId])

Instead of closing the retry channel here, please delete the control object from the registry.


go/vt/workflow/resharding/parallel_runner.go, line 134 at r8 (raw file):

	log.Errorf("action function is called pathName:%v, %v, name: %v\n", pathName, getTaskId(pathName), name)

	c, ok := p.actionRegistery[getTaskId(pathName)]

Access to actionRegistry requires a mutex because two different threads (the execution Go routine and the http handler which calls Action()) can access it.


go/vt/workflow/resharding/parallel_runner.go, line 142 at r8 (raw file):

	case "Retry":
		p.closeRetryChannel(c)
		c.node.BroadcastChanges(false /* updateChildren */)

This should be part of the "closeRetryChannel" method.


go/vt/workflow/resharding/parallel_runner.go, line 155 at r8 (raw file):

// closeRetryChannel closes the retryChannel and empties the Actions list in the rootUINode
// to indicate that the channel is closed and Retry action is not waited for anymore.
func (p *ParallelRunner) closeRetryChannel(c *Control) {

I suggest to name this more generic e.g. "triggerRetry" because the implementation details are not relevant in the method name.


go/vt/workflow/resharding/parallel_runner.go, line 155 at r8 (raw file):

(p *ParallelRunner) closeRetryChannel(c *Control)

This method should be part of "Control" and not "ParallelRunner".


go/vt/workflow/resharding/task_helper.go, line 1 at r9 (raw file):

package resharding

Please rename this file to tasks.go because it contains the core logic for the different tasks.


go/vt/workflow/resharding/task_helper.go, line 16 at r9 (raw file):

%s_%s_%s

shardType is not needed. Let's use / as name instead.


go/vt/workflow/resharding/task_helper.go, line 19 at r9 (raw file):

copy of tasks

This is not a copy. Instead, you're just returning the list of selected tasks.


go/vt/workflow/resharding/task_helper.go, line 37 at r9 (raw file):

}

// runCopySchemaPerShard runs CopySchema for a destination shard.

Fix comment (wrong method name).


go/vt/workflow/resharding/task_helper.go, line 39 at r9 (raw file):

attr

attributes (no abbreviations please)


go/vt/workflow/resharding/task_helper.go, line 46 at r9 (raw file):

CopySchemaShardFromShard

Please change this to CopySchemaShard because that's also the name of the respective vtctl command. "FromShard" is an implementation detail which we don't want to expose to the user :)


go/vt/workflow/resharding/task_helper.go, line 118 at r9 (raw file):

for _, servedType := range servedTypeParams {

This is going to change the execution order. Your code would migrate all types of a shard, one shard at a time.

But we want to migrate all RDONLY types first, then REPLICA and then MASTER.

Given that, please change the code as follows:

Split the "migrate" phase into three phases i.e. "migrate_rdonly", "migrate_replica" and "migrate_master". Each phase is going to need its own UI nodes and its own ParallelRunner for the execution.


Comments from Reviewable

@mberlin-bot
Copy link
Copy Markdown

Comments for the changes to ParallelRunner. The structure of the test for the retry looks very good.

Please don't forgot to add the file which has the code for the retry controller. (Note: It may not be worth to have that in a separate file.)


Reviewed 1 of 8 files at r9.
Review status: 9 of 11 files reviewed at latest revision, 57 unresolved discussions, some commit checks broke.


go/vt/workflow/resharding/parallel_runner.go, line 30 at r8 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

actionRegistery

typo: actionRegistry

Typo is still there.


go/vt/workflow/resharding/parallel_runner.go, line 155 at r8 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

I suggest to name this more generic e.g. "triggerRetry" because the implementation details are not relevant in the method name.

Please rename the method as suggested.


go/vt/workflow/resharding/parallel_runner.go, line 36 at r9 (raw file):

control

controller


go/vt/workflow/resharding/parallel_runner.go, line 47 at r9 (raw file):

p.retryActionRegistery = make(map[string]*RetryController)

Please add this to the call above and return it directly without the intermediate variable p.


go/vt/workflow/resharding/parallel_runner.go, line 97 at r9 (raw file):

				}

				// If task fails, the retry action is enabled.

Before you start the retry, you need to check if the context is still valid.

That's because the error could have been caused by an expired context.

In that case you should not retry.

You can check for that with a non blocking read on the channel returned by ctx.Done(). Here's an example in our code base: https://github.com/youtube/vitess/blob/master/go/vt/wrangler/split.go#L125


go/vt/workflow/resharding/parallel_runner.go, line 104 at r9 (raw file):

				}

				retryAction := &workflow.Action{

Please move this code into a separate method e.g. "addRetryButton". Then, that method can return your retry channel.


go/vt/workflow/resharding/parallel_runner.go, line 122 at r9 (raw file):

				// Block the task execution until the retry action is triggered or the job is canceled.
				select {
				case <-p.retryActionRegistery[taskID].retryChannel:

Unguarded access to retryActionRegistry. Instead of getting the retry channel from the registry, please store it in a local variable when you add it. See also my comment above about having a method return it.


go/vt/workflow/resharding/parallel_runner.go, line 125 at r9 (raw file):

					continue
				case <-p.ctx.Done():
					p.retryActionRegistery = nil

You need to obtain the lock here.

Besides that, unassigning the map could go wrong when another Go routine tries to add a controller object into the map after this.

Instead of deleting the make, please delete only the controller object for this particular retry.


go/vt/workflow/resharding/parallel_runner.go, line 141 at r9 (raw file):

// Action handles the retry action. It implements the interface ActionListener.
func (p *ParallelRunner) Action(ctx context.Context, pathName, name string) error {
	p.mu.Lock()

Right now the actionRegistry is only relevant for the retry and not other actions.

Given that, please change the structure of this method:

First have the switch block which checks the "name". In case of a retry, call a new method which actually triggers the retry. The end result should be that this Action() doesn't access p.retryActionRegistry at all and instead your new method does that.


go/vt/workflow/resharding/parallel_runner.go, line 144 at r9 (raw file):

actionID

nit: Please stay consistent. In the other function you're naming this taskID and not actionID.


go/vt/workflow/resharding/parallel_runner_test.go, line 57 at r9 (raw file):

	// Create UI nodes. Each task has a node. These task nodes are the children of a root node.
	notifications := make(chan []byte, 10)
	nodeManager := workflow.NewNodeManager()

As discussed offline, it would be better if you re-use the workflow library and don't create parts of it (like the NodeManager) yourself. "TestManagerSimpleRun" is an example for such a test. You should write a second, minimal retry workflow which implements the Factory interface. That workflow can fail on the first attempt, expect a "Retry" action and then finally succeed.


go/vt/workflow/resharding/parallel_runner_test.go, line 88 at r9 (raw file):

	if !ok ||
		strings.Contains(string(result), `"children":[]`) ||
		!strings.Contains(string(result), `"name":"task_Sleep_0"`) ||

I like this test and what it's testing.

I suggest to do the following two changes:

  • reduce the number of tasks e.g. from 5 to 2
  • test that one task can be retried while another one is still blocked on the retry

i.e.:

  1. start workflow with the two tasks
  2. let both tasks fail
  3. retry task 2
  4. wait for task 2 to finish after the retry (you can use the notifications to wait for that)
  5. retry task 1
  6. wait for task 1 to finish after the retry

go/vt/workflow/resharding/parallel_runner_test.go, line 124 at r9 (raw file):

mornitor

typo: monitor


go/vt/workflow/resharding/parallel_runner_test.go, line 159 at r9 (raw file):

var waitGroup sync.WaitGroup

better:

wg := sync.WaitGroup{}

(This is how our code does it everywhere else.)


Comments from Reviewable

…e.proto.

Implement ParallelRunner and Checkpointer. Create a simple test for
ParallelRunner.
workflow. Complete the unit test and E2E test for happy path.
unit test to verify this function. Implemented Horizontal Resharding
workflow and tested in unit test and e2e test for the happy path.
for change on node.go in workflow folder.
@wangyipei01-bot
Copy link
Copy Markdown

Review status: 9 of 11 files reviewed at latest revision, 57 unresolved discussions, some commit checks broke.


go/vt/topo/workflow.go, line 30 at r6 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please remove this again.

Note that this version cannot be used for the version which you have to store in the proto. It's only used for interactions with the topology.

Done.


go/vt/workflow/node.go, line 105 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please undo this change.

I'll fix the documentation in a separate PR.

Done.


go/vt/workflow/resharding/checkpoint.go, line 13 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

nit: saves

Done.


go/vt/workflow/resharding/checkpoint.go, line 32 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

checkpointing the update

This doesn't reflect that you're writing the complete checkpoint. I suggest to change it as follows:

// UpdateTask updates the task status in the checkpoint copy and writes the full checkpoint to the topology.

Done.


go/vt/workflow/resharding/checkpoint.go, line 43 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Save()

You assume that the lock on the mutex is already hold when this is called.

We signal this in the code by adding the suffix "Locked".

Given that you want to export this method as well, I suggest the following:

  • rename this method to "saveLocked"
  • add an additional method "Save" which obtains the lock and then calls "saveLocked".

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 33 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please create an enum type for these phases.

I like that you're already using a common suffix ("Name"). But that's use a prefix instead and let's name it "Phase" to be more clear what this thing is. Use this prefix for the enum type name as well.

Example:

type PhaseType string

// Different phases the resharding workflow goes through.
const (
	PhaseCopySchema PhaseType = "copy_schema"
	PhaseClone PhaseType = "clone"
...
)

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 35 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

WaitFilteredReplication

nit: Let's not shorten this one because there's a difference between the MySQL replication and the Vitess filtered replication.

Therefore, please call it "PhaseWaitForFilteredReplication" and "wait_for_filtered_replication".

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 61 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

splitCloneUINode

Please rename this to "cloneUINode" to be consistent with the phase name.

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 63 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

splitDiffUINode

Please rename this to "diffUINode" to be consistent with the phase name.

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 94 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

There is a better way to generalize this:

Pass in the name of the phase and the list of shards.

This way you don't need any type switch in the method itself and it's clear in the calling code over which list you're iterating.

Please also rename the method to "getTasks" for example because the task objects are already generated.

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 96 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

w.runCopySchema, PARALLEL

Please move these two parameters into the constructor of ParallelRunner as well. At the end, Run() shouldn't have any argument.

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 140 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

HorizontalReshard

HorizontalResharding

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 144 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

workflowProto

Please use the same name as in the interface i.e. "w".

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 159 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please move these two lines into "initCheckpoint". You don't need the "ts" object here and this way you avoid having to pass it.

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 161 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

workflowCheckpoint

That's too verbose/redundant. Let's go with "checkpoint".

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 174 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

workflowProto

Please use the same name as in the interface i.e. "w".

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 177 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

workflowCheckpoint

checkpoint

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 191 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

"clone"

Please be consistent and use the constants everywhere.

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 238 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please update the comment.

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 263 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

You're mixing here two different things in one method.

Instead, please:

  • write a separate function which gets you the list of source and destination shards
  • then go over these two lists and create the tasks.

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 267 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

a) Please change this to the same structure as the original workflow i.e. create tasks in the same order.
b) Don't pass arguments when they are not needed e.g. the migration doesn't need the vtworker address.

If you want to generalize things, I suggest the following:

Write a "addTasks" function which has the phase name and the list of shards as argument. Additionally, it should have "func (i int, name string) map[string]string" as argument which returns the attributes depending on the index in the list.

With that, you can write the task creation like this:

addTasks(tasks, PhaseClone, sourceShards, func (i int, name string) map[string]string {
  return map[string]string{
    "vtworker": vtworkers[i],
    "keyspace": keyspace,
    "source_shard": name,
  }
})

Done.


go/vt/workflow/resharding/parallel_runner.go, line 30 at r8 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

actionRegistery

typo: actionRegistry

Done.


go/vt/workflow/resharding/parallel_runner.go, line 37 at r8 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

As discussed offline, the mutex can be removed here.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 91 at r8 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

n

The scope of this variable is longer than a couple of lines.

Please call it "node" instead. That will make it easier to read.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 116 at r8 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Instead of closing the retry channel here, please delete the control object from the registry.

I clean up the entire registry here.


go/vt/workflow/resharding/parallel_runner.go, line 134 at r8 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Access to actionRegistry requires a mutex because two different threads (the execution Go routine and the http handler which calls Action()) can access it.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 142 at r8 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

This should be part of the "closeRetryChannel" method.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 155 at r8 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please rename the method as suggested.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 155 at r8 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

(p *ParallelRunner) closeRetryChannel(c *Control)

This method should be part of "Control" and not "ParallelRunner".

Done.


go/vt/workflow/resharding/parallel_runner.go, line 36 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

control

controller

Done.


go/vt/workflow/resharding/parallel_runner.go, line 47 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

p.retryActionRegistery = make(map[string]*RetryController)

Please add this to the call above and return it directly without the intermediate variable p.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 97 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Before you start the retry, you need to check if the context is still valid.

That's because the error could have been caused by an expired context.

In that case you should not retry.

You can check for that with a non blocking read on the channel returned by ctx.Done(). Here's an example in our code base: https://github.com/youtube/vitess/blob/master/go/vt/wrangler/split.go#L125

Done.


go/vt/workflow/resharding/parallel_runner.go, line 104 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please move this code into a separate method e.g. "addRetryButton". Then, that method can return your retry channel.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 122 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Unguarded access to retryActionRegistry. Instead of getting the retry channel from the registry, please store it in a local variable when you add it. See also my comment above about having a method return it.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 125 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

You need to obtain the lock here.

Besides that, unassigning the map could go wrong when another Go routine tries to add a controller object into the map after this.

Instead of deleting the make, please delete only the controller object for this particular retry.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 141 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Right now the actionRegistry is only relevant for the retry and not other actions.

Given that, please change the structure of this method:

First have the switch block which checks the "name". In case of a retry, call a new method which actually triggers the retry. The end result should be that this Action() doesn't access p.retryActionRegistry at all and instead your new method does that.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 144 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

actionID

nit: Please stay consistent. In the other function you're naming this taskID and not actionID.

Done.


go/vt/workflow/resharding/parallel_runner_test.go, line 52 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

wcp.Settings["numbers"]

In this example, the settings entry should be called "count" and it should only have the number of tasks in it.

You could make this task count a parameter (type "int") of this function and then you can reuse it for the initialization and running the tasks.

Done.


go/vt/workflow/resharding/parallel_runner_test.go, line 59 at r5 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Same comment as for the protobuf file: Please organize the elements from top to bottom where it makes sense.

Done.


go/vt/workflow/resharding/parallel_runner_test.go, line 57 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

As discussed offline, it would be better if you re-use the workflow library and don't create parts of it (like the NodeManager) yourself. "TestManagerSimpleRun" is an example for such a test. You should write a second, minimal retry workflow which implements the Factory interface. That workflow can fail on the first attempt, expect a "Retry" action and then finally succeed.

Done.


go/vt/workflow/resharding/parallel_runner_test.go, line 88 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

I like this test and what it's testing.

I suggest to do the following two changes:

  • reduce the number of tasks e.g. from 5 to 2
  • test that one task can be retried while another one is still blocked on the retry

i.e.:

  1. start workflow with the two tasks
  2. let both tasks fail
  3. retry task 2
  4. wait for task 2 to finish after the retry (you can use the notifications to wait for that)
  5. retry task 1
  6. wait for task 1 to finish after the retry

Done.


go/vt/workflow/resharding/parallel_runner_test.go, line 124 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

mornitor

typo: monitor

Done.


go/vt/workflow/resharding/parallel_runner_test.go, line 159 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

var waitGroup sync.WaitGroup

better:

wg := sync.WaitGroup{}

(This is how our code does it everywhere else.)

Done.


go/vt/workflow/resharding/task_helper.go, line 1 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please rename this file to tasks.go because it contains the core logic for the different tasks.

Done.


go/vt/workflow/resharding/task_helper.go, line 16 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

%s_%s_%s

shardType is not needed. Let's use / as name instead.

Done.


go/vt/workflow/resharding/task_helper.go, line 19 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

copy of tasks

This is not a copy. Instead, you're just returning the list of selected tasks.

Done.


go/vt/workflow/resharding/task_helper.go, line 37 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Fix comment (wrong method name).

Done.


go/vt/workflow/resharding/task_helper.go, line 39 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

attr

attributes (no abbreviations please)

Done.


go/vt/workflow/resharding/task_helper.go, line 46 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

CopySchemaShardFromShard

Please change this to CopySchemaShard because that's also the name of the respective vtctl command. "FromShard" is an implementation detail which we don't want to expose to the user :)

Done.


go/vt/workflow/resharding/task_helper.go, line 118 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

for _, servedType := range servedTypeParams {

This is going to change the execution order. Your code would migrate all types of a shard, one shard at a time.

But we want to migrate all RDONLY types first, then REPLICA and then MASTER.

Given that, please change the code as follows:

Split the "migrate" phase into three phases i.e. "migrate_rdonly", "migrate_replica" and "migrate_master". Each phase is going to need its own UI nodes and its own ParallelRunner for the execution.

Done.


Comments from Reviewable

@mberlin-bot
Copy link
Copy Markdown

First round of comments. Everything is looking good so far and I think it's coming together nicely :)


Reviewed 3 of 10 files at r12, 4 of 8 files at r15.
Review status: 7 of 12 files reviewed at latest revision, 52 unresolved discussions, some commit checks broke.


go/vt/workflow/node.go, line 223 at r15 (raw file):

}

// GetChildByPath returns the child node using its path in the sub-tree

Two things: I would use the term "relative path" in here and rewrite the fact that it's not thread safe.

With that, I suggest:

// GetChildByPath returns the child node which is addressed by the path "subPath" that must be relative to this node.
// The caller must ensure that the node tree is not modified during the call.

go/vt/workflow/node.go, line 228 at r15 (raw file):

func (n *Node) GetChildByPath(subPath string) (*Node, error) {
	parts := strings.Split(subPath, "/")
	s := n

Move this line right before the for statement to make it clearer that it's actually a loop variable in some sense.


go/vt/workflow/node.go, line 228 at r15 (raw file):

s

This name is not very meaningful. How about currentNode? I know it's long but it would be easier to read.


go/vt/workflow/node.go, line 234 at r15 (raw file):

sn

When you rename s above, please rename this one as well. I suggest child.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 159 at r9 (raw file):

Previously, wangyipei01-bot wrote…

Done.

This is not done yet.

Opening and closing the topology is only needed in findSourceAndDestinationShards. Therefore, please do the work there and not here.

When you move it, please also add a TODO comment that we should extend the factory interface to pass in the topo as well.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 231 at r15 (raw file):

	}

	destinationShards := strings.Split(hw.checkpoint.Settings["destination_shards"], ",")

nit: Please swap these two lines because I find it more intuitive that the source comes first and then the destination.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 277 at r15 (raw file):

sourceShardList, destinationShardList

nit: Please remove the prefix "List" because it's redundant. From the type we already know that it's a string slice :)

Please change this throughout the file.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 312 at r15 (raw file):

taskMap

nit: Similar comment as above. Just call it "tasks" and do not include the data type in the variable name.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 312 at r15 (raw file):

func initCheckpointFromShards(keyspace string, vtworkers, sourceShardList, destinationShardList []string) (*workflowpb.WorkflowCheckpoint, error) {
	taskMap := make(map[string]*workflowpb.Task)

Please add a check here that the number of source shards is equal to the number of vtworkers. If not, this function must return an error.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 317 at r15 (raw file):

			"source_shard":      sourceShardList[0],
			"destination_shard": shard,
			"keyspace":          keyspace,

nit: Please move the line with the keyspace to the beginning because it's the coarsest dimension when addressing shards.

Please make this change for the other occurrences as well.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 376 at r15 (raw file):

taskMap map[string]*workflowpb.Task

Please make this the first parameter. It's common across all invocations. This will make it easier to read the method above because the reader sees that "tasks" is the same for each call.


go/vt/workflow/resharding/parallel_runner.go, line 19 at r15 (raw file):

SEQUENTIAL

nit: Let's use here CamelCase and not ALLCAPS i.e. it should be "Sequential" and "Parallel"


go/vt/workflow/resharding/parallel_runner.go, line 34 at r15 (raw file):

	concurrencyLevel level
	executeFunc      func(context.Context, *workflowpb.Task) error
	// mu is used to protect the retryActionRegistery.

Please add a newline before this line.

That's because all fields protected by a mutex must be in a separate group. By following this convention, the code becomes easier to read.


go/vt/workflow/resharding/parallel_runner.go, line 39 at r15 (raw file):

worklfow

typo: workflow


go/vt/workflow/resharding/parallel_runner.go, line 41 at r15 (raw file):

synchroizing

typo: synchronizing


go/vt/workflow/resharding/parallel_runner.go, line 87 at r15 (raw file):

			taskID := t.Id
			for {
				err := p.executeFunc(p.ctx, t)

Before executing the task, please set its state in the checkpoint to TaskRunning.


go/vt/workflow/resharding/parallel_runner.go, line 100 at r15 (raw file):

cancelled

nit: Our code base uses the spelling "canceled" with a single l because Go changed to that as well. Please change it as well.


go/vt/workflow/resharding/parallel_runner.go, line 108 at r15 (raw file):

				}

				fmt.Printf("enabling retry action for task: %v", taskID)

Please remove debug statements.


go/vt/workflow/resharding/parallel_runner.go, line 160 at r15 (raw file):

}

func (p *ParallelRunner) triggerRetry(nodePath string) error {

nit: Please move this right after the Action() method.


go/vt/workflow/resharding/task.go, line 1 at r15 (raw file):

package resharding

Please rename this file to tasks.go because it contains the logic for multiple tasks.


go/vt/workflow/resharding/task.go, line 23 at r15 (raw file):

checkpoint *workflowpb.WorkflowCheckpoint

This is a method of "hw" which has the checkpoint as field. You don't need to pass it in as well.


go/vt/workflow/resharding/task.go, line 30 at r15 (raw file):

	case phaseClone, phaseMigrateRdonly, phaseMigrateReplica, phaseMigrateMaster:
		shards = strings.Split(checkpoint.Settings["source_shards"], ",")
	}

Please add a default case here.

In that, you can add an assertation. For that, we use panic with a string starts with "BUG: ".

Example:

panic(fmt.Sprintf("BUG: unknown phase type: %v", phase))

go/vt/workflow/resharding/task.go, line 41 at r15 (raw file):

func (hw *HorizontalReshardingWorkflow) runCopySchema(ctx context.Context, t *workflowpb.Task) error {
	s := t.Attributes["source_shard"]

nit: Using short variables is a good idea when the scope is short.

But here're you're missing long ("keyspace") and short names ("s"). To be consistent, I suggest to rename the two shard variables to sourceShard and destShard. These are common names which we use in other places as well.


go/vt/workflow/resharding/task.go, line 43 at r15 (raw file):

	s := t.Attributes["source_shard"]
	d := t.Attributes["destination_shard"]
	keyspace := t.Attributes["keyspace"]

nit: Same comment as above. Please always list keyspace first and then the shards.


go/vt/workflow/resharding/task.go, line 46 at r15 (raw file):

	err := hw.wr.CopySchemaShardFromShard(ctx, nil /* tableArray*/, nil /* excludeTableArray */, true, /*includeViews*/
		keyspace, s, keyspace, d, wrangler.DefaultWaitSlaveTimeout)
	if err != nil {

Please move the logging into the parallel runner. This way, these functions become shorter.

e.g. here you can just write return hw.wr.CopySchemaShardFromShard(...).

Please change this throughout the file.


Comments from Reviewable

@wangyipei01-bot
Copy link
Copy Markdown

Review status: 7 of 12 files reviewed at latest revision, 52 unresolved discussions, some commit checks broke.


go/vt/workflow/node.go, line 223 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Two things: I would use the term "relative path" in here and rewrite the fact that it's not thread safe.

With that, I suggest:

// GetChildByPath returns the child node which is addressed by the path "subPath" that must be relative to this node.
// The caller must ensure that the node tree is not modified during the call.

Done.


go/vt/workflow/node.go, line 228 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Move this line right before the for statement to make it clearer that it's actually a loop variable in some sense.

Done.


go/vt/workflow/node.go, line 228 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

s

This name is not very meaningful. How about currentNode? I know it's long but it would be easier to read.

Done.


go/vt/workflow/node.go, line 234 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

sn

When you rename s above, please rename this one as well. I suggest child.

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 159 at r9 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

This is not done yet.

Opening and closing the topology is only needed in findSourceAndDestinationShards. Therefore, please do the work there and not here.

When you move it, please also add a TODO comment that we should extend the factory interface to pass in the topo as well.

Done. TODO comment is added to the interface definition.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 231 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

nit: Please swap these two lines because I find it more intuitive that the source comes first and then the destination.

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 277 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

sourceShardList, destinationShardList

nit: Please remove the prefix "List" because it's redundant. From the type we already know that it's a string slice :)

Please change this throughout the file.

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 312 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

taskMap

nit: Similar comment as above. Just call it "tasks" and do not include the data type in the variable name.

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 312 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please add a check here that the number of source shards is equal to the number of vtworkers. If not, this function must return an error.

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 317 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

nit: Please move the line with the keyspace to the beginning because it's the coarsest dimension when addressing shards.

Please make this change for the other occurrences as well.

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 376 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

taskMap map[string]*workflowpb.Task

Please make this the first parameter. It's common across all invocations. This will make it easier to read the method above because the reader sees that "tasks" is the same for each call.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 19 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

SEQUENTIAL

nit: Let's use here CamelCase and not ALLCAPS i.e. it should be "Sequential" and "Parallel"

Done.


go/vt/workflow/resharding/parallel_runner.go, line 34 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please add a newline before this line.

That's because all fields protected by a mutex must be in a separate group. By following this convention, the code becomes easier to read.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 39 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

worklfow

typo: workflow

Done.


go/vt/workflow/resharding/parallel_runner.go, line 41 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

synchroizing

typo: synchronizing

Done.


go/vt/workflow/resharding/parallel_runner.go, line 87 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Before executing the task, please set its state in the checkpoint to TaskRunning.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 100 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

cancelled

nit: Our code base uses the spelling "canceled" with a single l because Go changed to that as well. Please change it as well.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 108 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please remove debug statements.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 160 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

nit: Please move this right after the Action() method.

Done.


go/vt/workflow/resharding/task.go, line 1 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please rename this file to tasks.go because it contains the logic for multiple tasks.

Done.


go/vt/workflow/resharding/task.go, line 30 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please add a default case here.

In that, you can add an assertation. For that, we use panic with a string starts with "BUG: ".

Example:

panic(fmt.Sprintf("BUG: unknown phase type: %v", phase))

Done.


go/vt/workflow/resharding/task.go, line 41 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

nit: Using short variables is a good idea when the scope is short.

But here're you're missing long ("keyspace") and short names ("s"). To be consistent, I suggest to rename the two shard variables to sourceShard and destShard. These are common names which we use in other places as well.

Done.


go/vt/workflow/resharding/task.go, line 43 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

nit: Same comment as above. Please always list keyspace first and then the shards.

Done.


go/vt/workflow/resharding/task.go, line 46 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please move the logging into the parallel runner. This way, these functions become shorter.

e.g. here you can just write return hw.wr.CopySchemaShardFromShard(...).

Please change this throughout the file.

Done.


Comments from Reviewable

@mberlin-bot
Copy link
Copy Markdown

Last round of small comments.

Once they're addressed, this is LGTM.

As discussed offline, I'll review the test code separately when you switch it to the new mocking library.

Please also re-run "make proto". Travis is currently failing because of that: https://travis-ci.org/youtube/vitess/jobs/204284333


Reviewed 4 of 6 files at r16.
Review status: 8 of 13 files reviewed at latest revision, 25 unresolved discussions, some commit checks broke.


go/vt/workflow/node.go, line 223 at r15 (raw file):

Previously, wangyipei01-bot wrote…

Done.

Sorry to nitpick here, but I think the changed comment is not clear enough e.g. it doesn't properly express that "subPath" is a relative path which is relative to this node. Another minor issue is that it should be "thread safe" and not "concurrency safe" and that this comment is not detailed enough: You could actually call this method concurrently, but the node tree must not change during each call.

Given that, why not just use the comment I wrote?


go/vt/workflow/node.go, line 228 at r15 (raw file):

Previously, wangyipei01-bot wrote…

Done.

I meant the declaration of the node should be right before the for loop because it's basically a loop variable:

// Find the subnode if needed.
parts := strings.Split(subPath, "/")

currentNode := n
for i := 0; i < len(parts); i++ {

go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 6 at r16 (raw file):

// The workflow assumes that there are as many vtworker processes running as source shards.
// Plus, these vtworker processes must be reachable via RPC.
// TO DO: it can be used to save checkpointer

Please remove this TODO comment.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 35 at r16 (raw file):

worklow

typo: workflow


go/vt/workflow/resharding/parallel_runner.go, line 35 at r16 (raw file):

Registery

typo: Registry


go/vt/workflow/resharding/parallel_runner.go, line 89 at r16 (raw file):

of

nit: This should be "to": Change something to something.

Same below ("to done" instead of "of done").


go/vt/workflow/resharding/parallel_runner.go, line 97 at r16 (raw file):

				}
				err := p.executeFunc(p.ctx, t)
				// Update the task status of done in the checkpoint

nit: Missing period.


go/vt/workflow/resharding/parallel_runner.go, line 104 at r16 (raw file):

is finished

nit: has finished


go/vt/workflow/resharding/task.go, line 1 at r15 (raw file):

Previously, wangyipei01-bot wrote…

Done.

File is not renamed yet.


Comments from Reviewable

@wangyipei01-bot
Copy link
Copy Markdown

Review status: 8 of 13 files reviewed at latest revision, 25 unresolved discussions, some commit checks broke.


go/vt/workflow/node.go, line 223 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Sorry to nitpick here, but I think the changed comment is not clear enough e.g. it doesn't properly express that "subPath" is a relative path which is relative to this node. Another minor issue is that it should be "thread safe" and not "concurrency safe" and that this comment is not detailed enough: You could actually call this method concurrently, but the node tree must not change during each call.

Given that, why not just use the comment I wrote?

Done.


go/vt/workflow/node.go, line 228 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

I meant the declaration of the node should be right before the for loop because it's basically a loop variable:

// Find the subnode if needed.
parts := strings.Split(subPath, "/")

currentNode := n
for i := 0; i < len(parts); i++ {

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 6 at r16 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Please remove this TODO comment.

Done.


go/vt/workflow/resharding/horizontal_resharding_workflow.go, line 35 at r16 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

worklow

typo: workflow

Done.


go/vt/workflow/resharding/parallel_runner.go, line 35 at r16 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

Registery

typo: Registry

Done.


go/vt/workflow/resharding/parallel_runner.go, line 89 at r16 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

of

nit: This should be "to": Change something to something.

Same below ("to done" instead of "of done").

Done.


go/vt/workflow/resharding/parallel_runner.go, line 97 at r16 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

nit: Missing period.

Done.


go/vt/workflow/resharding/parallel_runner.go, line 104 at r16 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

is finished

nit: has finished

Done.


go/vt/workflow/resharding/task.go, line 1 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

File is not renamed yet.

Done.


go/vt/workflow/resharding/task.go, line 23 at r15 (raw file):

Previously, mberlin-bot (Michael Berlin) wrote…

checkpoint *workflowpb.WorkflowCheckpoint

This is a method of "hw" which has the checkpoint as field. You don't need to pass it in as well.

Done.


Comments from Reviewable

@mberlin-bot
Copy link
Copy Markdown

mberlin-bot commented Feb 23, 2017

:lgtm:


Reviewed 5 of 5 files at r17.
Review status: 9 of 13 files reviewed at latest revision, 14 unresolved discussions.


Comments from Reviewable

Approved with PullApprove

horizontal resharding workflow. First round comments resolved.
(addressing race condition warnings)
@michael-berlin michael-berlin changed the title Workflow control workflow: resharding workflow: Implement checkpointing. Feb 23, 2017
@michael-berlin michael-berlin merged commit 342cdf5 into vitessio:master Feb 23, 2017
michael-berlin added a commit to michael-berlin/vitess that referenced this pull request Feb 23, 2017
This is a follow-up fix for vitessio#2495.

We were not able to fix Yipei's workstation such that it would use the
same protobuf generator as Travis and our setups does. Therefore, I'm
re-generating the files separately on my machine.
@mberlin-bot
Copy link
Copy Markdown

With the more coarse grained locking I think we can further simplify the code for the retry controller.

Since this PR is merged, please address my comments in a new PR. Thanks! :)


Reviewed 1 of 2 files at r19.
Review status: 9 of 13 files reviewed at latest revision, 21 unresolved discussions, some commit checks failed.


go/vt/workflow/resharding/parallel_runner.go, line 35 at r19 (raw file):

	executeFunc      func(context.Context, *workflowpb.Task) error

	// mu is used to protect the retryActionRegistry.

Please add a comment here that it's also used to serialize the UI node changes.


go/vt/workflow/resharding/parallel_runner.go, line 154 at r19 (raw file):

p.unregisterRetryControllerLocked(nodePath)

I think this can be kept simpler now:

You can remove the unregister function completely and replace this call with a delete of the controller.

Since you're still holding the lock, you know for sure that the controller is still in the map and you can just delete it.

The other usage of unregister, when a context is done, is most likely overkill and you can just remove that. When the context is done, all Go routines should end and we don't care about the state of the action registry.


go/vt/workflow/resharding/parallel_runner.go, line 162 at r19 (raw file):

	node, err := p.rootUINode.GetChildByPath(taskID)
	if err != nil {
		panic(fmt.Errorf("node on child path %v not found", taskID))

Note that we do not use panic for error handling and instead always have error as return value.

However, I can see that this is more an assertion than an error.

In that case, please change it as follows:

  • use fmt.Sprintf to make your intent clear that this is not about error handling
  • prefix the message with BUG: to express that this a not an expected error

go/vt/workflow/resharding/parallel_runner.go, line 173 at r19 (raw file):

}

func (p *ParallelRunner) registerRetryControllerLocked(nodePath string, c *RetryController) {

Similar comment as above: With the locking gone, this can be folded into addRetryAction because that's the only caller.


go/vt/workflow/resharding/retry_controller.go, line 6 at r19 (raw file):

// RetryController stores the data for controlling the retry action.
type RetryController struct {

Let's remove this structure and store only the channel in the registry.

The only other field here is "node". But you can also get that by using the path value which is provided by the Action() callback.

The path which Action() returns unfortunately also includes the root node. Since no other workflow is using the path so far, please change the NodeManager.Action() function in go/vt/workflow/node.go such that the first element for the root node is not included in the path.

This way you can use the returned path to look up the node and you don't need to store it in the registry.

The only thing left then is adding and removing the Actions to the node object. You can add this code to your ParallelRunner object.


go/vt/workflow/resharding/retry_controller.go, line 15 at r19 (raw file):

CreateRetryController

This should be: NewRetryController()


go/vt/workflow/resharding/retry_controller.go, line 32 at r19 (raw file):

// in the UI Node. This disables the retry action.
func (c *RetryController) triggerRetry() {
	if len(c.node.Actions) != 0 {

This should never happen, should it? If this happens, this is a bug with your registry mechanism?


Comments from Reviewable

@wangyipei01 wangyipei01 deleted the workflow-control branch February 23, 2017 21:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants