Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/cmd/vtgateclienttest/services/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (c *echoClient) ExecuteBatch(ctx context.Context, session *vtgatepb.Session
return c.fallbackClient.ExecuteBatch(ctx, session, sqlList, bindVariablesList)
}

func (c *echoClient) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, callback func([]*binlogdatapb.VEvent) error) error {
func (c *echoClient) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, callback func([]*binlogdatapb.VEvent) error) error {
if strings.HasPrefix(vgtid.ShardGtids[0].Shard, EchoPrefix) {
_ = callback([]*binlogdatapb.VEvent{
{
Expand All @@ -170,5 +170,5 @@ func (c *echoClient) VStream(ctx context.Context, tabletType topodatapb.TabletTy
return nil
}

return c.fallbackClient.VStream(ctx, tabletType, vgtid, filter, callback)
return c.fallbackClient.VStream(ctx, tabletType, vgtid, filter, flags, callback)
}
4 changes: 2 additions & 2 deletions go/cmd/vtgateclienttest/services/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func (c fallbackClient) ResolveTransaction(ctx context.Context, dtid string) err
return c.fallback.ResolveTransaction(ctx, dtid)
}

func (c fallbackClient) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
return c.fallback.VStream(ctx, tabletType, vgtid, filter, send)
func (c fallbackClient) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, send func([]*binlogdatapb.VEvent) error) error {
return c.fallback.VStream(ctx, tabletType, vgtid, filter, flags, send)
}

func (c fallbackClient) HandlePanic(err *error) {
Expand Down
2 changes: 1 addition & 1 deletion go/cmd/vtgateclienttest/services/terminal.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (c *terminalClient) ResolveTransaction(ctx context.Context, dtid string) er
return errTerminal
}

func (c *terminalClient) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
func (c *terminalClient) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, send func([]*binlogdatapb.VEvent) error) error {
return errTerminal
}

Expand Down
419 changes: 328 additions & 91 deletions go/vt/proto/vtgate/vtgate.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go/vt/vitessdriver/fakeserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (f *fakeVTGateService) ResolveTransaction(ctx context.Context, dtid string)
return nil
}

func (f *fakeVTGateService) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
func (f *fakeVTGateService) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, send func([]*binlogdatapb.VEvent) error) error {
return nil
}

Expand Down
11 changes: 8 additions & 3 deletions go/vt/vtgate/endtoend/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"sync"
"testing"

vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -79,7 +81,8 @@ func TestVStream(t *testing.T) {
Match: "/.*/",
}},
}
reader, err := gconn.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, filter)
flags := &vtgatepb.VStreamFlags{}
reader, err := gconn.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, filter, flags)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -194,7 +197,8 @@ func TestVStreamCopyBasic(t *testing.T) {
Filter: "select * from t1",
}},
}
reader, err := gconn.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, filter)
flags := &vtgatepb.VStreamFlags{}
reader, err := gconn.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, filter, flags)
_, _ = conn, mconn
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -247,7 +251,8 @@ func TestVStreamCurrent(t *testing.T) {
Filter: "select * from t1",
}},
}
reader, err := gconn.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, filter)
flags := &vtgatepb.VStreamFlags{}
reader, err := gconn.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, filter, flags)
_, _ = conn, mconn
if err != nil {
t.Fatal(err)
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vtgate/fakerpcvtgateconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ func (conn *FakeVTGateConn) ResolveTransaction(ctx context.Context, dtid string)
}

// VStream streams binlog events.
func (conn *FakeVTGateConn) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter) (vtgateconn.VStreamReader, error) {
func (conn *FakeVTGateConn) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid,
filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags) (vtgateconn.VStreamReader, error) {

return nil, fmt.Errorf("NYI")
}

Expand Down
5 changes: 4 additions & 1 deletion go/vt/vtgate/grpcvtgateconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,15 @@ func (a *vstreamAdapter) Recv() ([]*binlogdatapb.VEvent, error) {
return r.Events, nil
}

func (conn *vtgateConn) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter) (vtgateconn.VStreamReader, error) {
func (conn *vtgateConn) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid,
filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags) (vtgateconn.VStreamReader, error) {

req := &vtgatepb.VStreamRequest{
CallerId: callerid.EffectiveCallerIDFromContext(ctx),
TabletType: tabletType,
Vgtid: vgtid,
Filter: filter,
Flags: flags,
}
stream, err := conn.c.VStream(ctx, req)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/grpcvtgateconn/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (f *fakeVTGateService) ResolveTransaction(ctx context.Context, dtid string)
return nil
}

func (f *fakeVTGateService) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
func (f *fakeVTGateService) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, send func([]*binlogdatapb.VEvent) error) error {
panic("unimplemented")
}

Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/grpcvtgateservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func (vtg *VTGate) VStream(request *vtgatepb.VStreamRequest, stream vtgateservic
request.TabletType,
request.Vgtid,
request.Filter,
request.Flags,
func(events []*binlogdatapb.VEvent) error {
return stream.Send(&vtgatepb.VStreamResponse{
Events: events,
Expand Down
160 changes: 150 additions & 10 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ limitations under the License.
package vtgate

import (
"context"
"fmt"
"io"
"sync"
"time"

"context"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"

"github.com/golang/protobuf/proto"

Expand Down Expand Up @@ -65,6 +67,26 @@ type vstream struct {

cancel context.CancelFunc
wg sync.WaitGroup

// this flag is set by the client, default false
// if true skew detection is enabled and we align the streams so that they receive events from
// about the same time as each other. Note that there is no exact ordering of events across shards
minimizeSkew bool

// mutex used to synchronize access to skew detection parameters
skewMu sync.Mutex
// channel is created whenever there is a skew detected. closing it implies the current skew has been fixed
skewCh chan bool
// if a skew lasts for this long, we timeout the vstream call. currently hardcoded
skewTimeoutSeconds int64
// the slow streamId which is causing the skew. streamId is of the form <keyspace>.<shard>
laggard string
// transaction timestamp of the slowest stream
lowestTS int64
// the timestamp of the most recent event, keyed by streamId. streamId is of the form <keyspace>.<shard>
timestamps map[string]int64

vsm *vstreamManager
}

type journalEvent struct {
Expand All @@ -81,8 +103,9 @@ func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell str
}
}

func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, send func(events []*binlogdatapb.VEvent) error) error {
vgtid, filter, err := vsm.resolveParams(ctx, tabletType, vgtid, filter)
func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid,
filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, send func(events []*binlogdatapb.VEvent) error) error {
vgtid, filter, flags, err := vsm.resolveParams(ctx, tabletType, vgtid, filter, flags)
if err != nil {
return err
}
Expand All @@ -93,33 +116,44 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta
send: send,
resolver: vsm.resolver,
journaler: make(map[int64]*journalEvent),

minimizeSkew: flags.MinimizeSkew,
skewTimeoutSeconds: 10 * 60,
timestamps: make(map[string]int64),
vsm: vsm,
}
return vs.stream(ctx)
}

// resolveParams provides defaults for the inputs if they're not specified.
func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter) (*binlogdatapb.VGtid, *binlogdatapb.Filter, error) {
func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid,
filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags) (*binlogdatapb.VGtid, *binlogdatapb.Filter, *vtgatepb.VStreamFlags, error) {

if filter == nil {
filter = &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "/.*",
}},
}
}

if flags == nil {
flags = &vtgatepb.VStreamFlags{}
}
if vgtid == nil || len(vgtid.ShardGtids) == 0 {
return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vgtid must have at least one value with a starting position")
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vgtid must have at least one value with a starting position")
}
// To fetch from all keyspaces, the input must contain a single ShardGtid
// that has an empty keyspace, and the Gtid must be "current". In the
// future, we'll allow the Gtid to be empty which will also support
// copying of existing data.
if len(vgtid.ShardGtids) == 1 && vgtid.ShardGtids[0].Keyspace == "" {
if vgtid.ShardGtids[0].Gtid != "current" {
return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "for an empty keyspace, the Gtid value must be 'current': %v", vgtid)
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "for an empty keyspace, the Gtid value must be 'current': %v", vgtid)
}
keyspaces, err := vsm.toposerv.GetSrvKeyspaceNames(ctx, vsm.cell, false)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
newvgtid := &binlogdatapb.VGtid{}
for _, keyspace := range keyspaces {
Expand All @@ -134,12 +168,12 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat
for _, sgtid := range vgtid.ShardGtids {
if sgtid.Shard == "" {
if sgtid.Gtid != "current" {
return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "if shards are unspecified, the Gtid value must be 'current': %v", vgtid)
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "if shards are unspecified, the Gtid value must be 'current': %v", vgtid)
}
// TODO(sougou): this should work with the new Migrate workflow
_, _, allShards, err := vsm.resolver.GetKeyspaceShards(ctx, sgtid.Keyspace, tabletType)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
for _, shard := range allShards {
newvgtid.ShardGtids = append(newvgtid.ShardGtids, &binlogdatapb.ShardGtid{
Expand All @@ -152,11 +186,19 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat
newvgtid.ShardGtids = append(newvgtid.ShardGtids, sgtid)
}
}

//TODO add tablepk validations

return newvgtid, filter, nil
return newvgtid, filter, flags, nil
}

func (vsm *vstreamManager) RecordStreamDelay() {
vstreamSkewDelayCount.Add(1)
}

func (vsm *vstreamManager) GetTotalStreamDelay() int64 {
return vstreamSkewDelayCount.Get()
}
func (vs *vstream) stream(ctx context.Context) error {
ctx, vs.cancel = context.WithCancel(ctx)
defer vs.cancel()
Expand Down Expand Up @@ -187,6 +229,95 @@ func (vs *vstream) startOneStream(ctx context.Context, sgtid *binlogdatapb.Shard
}()
}

// MaxSkew is the threshold for a skew to be detected. Since MySQL timestamps are in seconds we account for
// two round-offs: one for the actual event and another while accounting for the clock skew
const MaxSkew = int64(2)

// computeSkew sets the timestamp of the current event for the calling stream, accounts for a clock skew
// and declares that a skew has arisen if the streams are too far apart
func (vs *vstream) computeSkew(streamID string, event *binlogdatapb.VEvent) bool {
vs.skewMu.Lock()
defer vs.skewMu.Unlock()
// account for skew between this vtgate and the source mysql server
secondsInThePast := event.CurrentTime/1e9 - event.Timestamp
vs.timestamps[streamID] = time.Now().Unix() - secondsInThePast

var minTs, maxTs int64
var laggardStream string

if len(vs.timestamps) <= 1 {
return false
}
for k, ts := range vs.timestamps {
if ts < minTs || minTs == 0 {
minTs = ts
laggardStream = k
}
if ts > maxTs {
maxTs = ts
}
}
if vs.laggard != "" { // we are skewed, check if this event has fixed the skew
if (maxTs - minTs) <= MaxSkew {
vs.laggard = ""
close(vs.skewCh)
}
} else {
if (maxTs - minTs) > MaxSkew { // check if we are skewed due to this event
log.Infof("Skew found, laggard is %s, %+v", laggardStream, vs.timestamps)
vs.laggard = laggardStream
vs.skewCh = make(chan bool)
}
}
return vs.mustPause(streamID)
}

// mustPause returns true if a skew exists and the stream calling this is not the slowest one
func (vs *vstream) mustPause(streamID string) bool {
switch vs.laggard {
case "":
return false
case streamID:
// current stream is the laggard, not pausing
return false
}

if (vs.timestamps[streamID] - vs.lowestTS) <= MaxSkew {
// current stream is not the laggard, but the skew is still within the limit
return false
}
vs.vsm.RecordStreamDelay()
return true
}

// alignStreams is called by each individual shard's stream before an event is sent to the client or after each heartbeat.
// It checks for skew (if the minimizeSkew option is set). If skew is present this stream is delayed until the skew is fixed
// The faster stream detects the skew and waits. The slower stream resets the skew when it catches up.
func (vs *vstream) alignStreams(ctx context.Context, event *binlogdatapb.VEvent, keyspace, shard string) error {
if !vs.minimizeSkew || event.Timestamp == 0 {
return nil
}
streamID := fmt.Sprintf("%s/%s", keyspace, shard)
for {
mustPause := vs.computeSkew(streamID, event)
if event.Type == binlogdatapb.VEventType_HEARTBEAT {
return nil
}
if !mustPause {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Duration(vs.skewTimeoutSeconds) * time.Second):
log.Errorf("timed out while waiting for skew to reduce: %s", streamID)
return fmt.Errorf("timed out while waiting for skew to reduce: %s", streamID)
case <-vs.skewCh:
// once skew is fixed the channel is closed and all waiting streams "wake up"
}
}
}

// streamFromTablet streams from one shard. If transactions come in separate chunks, they are grouped and sent.
func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.ShardGtid) error {
// journalDone is assigned a channel when a journal event is encountered.
Expand Down Expand Up @@ -249,6 +380,11 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_OTHER:
sendevents = append(sendevents, event)
eventss = append(eventss, sendevents)

if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil {
return err
}

if err := vs.sendAll(sgtid, eventss); err != nil {
return err
}
Expand All @@ -258,6 +394,10 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// Remove all heartbeat events for now.
// Otherwise they can accumulate indefinitely if there are no real events.
// TODO(sougou): figure out a model for this.
if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil {
return err
}

case binlogdatapb.VEventType_JOURNAL:
journal := event.Journal
// Journal events are not sent to clients.
Expand Down
Loading