Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 140 additions & 131 deletions go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions go/vt/topo/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"sort"
"strings"
"sync"
"time"

"golang.org/x/net/context"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

Expand Down Expand Up @@ -174,6 +176,16 @@ func (si *ShardInfo) HasMaster() bool {
return !topoproto.TabletAliasIsZero(si.Shard.MasterAlias)
}

// GetMasterTermStartTime returns the shard's master term start time as a Time value.
func (si *ShardInfo) GetMasterTermStartTime() time.Time {
return logutil.ProtoToTime(si.Shard.MasterTermStartTime)
}

// SetMasterTermStartTime sets the shard's master term start time as a Time value.
func (si *ShardInfo) SetMasterTermStartTime(t time.Time) {
si.Shard.MasterTermStartTime = logutil.TimeToProto(t)
}

// GetShard is a high level function to read shard data.
// It generates trace spans.
func (ts *Server) GetShard(ctx context.Context, keyspace, shard string) (*ShardInfo, error) {
Expand Down
12 changes: 12 additions & 0 deletions go/vt/topo/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"path"
"sync"
"time"

"golang.org/x/net/context"
"vitess.io/vitess/go/vt/proto/vtrpc"
Expand All @@ -30,6 +31,7 @@ import (
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo/events"
Expand Down Expand Up @@ -207,6 +209,16 @@ func (ti *TabletInfo) IsSlaveType() bool {
return IsSlaveType(ti.Type)
}

// GetMasterTermStartTime returns the tablet's master term start time as a Time value.
func (ti *TabletInfo) GetMasterTermStartTime() time.Time {
return logutil.ProtoToTime(ti.Tablet.MasterTermStartTime)
}

// SetMasterTermStartTime sets the tablet's master term start time as a Time value.
func (ti *TabletInfo) SetMasterTermStartTime(t time.Time) {
ti.Tablet.MasterTermStartTime = logutil.TimeToProto(t)
}

// NewTabletInfo returns a TabletInfo basing on tablet with the
// version set. This function should be only used by Server
// implementations.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtcombo/tablet_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ func (itmc *internalTabletManagerClient) SlaveWasPromoted(ctx context.Context, t
return fmt.Errorf("not implemented in vtcombo")
}

func (itmc *internalTabletManagerClient) SetMaster(ctx context.Context, tablet *topodatapb.Tablet, parent *topodatapb.TabletAlias, timeCreatedNS int64, forceStartSlave bool) error {
func (itmc *internalTabletManagerClient) SetMaster(ctx context.Context, tablet *topodatapb.Tablet, parent *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition string, forceStartSlave bool) error {
return fmt.Errorf("not implemented in vtcombo")
}

Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtctl/reparent.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func init() {
addCommand("Shards", command{
"PlannedReparentShard",
commandPlannedReparentShard,
"-keyspace_shard=<keyspace/shard> [-new_master=<tablet alias>] [-avoid_master=<tablet alias>]",
"-keyspace_shard=<keyspace/shard> [-new_master=<tablet alias>] [-avoid_master=<tablet alias>] [-wait_slave_timeout=<duration>]",
"Reparents the shard to the new master, or away from old master. Both old and new master need to be up and running."})
addCommand("Shards", command{
"EmergencyReparentShard",
Expand Down Expand Up @@ -107,7 +107,7 @@ func commandPlannedReparentShard(ctx context.Context, wr *wrangler.Wrangler, sub
return fmt.Errorf("active reparent commands disabled (unset the -disable_active_reparents flag to enable)")
}

waitSlaveTimeout := subFlags.Duration("wait_slave_timeout", *topo.RemoteOperationTimeout, "time to wait for slaves to catch up in reparenting")
waitSlaveTimeout := subFlags.Duration("wait_slave_timeout", *topo.RemoteOperationTimeout, "time to wait for replicas to catch up on replication before and after reparenting")
keyspaceShard := subFlags.String("keyspace_shard", "", "keyspace/shard of the shard that needs to be reparented")
newMaster := subFlags.String("new_master", "", "alias of a tablet that should be the new master")
avoidMaster := subFlags.String("avoid_master", "", "alias of a tablet that should not be the master, i.e. reparent to any other tablet if this one is the master")
Expand Down
8 changes: 5 additions & 3 deletions go/vt/vttablet/agentrpctest/test_agent_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,7 @@ func agentRPCTestInitMasterPanic(ctx context.Context, t *testing.T, client tmcli

var testPopulateReparentJournalCalled = false
var testTimeCreatedNS int64 = 4569900
var testWaitPosition string = "test wait position"
var testActionName = "TestActionName"
var testMasterAlias = &topodatapb.TabletAlias{
Cell: "ce",
Expand Down Expand Up @@ -1071,24 +1072,25 @@ func agentRPCTestSlaveWasPromotedPanic(ctx context.Context, t *testing.T, client
var testSetMasterCalled = false
var testForceStartSlave = true

func (fra *fakeRPCAgent) SetMaster(ctx context.Context, parent *topodatapb.TabletAlias, timeCreatedNS int64, forceStartSlave bool) error {
func (fra *fakeRPCAgent) SetMaster(ctx context.Context, parent *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition string, forceStartSlave bool) error {
if fra.panics {
panic(fmt.Errorf("test-triggered panic"))
}
compare(fra.t, "SetMaster parent", parent, testMasterAlias)
compare(fra.t, "SetMaster timeCreatedNS", timeCreatedNS, testTimeCreatedNS)
compare(fra.t, "SetMaster waitPosition", waitPosition, testWaitPosition)
compare(fra.t, "SetMaster forceStartSlave", forceStartSlave, testForceStartSlave)
testSetMasterCalled = true
return nil
}

func agentRPCTestSetMaster(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.Tablet) {
err := client.SetMaster(ctx, tablet, testMasterAlias, testTimeCreatedNS, testForceStartSlave)
err := client.SetMaster(ctx, tablet, testMasterAlias, testTimeCreatedNS, testWaitPosition, testForceStartSlave)
compareError(t, "SetMaster", err, true, testSetMasterCalled)
}

func agentRPCTestSetMasterPanic(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.Tablet) {
err := client.SetMaster(ctx, tablet, testMasterAlias, testTimeCreatedNS, testForceStartSlave)
err := client.SetMaster(ctx, tablet, testMasterAlias, testTimeCreatedNS, testWaitPosition, testForceStartSlave)
expectHandleRPCPanic(t, "SetMaster", true /*verbose*/, err)
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/faketmclient/fake_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (client *FakeTabletManagerClient) SlaveWasPromoted(ctx context.Context, tab
}

// SetMaster is part of the tmclient.TabletManagerClient interface.
func (client *FakeTabletManagerClient) SetMaster(ctx context.Context, tablet *topodatapb.Tablet, parent *topodatapb.TabletAlias, timeCreatedNS int64, forceStartSlave bool) error {
func (client *FakeTabletManagerClient) SetMaster(ctx context.Context, tablet *topodatapb.Tablet, parent *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition string, forceStartSlave bool) error {
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/grpctmclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ func (client *Client) SlaveWasPromoted(ctx context.Context, tablet *topodatapb.T
}

// SetMaster is part of the tmclient.TabletManagerClient interface.
func (client *Client) SetMaster(ctx context.Context, tablet *topodatapb.Tablet, parent *topodatapb.TabletAlias, timeCreatedNS int64, forceStartSlave bool) error {
func (client *Client) SetMaster(ctx context.Context, tablet *topodatapb.Tablet, parent *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition string, forceStartSlave bool) error {
cc, c, err := client.dial(tablet)
if err != nil {
return err
Expand All @@ -711,6 +711,7 @@ func (client *Client) SetMaster(ctx context.Context, tablet *topodatapb.Tablet,
_, err = c.SetMaster(ctx, &tabletmanagerdatapb.SetMasterRequest{
Parent: parent,
TimeCreatedNs: timeCreatedNS,
WaitPosition: waitPosition,
ForceStartSlave: forceStartSlave,
})
return err
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/grpctmserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (s *server) SetMaster(ctx context.Context, request *tabletmanagerdatapb.Set
defer s.agent.HandleRPCPanic(ctx, "SetMaster", request, response, true /*verbose*/, &err)
ctx = callinfo.GRPCCallInfo(ctx)
response = &tabletmanagerdatapb.SetMasterResponse{}
return response, s.agent.SetMaster(ctx, request.Parent, request.TimeCreatedNs, request.ForceStartSlave)
return response, s.agent.SetMaster(ctx, request.Parent, request.TimeCreatedNs, request.WaitPosition, request.ForceStartSlave)
}

func (s *server) SlaveWasRestarted(ctx context.Context, request *tabletmanagerdatapb.SlaveWasRestartedRequest) (response *tabletmanagerdatapb.SlaveWasRestartedResponse, err error) {
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vttablet/tabletmanager/init_tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error {
// Read the master term start time from tablet.
// If it is nil, it might mean that we are upgrading, so use current time instead
if oldTablet.MasterTermStartTime != nil {
agent.setMasterTermStartTime(logutil.ProtoToTime(oldTablet.MasterTermStartTime))
agent.setMasterTermStartTime(oldTablet.GetMasterTermStartTime())
} else {
agent.setMasterTermStartTime(time.Now())
}
Expand All @@ -137,8 +137,8 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error {
if oldTablet.Type == topodatapb.TabletType_MASTER {
// Our existing tablet type is master, but the shard record does not agree.
// Only take over if our master_term_start_time is after what is in the shard record
oldMasterTermStartTime := logutil.ProtoToTime(oldTablet.MasterTermStartTime)
currentShardTime := logutil.ProtoToTime(si.MasterTermStartTime)
oldMasterTermStartTime := oldTablet.GetMasterTermStartTime()
currentShardTime := si.GetMasterTermStartTime()
if oldMasterTermStartTime.After(currentShardTime) {
tabletType = topodatapb.TabletType_MASTER
// read the master term start time from tablet
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/replication_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func repairReplication(ctx context.Context, agent *ActionAgent) error {
}
}

return agent.setMasterRepairReplication(ctx, si.MasterAlias, 0, true)
return agent.setMasterRepairReplication(ctx, si.MasterAlias, 0, "", true)
}

func registerReplicationReporter(agent *ActionAgent) {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/rpc_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ type RPCAgent interface {

SlaveWasPromoted(ctx context.Context) error

SetMaster(ctx context.Context, parent *topodatapb.TabletAlias, timeCreatedNS int64, forceStartSlave bool) error
SetMaster(ctx context.Context, parent *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition string, forceStartSlave bool) error

SlaveWasRestarted(ctx context.Context, parent *topodatapb.TabletAlias) error

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/rpc_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (agent *ActionAgent) Backup(ctx context.Context, concurrency int, logger lo

// Change our type back to the original value.
// Original type could be master so pass in a real value for masterTermStartTime
_, err = topotools.ChangeType(bgCtx, agent.TopoServer, tablet.Alias, originalType, tablet.MasterTermStartTime)
_, err = topotools.ChangeType(bgCtx, agent.TopoServer, tablet.Alias, originalType, tablet.Tablet.MasterTermStartTime)
if err != nil {
// failure in changing the topology type is probably worse,
// so returning that (we logged the snapshot error anyway)
Expand Down
33 changes: 23 additions & 10 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,13 +522,13 @@ func (agent *ActionAgent) SlaveWasPromoted(ctx context.Context) error {

// SetMaster sets replication master, and waits for the
// reparent_journal table entry up to context timeout
func (agent *ActionAgent) SetMaster(ctx context.Context, parentAlias *topodatapb.TabletAlias, timeCreatedNS int64, forceStartSlave bool) error {
func (agent *ActionAgent) SetMaster(ctx context.Context, parentAlias *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition string, forceStartSlave bool) error {
if err := agent.lock(ctx); err != nil {
return err
}
defer agent.unlock()

if err := agent.setMasterLocked(ctx, parentAlias, timeCreatedNS, forceStartSlave); err != nil {
if err := agent.setMasterLocked(ctx, parentAlias, timeCreatedNS, waitPosition, forceStartSlave); err != nil {
return err
}

Expand All @@ -543,7 +543,7 @@ func (agent *ActionAgent) SetMaster(ctx context.Context, parentAlias *topodatapb
return nil
}

func (agent *ActionAgent) setMasterRepairReplication(ctx context.Context, parentAlias *topodatapb.TabletAlias, timeCreatedNS int64, forceStartSlave bool) (err error) {
func (agent *ActionAgent) setMasterRepairReplication(ctx context.Context, parentAlias *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition string, forceStartSlave bool) (err error) {
parent, err := agent.TopoServer.GetTablet(ctx, parentAlias)
if err != nil {
return err
Expand All @@ -556,10 +556,10 @@ func (agent *ActionAgent) setMasterRepairReplication(ctx context.Context, parent

defer unlock(&err)

return agent.setMasterLocked(ctx, parentAlias, timeCreatedNS, forceStartSlave)
return agent.setMasterLocked(ctx, parentAlias, timeCreatedNS, waitPosition, forceStartSlave)
}

func (agent *ActionAgent) setMasterLocked(ctx context.Context, parentAlias *topodatapb.TabletAlias, timeCreatedNS int64, forceStartSlave bool) (err error) {
func (agent *ActionAgent) setMasterLocked(ctx context.Context, parentAlias *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition string, forceStartSlave bool) (err error) {
// End orchestrator maintenance at the end of fixing replication.
// This is a best effort operation, so it should happen in a goroutine
defer func() {
Expand Down Expand Up @@ -648,11 +648,24 @@ func (agent *ActionAgent) setMasterLocked(ctx context.Context, parentAlias *topo
}
}

// If needed, wait until we replicate the specified row,
// or our context times out.
if shouldbeReplicating && timeCreatedNS != 0 {
if err := agent.MysqlDaemon.WaitForReparentJournal(ctx, timeCreatedNS); err != nil {
return err
// If needed, wait until we replicate to the specified point, or our context
// times out. Callers can specify the point to wait for as either a
// GTID-based replication position or a Vitess reparent journal entry,
// or both.
if shouldbeReplicating {
if waitPosition != "" {
pos, err := mysql.DecodePosition(waitPosition)
if err != nil {
return err
}
if err := agent.MysqlDaemon.WaitMasterPos(ctx, pos); err != nil {
return err
}
}
if timeCreatedNS != 0 {
if err := agent.MysqlDaemon.WaitForReparentJournal(ctx, timeCreatedNS); err != nil {
return err
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/shard_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func syncShardMaster(ctx context.Context, ts *topo.Server, tablet *topodatapb.Ta

var shardInfo *topo.ShardInfo
_, err = ts.UpdateShardFields(ctx, tablet.Keyspace, tablet.Shard, func(si *topo.ShardInfo) error {
lastTerm := logutil.ProtoToTime(si.MasterTermStartTime)
lastTerm := si.GetMasterTermStartTime()

// Save the ShardInfo so we can check it afterward.
// We can't use the return value of UpdateShardFields because it might be nil.
Expand Down Expand Up @@ -219,7 +219,7 @@ func (agent *ActionAgent) abortMasterTerm(ctx context.Context, masterAlias *topo
setMasterCtx, cancelSetMaster := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer cancelSetMaster()
log.Infof("Attempting to reparent self to new master %v.", masterAliasStr)
if err := agent.SetMaster(setMasterCtx, masterAlias, 0, true); err != nil {
if err := agent.SetMaster(setMasterCtx, masterAlias, 0, "", true); err != nil {
return vterrors.Wrap(err, "failed to reparent self to new master")
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tmclient/rpc_client_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ type TabletManagerClient interface {
// SetMaster tells a tablet to make itself a slave to the
// passed in master tablet alias, and wait for the row in the
// reparent_journal table (if timeCreatedNS is non-zero).
SetMaster(ctx context.Context, tablet *topodatapb.Tablet, parent *topodatapb.TabletAlias, timeCreatedNS int64, forceStartSlave bool) error
SetMaster(ctx context.Context, tablet *topodatapb.Tablet, parent *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition string, forceStartSlave bool) error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't changing the interface here cause problems during upgrade?
Old vtctld's wrangler will call the old version of SetMaster, which won't work on an already upgraded vttablet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the call crosses process boundaries, it gets encoded as protobuf on the wire. The protobuf level is thus where we need to ensure compatibility when changing existing RPCs.

Adding a new, optional field in the Request struct like this should be safe. The old vtctld will not try to use the new field because it doesn't know about it. The new vttablet will simply receive a Request protobuf with the new field unset, so it will be left on the zero value.


// SlaveWasRestarted tells the remote tablet its master has changed
SlaveWasRestarted(ctx context.Context, tablet *topodatapb.Tablet, parent *topodatapb.TabletAlias) error
Expand Down
Loading