-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Add new force flag to DemotePrimary to force a demotion even when blocked on waiting for semi-sync acks
#18714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ba814b6
8a54593
0444df7
8adec1d
bde16be
26aa0b4
dda3b0a
429964e
9740147
7c5e650
88b06ad
cb19d48
1c91227
33f4cbe
691e337
731198d
76823d1
2294c2b
feb665c
b1722b9
f2622c4
a805c4a
6284f7b
568d148
4f6df49
e00923a
1f57d86
3a4c5a7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,16 +19,20 @@ package emergencyreparent | |
| import ( | ||
| "context" | ||
| "os/exec" | ||
| "sync" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
|
|
||
| "vitess.io/vitess/go/mysql" | ||
| "vitess.io/vitess/go/test/endtoend/cluster" | ||
| "vitess.io/vitess/go/test/endtoend/reparent/utils" | ||
| "vitess.io/vitess/go/vt/log" | ||
| "vitess.io/vitess/go/vt/vtctl/reparentutil/policy" | ||
|
|
||
| topodatapb "vitess.io/vitess/go/vt/proto/topodata" | ||
| ) | ||
|
|
||
| func TestTrivialERS(t *testing.T) { | ||
|
|
@@ -131,6 +135,131 @@ func TestReparentDownPrimary(t *testing.T) { | |
| utils.ResurrectTablet(ctx, t, clusterInstance, tablets[0]) | ||
| } | ||
|
|
||
| func TestEmergencyReparentWithBlockedPrimary(t *testing.T) { | ||
| clusterInstance := utils.SetupReparentCluster(t, policy.DurabilitySemiSync) | ||
| defer utils.TeardownCluster(clusterInstance) | ||
|
|
||
| if clusterInstance.VtTabletMajorVersion < 24 { | ||
| t.Skip("Skipping test since `DemotePrimary` on earlier versions does not handle blocked primaries correctly") | ||
| } | ||
|
|
||
| // start vtgate w/disabled buffering | ||
| clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, | ||
| "--enable_buffer=false", | ||
| "--query-timeout", "3000") | ||
| err := clusterInstance.StartVtgate() | ||
| require.NoError(t, err) | ||
|
|
||
| ctx, cancel := context.WithCancel(context.Background()) | ||
| defer cancel() | ||
|
|
||
| conn, err := mysql.Connect(ctx, &mysql.ConnParams{ | ||
| Host: clusterInstance.Hostname, | ||
| Port: clusterInstance.VtgateMySQLPort, | ||
| }) | ||
| require.NoError(t, err) | ||
| defer conn.Close() | ||
|
|
||
| _, err = conn.ExecuteFetch("CREATE TABLE test (id INT PRIMARY KEY, msg VARCHAR(64))", 0, false) | ||
| require.NoError(t, err) | ||
|
|
||
| tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets | ||
|
|
||
| // Simulate no semi-sync replicas being available by disabling semi-sync on all replicas | ||
| for _, tablet := range tablets[1:] { | ||
| utils.RunSQL(ctx, t, "STOP REPLICA IO_THREAD", tablet) | ||
|
|
||
| // Disable semi-sync on replicas to simulate blocking | ||
| semisyncType, err := utils.SemiSyncExtensionLoaded(context.Background(), tablet) | ||
| require.NoError(t, err) | ||
| switch semisyncType { | ||
| case mysql.SemiSyncTypeSource: | ||
| utils.RunSQL(context.Background(), t, "SET GLOBAL rpl_semi_sync_replica_enabled = false", tablet) | ||
| case mysql.SemiSyncTypeMaster: | ||
| utils.RunSQL(context.Background(), t, "SET GLOBAL rpl_semi_sync_slave_enabled = false", tablet) | ||
| } | ||
|
|
||
| utils.RunSQL(context.Background(), t, "START REPLICA IO_THREAD", tablet) | ||
| } | ||
|
|
||
| // Try performing a write and ensure that it blocks. | ||
| writeSQL := `insert into test(id, msg) values (1, 'test 1')` | ||
| wg := sync.WaitGroup{} | ||
| wg.Add(1) | ||
| go func() { | ||
| defer wg.Done() | ||
|
|
||
| // Attempt writing via vtgate against the primary. This should block (because there's no replicas to ack the semi-sync), | ||
| // and fail on the vtgate query timeout. Async replicas will still receive this write (probably), because it is written | ||
| // to the PRIMARY binlog even when no ackers exist. This means we need to disable the vtgate buffer (above), because it | ||
| // will attempt the write on the promoted, unblocked primary - and this will hit a dupe key error. | ||
| _, err := conn.ExecuteFetch(writeSQL, 0, false) | ||
| require.ErrorContains(t, err, "context deadline exceeded (errno 1317) (sqlstate 70100) during query: "+writeSQL) | ||
timvaillancourt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // Verify vtgate really processed the insert in case something unrelated caused the deadline exceeded. | ||
| vtgateVars := clusterInstance.VtgateProcess.GetVars() | ||
| require.NotNil(t, vtgateVars) | ||
| require.NotNil(t, vtgateVars["QueryRoutes"]) | ||
| require.NotNil(t, vtgateVars["VtgateApiErrorCounts"]) | ||
| require.EqualValues(t, map[string]interface{}{ | ||
| "DDL.DirectDDL.PRIMARY": float64(1), | ||
| "INSERT.Passthrough.PRIMARY": float64(1), | ||
| }, vtgateVars["QueryRoutes"]) | ||
| require.EqualValues(t, map[string]interface{}{ | ||
| "Execute.ks.primary.DEADLINE_EXCEEDED": float64(1), | ||
| }, vtgateVars["VtgateApiErrorCounts"]) | ||
| }() | ||
|
|
||
| wg.Add(1) | ||
| waitReplicasTimeout := time.Second * 10 | ||
| go func() { | ||
| defer wg.Done() | ||
|
|
||
| // Ensure the write (other goroutine above) is blocked waiting on ACKs on the primary. | ||
| utils.WaitForQueryWithStateInProcesslist(context.Background(), t, tablets[0], writeSQL, "Waiting for semi-sync ACK from replica", time.Second*20) | ||
|
|
||
| // Send SIGSTOP to primary to simulate it being unresponsive. | ||
| tablets[0].VttabletProcess.Stop() | ||
timvaillancourt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // Run forced reparent operation, this should now proceed unimpeded. | ||
| out, err := utils.Ers(clusterInstance, tablets[1], "15s", waitReplicasTimeout.String()) | ||
| require.NoError(t, err, out) | ||
|
Comment on lines
+224
to
+226
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we first test that the non-forced ERS fails?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The non-forced ERS won't necessarily fail - it will just leave the old primary in a semi sync blocked state. I can add a test case for this to highlight the difference in behavior between the two?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I don't think we need a new test if this PR doesn't affect that behaviour 🤷 |
||
| }() | ||
|
|
||
| wg.Wait() | ||
|
|
||
| // We need to wait at least 10 seconds here to ensure the wait-for-replicas-timeout has passed, | ||
| // before we resume the old primary - otherwise the old primary will receive a `SetReplicationSource` call. | ||
| time.Sleep(waitReplicasTimeout * 2) | ||
|
|
||
| // Bring back the demoted primary | ||
| tablets[0].VttabletProcess.Resume() | ||
|
|
||
| // Give the old primary some time to realize it's no longer the primary, | ||
| // and for a new primary to be promoted. | ||
| require.EventuallyWithT(t, func(c *assert.CollectT) { | ||
| // Ensure the old primary was demoted correctly | ||
| tabletInfo, err := clusterInstance.VtctldClientProcess.GetTablet(tablets[0].Alias) | ||
| require.NoError(c, err) | ||
|
|
||
| // The old primary should have noticed there's a new primary tablet now and should | ||
| // have demoted itself to REPLICA. | ||
| require.Equal(c, topodatapb.TabletType_REPLICA, tabletInfo.GetType()) | ||
|
|
||
| // The old primary should be in not serving mode because we should be unable to re-attach it | ||
| // as a replica due to the errant GTID caused by semi-sync writes that were never replicated out. | ||
| // | ||
| // Note: The writes that were not replicated were caused by the semi sync unblocker, which | ||
| // performed writes after ERS. | ||
| require.Equal(c, "NOT_SERVING", tablets[0].VttabletProcess.GetTabletStatus()) | ||
| require.Equal(c, "replica", tablets[0].VttabletProcess.GetTabletType()) | ||
|
|
||
| // Check the 2nd tablet becomes PRIMARY. | ||
| require.Equal(c, "SERVING", tablets[1].VttabletProcess.GetTabletStatus()) | ||
| require.Equal(c, "primary", tablets[1].VttabletProcess.GetTabletType()) | ||
| }, 30*time.Second, time.Second, "could not validate primary was demoted") | ||
| } | ||
timvaillancourt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| func TestReparentNoChoiceDownPrimary(t *testing.T) { | ||
| clusterInstance := utils.SetupReparentCluster(t, policy.DurabilitySemiSync) | ||
| defer utils.TeardownCluster(clusterInstance) | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -819,3 +819,30 @@ func (mysqld *Mysqld) SemiSyncExtensionLoaded(ctx context.Context) (mysql.SemiSy | |||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| return conn.Conn.SemiSyncExtensionLoaded() | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| func (mysqld *Mysqld) IsSemiSyncBlocked(ctx context.Context) (bool, error) { | ||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like that we have another method to check this when the monitor exposes methods for this as well. |
||||||||||||||||||||||||
| conn, err := getPoolReconnect(ctx, mysqld.dbaPool) | ||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would not expect that we need to use the DBA pool for this as we're just selecting from P_S. The DBA pools tend to be relatively small and should only be used when truly necessary or they can block other things that are critical and do require it.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wondered this too, but I don't know what alternatives we have I have seen the small DBA pool get exhausted before, which prevents reparent RPCs that also use that pool. The root cause was likely overloaded mysqld or semi-sync issues, but filling that pool is a risk
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I used this pool because that's what all the other operations (like vitess/go/vt/mysqlctl/replication.go Lines 496 to 506 in 429964e
Maybe it would be overall better to fetch a connection from the pool and then use that connection for all the operations that happen during commands like
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yeah, I don't think we should include that kind of change in this PR, but it's a risk For some context: Slack has seen overloaded tablets cause this pool to fill with I think we'll just need to keep this in mind, and only 1 x RPC is being added to the pile here 👍/👎
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's fair. I don't think we should use DBA everywhere here, but it does line up with the other uses. And using a single connection for all of the work might be a good idea here too. Not related though so we can investigate this more separately. |
||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||
| return false, err | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| defer conn.Recycle() | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| // Execute the query to check if the primary is blocked on semi-sync. | ||||||||||||||||||||||||
| semiSyncWaitSessionsRead := "select variable_value from performance_schema.global_status where regexp_like(variable_name, 'Rpl_semi_sync_(source|master)_wait_sessions')" | ||||||||||||||||||||||||
| res, err := conn.Conn.ExecuteFetch(semiSyncWaitSessionsRead, 1, false) | ||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||
| return false, err | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| // If we have no rows, then the primary doesn't have semi-sync enabled. | ||||||||||||||||||||||||
| // It then follows, that the primary isn't blocked :) | ||||||||||||||||||||||||
| if len(res.Rows) == 0 { | ||||||||||||||||||||||||
| return false, nil | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| // Read the status value and check if it is non-zero. | ||||||||||||||||||||||||
| if len(res.Rows) != 1 || len(res.Rows[0]) != 1 { | ||||||||||||||||||||||||
| return false, fmt.Errorf("unexpected number of rows received - %v", res.Rows) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| value, err := res.Rows[0][0].ToCastInt64() | ||||||||||||||||||||||||
| return value != 0, err | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Uh oh!
There was an error while loading. Please reload this page.