From 570414da3807b997eb1ae66ea2c0cb6ca0de71aa Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Wed, 26 Nov 2025 18:14:29 +0100 Subject: [PATCH 1/2] Cherry-pick 1f49de43adb033aeda0c5c40519bd55b84eca767 with conflicts --- go/test/endtoend/cluster/vttablet_process.go | 14 ++ .../reparent/emergencyreparent/ers_test.go | 129 ++++++++++++++++++ go/test/endtoend/reparent/utils/utils.go | 29 +++- go/vt/mysqlctl/fakemysqldaemon.go | 5 + go/vt/mysqlctl/mysql_daemon.go | 1 + go/vt/mysqlctl/replication.go | 27 ++++ .../tabletmanagerdata/tabletmanagerdata.pb.go | 13 +- .../tabletmanagerdata_vtproto.pb.go | 34 +++++ go/vt/vtcombo/tablet_map.go | 5 + .../testutil/test_tmclient.go | 2 +- .../reparentutil/emergency_reparenter_test.go | 6 +- .../vtctl/reparentutil/planned_reparenter.go | 4 +- go/vt/vtctl/reparentutil/policy/durability.go | 27 ++++ .../reparentutil/policy/durability_test.go | 7 + go/vt/vtctl/reparentutil/replication.go | 2 +- go/vt/vtctl/reparentutil/replication_test.go | 2 +- go/vt/vttablet/faketmclient/fake_client.go | 2 +- go/vt/vttablet/grpctmclient/client.go | 4 +- go/vt/vttablet/grpctmserver/server.go | 2 +- go/vt/vttablet/tabletmanager/rpc_agent.go | 2 +- .../vttablet/tabletmanager/rpc_replication.go | 68 +++++++-- .../tabletmanager/rpc_replication_test.go | 8 +- go/vt/vttablet/tabletmanager/shard_sync.go | 2 +- go/vt/vttablet/tmclient/rpc_client_api.go | 2 +- go/vt/vttablet/tmrpctest/test_tm_rpc.go | 6 +- proto/tabletmanagerdata.proto | 1 + web/vtadmin/src/proto/vtadmin.d.ts | 6 + web/vtadmin/src/proto/vtadmin.js | 34 ++++- 28 files changed, 406 insertions(+), 38 deletions(-) diff --git a/go/test/endtoend/cluster/vttablet_process.go b/go/test/endtoend/cluster/vttablet_process.go index 71ecd16f587..6dae6a37148 100644 --- a/go/test/endtoend/cluster/vttablet_process.go +++ b/go/test/endtoend/cluster/vttablet_process.go @@ -419,6 +419,20 @@ func (vttablet *VttabletProcess) TearDown() error { return vttablet.TearDownWithTimeout(vttabletStateTimeout) } +func (vttablet *VttabletProcess) Stop() { + if vttablet.proc == nil || vttablet.exit == nil { + return + } + vttablet.proc.Process.Signal(syscall.SIGSTOP) +} + +func (vttablet *VttabletProcess) Resume() { + if vttablet.proc == nil || vttablet.exit == nil { + return + } + vttablet.proc.Process.Signal(syscall.SIGCONT) +} + // Kill shuts down the running vttablet service immediately. func (vttablet *VttabletProcess) Kill() error { if vttablet.proc == nil || vttablet.exit == nil { diff --git a/go/test/endtoend/reparent/emergencyreparent/ers_test.go b/go/test/endtoend/reparent/emergencyreparent/ers_test.go index 7473bf89c9e..cf0adaf507f 100644 --- a/go/test/endtoend/reparent/emergencyreparent/ers_test.go +++ b/go/test/endtoend/reparent/emergencyreparent/ers_test.go @@ -19,9 +19,11 @@ package emergencyreparent import ( "context" "os/exec" + "sync" "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" @@ -29,6 +31,8 @@ import ( "vitess.io/vitess/go/test/endtoend/reparent/utils" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/vtctl/reparentutil/policy" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) func TestTrivialERS(t *testing.T) { @@ -131,6 +135,131 @@ func TestReparentDownPrimary(t *testing.T) { utils.ResurrectTablet(ctx, t, clusterInstance, tablets[0]) } +func TestEmergencyReparentWithBlockedPrimary(t *testing.T) { + clusterInstance := utils.SetupReparentCluster(t, policy.DurabilitySemiSync) + defer utils.TeardownCluster(clusterInstance) + + if clusterInstance.VtTabletMajorVersion < 24 { + t.Skip("Skipping test since `DemotePrimary` on earlier versions does not handle blocked primaries correctly") + } + + // start vtgate w/disabled buffering + clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, + "--enable_buffer=false", + "--query-timeout", "3000") + err := clusterInstance.StartVtgate() + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + conn, err := mysql.Connect(ctx, &mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + }) + require.NoError(t, err) + defer conn.Close() + + _, err = conn.ExecuteFetch("CREATE TABLE test (id INT PRIMARY KEY, msg VARCHAR(64))", 0, false) + require.NoError(t, err) + + tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets + + // Simulate no semi-sync replicas being available by disabling semi-sync on all replicas + for _, tablet := range tablets[1:] { + utils.RunSQL(ctx, t, "STOP REPLICA IO_THREAD", tablet) + + // Disable semi-sync on replicas to simulate blocking + semisyncType, err := utils.SemiSyncExtensionLoaded(context.Background(), tablet) + require.NoError(t, err) + switch semisyncType { + case mysql.SemiSyncTypeSource: + utils.RunSQL(context.Background(), t, "SET GLOBAL rpl_semi_sync_replica_enabled = false", tablet) + case mysql.SemiSyncTypeMaster: + utils.RunSQL(context.Background(), t, "SET GLOBAL rpl_semi_sync_slave_enabled = false", tablet) + } + + utils.RunSQL(context.Background(), t, "START REPLICA IO_THREAD", tablet) + } + + // Try performing a write and ensure that it blocks. + writeSQL := `insert into test(id, msg) values (1, 'test 1')` + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + + // Attempt writing via vtgate against the primary. This should block (because there's no replicas to ack the semi-sync), + // and fail on the vtgate query timeout. Async replicas will still receive this write (probably), because it is written + // to the PRIMARY binlog even when no ackers exist. This means we need to disable the vtgate buffer (above), because it + // will attempt the write on the promoted, unblocked primary - and this will hit a dupe key error. + _, err := conn.ExecuteFetch(writeSQL, 0, false) + require.ErrorContains(t, err, "context deadline exceeded (errno 1317) (sqlstate 70100) during query: "+writeSQL) + + // Verify vtgate really processed the insert in case something unrelated caused the deadline exceeded. + vtgateVars := clusterInstance.VtgateProcess.GetVars() + require.NotNil(t, vtgateVars) + require.NotNil(t, vtgateVars["QueryRoutes"]) + require.NotNil(t, vtgateVars["VtgateApiErrorCounts"]) + require.EqualValues(t, map[string]interface{}{ + "DDL.DirectDDL.PRIMARY": float64(1), + "INSERT.Passthrough.PRIMARY": float64(1), + }, vtgateVars["QueryRoutes"]) + require.EqualValues(t, map[string]interface{}{ + "Execute.ks.primary.DEADLINE_EXCEEDED": float64(1), + }, vtgateVars["VtgateApiErrorCounts"]) + }() + + wg.Add(1) + waitReplicasTimeout := time.Second * 10 + go func() { + defer wg.Done() + + // Ensure the write (other goroutine above) is blocked waiting on ACKs on the primary. + utils.WaitForQueryWithStateInProcesslist(context.Background(), t, tablets[0], writeSQL, "Waiting for semi-sync ACK from replica", time.Second*20) + + // Send SIGSTOP to primary to simulate it being unresponsive. + tablets[0].VttabletProcess.Stop() + + // Run forced reparent operation, this should now proceed unimpeded. + out, err := utils.Ers(clusterInstance, tablets[1], "15s", waitReplicasTimeout.String()) + require.NoError(t, err, out) + }() + + wg.Wait() + + // We need to wait at least 10 seconds here to ensure the wait-for-replicas-timeout has passed, + // before we resume the old primary - otherwise the old primary will receive a `SetReplicationSource` call. + time.Sleep(waitReplicasTimeout * 2) + + // Bring back the demoted primary + tablets[0].VttabletProcess.Resume() + + // Give the old primary some time to realize it's no longer the primary, + // and for a new primary to be promoted. + require.EventuallyWithT(t, func(c *assert.CollectT) { + // Ensure the old primary was demoted correctly + tabletInfo, err := clusterInstance.VtctldClientProcess.GetTablet(tablets[0].Alias) + require.NoError(c, err) + + // The old primary should have noticed there's a new primary tablet now and should + // have demoted itself to REPLICA. + require.Equal(c, topodatapb.TabletType_REPLICA, tabletInfo.GetType()) + + // The old primary should be in not serving mode because we should be unable to re-attach it + // as a replica due to the errant GTID caused by semi-sync writes that were never replicated out. + // + // Note: The writes that were not replicated were caused by the semi sync unblocker, which + // performed writes after ERS. + require.Equal(c, "NOT_SERVING", tablets[0].VttabletProcess.GetTabletStatus()) + require.Equal(c, "replica", tablets[0].VttabletProcess.GetTabletType()) + + // Check the 2nd tablet becomes PRIMARY. + require.Equal(c, "SERVING", tablets[1].VttabletProcess.GetTabletStatus()) + require.Equal(c, "primary", tablets[1].VttabletProcess.GetTabletType()) + }, 30*time.Second, time.Second, "could not validate primary was demoted") +} + func TestReparentNoChoiceDownPrimary(t *testing.T) { clusterInstance := utils.SetupReparentCluster(t, policy.DurabilitySemiSync) defer utils.TeardownCluster(clusterInstance) diff --git a/go/test/endtoend/reparent/utils/utils.go b/go/test/endtoend/reparent/utils/utils.go index b597af0cdcd..1ab4ce543b6 100644 --- a/go/test/endtoend/reparent/utils/utils.go +++ b/go/test/endtoend/reparent/utils/utils.go @@ -31,16 +31,15 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - querypb "vitess.io/vitess/go/vt/proto/query" - "vitess.io/vitess/go/vt/utils" - "vitess.io/vitess/go/vt/vtctl/reparentutil/policy" - "vitess.io/vitess/go/vt/vttablet/tabletconn" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/log" + querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/utils" + "vitess.io/vitess/go/vt/vtctl/reparentutil/policy" + "vitess.io/vitess/go/vt/vttablet/tabletconn" ) var ( @@ -842,6 +841,7 @@ func CheckReplicationStatus(ctx context.Context, t *testing.T, tablet *cluster.V } } +// WaitForTabletToBeServing waits for a tablet to reach a serving state. func WaitForTabletToBeServing(ctx context.Context, t *testing.T, clusterInstance *cluster.LocalProcessCluster, tablet *cluster.Vttablet, timeout time.Duration) { vTablet, err := clusterInstance.VtctldClientProcess.GetTablet(tablet.Alias) require.NoError(t, err) @@ -862,3 +862,22 @@ func WaitForTabletToBeServing(ctx context.Context, t *testing.T, clusterInstance t.Fatal(err.Error()) } } + +// WaitForQueryWithStateInProcesslist waits for a query to be present in the processlist with a specific state. +func WaitForQueryWithStateInProcesslist(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, sql, state string, timeout time.Duration) { + require.Eventually(t, func() bool { + qr := RunSQL(ctx, t, "select Command, State, Info from information_schema.processlist", tablet) + for _, row := range qr.Rows { + if len(row) != 3 { + continue + } + if strings.EqualFold(row[0].ToString(), "Query") { + continue + } + if strings.EqualFold(row[1].ToString(), state) && strings.EqualFold(row[2].ToString(), sql) { + return true + } + } + return false + }, timeout, time.Second, "query with state not in processlist") +} diff --git a/go/vt/mysqlctl/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon.go index e61d2e12a0a..536ffaeb2ad 100644 --- a/go/vt/mysqlctl/fakemysqldaemon.go +++ b/go/vt/mysqlctl/fakemysqldaemon.go @@ -768,6 +768,11 @@ func (fmd *FakeMysqlDaemon) SemiSyncReplicationStatus(ctx context.Context) (bool return fmd.SemiSyncReplicaEnabled, nil } +// IsSemiSyncBlocked is part of the MysqlDaemon interface. +func (fmd *FakeMysqlDaemon) IsSemiSyncBlocked(ctx context.Context) (bool, error) { + return false, nil +} + // GetVersionString is part of the MysqlDaemon interface. func (fmd *FakeMysqlDaemon) GetVersionString(ctx context.Context) (string, error) { return fmd.Version, nil diff --git a/go/vt/mysqlctl/mysql_daemon.go b/go/vt/mysqlctl/mysql_daemon.go index 5851a980664..41896abb785 100644 --- a/go/vt/mysqlctl/mysql_daemon.go +++ b/go/vt/mysqlctl/mysql_daemon.go @@ -74,6 +74,7 @@ type MysqlDaemon interface { SemiSyncClients(ctx context.Context) (count uint32) SemiSyncSettings(ctx context.Context) (timeout uint64, numReplicas uint32) SemiSyncReplicationStatus(ctx context.Context) (bool, error) + IsSemiSyncBlocked(ctx context.Context) (bool, error) ResetReplicationParameters(ctx context.Context) error GetBinlogInformation(ctx context.Context) (binlogFormat string, logEnabled bool, logReplicaUpdate bool, binlogRowImage string, err error) GetGTIDMode(ctx context.Context) (gtidMode string, err error) diff --git a/go/vt/mysqlctl/replication.go b/go/vt/mysqlctl/replication.go index bd244d4f5c7..1aabafe85f1 100644 --- a/go/vt/mysqlctl/replication.go +++ b/go/vt/mysqlctl/replication.go @@ -819,3 +819,30 @@ func (mysqld *Mysqld) SemiSyncExtensionLoaded(ctx context.Context) (mysql.SemiSy return conn.Conn.SemiSyncExtensionLoaded() } + +func (mysqld *Mysqld) IsSemiSyncBlocked(ctx context.Context) (bool, error) { + conn, err := getPoolReconnect(ctx, mysqld.dbaPool) + if err != nil { + return false, err + } + defer conn.Recycle() + + // Execute the query to check if the primary is blocked on semi-sync. + semiSyncWaitSessionsRead := "select variable_value from performance_schema.global_status where regexp_like(variable_name, 'Rpl_semi_sync_(source|master)_wait_sessions')" + res, err := conn.Conn.ExecuteFetch(semiSyncWaitSessionsRead, 1, false) + if err != nil { + return false, err + } + // If we have no rows, then the primary doesn't have semi-sync enabled. + // It then follows, that the primary isn't blocked :) + if len(res.Rows) == 0 { + return false, nil + } + + // Read the status value and check if it is non-zero. + if len(res.Rows) != 1 || len(res.Rows[0]) != 1 { + return false, fmt.Errorf("unexpected number of rows received - %v", res.Rows) + } + value, err := res.Rows[0][0].ToCastInt64() + return value != 0, err +} diff --git a/go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go b/go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go index d7047b925d0..6cad59cd6fe 100644 --- a/go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go +++ b/go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go @@ -4460,6 +4460,7 @@ func (*InitReplicaResponse) Descriptor() ([]byte, []int) { type DemotePrimaryRequest struct { state protoimpl.MessageState `protogen:"open.v1"` + Force bool `protobuf:"varint,1,opt,name=force,proto3" json:"force,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -4494,6 +4495,13 @@ func (*DemotePrimaryRequest) Descriptor() ([]byte, []int) { return file_tabletmanagerdata_proto_rawDescGZIP(), []int{92} } +func (x *DemotePrimaryRequest) GetForce() bool { + if x != nil { + return x.Force + } + return false +} + type DemotePrimaryResponse struct { state protoimpl.MessageState `protogen:"open.v1"` // PrimaryStatus represents the response from calling `SHOW BINARY LOG STATUS` on a primary that has been demoted. @@ -8684,8 +8692,9 @@ const file_tabletmanagerdata_proto_rawDesc = "" + "\x14replication_position\x18\x02 \x01(\tR\x13replicationPosition\x12&\n" + "\x0ftime_created_ns\x18\x03 \x01(\x03R\rtimeCreatedNs\x12\x1a\n" + "\bsemiSync\x18\x04 \x01(\bR\bsemiSync\"\x15\n" + - "\x13InitReplicaResponse\"\x16\n" + - "\x14DemotePrimaryRequest\"d\n" + + "\x13InitReplicaResponse\",\n" + + "\x14DemotePrimaryRequest\x12\x14\n" + + "\x05force\x18\x01 \x01(\bR\x05force\"d\n" + "\x15DemotePrimaryResponse\x12E\n" + "\x0eprimary_status\x18\x02 \x01(\v2\x1e.replicationdata.PrimaryStatusR\rprimaryStatusJ\x04\b\x01\x10\x02\"6\n" + "\x18UndoDemotePrimaryRequest\x12\x1a\n" + diff --git a/go/vt/proto/tabletmanagerdata/tabletmanagerdata_vtproto.pb.go b/go/vt/proto/tabletmanagerdata/tabletmanagerdata_vtproto.pb.go index 0ef58ee633e..baf50f623e6 100644 --- a/go/vt/proto/tabletmanagerdata/tabletmanagerdata_vtproto.pb.go +++ b/go/vt/proto/tabletmanagerdata/tabletmanagerdata_vtproto.pb.go @@ -1748,6 +1748,7 @@ func (m *DemotePrimaryRequest) CloneVT() *DemotePrimaryRequest { return (*DemotePrimaryRequest)(nil) } r := new(DemotePrimaryRequest) + r.Force = m.Force if len(m.unknownFields) > 0 { r.unknownFields = make([]byte, len(m.unknownFields)) copy(r.unknownFields, m.unknownFields) @@ -7270,6 +7271,16 @@ func (m *DemotePrimaryRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error) i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Force { + i-- + if m.Force { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x8 + } return len(dAtA) - i, nil } @@ -12566,6 +12577,9 @@ func (m *DemotePrimaryRequest) SizeVT() (n int) { } var l int _ = l + if m.Force { + n += 2 + } n += len(m.unknownFields) return n } @@ -22601,6 +22615,26 @@ func (m *DemotePrimaryRequest) UnmarshalVT(dAtA []byte) error { return fmt.Errorf("proto: DemotePrimaryRequest: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Force", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Force = bool(v != 0) default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/go/vt/vtcombo/tablet_map.go b/go/vt/vtcombo/tablet_map.go index a77de599800..807d8fe197f 100644 --- a/go/vt/vtcombo/tablet_map.go +++ b/go/vt/vtcombo/tablet_map.go @@ -1017,8 +1017,13 @@ func (itmc *internalTabletManagerClient) ReadReparentJournalInfo(ctx context.Con return 0, fmt.Errorf("not implemented in vtcombo") } +<<<<<<< HEAD func (itmc *internalTabletManagerClient) DemotePrimary(context.Context, *topodatapb.Tablet) (*replicationdatapb.PrimaryStatus, error) { return nil, fmt.Errorf("not implemented in vtcombo") +======= +func (itmc *internalTabletManagerClient) DemotePrimary(context.Context, *topodatapb.Tablet, bool) (*replicationdatapb.PrimaryStatus, error) { + return nil, errors.New("not implemented in vtcombo") +>>>>>>> 1f49de43ad (Add new `force` flag to `DemotePrimary` to force a demotion even when blocked on waiting for semi-sync acks (#18714)) } func (itmc *internalTabletManagerClient) UndoDemotePrimary(context.Context, *topodatapb.Tablet, bool) error { diff --git a/go/vt/vtctl/grpcvtctldserver/testutil/test_tmclient.go b/go/vt/vtctl/grpcvtctldserver/testutil/test_tmclient.go index cbdb59a9e03..e5c63e9b804 100644 --- a/go/vt/vtctl/grpcvtctldserver/testutil/test_tmclient.go +++ b/go/vt/vtctl/grpcvtctldserver/testutil/test_tmclient.go @@ -553,7 +553,7 @@ func (fake *TabletManagerClient) ChangeType(ctx context.Context, tablet *topodat } // DemotePrimary is part of the tmclient.TabletManagerClient interface. -func (fake *TabletManagerClient) DemotePrimary(ctx context.Context, tablet *topodatapb.Tablet) (*replicationdatapb.PrimaryStatus, error) { +func (fake *TabletManagerClient) DemotePrimary(ctx context.Context, tablet *topodatapb.Tablet, force bool) (*replicationdatapb.PrimaryStatus, error) { if fake.DemotePrimaryResults == nil { return nil, assert.AnError } diff --git a/go/vt/vtctl/reparentutil/emergency_reparenter_test.go b/go/vt/vtctl/reparentutil/emergency_reparenter_test.go index fdc080156c5..87bca61218d 100644 --- a/go/vt/vtctl/reparentutil/emergency_reparenter_test.go +++ b/go/vt/vtctl/reparentutil/emergency_reparenter_test.go @@ -18,7 +18,11 @@ package reparentutil import ( "context" +<<<<<<< HEAD "fmt" +======= + "errors" +>>>>>>> 1f49de43ad (Add new `force` flag to `DemotePrimary` to force a demotion even when blocked on waiting for semi-sync acks (#18714)) "slices" "testing" "time" @@ -4673,7 +4677,7 @@ func getRelayLogPosition(gtidSets ...string) string { res += "," } first = false - res += fmt.Sprintf("%s:%s", uuids[idx], set) + res += uuids[idx] + ":" + set } return res } diff --git a/go/vt/vtctl/reparentutil/planned_reparenter.go b/go/vt/vtctl/reparentutil/planned_reparenter.go index dcd6dc7c590..52b51b52b8f 100644 --- a/go/vt/vtctl/reparentutil/planned_reparenter.go +++ b/go/vt/vtctl/reparentutil/planned_reparenter.go @@ -275,7 +275,7 @@ func (pr *PlannedReparenter) performGracefulPromotion( demoteCtx, demoteCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer demoteCancel() - primaryStatus, err := pr.tmc.DemotePrimary(demoteCtx, currentPrimary.Tablet) + primaryStatus, err := pr.tmc.DemotePrimary(demoteCtx, currentPrimary.Tablet, false) if err != nil { return vterrors.Wrapf(err, "failed to DemotePrimary on current primary %v: %v", currentPrimary.AliasString(), err) } @@ -426,7 +426,7 @@ func (pr *PlannedReparenter) performPotentialPromotion( // tablet type), that's already in read-only. pr.logger.Infof("demoting tablet %v", alias) - primaryStatus, err := pr.tmc.DemotePrimary(stopAllCtx, tablet) + primaryStatus, err := pr.tmc.DemotePrimary(stopAllCtx, tablet, false) if err != nil { rec.RecordError(vterrors.Wrapf(err, "DemotePrimary(%v) failed on contested primary", alias)) diff --git a/go/vt/vtctl/reparentutil/policy/durability.go b/go/vt/vtctl/reparentutil/policy/durability.go index bad6846ef29..1ff88267fbf 100644 --- a/go/vt/vtctl/reparentutil/policy/durability.go +++ b/go/vt/vtctl/reparentutil/policy/durability.go @@ -91,6 +91,8 @@ type Durabler interface { SemiSyncAckers(*topodatapb.Tablet) int // IsReplicaSemiSync returns whether the "replica" should send semi-sync acks if "primary" were to become the PRIMARY instance IsReplicaSemiSync(primary, replica *topodatapb.Tablet) bool + // HasSemiSync returns whether the durability policy uses semi-sync. + HasSemiSync() bool } func RegisterDurability(name string, newDurablerFunc NewDurabler) { @@ -142,6 +144,11 @@ func IsReplicaSemiSync(durability Durabler, primary, replica *topodatapb.Tablet) return durability.IsReplicaSemiSync(primary, replica) } +// HasSemiSync returns true if the durability policy uses semi-sync. +func HasSemiSync(durability Durabler) bool { + return durability.HasSemiSync() +} + //======================================================================= // durabilityNone has no semi-sync and returns NeutralPromoteRule for Primary and Replica tablet types, MustNotPromoteRule for everything else @@ -166,6 +173,11 @@ func (d *durabilityNone) IsReplicaSemiSync(primary, replica *topodatapb.Tablet) return false } +// HasSemiSync implements the Durabler interface +func (d *durabilityNone) HasSemiSync() bool { + return false +} + //======================================================================= // durabilitySemiSync has 1 semi-sync setup. It only allows Primary and Replica type servers to acknowledge semi sync @@ -199,6 +211,11 @@ func (d *durabilitySemiSync) IsReplicaSemiSync(primary, replica *topodatapb.Tabl return false } +// HasSemiSync implements the Durabler interface +func (d *durabilitySemiSync) HasSemiSync() bool { + return true +} + //======================================================================= // durabilityCrossCell has 1 semi-sync setup. It only allows Primary and Replica type servers from a different cell to acknowledge semi sync. @@ -233,6 +250,11 @@ func (d *durabilityCrossCell) IsReplicaSemiSync(primary, replica *topodatapb.Tab return false } +// HasSemiSync implements the Durabler interface +func (d *durabilityCrossCell) HasSemiSync() bool { + return true +} + //======================================================================= // durabilityTest is like durabilityNone. It overrides the type for a specific tablet to prefer. It is only meant to be used for testing purposes! @@ -260,3 +282,8 @@ func (d *durabilityTest) SemiSyncAckers(tablet *topodatapb.Tablet) int { func (d *durabilityTest) IsReplicaSemiSync(primary, replica *topodatapb.Tablet) bool { return false } + +// HasSemiSync implements the Durabler interface +func (d *durabilityTest) HasSemiSync() bool { + return false +} diff --git a/go/vt/vtctl/reparentutil/policy/durability_test.go b/go/vt/vtctl/reparentutil/policy/durability_test.go index 441275f29bf..e3a4710b2d1 100644 --- a/go/vt/vtctl/reparentutil/policy/durability_test.go +++ b/go/vt/vtctl/reparentutil/policy/durability_test.go @@ -331,3 +331,10 @@ func TestDurabilityTest(t *testing.T) { }) } } + +func TestHasSemiSync(t *testing.T) { + require.False(t, HasSemiSync(&durabilityNone{})) + require.False(t, HasSemiSync(&durabilityTest{})) + require.True(t, HasSemiSync(&durabilitySemiSync{})) + require.True(t, HasSemiSync(&durabilityCrossCell{})) +} diff --git a/go/vt/vtctl/reparentutil/replication.go b/go/vt/vtctl/reparentutil/replication.go index 096cb7166ee..e13a3e0dd44 100644 --- a/go/vt/vtctl/reparentutil/replication.go +++ b/go/vt/vtctl/reparentutil/replication.go @@ -255,7 +255,7 @@ func stopReplicationAndBuildStatusMaps( if isSQLErr && sqlErr != nil && sqlErr.Number() == sqlerror.ERNotReplica { var primaryStatus *replicationdatapb.PrimaryStatus - primaryStatus, err = tmc.DemotePrimary(groupCtx, tabletInfo.Tablet) + primaryStatus, err = tmc.DemotePrimary(groupCtx, tabletInfo.Tablet, true /* force */) if err != nil { msg := "replica %v thinks it's primary but we failed to demote it: %v" err = vterrors.Wrapf(err, msg, alias, err) diff --git a/go/vt/vtctl/reparentutil/replication_test.go b/go/vt/vtctl/reparentutil/replication_test.go index be0b47e7246..d0fedde0f8a 100644 --- a/go/vt/vtctl/reparentutil/replication_test.go +++ b/go/vt/vtctl/reparentutil/replication_test.go @@ -224,7 +224,7 @@ type stopReplicationAndBuildStatusMapsTestTMClient struct { stopReplicationAndGetStatusDelays map[string]time.Duration } -func (fake *stopReplicationAndBuildStatusMapsTestTMClient) DemotePrimary(ctx context.Context, tablet *topodatapb.Tablet) (*replicationdatapb.PrimaryStatus, error) { +func (fake *stopReplicationAndBuildStatusMapsTestTMClient) DemotePrimary(ctx context.Context, tablet *topodatapb.Tablet, force bool) (*replicationdatapb.PrimaryStatus, error) { if tablet.Alias == nil { return nil, assert.AnError } diff --git a/go/vt/vttablet/faketmclient/fake_client.go b/go/vt/vttablet/faketmclient/fake_client.go index d1c9da62210..2378c4acb54 100644 --- a/go/vt/vttablet/faketmclient/fake_client.go +++ b/go/vt/vttablet/faketmclient/fake_client.go @@ -378,7 +378,7 @@ func (client *FakeTabletManagerClient) ReadReparentJournalInfo(ctx context.Conte } // DemotePrimary is part of the tmclient.TabletManagerClient interface. -func (client *FakeTabletManagerClient) DemotePrimary(ctx context.Context, tablet *topodatapb.Tablet) (*replicationdatapb.PrimaryStatus, error) { +func (client *FakeTabletManagerClient) DemotePrimary(ctx context.Context, tablet *topodatapb.Tablet, force bool) (*replicationdatapb.PrimaryStatus, error) { return nil, nil } diff --git a/go/vt/vttablet/grpctmclient/client.go b/go/vt/vttablet/grpctmclient/client.go index 289930d3c5c..0cc00c6d6fb 100644 --- a/go/vt/vttablet/grpctmclient/client.go +++ b/go/vt/vttablet/grpctmclient/client.go @@ -1191,13 +1191,13 @@ func (client *Client) InitReplica(ctx context.Context, tablet *topodatapb.Tablet } // DemotePrimary is part of the tmclient.TabletManagerClient interface. -func (client *Client) DemotePrimary(ctx context.Context, tablet *topodatapb.Tablet) (*replicationdatapb.PrimaryStatus, error) { +func (client *Client) DemotePrimary(ctx context.Context, tablet *topodatapb.Tablet, force bool) (*replicationdatapb.PrimaryStatus, error) { c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return nil, err } defer closer.Close() - response, err := c.DemotePrimary(ctx, &tabletmanagerdatapb.DemotePrimaryRequest{}) + response, err := c.DemotePrimary(ctx, &tabletmanagerdatapb.DemotePrimaryRequest{Force: force}) if err != nil { return nil, vterrors.FromGRPC(err) } diff --git a/go/vt/vttablet/grpctmserver/server.go b/go/vt/vttablet/grpctmserver/server.go index 23a6f88794f..3f81062dc60 100644 --- a/go/vt/vttablet/grpctmserver/server.go +++ b/go/vt/vttablet/grpctmserver/server.go @@ -607,7 +607,7 @@ func (s *server) DemotePrimary(ctx context.Context, request *tabletmanagerdatapb defer s.tm.HandleRPCPanic(ctx, "DemotePrimary", request, response, true /*verbose*/, &err) ctx = callinfo.GRPCCallInfo(ctx) response = &tabletmanagerdatapb.DemotePrimaryResponse{} - status, err := s.tm.DemotePrimary(ctx) + status, err := s.tm.DemotePrimary(ctx, request.Force) if err == nil { response.PrimaryStatus = status } diff --git a/go/vt/vttablet/tabletmanager/rpc_agent.go b/go/vt/vttablet/tabletmanager/rpc_agent.go index 5a5b2cb99c3..9e341662ae4 100644 --- a/go/vt/vttablet/tabletmanager/rpc_agent.go +++ b/go/vt/vttablet/tabletmanager/rpc_agent.go @@ -148,7 +148,7 @@ type RPCTM interface { InitReplica(ctx context.Context, parent *topodatapb.TabletAlias, replicationPosition string, timeCreatedNS int64, semiSync bool) error - DemotePrimary(ctx context.Context) (*replicationdatapb.PrimaryStatus, error) + DemotePrimary(ctx context.Context, force bool) (*replicationdatapb.PrimaryStatus, error) UndoDemotePrimary(ctx context.Context, semiSync bool) error diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index b0a4ff2ccd6..a1f3f7c2b11 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -548,20 +548,20 @@ func (tm *TabletManager) InitReplica(ctx context.Context, parent *topodatapb.Tab // or on a tablet that already transitioned to REPLICA. // // If a step fails in the middle, it will try to undo any changes it made. -func (tm *TabletManager) DemotePrimary(ctx context.Context) (*replicationdatapb.PrimaryStatus, error) { +func (tm *TabletManager) DemotePrimary(ctx context.Context, force bool) (*replicationdatapb.PrimaryStatus, error) { log.Infof("DemotePrimary") if err := tm.waitForGrantsToHaveApplied(ctx); err != nil { return nil, err } // The public version always reverts on partial failure. - return tm.demotePrimary(ctx, true /* revertPartialFailure */) + return tm.demotePrimary(ctx, true /* revertPartialFailure */, force) } // demotePrimary implements DemotePrimary with an additional, private option. // // If revertPartialFailure is true, and a step fails in the middle, it will try // to undo any changes it made. -func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure bool) (primaryStatus *replicationdatapb.PrimaryStatus, finalErr error) { +func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure bool, force bool) (primaryStatus *replicationdatapb.PrimaryStatus, finalErr error) { if err := tm.lock(ctx); err != nil { return nil, err } @@ -623,13 +623,66 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure }() } - // Now we know no writes are in-flight and no new writes can occur. - // We just need to wait for no write being blocked on semi-sync ACKs. - err = tm.SemiSyncMonitor.WaitUntilSemiSyncUnblocked(ctx) + isSemiSyncBlocked, err := tm.MysqlDaemon.IsSemiSyncBlocked(ctx) if err != nil { return nil, err } + // `force` is true when `DemotePrimary` is called for `EmergencyReparentShard` or when a primary notices + // that a different tablet has been promoted to primary and demotes itself. + // + // In both cases, the reason for semi sync being blocked is very likely that there's no replica + // connected that can send semi-sync ACKs, so we need to disable semi-sync to enable read-only mode. + // And in either of these cases, it's almost guaranteed that no semi-sync enabled replica will connect + // to this tablet again. + // + // The only way for us to finish the demotion in this scenario is to disable semi-sync - otherwise + // enabling ``super_read_only` will end up waiting indefinitely for in-flight transactions + // to complete, which won't happen as they are waiting for semi-sync ACKs. + // + // By disabling semi-sync, we allow the blocking in-flight transactions to complete. Note that at this point, + // the query service is already disabled, so the original sessions that issued those writes + // will never have seen their transactions commit - they will already have received an error. + // + // The demoted primary will end up with errant GTIDs, but that's unavoidable in this scenario. + if force && isSemiSyncBlocked { + if tm.isPrimarySideSemiSyncEnabled(ctx) { + // Disable the primary side semi-sync to unblock the writes. + if err := tm.fixSemiSync(ctx, topodatapb.TabletType_REPLICA, SemiSyncActionSet); err != nil { + return nil, err + } + defer func() { + if finalErr != nil && revertPartialFailure && wasPrimary { + // enable primary-side semi-sync again + if err := tm.fixSemiSync(ctx, topodatapb.TabletType_PRIMARY, SemiSyncActionSet); err != nil { + log.Warningf("fixSemiSync(PRIMARY) failed during revert: %v", err) + } + } + }() + } + } else { + // If `force` is false, we're demoting this primary as part of a `PlannedReparentShard` operation, + // but we might be blocked on semi-sync ACKs. + // + // If there's any in-flight transactions waiting for semi-sync ACKs, + // we won't be able to change the MySQL `super_read_only` because turning on + // read only mode requires all in-flight transactions to complete. + // + // So we're doing a last-ditch effort here trying to wait for in-flight transactions to complete. + // This will only be successful if at least one semi-sync enabled replica connects back to this primary + // and a new transaction commit unblocks the semi-sync wait. + // + // The scenario where this could happen is some sort of network hiccup during a + // `PlannedReparentShard` call, where the primary temporarily loses connectivity to + // all semi-sync enabled replicas. + // + // If we can't unblock within the context timeout, the `PlannedReparentShard` operation will fail. + err = tm.SemiSyncMonitor.WaitUntilSemiSyncUnblocked(ctx) + if err != nil { + return nil, err + } + } + // We can now set MySQL to super_read_only mode. If we are already super_read_only because of a // previous demotion, or because we are not primary anyway, this should be // idempotent. @@ -651,8 +704,7 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure } }() - // Here, we check if the primary side semi sync is enabled or not. If it isn't enabled then we do not need to take any action. - // If it is enabled then we should turn it off and revert in case of failure. + // If we haven't disabled the primary side semi-sync so far, do it now. if tm.isPrimarySideSemiSyncEnabled(ctx) { // If using semi-sync, we need to disable primary-side. if err := tm.fixSemiSync(ctx, topodatapb.TabletType_REPLICA, SemiSyncActionSet); err != nil { diff --git a/go/vt/vttablet/tabletmanager/rpc_replication_test.go b/go/vt/vttablet/tabletmanager/rpc_replication_test.go index 7ba8fc6729c..31da3abb732 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication_test.go @@ -98,7 +98,7 @@ func TestDemotePrimaryStalled(t *testing.T) { } go func() { - tm.demotePrimary(context.Background(), false) + tm.demotePrimary(context.Background(), false /* revertPartialFailure */, false /* force */) }() // We make IsServing stall by making it wait on a channel. // This should cause the demote primary operation to be stalled. @@ -149,7 +149,7 @@ func TestDemotePrimaryWaitingForSemiSyncUnblock(t *testing.T) { // Start the demote primary operation in a go routine. var demotePrimaryFinished atomic.Bool go func() { - _, err := tm.demotePrimary(ctx, false) + _, err := tm.demotePrimary(ctx, false /* revertPartialFailure */, false /* force */) require.NoError(t, err) demotePrimaryFinished.Store(true) }() @@ -220,7 +220,7 @@ func TestDemotePrimaryWithSemiSyncProgressDetection(t *testing.T) { // Start the demote primary operation in a go routine. var demotePrimaryFinished atomic.Bool go func() { - _, err := tm.demotePrimary(ctx, false) + _, err := tm.demotePrimary(ctx, false /* revertPartialFailure */, false /* force */) require.NoError(t, err) demotePrimaryFinished.Store(true) }() @@ -278,7 +278,7 @@ func TestDemotePrimaryWhenSemiSyncBecomesUnblockedBetweenChecks(t *testing.T) { // Start the demote primary operation in a go routine. var demotePrimaryFinished atomic.Bool go func() { - _, err := tm.demotePrimary(ctx, false) + _, err := tm.demotePrimary(ctx, false /* revertPartialFailure */, false /* force */) require.NoError(t, err) demotePrimaryFinished.Store(true) }() diff --git a/go/vt/vttablet/tabletmanager/shard_sync.go b/go/vt/vttablet/tabletmanager/shard_sync.go index 97397ef2a34..546f60c96fe 100644 --- a/go/vt/vttablet/tabletmanager/shard_sync.go +++ b/go/vt/vttablet/tabletmanager/shard_sync.go @@ -242,7 +242,7 @@ func (tm *TabletManager) endPrimaryTerm(ctx context.Context, primaryAlias *topod log.Infof("Active reparents are enabled; converting MySQL to replica.") demotePrimaryCtx, cancelDemotePrimary := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancelDemotePrimary() - if _, err := tm.demotePrimary(demotePrimaryCtx, false /* revertPartialFailure */); err != nil { + if _, err := tm.demotePrimary(demotePrimaryCtx, false /* revertPartialFailure */, true /* force */); err != nil { return vterrors.Wrap(err, "failed to demote primary") } setPrimaryCtx, cancelSetPrimary := context.WithTimeout(ctx, topo.RemoteOperationTimeout) diff --git a/go/vt/vttablet/tmclient/rpc_client_api.go b/go/vt/vttablet/tmclient/rpc_client_api.go index f1037822c74..5a61f5d3d1d 100644 --- a/go/vt/vttablet/tmclient/rpc_client_api.go +++ b/go/vt/vttablet/tmclient/rpc_client_api.go @@ -254,7 +254,7 @@ type TabletManagerClient interface { // DemotePrimary tells the soon-to-be-former primary it's going to change, // and it should go read-only and return its current position. - DemotePrimary(ctx context.Context, tablet *topodatapb.Tablet) (*replicationdatapb.PrimaryStatus, error) + DemotePrimary(ctx context.Context, tablet *topodatapb.Tablet, force bool) (*replicationdatapb.PrimaryStatus, error) // UndoDemotePrimary reverts all changes made by DemotePrimary // To be used if we are unable to promote the chosen new primary diff --git a/go/vt/vttablet/tmrpctest/test_tm_rpc.go b/go/vt/vttablet/tmrpctest/test_tm_rpc.go index 2e6bf98ae6b..a6bac299e05 100644 --- a/go/vt/vttablet/tmrpctest/test_tm_rpc.go +++ b/go/vt/vttablet/tmrpctest/test_tm_rpc.go @@ -1266,7 +1266,7 @@ func tmRPCTestInitReplicaPanic(ctx context.Context, t *testing.T, client tmclien expectHandleRPCPanic(t, "InitReplica", true /*verbose*/, err) } -func (fra *fakeRPCTM) DemotePrimary(ctx context.Context) (*replicationdatapb.PrimaryStatus, error) { +func (fra *fakeRPCTM) DemotePrimary(ctx context.Context, force bool) (*replicationdatapb.PrimaryStatus, error) { if fra.panics { panic(fmt.Errorf("test-triggered panic")) } @@ -1274,12 +1274,12 @@ func (fra *fakeRPCTM) DemotePrimary(ctx context.Context) (*replicationdatapb.Pri } func tmRPCTestDemotePrimary(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.Tablet) { - PrimaryStatus, err := client.DemotePrimary(ctx, tablet) + PrimaryStatus, err := client.DemotePrimary(ctx, tablet, false) compareError(t, "DemotePrimary", err, PrimaryStatus.Position, testPrimaryStatus.Position) } func tmRPCTestDemotePrimaryPanic(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.Tablet) { - _, err := client.DemotePrimary(ctx, tablet) + _, err := client.DemotePrimary(ctx, tablet, false) expectHandleRPCPanic(t, "DemotePrimary", true /*verbose*/, err) } diff --git a/proto/tabletmanagerdata.proto b/proto/tabletmanagerdata.proto index fc8acdca8a5..e46bc63dbd4 100644 --- a/proto/tabletmanagerdata.proto +++ b/proto/tabletmanagerdata.proto @@ -485,6 +485,7 @@ message InitReplicaResponse { } message DemotePrimaryRequest { + bool force = 1; } message DemotePrimaryResponse { diff --git a/web/vtadmin/src/proto/vtadmin.d.ts b/web/vtadmin/src/proto/vtadmin.d.ts index b4de009404b..bb3ae11e8b8 100644 --- a/web/vtadmin/src/proto/vtadmin.d.ts +++ b/web/vtadmin/src/proto/vtadmin.d.ts @@ -30189,6 +30189,9 @@ export namespace tabletmanagerdata { /** Properties of a DemotePrimaryRequest. */ interface IDemotePrimaryRequest { + + /** DemotePrimaryRequest force */ + force?: (boolean|null); } /** Represents a DemotePrimaryRequest. */ @@ -30200,6 +30203,9 @@ export namespace tabletmanagerdata { */ constructor(properties?: tabletmanagerdata.IDemotePrimaryRequest); + /** DemotePrimaryRequest force. */ + public force: boolean; + /** * Creates a new DemotePrimaryRequest instance using the specified properties. * @param [properties] Properties to set diff --git a/web/vtadmin/src/proto/vtadmin.js b/web/vtadmin/src/proto/vtadmin.js index a853d19df26..4b71eee1511 100644 --- a/web/vtadmin/src/proto/vtadmin.js +++ b/web/vtadmin/src/proto/vtadmin.js @@ -68939,6 +68939,7 @@ export const tabletmanagerdata = $root.tabletmanagerdata = (() => { * Properties of a DemotePrimaryRequest. * @memberof tabletmanagerdata * @interface IDemotePrimaryRequest + * @property {boolean|null} [force] DemotePrimaryRequest force */ /** @@ -68956,6 +68957,14 @@ export const tabletmanagerdata = $root.tabletmanagerdata = (() => { this[keys[i]] = properties[keys[i]]; } + /** + * DemotePrimaryRequest force. + * @member {boolean} force + * @memberof tabletmanagerdata.DemotePrimaryRequest + * @instance + */ + DemotePrimaryRequest.prototype.force = false; + /** * Creates a new DemotePrimaryRequest instance using the specified properties. * @function create @@ -68980,6 +68989,8 @@ export const tabletmanagerdata = $root.tabletmanagerdata = (() => { DemotePrimaryRequest.encode = function encode(message, writer) { if (!writer) writer = $Writer.create(); + if (message.force != null && Object.hasOwnProperty.call(message, "force")) + writer.uint32(/* id 1, wireType 0 =*/8).bool(message.force); return writer; }; @@ -69014,6 +69025,10 @@ export const tabletmanagerdata = $root.tabletmanagerdata = (() => { while (reader.pos < end) { let tag = reader.uint32(); switch (tag >>> 3) { + case 1: { + message.force = reader.bool(); + break; + } default: reader.skipType(tag & 7); break; @@ -69049,6 +69064,9 @@ export const tabletmanagerdata = $root.tabletmanagerdata = (() => { DemotePrimaryRequest.verify = function verify(message) { if (typeof message !== "object" || message === null) return "object expected"; + if (message.force != null && message.hasOwnProperty("force")) + if (typeof message.force !== "boolean") + return "force: boolean expected"; return null; }; @@ -69063,7 +69081,10 @@ export const tabletmanagerdata = $root.tabletmanagerdata = (() => { DemotePrimaryRequest.fromObject = function fromObject(object) { if (object instanceof $root.tabletmanagerdata.DemotePrimaryRequest) return object; - return new $root.tabletmanagerdata.DemotePrimaryRequest(); + let message = new $root.tabletmanagerdata.DemotePrimaryRequest(); + if (object.force != null) + message.force = Boolean(object.force); + return message; }; /** @@ -69075,8 +69096,15 @@ export const tabletmanagerdata = $root.tabletmanagerdata = (() => { * @param {$protobuf.IConversionOptions} [options] Conversion options * @returns {Object.} Plain object */ - DemotePrimaryRequest.toObject = function toObject() { - return {}; + DemotePrimaryRequest.toObject = function toObject(message, options) { + if (!options) + options = {}; + let object = {}; + if (options.defaults) + object.force = false; + if (message.force != null && message.hasOwnProperty("force")) + object.force = message.force; + return object; }; /** From c1a8c99f9477e6726ab35b47a7c31974cd734cd9 Mon Sep 17 00:00:00 2001 From: Mohamed Hamza Date: Tue, 27 Jan 2026 16:42:07 -0500 Subject: [PATCH 2/2] fix conflicts Signed-off-by: Mohamed Hamza --- go/vt/vtcombo/tablet_map.go | 7 ++--- .../reparentutil/emergency_reparenter_test.go | 31 ++++++++++--------- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/go/vt/vtcombo/tablet_map.go b/go/vt/vtcombo/tablet_map.go index 807d8fe197f..97d9047b4cf 100644 --- a/go/vt/vtcombo/tablet_map.go +++ b/go/vt/vtcombo/tablet_map.go @@ -18,6 +18,7 @@ package vtcombo import ( "context" + "errors" "fmt" "os" "path" @@ -1017,13 +1018,8 @@ func (itmc *internalTabletManagerClient) ReadReparentJournalInfo(ctx context.Con return 0, fmt.Errorf("not implemented in vtcombo") } -<<<<<<< HEAD -func (itmc *internalTabletManagerClient) DemotePrimary(context.Context, *topodatapb.Tablet) (*replicationdatapb.PrimaryStatus, error) { - return nil, fmt.Errorf("not implemented in vtcombo") -======= func (itmc *internalTabletManagerClient) DemotePrimary(context.Context, *topodatapb.Tablet, bool) (*replicationdatapb.PrimaryStatus, error) { return nil, errors.New("not implemented in vtcombo") ->>>>>>> 1f49de43ad (Add new `force` flag to `DemotePrimary` to force a demotion even when blocked on waiting for semi-sync acks (#18714)) } func (itmc *internalTabletManagerClient) UndoDemotePrimary(context.Context, *topodatapb.Tablet, bool) error { @@ -1108,6 +1104,7 @@ func (itmc *internalTabletManagerClient) ResetReplicationParameters(context.Cont func (itmc *internalTabletManagerClient) ReplicaWasRestarted(context.Context, *topodatapb.Tablet, *topodatapb.TabletAlias) error { return fmt.Errorf("not implemented in vtcombo") } + func (itmc *internalTabletManagerClient) ResetSequences(ctx context.Context, tablet *topodatapb.Tablet, tables []string) error { return fmt.Errorf("not implemented in vtcombo") } diff --git a/go/vt/vtctl/reparentutil/emergency_reparenter_test.go b/go/vt/vtctl/reparentutil/emergency_reparenter_test.go index 87bca61218d..4444f63e1f7 100644 --- a/go/vt/vtctl/reparentutil/emergency_reparenter_test.go +++ b/go/vt/vtctl/reparentutil/emergency_reparenter_test.go @@ -18,11 +18,7 @@ package reparentutil import ( "context" -<<<<<<< HEAD - "fmt" -======= "errors" ->>>>>>> 1f49de43ad (Add new `force` flag to `DemotePrimary` to force a demotion even when blocked on waiting for semi-sync acks (#18714)) "slices" "testing" "time" @@ -1871,7 +1867,6 @@ func TestEmergencyReparenter_reparentShardLocked(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -2021,7 +2016,7 @@ func TestEmergencyReparenter_promotionOfNewPrimary(t *testing.T) { Error error }{ "zone1-0000000100": { - Error: fmt.Errorf("primary position error"), + Error: errors.New("primary position error"), }, }, }, @@ -3205,7 +3200,7 @@ func TestEmergencyReparenter_reparentReplicas(t *testing.T) { Error error }{ "zone1-0000000100": { - Error: fmt.Errorf("primary position error"), + Error: errors.New("primary position error"), }, }, }, @@ -3439,7 +3434,8 @@ func TestEmergencyReparenter_reparentReplicas(t *testing.T) { keyspace: "testkeyspace", shard: "-", shouldErr: false, - }, { + }, + { name: "single replica failing to SetReplicationSource does not fail the promotion", emergencyReparentOps: EmergencyReparentOptions{}, tmc: &testutil.TabletManagerClient{ @@ -3582,7 +3578,7 @@ func TestEmergencyReparenter_reparentReplicas(t *testing.T) { Shard: &topodatapb.Shard{ PrimaryAlias: &topodatapb.TabletAlias{ Cell: "zone1", - Uid: 000, + Uid: 0o00, }, }, }, @@ -3723,7 +3719,8 @@ func TestEmergencyReparenter_promoteIntermediateSource(t *testing.T) { Uid: 100, }, Hostname: "primary-elect", - }, { + }, + { Alias: &topodatapb.TabletAlias{ Cell: "zone1", Uid: 101, @@ -3744,7 +3741,8 @@ func TestEmergencyReparenter_promoteIntermediateSource(t *testing.T) { Uid: 100, }, Hostname: "primary-elect", - }, { + }, + { Alias: &topodatapb.TabletAlias{ Cell: "zone1", Uid: 101, @@ -3853,7 +3851,8 @@ func TestEmergencyReparenter_promoteIntermediateSource(t *testing.T) { }, }, }, - }, { + }, + { name: "success - only 2 tablets and they error", emergencyReparentOps: EmergencyReparentOptions{}, tmc: &testutil.TabletManagerClient{ @@ -3861,7 +3860,7 @@ func TestEmergencyReparenter_promoteIntermediateSource(t *testing.T) { "zone1-0000000100": nil, }, SetReplicationSourceResults: map[string]error{ - "zone1-0000000101": fmt.Errorf("An error"), + "zone1-0000000101": errors.New("An error"), }, }, newSourceTabletAlias: "zone1-0000000100", @@ -3968,7 +3967,8 @@ func TestEmergencyReparenter_promoteIntermediateSource(t *testing.T) { Uid: 100, }, Hostname: "primary-elect", - }, { + }, + { Alias: &topodatapb.TabletAlias{ Cell: "zone1", Uid: 101, @@ -4038,7 +4038,8 @@ func TestEmergencyReparenter_promoteIntermediateSource(t *testing.T) { Uid: 100, }, Hostname: "primary-elect", - }, { + }, + { Alias: &topodatapb.TabletAlias{ Cell: "zone1", Uid: 101,