Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,22 @@ import (
"sync"
"time"

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"

vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/vterrors"
)

// vstreamManager manages vstream requests.
Expand Down Expand Up @@ -521,18 +521,23 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
go func() {
_ = tabletConn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error {
var err error
if ctx.Err() != nil {
switch {
case ctx.Err() != nil:
err = fmt.Errorf("context has ended")
} else if shr == nil || shr.RealtimeStats == nil || shr.Target == nil {
err = fmt.Errorf("health check failed")
} else if vs.tabletType != shr.Target.TabletType {
err = fmt.Errorf("tablet type has changed from %s to %s, restarting vstream",
vs.tabletType, shr.Target.TabletType)
} else if shr.RealtimeStats.HealthError != "" {
case shr == nil || shr.RealtimeStats == nil || shr.Target == nil:
err = fmt.Errorf("health check failed on %s", topoproto.TabletAliasString(tablet.Alias))
case vs.tabletType != shr.Target.TabletType:
err = fmt.Errorf("tablet %s type has changed from %s to %s, restarting vstream",
topoproto.TabletAliasString(tablet.Alias), vs.tabletType, shr.Target.TabletType)
case shr.RealtimeStats.HealthError != "":
err = fmt.Errorf("tablet %s is no longer healthy: %s, restarting vstream",
tablet.Alias, shr.RealtimeStats.HealthError)
topoproto.TabletAliasString(tablet.Alias), shr.RealtimeStats.HealthError)
case shr.RealtimeStats.ReplicationLagSeconds > uint32(discovery.GetLowReplicationLag().Seconds()):
err = fmt.Errorf("tablet %s has a replication lag of %d seconds which is beyond the value provided in --discovery_low_replication_lag of %s so the tablet is no longer considered healthy, restarting vstream",
topoproto.TabletAliasString(tablet.Alias), shr.RealtimeStats.ReplicationLagSeconds, discovery.GetLowReplicationLag())
}
if err != nil {
log.Warningf("Tablet state changed: %s, attempting to restart", err)
errCh <- err
return err
}
Expand Down Expand Up @@ -563,7 +568,6 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
case <-ctx.Done():
return ctx.Err()
case streamErr := <-errCh:
log.Warningf("Tablet state changed: %s, attempting to restart", streamErr)
return vterrors.New(vtrpcpb.Code_UNAVAILABLE, streamErr.Error())
case <-journalDone:
// Unreachable.
Expand Down
140 changes: 130 additions & 10 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,26 @@ import (
"testing"
"time"

"vitess.io/vitess/go/vt/topo"

vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/vttablet/sandboxconn"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/sandboxconn"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/srvtopo"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

var mu sync.Mutex
Expand Down Expand Up @@ -1187,6 +1189,124 @@ func TestVStreamIdleHeartbeat(t *testing.T) {
}
}

// TestVStreamManagerHealthCheckResponseHandling tests the handling of healthcheck responses by
// the vstream manager to confirm that we are correctly restarting the vstream when we should.
func TestVStreamManagerHealthCheckResponseHandling(t *testing.T) {
// Capture the vstream warning log. Otherwise we need to re-implement the vstream error
// handling in SandboxConn's implementation and then we're not actually testing the
// production code.
logger := logutil.NewMemoryLogger()
log.Warningf = logger.Warningf

cell := "aa"
ks := "TestVStream"
shard := "0"
tabletType := topodatapb.TabletType_REPLICA
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{shard})
vsm := newTestVStreamManager(hc, st, cell)
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: shard,
}},
}
source := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, shard, tabletType, true, 0, nil)
tabletAlias := topoproto.TabletAliasString(source.Tablet().Alias)
addTabletToSandboxTopo(t, st, ks, shard, source.Tablet())
target := &querypb.Target{
Cell: cell,
Keyspace: ks,
Shard: shard,
TabletType: tabletType,
}
highLag := uint32(discovery.GetLowReplicationLag().Seconds()) + 1

type testcase struct {
name string
hcRes *querypb.StreamHealthResponse
wantErr string
}
testcases := []testcase{
{
name: "all healthy", // Will hit the context timeout
},
{
name: "failure",
hcRes: &querypb.StreamHealthResponse{
TabletAlias: source.Tablet().Alias,
Target: nil, // This is seen as a healthcheck stream failure
},
wantErr: fmt.Sprintf("health check failed on %s", tabletAlias),
},
{
name: "tablet type changed",
hcRes: &querypb.StreamHealthResponse{
TabletAlias: source.Tablet().Alias,
Target: &querypb.Target{
Cell: cell,
Keyspace: ks,
Shard: shard,
TabletType: topodatapb.TabletType_PRIMARY,
},
RealtimeStats: &querypb.RealtimeStats{},
},
wantErr: fmt.Sprintf("tablet %s type has changed from %s to %s",
tabletAlias, tabletType, topodatapb.TabletType_PRIMARY.String()),
},
{
name: "unhealthy",
hcRes: &querypb.StreamHealthResponse{
TabletAlias: source.Tablet().Alias,
Target: target,
RealtimeStats: &querypb.RealtimeStats{
HealthError: "unhealthy",
},
},
wantErr: fmt.Sprintf("tablet %s is no longer healthy", tabletAlias),
},
{
name: "replication lag too high",
hcRes: &querypb.StreamHealthResponse{
TabletAlias: source.Tablet().Alias,
Target: target,
RealtimeStats: &querypb.RealtimeStats{
ReplicationLagSeconds: highLag,
},
},
wantErr: fmt.Sprintf("%s has a replication lag of %d seconds which is beyond the value provided",
tabletAlias, highLag),
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
done := make(chan struct{})
go func() {
sctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
defer close(done)
// SandboxConn's VStream implementation always waits for the context to timeout.
err := vsm.VStream(sctx, tabletType, vgtid, nil, nil, func(events []*binlogdatapb.VEvent) error {
require.Fail(t, "unexpected event", "Received unexpected events: %v", events)
return nil
})
if tc.wantErr != "" { // Otherwise we simply expect the context to timeout
if !strings.Contains(logger.String(), tc.wantErr) {
require.Fail(t, "unexpected vstream error", "vstream ended with error: %v, which did not contain: %s", err, tc.wantErr)
}
}
}()
if tc.wantErr != "" {
source.SetStreamHealthResponse(tc.hcRes)
}
<-done
logger.Clear()
})
}
}

func newTestVStreamManager(hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager {
gw := NewTabletGateway(context.Background(), hc, serv, cell)
srvResolver := srvtopo.NewResolver(serv, gw, cell)
Expand Down
19 changes: 17 additions & 2 deletions go/vt/vttablet/sandboxconn/sandboxconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ type SandboxConn struct {
NotServing bool

getSchemaResult []map[string]string

streamHealthResponse *querypb.StreamHealthResponse
}

var _ queryservice.QueryService = (*SandboxConn)(nil) // compile-time interface check
Expand Down Expand Up @@ -415,9 +417,22 @@ func (sbc *SandboxConn) MessageAck(ctx context.Context, target *querypb.Target,
// SandboxSQRowCount is the default number of fake splits returned.
var SandboxSQRowCount = int64(10)

// StreamHealth is not implemented.
// SetStreamHealthResponse sets the StreamHealthResponse to be returned in StreamHealth.
func (sbc *SandboxConn) SetStreamHealthResponse(res *querypb.StreamHealthResponse) {
sbc.mapMu.Lock()
defer sbc.mapMu.Unlock()
sbc.streamHealthResponse = res
}

// StreamHealth always mocks a "healthy" result by default. If you want to override this behavior you
// can call SetStreamHealthResponse.
func (sbc *SandboxConn) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error {
return fmt.Errorf("not implemented in test")
sbc.mapMu.Lock()
defer sbc.mapMu.Unlock()
if sbc.streamHealthResponse != nil {
return callback(sbc.streamHealthResponse)
}
return nil
}

// ExpectVStreamStartPos makes the conn verify that that the next vstream request has the right startPos.
Expand Down