Skip to content

Commit 8417e84

Browse files
Merge branch 'slack-19.0' into rm-mariadb_to_mysql-CI.slack-19.0
2 parents 9baaff3 + ee7acfa commit 8417e84

12 files changed

+492
-23
lines changed

go/test/endtoend/reparent/newfeaturetest/reparent_test.go

+63
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@ package newfeaturetest
1919
import (
2020
"context"
2121
"fmt"
22+
"sync"
2223
"testing"
24+
"time"
2325

2426
"github.com/stretchr/testify/require"
2527

28+
"vitess.io/vitess/go/mysql"
2629
"vitess.io/vitess/go/test/endtoend/cluster"
2730
"vitess.io/vitess/go/test/endtoend/reparent/utils"
2831
)
@@ -156,3 +159,63 @@ func TestChangeTypeWithoutSemiSync(t *testing.T) {
156159
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replica.Alias, "replica")
157160
require.NoError(t, err)
158161
}
162+
163+
func TestBufferingWithMultipleDisruptions(t *testing.T) {
164+
defer cluster.PanicHandler(t)
165+
clusterInstance := utils.SetupShardedReparentCluster(t)
166+
defer utils.TeardownCluster(clusterInstance)
167+
168+
// Stop all VTOrc instances, so that they don't interfere with the test.
169+
for _, vtorc := range clusterInstance.VTOrcProcesses {
170+
err := vtorc.TearDown()
171+
require.NoError(t, err)
172+
}
173+
174+
// Start by reparenting all the shards to the first tablet.
175+
keyspace := clusterInstance.Keyspaces[0]
176+
shards := keyspace.Shards
177+
for _, shard := range shards {
178+
err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shard.Name, shard.Vttablets[0].Alias)
179+
require.NoError(t, err)
180+
}
181+
182+
// We simulate start of external reparent or a PRS where the healthcheck update from the tablet gets lost in transit
183+
// to vtgate by just setting the primary read only. This is also why we needed to shutdown all VTOrcs, so that they don't
184+
// fix this.
185+
utils.RunSQL(context.Background(), t, "set global read_only=1", shards[0].Vttablets[0])
186+
utils.RunSQL(context.Background(), t, "set global read_only=1", shards[1].Vttablets[0])
187+
188+
wg := sync.WaitGroup{}
189+
rowCount := 10
190+
vtParams := clusterInstance.GetVTParams(keyspace.Name)
191+
// We now spawn writes for a bunch of go routines.
192+
// The ones going to shard 1 and shard 2 should block, since
193+
// they're in the midst of a reparenting operation (as seen by the buffering code).
194+
for i := 1; i <= rowCount; i++ {
195+
wg.Add(1)
196+
go func(i int) {
197+
defer wg.Done()
198+
conn, err := mysql.Connect(context.Background(), &vtParams)
199+
if err != nil {
200+
return
201+
}
202+
defer conn.Close()
203+
_, err = conn.ExecuteFetch(utils.GetInsertQuery(i), 0, false)
204+
require.NoError(t, err)
205+
}(i)
206+
}
207+
208+
// Now, run a PRS call on the last shard. This shouldn't unbuffer the queries that are buffered for shards 1 and 2
209+
// since the disruption on the two shards hasn't stopped.
210+
err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[2].Name, shards[2].Vttablets[1].Alias)
211+
require.NoError(t, err)
212+
// We wait a second just to make sure the PRS changes are processed by the buffering logic in vtgate.
213+
time.Sleep(1 * time.Second)
214+
// Finally, we'll now make the 2 shards healthy again by running PRS.
215+
err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[0].Name, shards[0].Vttablets[1].Alias)
216+
require.NoError(t, err)
217+
err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[1].Name, shards[1].Vttablets[1].Alias)
218+
require.NoError(t, err)
219+
// Wait for all the writes to have succeeded.
220+
wg.Wait()
221+
}

go/test/endtoend/reparent/utils/utils.go

+46
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,52 @@ func SetupRangeBasedCluster(ctx context.Context, t *testing.T) *cluster.LocalPro
7575
return setupCluster(ctx, t, ShardName, []string{cell1}, []int{2}, "semi_sync")
7676
}
7777

78+
// SetupShardedReparentCluster is used to setup a sharded cluster for testing
79+
func SetupShardedReparentCluster(t *testing.T) *cluster.LocalProcessCluster {
80+
clusterInstance := cluster.NewCluster(cell1, Hostname)
81+
// Start topo server
82+
err := clusterInstance.StartTopo()
83+
require.NoError(t, err)
84+
85+
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs,
86+
"--lock_tables_timeout", "5s",
87+
// Fast health checks help find corner cases.
88+
"--health_check_interval", "1s",
89+
"--track_schema_versions=true",
90+
"--queryserver_enable_online_ddl=false")
91+
clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs,
92+
"--enable_buffer",
93+
// Long timeout in case failover is slow.
94+
"--buffer_window", "10m",
95+
"--buffer_max_failover_duration", "10m",
96+
"--buffer_min_time_between_failovers", "20m",
97+
)
98+
99+
// Start keyspace
100+
keyspace := &cluster.Keyspace{
101+
Name: KeyspaceName,
102+
SchemaSQL: sqlSchema,
103+
VSchema: `{"sharded": true, "vindexes": {"hash_index": {"type": "hash"}}, "tables": {"vt_insert_test": {"column_vindexes": [{"column": "id", "name": "hash_index"}]}}}`,
104+
}
105+
err = clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false)
106+
require.NoError(t, err)
107+
108+
// Start Vtgate
109+
err = clusterInstance.StartVtgate()
110+
require.NoError(t, err)
111+
return clusterInstance
112+
}
113+
114+
// GetInsertQuery returns a built insert query to insert a row.
115+
func GetInsertQuery(idx int) string {
116+
return fmt.Sprintf(insertSQL, idx, idx)
117+
}
118+
119+
// GetSelectionQuery returns a built selection query read the data.
120+
func GetSelectionQuery() string {
121+
return `select * from vt_insert_test`
122+
}
123+
78124
// TeardownCluster is used to teardown the reparent cluster. When
79125
// run in a CI environment -- which is considered true when the
80126
// "CI" env variable is set to "true" -- the teardown also removes

go/vt/discovery/fake_healthcheck.go

+15
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,21 @@ func (fhc *FakeHealthCheck) SetTabletType(tablet *topodatapb.Tablet, tabletType
172172
item.ts.Target.TabletType = tabletType
173173
}
174174

175+
// SetPrimaryTimestamp sets the primary timestamp for the given tablet
176+
func (fhc *FakeHealthCheck) SetPrimaryTimestamp(tablet *topodatapb.Tablet, timestamp int64) {
177+
if fhc.ch == nil {
178+
return
179+
}
180+
fhc.mu.Lock()
181+
defer fhc.mu.Unlock()
182+
key := TabletToMapKey(tablet)
183+
item, isPresent := fhc.items[key]
184+
if !isPresent {
185+
return
186+
}
187+
item.ts.PrimaryTermStartTime = timestamp
188+
}
189+
175190
// Unsubscribe is not implemented.
176191
func (fhc *FakeHealthCheck) Unsubscribe(c chan *TabletHealth) {
177192
}

go/vt/discovery/keyspace_events.go

+66-3
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,12 @@ func (kss *keyspaceState) beingResharded(currentShard string) bool {
173173
}
174174

175175
type shardState struct {
176-
target *querypb.Target
177-
serving bool
176+
target *querypb.Target
177+
serving bool
178+
// waitForReparent is used to tell the keyspace event watcher
179+
// that this shard should be marked serving only after a reparent
180+
// operation has succeeded.
181+
waitForReparent bool
178182
externallyReparented int64
179183
currentPrimary *topodatapb.TabletAlias
180184
}
@@ -357,8 +361,34 @@ func (kss *keyspaceState) onHealthCheck(th *TabletHealth) {
357361
// if the shard went from serving to not serving, or the other way around, the keyspace
358362
// is undergoing an availability event
359363
if sstate.serving != th.Serving {
360-
sstate.serving = th.Serving
361364
kss.consistent = false
365+
switch {
366+
case th.Serving && sstate.waitForReparent:
367+
// While waiting for a reparent, if we receive a serving primary,
368+
// we should check if the primary term start time is greater than the externally reparented time.
369+
// We mark the shard serving only if it is. This is required so that we don't prematurely stop
370+
// buffering for PRS, or TabletExternallyReparented, after seeing a serving healthcheck from the
371+
// same old primary tablet that has already been turned read-only.
372+
if th.PrimaryTermStartTime > sstate.externallyReparented {
373+
sstate.waitForReparent = false
374+
sstate.serving = true
375+
}
376+
case th.Serving && !sstate.waitForReparent:
377+
sstate.serving = true
378+
case !th.Serving:
379+
sstate.serving = false
380+
}
381+
}
382+
if !th.Serving {
383+
// Once we have seen a non-serving primary healthcheck, there is no need for us to explicitly wait
384+
// for a reparent to happen. We use waitForReparent to ensure that we don't prematurely stop
385+
// buffering when we receive a serving healthcheck from the primary that is being demoted.
386+
// However, if we receive a non-serving check, then we know that we won't receive any more serving
387+
// health checks until reparent finishes. Specifically, this helps us when PRS fails, but
388+
// stops gracefully because the new candidate couldn't get caught up in time. In this case, we promote
389+
// the previous primary back. Without turning off waitForReparent here, we wouldn't be able to stop
390+
// buffering for that case.
391+
sstate.waitForReparent = false
362392
}
363393

364394
// if the primary for this shard has been externally reparented, we're undergoing a failover,
@@ -653,3 +683,36 @@ func (kew *KeyspaceEventWatcher) GetServingKeyspaces() []string {
653683
}
654684
return servingKeyspaces
655685
}
686+
687+
// MarkShardNotServing marks the given shard not serving.
688+
// We use this when we start buffering for a given shard. This helps
689+
// coordinate between the sharding logic and the keyspace event watcher.
690+
// We take in a boolean as well to tell us whether this error is because
691+
// a reparent is ongoing. If it is, we also mark the shard to wait for a reparent.
692+
// The return argument is whether the shard was found and marked not serving successfully or not.
693+
func (kew *KeyspaceEventWatcher) MarkShardNotServing(ctx context.Context, keyspace string, shard string, isReparentErr bool) bool {
694+
kss := kew.getKeyspaceStatus(ctx, keyspace)
695+
if kss == nil {
696+
// Only happens if the keyspace was deleted.
697+
return false
698+
}
699+
kss.mu.Lock()
700+
defer kss.mu.Unlock()
701+
sstate := kss.shards[shard]
702+
if sstate == nil {
703+
// This only happens if the shard is deleted, or if
704+
// the keyspace event watcher hasn't seen the shard at all.
705+
return false
706+
}
707+
// Mark the keyspace inconsistent and the shard not serving.
708+
kss.consistent = false
709+
sstate.serving = false
710+
if isReparentErr {
711+
// If the error was triggered because a reparent operation has started.
712+
// We mark the shard to wait for a reparent to finish before marking it serving.
713+
// This is required to prevent premature stopping of buffering if we receive
714+
// a serving healthcheck from a primary that is being demoted.
715+
sstate.waitForReparent = true
716+
}
717+
return true
718+
}

0 commit comments

Comments
 (0)