Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions go/vt/topotools/keyspace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
Copyright 2021 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package topotools

import (
"context"
"sync"
"time"

"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet/tmclient"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// RefreshTabletsByShard calls RefreshState on all the tablets in a given shard.
//
// It only returns errors from looking up the tablet map from the topology;
// errors returned from any RefreshState RPCs are logged and then ignored. Also,
// any tablets without a .Hostname set in the topology are skipped.
func RefreshTabletsByShard(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, si *topo.ShardInfo, cells []string, logger logutil.Logger) error {
logger.Infof("RefreshTabletsByShard called on shard %v/%v", si.Keyspace(), si.ShardName())

tabletMap, err := ts.GetTabletMapForShardByCell(ctx, si.Keyspace(), si.ShardName(), cells)
switch {
case err == nil:
// keep going
case topo.IsErrType(err, topo.PartialResult):
logger.Warningf("RefreshTabletsByShard: got partial result for shard %v/%v, may not refresh all tablets everywhere", si.Keyspace(), si.ShardName())
default:
return err
}

// Any errors from this point onward are ignored.
var wg sync.WaitGroup
for _, ti := range tabletMap {
if ti.Hostname == "" {
// The tablet is not running, we don't have the host
// name to connect to, so we just skip this tablet.
logger.Infof("Tablet %v has no hostname, skipping its RefreshState", ti.AliasString())
continue
}

wg.Add(1)
go func(ti *topo.TabletInfo) {
defer wg.Done()
logger.Infof("Calling RefreshState on tablet %v", ti.AliasString())

ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()

if err := tmc.RefreshState(ctx, ti.Tablet); err != nil {
logger.Warningf("RefreshTabletsByShard: failed to refresh %v: %v", ti.AliasString(), err)
}
}(ti)
}

wg.Wait()
return nil
}

// UpdateShardRecords updates the shard records based on 'from' or 'to'
// direction.
func UpdateShardRecords(
ctx context.Context,
ts *topo.Server,
tmc tmclient.TabletManagerClient,
keyspace string,
shards []*topo.ShardInfo,
cells []string,
servedType topodatapb.TabletType,
isFrom bool,
clearSourceShards bool,
logger logutil.Logger,
) error {
disableQueryService := isFrom
if err := ts.UpdateDisableQueryService(ctx, keyspace, shards, servedType, cells, disableQueryService); err != nil {
return err
}

for i, si := range shards {
updatedShard, err := ts.UpdateShardFields(ctx, si.Keyspace(), si.ShardName(), func(si *topo.ShardInfo) error {
if clearSourceShards {
si.SourceShards = nil
}

return nil
})

if err != nil {
return err
}

shards[i] = updatedShard

// For 'to' shards, refresh to make them serve. The 'from' shards will
// be refreshed after traffic has migrated.
if !isFrom {
if err := RefreshTabletsByShard(ctx, ts, tmc, si, cells, logger); err != nil {
logger.Warningf("RefreshTabletsByShard(%v/%v, cells=%v) failed with %v; continuing ...", si.Keyspace(), si.ShardName(), cells, err)
}
}
}

return nil
}
58 changes: 58 additions & 0 deletions go/vt/topotools/routing_rules.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
Copyright 2021 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package topotools

import (
"context"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"

vschemapb "vitess.io/vitess/go/vt/proto/vschema"
)

// GetRoutingRules fetches routing rules from the topology server and returns a
// mapping of fromTable=>[]toTables.
func GetRoutingRules(ctx context.Context, ts *topo.Server) (map[string][]string, error) {
rrs, err := ts.GetRoutingRules(ctx)
if err != nil {
return nil, err
}

rules := make(map[string][]string, len(rrs.Rules))
for _, rr := range rrs.Rules {
rules[rr.FromTable] = rr.ToTables
}

return rules, nil
}

// SaveRoutingRules converts a mapping of fromTable=>[]toTables into a
// vschemapb.RoutingRules protobuf message and saves it in the topology.
func SaveRoutingRules(ctx context.Context, ts *topo.Server, rules map[string][]string) error {
log.Infof("Saving routing rules %v\n", rules)

rrs := &vschemapb.RoutingRules{Rules: make([]*vschemapb.RoutingRule, 0, len(rules))}
for from, to := range rules {
rrs.Rules = append(rrs.Rules, &vschemapb.RoutingRule{
FromTable: from,
ToTables: to,
})
}

return ts.SaveRoutingRules(ctx, rrs)
}
68 changes: 68 additions & 0 deletions go/vt/topotools/routing_rules_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
Copyright 2021 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package topotools

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/topo/memorytopo"
)

func TestRoutingRulesRoundTrip(t *testing.T) {
ctx := context.Background()
ts := memorytopo.NewServer("zone1")

rules := map[string][]string{
"t1": {"t2", "t3"},
"t4": {"t5"},
}

err := SaveRoutingRules(ctx, ts, rules)
require.NoError(t, err, "could not save routing rules to topo %v", rules)

roundtripRules, err := GetRoutingRules(ctx, ts)
require.NoError(t, err, "could not fetch routing rules from topo")

assert.Equal(t, rules, roundtripRules)
}

func TestRoutingRulesErrors(t *testing.T) {
ctx := context.Background()
ts, factory := memorytopo.NewServerAndFactory("zone1")
factory.SetError(errors.New("topo failure for testing"))

t.Run("GetRoutingRules error", func(t *testing.T) {

rules, err := GetRoutingRules(ctx, ts)
assert.Error(t, err, "expected error from GetRoutingRules, got rules=%v", rules)
})

t.Run("SaveRoutingRules error", func(t *testing.T) {
rules := map[string][]string{
"t1": {"t2", "t3"},
"t4": {"t5"},
}

err := SaveRoutingRules(ctx, ts, rules)
assert.Error(t, err, "expected error from GetRoutingRules, got rules=%v", rules)
})
}
66 changes: 2 additions & 64 deletions go/vt/wrangler/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,32 +918,7 @@ func (wr *Wrangler) startReverseReplication(ctx context.Context, sourceShards []

// updateShardRecords updates the shard records based on 'from' or 'to' direction.
func (wr *Wrangler) updateShardRecords(ctx context.Context, keyspace string, shards []*topo.ShardInfo, cells []string, servedType topodatapb.TabletType, isFrom bool, clearSourceShards bool) (err error) {
err = wr.ts.UpdateDisableQueryService(ctx, keyspace, shards, servedType, cells, isFrom /* disable */)
if err != nil {
return err
}

for i, si := range shards {
updatedShard, err := wr.ts.UpdateShardFields(ctx, si.Keyspace(), si.ShardName(), func(si *topo.ShardInfo) error {
if clearSourceShards {
si.SourceShards = nil
}
return nil
})

if err != nil {
return err
}

shards[i] = updatedShard

// For 'to' shards, refresh to make them serve.
// The 'from' shards will be refreshed after traffic has migrated.
if !isFrom {
wr.RefreshTabletsByShard(ctx, si, cells)
}
}
return nil
return topotools.UpdateShardRecords(ctx, wr.ts, wr.tmc, keyspace, shards, cells, servedType, isFrom, clearSourceShards, wr.Logger())
}

// updateFrozenFlag sets or unsets the Frozen flag for master migration. This is performed
Expand Down Expand Up @@ -1384,44 +1359,7 @@ func (wr *Wrangler) SetKeyspaceServedFrom(ctx context.Context, keyspace string,

// RefreshTabletsByShard calls RefreshState on all the tablets in a given shard.
func (wr *Wrangler) RefreshTabletsByShard(ctx context.Context, si *topo.ShardInfo, cells []string) error {
wr.Logger().Infof("RefreshTabletsByShard called on shard %v/%v", si.Keyspace(), si.ShardName())
tabletMap, err := wr.ts.GetTabletMapForShardByCell(ctx, si.Keyspace(), si.ShardName(), cells)
switch {
case err == nil:
// keep going
case topo.IsErrType(err, topo.PartialResult):
wr.Logger().Warningf("RefreshTabletsByShard: got partial result for shard %v/%v, may not refresh all tablets everywhere", si.Keyspace(), si.ShardName())
default:
return err
}

// ignore errors in this phase
wg := sync.WaitGroup{}
for _, ti := range tabletMap {
if ti.Hostname == "" {
// The tablet is not running, we don't have the host
// name to connect to, so we just skip this tablet.
wr.Logger().Infof("Tablet %v has no hostname, skipping its RefreshState", ti.AliasString())
continue
}

wg.Add(1)
go func(ti *topo.TabletInfo) {
wr.Logger().Infof("Calling RefreshState on tablet %v", ti.AliasString())
// Setting an upper bound timeout to fail faster in case of an error.
// Using 60 seconds because RefreshState should not take more than 30 seconds.
// (RefreshState will restart the tablet's QueryService and most time will be spent on the shutdown, i.e. waiting up to 30 seconds on transactions (see Config.TransactionTimeout)).
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
if err := wr.tmc.RefreshState(ctx, ti.Tablet); err != nil {
wr.Logger().Warningf("RefreshTabletsByShard: failed to refresh %v: %v", ti.AliasString(), err)
}
cancel()
wg.Done()
}(ti)
}
wg.Wait()

return nil
return topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, si, cells, wr.Logger())
}

// DeleteKeyspace will do all the necessary changes in the topology server
Expand Down
20 changes: 2 additions & 18 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1788,27 +1788,11 @@ func (ts *trafficSwitcher) deleteRoutingRules(ctx context.Context) error {
}

func (wr *Wrangler) getRoutingRules(ctx context.Context) (map[string][]string, error) {
rrs, err := wr.ts.GetRoutingRules(ctx)
if err != nil {
return nil, err
}
rules := make(map[string][]string, len(rrs.Rules))
for _, rr := range rrs.Rules {
rules[rr.FromTable] = rr.ToTables
}
return rules, nil
return topotools.GetRoutingRules(ctx, wr.ts)
}

func (wr *Wrangler) saveRoutingRules(ctx context.Context, rules map[string][]string) error {
log.Infof("Saving routing rules %v\n", rules)
rrs := &vschemapb.RoutingRules{Rules: make([]*vschemapb.RoutingRule, 0, len(rules))}
for from, to := range rules {
rrs.Rules = append(rrs.Rules, &vschemapb.RoutingRule{
FromTable: from,
ToTables: to,
})
}
return wr.ts.SaveRoutingRules(ctx, rrs)
return topotools.SaveRoutingRules(ctx, wr.ts, rules)
}

func reverseName(workflow string) string {
Expand Down