Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions go/vt/vttablet/tabletmanager/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package tabletmanager

import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -128,6 +129,35 @@ func (orc *orcClient) EndMaintenance(tablet *topodatapb.Tablet) error {
return err
}

func (orc *orcClient) InActiveShardRecovery(tablet *topodatapb.Tablet) (bool, error) {
alias := fmt.Sprintf("%v.%v", tablet.GetKeyspace(), tablet.GetShard())

// TODO(zmagg): Replace this with simpler call to active-cluster-recovery
// when call with alias parameter is supported.
resp, err := orc.apiGet("audit-recovery", "alias", alias)

if err != nil {
return false, err
}

var r []map[string]interface{}

if err := json.Unmarshal(resp, &r); err != nil {
return false, err
}

if len(r) == 0 {
return false, fmt.Errorf("Orchestrator returned an empty audit-recovery response")
}

active, ok := r[0]["IsActive"].(bool)

if !ok {
return false, fmt.Errorf("Error parsing JSON response from Orchestrator")
}
return active, nil
}

func mysqlHostPort(tablet *topodatapb.Tablet) (host, port string, err error) {
mysqlPort := int(topoproto.MysqlPort(tablet))
if mysqlPort == 0 {
Expand Down
22 changes: 21 additions & 1 deletion go/vt/vttablet/tabletmanager/replication_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,34 @@ func repairReplication(ctx context.Context, agent *ActionAgent) error {

ts := agent.TopoServer
tablet := agent.Tablet()

si, err := ts.GetShard(ctx, tablet.Keyspace, tablet.Shard)
if err != nil {
return err
}
if !si.HasMaster() {
return fmt.Errorf("no master tablet for shard %v/%v", tablet.Keyspace, tablet.Shard)
}
return agent.setMasterLocked(ctx, si.MasterAlias, 0, true)

// If Orchestrator is configured and if Orchestrator is actively reparenting, we should not repairReplication
if agent.orc != nil {
re, err := agent.orc.InActiveShardRecovery(tablet)
if err != nil {
return err
}
if re {
return fmt.Errorf("Orchestrator actively reparenting shard %v, skipping repairReplication", si)
}

// Before repairing replication, tell Orchestrator to enter maintenance mode for this tablet and to
// lock any other actions on this tablet by Orchestrator.
if err := agent.orc.BeginMaintenance(agent.Tablet(), "vttablet has been told to StopSlave"); err != nil {
log.Warningf("Orchestrator BeginMaintenance failed: %v", err)
return fmt.Errorf("Orchestrator BeginMaintenance failed :%v, skipping repairReplication", err)
}
}

return agent.setMasterRepairReplication(ctx, si.MasterAlias, 0, true)
}

func registerReplicationReporter(agent *ActionAgent) {
Expand Down
40 changes: 27 additions & 13 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,26 +404,40 @@ func (agent *ActionAgent) SetMaster(ctx context.Context, parentAlias *topodatapb
return agent.setMasterLocked(ctx, parentAlias, timeCreatedNS, forceStartSlave)
}

func (agent *ActionAgent) setMasterLocked(ctx context.Context, parentAlias *topodatapb.TabletAlias, timeCreatedNS int64, forceStartSlave bool) error {
func (agent *ActionAgent) setMasterRepairReplication(ctx context.Context, parentAlias *topodatapb.TabletAlias, timeCreatedNS int64, forceStartSlave bool) (err error) {
parent, err := agent.TopoServer.GetTablet(ctx, parentAlias)
if err != nil {
return err
}

// If this tablet used to be a master, end orchestrator maintenance after we are connected to the new master.
ctx, unlock, lockErr := agent.TopoServer.LockShard(ctx, parent.Tablet.GetKeyspace(), parent.Tablet.GetShard(), fmt.Sprintf("repairReplication to %v as parent)", topoproto.TabletAliasString(parentAlias)))
if lockErr != nil {
return lockErr
}

defer unlock(&err)

return agent.setMasterLocked(ctx, parentAlias, timeCreatedNS, forceStartSlave)
}

func (agent *ActionAgent) setMasterLocked(ctx context.Context, parentAlias *topodatapb.TabletAlias, timeCreatedNS int64, forceStartSlave bool) (err error) {
parent, err := agent.TopoServer.GetTablet(ctx, parentAlias)
if err != nil {
return err
}

// End orchestrator maintenance at the end of fixing replication.
// This is a best effort operation, so it should happen in a goroutine
if agent.Tablet().Type == topodatapb.TabletType_MASTER {
defer func() {
go func() {
if agent.orc == nil {
return
}
if err := agent.orc.EndMaintenance(agent.Tablet()); err != nil {
log.Warningf("Orchestrator EndMaintenance failed: %v", err)
}
}()
defer func() {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is perhaps not super clear. I'm removing the conditional if agent.Tablet().Type == topodatapb.TabletType_MASTER here as, in the case where the tablet was a REPLICA, and we're calling into this function from repairReplication we also want to end Orchestrator maintenance at the end of this function. It's still best effort to end orchestrator maintenance even in that case, as Orchestrator maintenance windows are time bounded.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ok for now. Ideally, whoever calls begins maintenance should be the one ending it (in a defer).
So, in the case of repairReplication, the block that calls the beginMaintenance should defer endMaintenance.
But then, we should also move other call sites to the vtctld workflow (wrangler), where begin and end should be called as part of the reparent commands. This probably means that vtctld needs to know about Orc. So, this may be a more involved change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, that sounds a lot better. I'll do that refactoring in a quick followup to this PR.

go func() {
if agent.orc == nil {
return
}
if err := agent.orc.EndMaintenance(agent.Tablet()); err != nil {
log.Warningf("Orchestrator EndMaintenance failed: %v", err)
}
}()
}
}()

// See if we were replicating at all, and should be replicating
wasReplicating := false
Expand Down