Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
778ae1c
Add config map to workflow options json document. Allow updating with…
rohit-nayak-ps Aug 7, 2024
f8b1c3b
Add config type with all vreplication/vstreamer related flags. Define…
rohit-nayak-ps Aug 7, 2024
e83c7ff
Move config/flags to vttablet/common
rohit-nayak-ps Aug 11, 2024
8b745ea
Move vreplication/flags into vttablet/common/flags
rohit-nayak-ps Aug 11, 2024
3e9ea6d
Switch from initing default config param struct from init() to a sepa…
rohit-nayak-ps Aug 11, 2024
c0656e0
Move VStreamer flags to common flags, update vreplicator to use Workf…
rohit-nayak-ps Aug 11, 2024
880ab1a
Fix compilation error
rohit-nayak-ps Aug 11, 2024
8b2cce7
Use ExperimentalFlags in vreplication by passing WorkflowConfig around
rohit-nayak-ps Aug 11, 2024
9ee8ccd
Set dbclient params in vreplicator init
rohit-nayak-ps Aug 11, 2024
3c45097
workflow config is initialized now in the controller
rohit-nayak-ps Aug 11, 2024
91b60d0
Use RelayLogMaxSize and CopyPhaseDuration from config
rohit-nayak-ps Aug 12, 2024
bdec36e
Use RelayLogMaxSize from config
rohit-nayak-ps Aug 12, 2024
9366484
Use more parameters from config
rohit-nayak-ps Aug 12, 2024
38922ff
Make workflow config flag values private to surface incorrect or dire…
rohit-nayak-ps Aug 12, 2024
58578cf
Make tablet types string flag variable private
rohit-nayak-ps Aug 12, 2024
e48f31e
Fix unit test usage of workflow config to get TestControllerKeyRange …
rohit-nayak-ps Aug 12, 2024
95e7c42
Fix controller unit tests
rohit-nayak-ps Aug 12, 2024
975fca3
Fix more unit tests
rohit-nayak-ps Aug 12, 2024
950be0b
Minor refactor
rohit-nayak-ps Aug 12, 2024
d3cbe82
Fix vdiff unit tests and static code check
rohit-nayak-ps Aug 12, 2024
9f6ce0c
Init vreplication config defaults in vstreamers
rohit-nayak-ps Aug 12, 2024
4ff9c33
Add logging. Add the ability to reset expected queries in mockdb
rohit-nayak-ps Aug 15, 2024
f8731ff
Update vdiff test expectations
rohit-nayak-ps Aug 15, 2024
c4eb394
Update more expectations for options
rohit-nayak-ps Aug 15, 2024
e14d6f5
Minor refactor
rohit-nayak-ps Aug 15, 2024
e0fd18b
Fix TestNewVReplicationConfig
rohit-nayak-ps Aug 15, 2024
4464e2b
Fix TestFailedMoveTablesCreateCleanup
rohit-nayak-ps Aug 15, 2024
1a80af4
Copy TestMoveTables as TestMoveTablesUnsharded to figure out dbmock e…
rohit-nayak-ps Aug 15, 2024
59bd946
More fixes for TestFailedMoveTablesCreateCleanup
rohit-nayak-ps Aug 15, 2024
94eb296
Fix failing tests in tabletmanager
rohit-nayak-ps Aug 15, 2024
e3f748a
Fix help output test
rohit-nayak-ps Aug 15, 2024
f4f35c4
Add config to stats. Some refactor
rohit-nayak-ps Aug 16, 2024
b0b6dee
Add e2e test for updating config overrides. Validate keys in CLI to e…
rohit-nayak-ps Aug 16, 2024
84f9d42
Address review comments
rohit-nayak-ps Aug 19, 2024
dfa0aac
Add config-overrides flag to MoveTables
rohit-nayak-ps Aug 29, 2024
014f172
Fix failing unit tests
rohit-nayak-ps Aug 29, 2024
484e258
Add overrides to reshard
rohit-nayak-ps Aug 29, 2024
78a04f9
Add log lines for confirming in prod
rohit-nayak-ps Aug 29, 2024
017a6dc
Add VStreamOptions to (Row/Table)Streamer
rohit-nayak-ps Aug 30, 2024
37b7025
Use config passed from the target in packet sizer
rohit-nayak-ps Aug 31, 2024
117dee3
Use values from passed-in config in all places in the source streamers
rohit-nayak-ps Aug 31, 2024
a97e0db
Use values from passed-in config in all places in the source streamers
rohit-nayak-ps Sep 1, 2024
28d044b
Fix unit tests for passed-in config
rohit-nayak-ps Sep 1, 2024
d7f0ad5
Update config unit tests to account for overrides
rohit-nayak-ps Sep 1, 2024
3784be9
Self-review
rohit-nayak-ps Sep 1, 2024
356d48e
Self-review
rohit-nayak-ps Sep 1, 2024
b157d3d
Comment sidecardb logs temporarily for ease of debugging. Improve moc…
rohit-nayak-ps Sep 1, 2024
66a7948
Self-review
rohit-nayak-ps Sep 1, 2024
49f282f
Self-review
rohit-nayak-ps Sep 2, 2024
aa353b4
Revert local changes to local_example.sh
rohit-nayak-ps Sep 7, 2024
a2a994f
Revert changes to comments and remove extra comments
rohit-nayak-ps Sep 7, 2024
dd5fea0
Address my own nits
mattlord Sep 10, 2024
e4f2107
WIP
rohit-nayak-ps Sep 10, 2024
2f46e71
Use getter instead of global default config variable
rohit-nayak-ps Sep 11, 2024
754f569
Address review comments
rohit-nayak-ps Sep 18, 2024
8dee3a2
Fix merge issues. Add summary section
rohit-nayak-ps Sep 18, 2024
465b4ae
Experiment with changes to Unit Test because it is only failing in CI…
rohit-nayak-ps Sep 18, 2024
1dcbaec
Cleanup test fixes
rohit-nayak-ps Sep 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions changelog/21.0/21.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- **[Support for recursive CTEs](#recursive-cte)**
- **[VTGate Tablet Balancer](#tablet-balancer)**
- **[Query Timeout Override](#query-timeout)**
- **[Dynamic VReplication Configuration](#dynamic-vreplication-configuration)**

## <a id="major-changes"/>Major Changes

Expand Down Expand Up @@ -130,3 +131,9 @@ A query can also be set to have no timeout by using the `QUERY_TIMEOUT_MS` comme

Example usage:
`select /*vt+ QUERY_TIMEOUT_MS=30 */ col from tbl`

### <a id="dynamic-vreplication-configuration"/>Dynamic VReplication Configuration
Currently many of the configuration options for VReplication Workflows are vttablet flags. This means that any change
requires restarts of vttablets. We now allow these to be overridden while creating a workflow or dynamically once
the workflow is in progress. See https://github.com/vitessio/vitess/pull/16583 for details.

24 changes: 24 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
vttablet "vitess.io/vitess/go/vt/vttablet/common"
)

var (
Expand Down Expand Up @@ -67,6 +68,7 @@ var (
MySQLServerVersion string
TruncateUILen int
TruncateErrLen int
ConfigOverrides []string
}{}
)

Expand Down Expand Up @@ -147,6 +149,27 @@ func validateOnDDL(cmd *cobra.Command) error {
return nil
}

// ParseConfigOverrides converts a slice of key=value strings into a map of config overrides. The slice is passed
// as a flag to the command, and the key=value pairs are used to override the default vreplication config values.
func ParseConfigOverrides(overrides []string) (map[string]string, error) {
configOverrides := make(map[string]string, len(overrides))
defaultConfig, err := vttablet.NewVReplicationConfig(nil)
if err != nil {
return nil, err
}
for _, kv := range overrides {
key, value, ok := strings.Cut(kv, "=")
if !ok {
return nil, fmt.Errorf("invalid config override format (var=value expected): %s", kv)
}
if _, ok := defaultConfig.Map()[key]; !ok {
return nil, fmt.Errorf("unknown vreplication config flag: %s", key)
}
configOverrides[key] = value
}
return configOverrides, nil
}

// ValidateShards checks if the provided shard names are valid key ranges.
func ValidateShards(shards []string) error {
for _, shard := range shards {
Expand Down Expand Up @@ -232,6 +255,7 @@ func AddCommonCreateFlags(cmd *cobra.Command) {
cmd.Flags().BoolVar(&CreateOptions.DeferSecondaryKeys, "defer-secondary-keys", false, "Defer secondary index creation for a table until after it has been copied.")
cmd.Flags().BoolVar(&CreateOptions.AutoStart, "auto-start", true, "Start the workflow after creating it.")
cmd.Flags().BoolVar(&CreateOptions.StopAfterCopy, "stop-after-copy", false, "Stop the workflow after it's finished copying the existing rows and before it starts replicating changes.")
cmd.Flags().StringSliceVar(&CreateOptions.ConfigOverrides, "config-overrides", []string{}, "Specify one or more VReplication config flags to override as a comma-separated list of key=value pairs.")
}

var MirrorTrafficOptions = struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ func commandCreate(cmd *cobra.Command, args []string) error {
tsp := common.GetTabletSelectionPreference(cmd)
cli.FinishedParsing(cmd)

configOverrides, err := common.ParseConfigOverrides(common.CreateOptions.ConfigOverrides)
if err != nil {
return err
}
workflowOptions := &vtctldatapb.WorkflowOptions{
Config: configOverrides,
}

ms := &vtctldatapb.MaterializeSettings{
Workflow: common.BaseOptions.Workflow,
MaterializationIntent: vtctldatapb.MaterializationIntent_CUSTOM,
Expand All @@ -101,6 +109,7 @@ func commandCreate(cmd *cobra.Command, args []string) error {
Cell: strings.Join(common.CreateOptions.Cells, ","),
TabletTypes: topoproto.MakeStringTypeCSV(common.CreateOptions.TabletTypes),
TabletSelectionPreference: tsp,
WorkflowOptions: workflowOptions,
}

createOptions.TableSettings.parser, err = sqlparser.New(sqlparser.Options{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ func commandCreate(cmd *cobra.Command, args []string) error {
tsp := common.GetTabletSelectionPreference(cmd)
cli.FinishedParsing(cmd)

configOverrides, err := common.ParseConfigOverrides(common.CreateOptions.ConfigOverrides)
if err != nil {
return err
}
createOptions.WorkflowOptions.Config = configOverrides

req := &vtctldatapb.MoveTablesCreateRequest{
Workflow: common.BaseOptions.Workflow,
TargetKeyspace: common.BaseOptions.TargetKeyspace,
Expand Down
9 changes: 9 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/reshard/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ func commandReshardCreate(cmd *cobra.Command, args []string) error {
tsp := common.GetTabletSelectionPreference(cmd)
cli.FinishedParsing(cmd)

configOverrides, err := common.ParseConfigOverrides(common.CreateOptions.ConfigOverrides)
if err != nil {
return err
}
workflowOptions := &vtctldatapb.WorkflowOptions{
Config: configOverrides,
}

req := &vtctldatapb.ReshardCreateRequest{
Workflow: common.BaseOptions.Workflow,
Keyspace: common.BaseOptions.TargetKeyspace,
Expand All @@ -72,6 +80,7 @@ func commandReshardCreate(cmd *cobra.Command, args []string) error {
SourceShards: reshardCreateOptions.sourceShards,
TargetShards: reshardCreateOptions.targetShards,
SkipSchemaCopy: reshardCreateOptions.skipSchemaCopy,
WorkflowOptions: workflowOptions,
}
resp, err := common.GetClient().ReshardCreate(common.GetCommandCtx(), req)
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/workflow/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var (
TabletTypes []topodatapb.TabletType
TabletTypesInPreferenceOrder bool
OnDDL string
ConfigOverrides []string
}{}

// update makes a WorkflowUpdate gRPC call to a vtctld.
Expand Down Expand Up @@ -74,6 +75,9 @@ var (
return fmt.Errorf("invalid on-ddl value: %s", updateOptions.OnDDL)
}
}
if len(updateOptions.ConfigOverrides) > 0 {
changes = true
}
if !changes {
return fmt.Errorf("no configuration options specified to update")
}
Expand All @@ -95,13 +99,19 @@ func commandUpdate(cmd *cobra.Command, args []string) error {
}
}

configOverrides, err := common.ParseConfigOverrides(updateOptions.ConfigOverrides)
if err != nil {
return err
}

req := &vtctldatapb.WorkflowUpdateRequest{
Keyspace: baseOptions.Keyspace,
TabletRequest: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: baseOptions.Workflow,
Cells: updateOptions.Cells,
TabletTypes: updateOptions.TabletTypes,
TabletSelectionPreference: &tsp,
ConfigOverrides: configOverrides,
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ func registerCommands(root *cobra.Command) {
update.Flags().VarP((*topoproto.TabletTypeListFlag)(&updateOptions.TabletTypes), "tablet-types", "t", "New source tablet types to replicate from (e.g. PRIMARY,REPLICA,RDONLY).")
update.Flags().BoolVar(&updateOptions.TabletTypesInPreferenceOrder, "tablet-types-in-order", true, "When performing source tablet selection, look for candidates in the type order as they are listed in the tablet-types flag.")
update.Flags().StringVar(&updateOptions.OnDDL, "on-ddl", "", "New instruction on what to do when DDL is encountered in the VReplication stream. Possible values are IGNORE, STOP, EXEC, and EXEC_IGNORE.")
update.Flags().StringSliceVar(&updateOptions.ConfigOverrides, "config-overrides", nil, "Specify one or more VReplication config flags to override as a comma-separated list of key=value pairs.")

common.AddShardSubsetFlag(update, &baseOptions.Shards)
base.AddCommand(update)
}
Expand Down
4 changes: 2 additions & 2 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ Flags:
--queryserver-enable-views Enable views support in vttablet.
--queryserver_enable_online_ddl Enable online DDL. (default true)
--redact-debug-ui-queries redact full queries and bind variables from debug UI
--relay_log_max_items int Maximum number of rows for VReplication target buffering. (default 5000)
--relay_log_max_size int Maximum buffer size (in bytes) for VReplication target buffering. If single rows are larger than this, a single row is buffered at a time. (default 250000)
--relay_log_max_items int Maximum number of rows for vreplication target buffering. (default 5000)
--relay_log_max_size int Maximum buffer size (in bytes) for vreplication target buffering. If single rows are larger than this, a single row is buffered at a time. (default 250000)
--remote_operation_timeout duration time to wait for a remote operation (default 15s)
--replication_connect_retry duration how long to wait in between replica reconnect attempts. Only precise to the second. (default 10s)
--restore-to-pos string (init incremental restore parameter) if set, run a point in time recovery that ends with the given position. This will attempt to use one full backup followed by zero or more incremental backups
Expand Down
4 changes: 2 additions & 2 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ Flags:
--queryserver-enable-views Enable views support in vttablet.
--queryserver_enable_online_ddl Enable online DDL. (default true)
--redact-debug-ui-queries redact full queries and bind variables from debug UI
--relay_log_max_items int Maximum number of rows for VReplication target buffering. (default 5000)
--relay_log_max_size int Maximum buffer size (in bytes) for VReplication target buffering. If single rows are larger than this, a single row is buffered at a time. (default 250000)
--relay_log_max_items int Maximum number of rows for vreplication target buffering. (default 5000)
--relay_log_max_size int Maximum buffer size (in bytes) for vreplication target buffering. If single rows are larger than this, a single row is buffered at a time. (default 250000)
--remote_operation_timeout duration time to wait for a remote operation (default 15s)
--replication_connect_retry duration how long to wait in between replica reconnect attempts. Only precise to the second. (default 10s)
--restore-to-pos string (init incremental restore parameter) if set, run a point in time recovery that ends with the given position. This will attempt to use one full backup followed by zero or more incremental backups
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/onlineddl/flow/onlineddl_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ import (
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet"
vttablet "vitess.io/vitess/go/vt/vttablet/common"
throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet"
vttablet "vitess.io/vitess/go/vt/vttablet/common"
)

type WriteMetrics struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import (
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet"
vttablet "vitess.io/vitess/go/vt/vttablet/common"
)

type testcase struct {
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
"vitess.io/vitess/go/vt/vttablet"
vttablet "vitess.io/vitess/go/vt/vttablet/common"

vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"

Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vttablet"
vttablet "vitess.io/vitess/go/vt/vttablet/common"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)
Expand Down
46 changes: 46 additions & 0 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"testing"
"time"

"golang.org/x/exp/maps"

"github.com/buger/jsonparser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1022,3 +1024,47 @@ func confirmKeyspacesRoutedTo(t *testing.T, keyspace string, routedKeyspace, tab
require.Equalf(t, routedKeyspace, plan.Keyspace.Name, "for database %s, keyspace %v, tabletType %s", database, keyspace, tt)
}
}

// getVReplicationConfig returns the vreplication config for one random workflow for a given tablet. Currently, this is
// used when there is only one workflow, so we are using this simple method to get the config.
func getVReplicationConfig(t *testing.T, tab *cluster.VttabletProcess) map[string]string {
configJson, err := getDebugVar(t, tab.Port, []string{"VReplicationConfig"})
require.NoError(t, err)

var config map[string]string
err = json2.Unmarshal([]byte(configJson), &config)
require.NoError(t, err)
require.Equal(t, 1, len(config))

configJson = config[maps.Keys(config)[0]]
config = nil
err = json2.Unmarshal([]byte(configJson), &config)
require.NoError(t, err)

return config
}

// mapToCSV converts a golang map to a CSV string for use in defining the config overrides in vrep CLI commands.
func mapToCSV(m map[string]string) string {
csv := ""
if len(m) == 0 {
return csv
}
for k, v := range m {
csv += fmt.Sprintf("%s=%s,", k, v)
}
if len(csv) == 0 {
return csv
}
return csv[:len(csv)-1]
}

// validateOverrides validates that the given vttablets have the expected config overrides.
func validateOverrides(t *testing.T, tabs map[string]*cluster.VttabletProcess, want map[string]string) {
for _, tab := range tabs {
config := getVReplicationConfig(t, tab)
for k, v := range want {
require.EqualValues(t, v, config[k])
}
}
}
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet"
vttablet "vitess.io/vitess/go/vt/vttablet/common"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
Expand Down
Loading