Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions go/vt/vtgate/endtoend/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ create table t1(
primary key(id1)
) Engine=InnoDB;

create table t1_copy_basic(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;

create table t1_id2_idx(
id2 bigint,
keyspace_id varbinary(10),
Expand Down Expand Up @@ -134,6 +140,12 @@ create table t1_sharded(
Name: "t1_id2_vdx",
}},
},
"t1_copy_basic": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
Name: "hash",
}},
},
"t1_sharded": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
Expand Down
165 changes: 160 additions & 5 deletions go/vt/vtgate/endtoend/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"regexp"
"sync"
"testing"

Expand Down Expand Up @@ -98,18 +99,29 @@ func TestVStream(t *testing.T) {
// In a real world scenario where every mysql instance hosts only one
// keyspace/shard, we should expect only a single event.
// The events could come in any order as the scatter insert runs in parallel.
emptyEventSkipped := false
for i := 0; i < 2; i++ {
events, err := reader.Recv()
if err != nil {
t.Fatal(err)
}
fmt.Printf("events: %v\n", events)
// An empty transaction has three events: begin, gtid and commit.
if len(events) == 3 && !emptyEventSkipped {
emptyEventSkipped = true
continue

// An empty transaction has either:
// - three events: begin, vgtid and commit.
// - two events: vgtid and other
if len(events) == 3 {
if events[0].Type == binlogdatapb.VEventType_BEGIN &&
events[1].Type == binlogdatapb.VEventType_VGTID &&
events[2].Type == binlogdatapb.VEventType_COMMIT {
continue
}
} else if len(events) == 2 {
if events[0].Type == binlogdatapb.VEventType_VGTID &&
events[1].Type == binlogdatapb.VEventType_OTHER {
continue
}
}

if len(events) != 5 {
t.Errorf("Unexpected event length: %v", events)
continue
Expand Down Expand Up @@ -381,6 +393,149 @@ func TestVStreamSharded(t *testing.T) {

}

// TestVStreamCopyTransactions tests that we are properly wrapping
// ROW events in the stream with BEGIN and COMMIT events.
func TestVStreamCopyTransactions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
keyspace := "ks"
shards := []string{"-80", "80-"}
table := "t1_copy_basic"
beginEventSeen, commitEventSeen := false, false
numResultInTrx := 0
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{
{
Keyspace: keyspace,
Shard: shards[0],
Gtid: "", // Start a vstream copy
},
{
Keyspace: keyspace,
Shard: shards[1],
Gtid: "", // Start a vstream copy
},
},
}
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: table,
Filter: fmt.Sprintf("select * from %s", table),
}},
}

gconn, conn, _, closeConnections := initialize(ctx, t)
defer closeConnections()

// Clear any existing data.
q := fmt.Sprintf("delete from %s", table)
_, err := conn.ExecuteFetch(q, -1, false)
require.NoError(t, err, "error clearing data: %v", err)

// Generate some test data. Enough to cross the default
// vstream_packet_size threshold.
for i := 1; i <= 100000; i++ {
values := fmt.Sprintf("(%d, %d)", i, i)
q := fmt.Sprintf("insert into %s (id1, id2) values %s", table, values)
_, err := conn.ExecuteFetch(q, 1, false)
require.NoError(t, err, "error inserting data: %v", err)
}

// Start a vstream.
reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, nil)
require.NoError(t, err, "error starting vstream: %v", err)

recvLoop:
for {
finished := true

vevents, err := reader.Recv()
numResultInTrx++
eventCount := len(vevents)
t.Logf("------------------ Received %d events in response #%d for the transaction ------------------\n",
eventCount, numResultInTrx)
switch err {
case nil:

for _, event := range vevents {
switch event.Type {
case binlogdatapb.VEventType_BEGIN:
require.False(t, beginEventSeen, "received a second BEGIN event within the transaction: numResultInTrx=%d\n",
numResultInTrx)
beginEventSeen = true
t.Logf("Found BEGIN event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d\n",
beginEventSeen, commitEventSeen, event.Type, numResultInTrx)
require.False(t, commitEventSeen, "received a BEGIN event when expecting a COMMIT event: numResultInTrx=%d\n",
numResultInTrx)
case binlogdatapb.VEventType_VGTID:
t.Logf("Found VGTID event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n",
beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event)

finished = true
for _, shardGtid := range event.Vgtid.ShardGtids {
finished = finished && len(shardGtid.TablePKs) == 0
}

case binlogdatapb.VEventType_FIELD:
t.Logf("Found FIELD event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n",
beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event)
case binlogdatapb.VEventType_ROW:
// Uncomment if you need to do more debugging.
// t.Logf("Found ROW event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n",
// beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event)

case binlogdatapb.VEventType_COMMIT:
commitEventSeen = true
t.Logf("Found COMMIT event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n",
beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event)
require.True(t, beginEventSeen, "received COMMIT event before receiving BEGIN event: numResultInTrx=%d\n",
numResultInTrx)

if finished {
t.Logf("Finished vstream copy\n")
t.Logf("-------------------------------------------------------------------\n\n")
cancel()
break recvLoop
}

default:
t.Logf("Found extraneous event: %+v\n", event)
}
if beginEventSeen && commitEventSeen {
t.Logf("Received both BEGIN and COMMIT, so resetting transactional state\n")
beginEventSeen = false
commitEventSeen = false
numResultInTrx = 0
}
}
case io.EOF:
t.Logf("vstream ended\n")
t.Logf("-------------------------------------------------------------------\n\n")
cancel()
return
default:
require.FailNowf(t, "unexpected error", "encountered error in vstream: %v", err)
return
}
}
// The last response, when the vstream copy completes, does not
// typically contain ROW events.
if beginEventSeen || commitEventSeen {
require.True(t, (beginEventSeen && commitEventSeen), "did not receive both BEGIN and COMMIT events in the final ROW event set")
}
}

func removeAnyDeprecatedDisplayWidths(orig string) string {
var adjusted string
baseIntType := "int"
intRE := regexp.MustCompile(`(?i)int\(([0-9]*)?\)`)
adjusted = intRE.ReplaceAllString(orig, baseIntType)
baseYearType := "year"
yearRE := regexp.MustCompile(`(?i)year\(([0-9]*)?\)`)
adjusted = yearRE.ReplaceAllString(adjusted, baseYearType)
return adjusted
}

var printMu sync.Mutex

func printEvents(evs []*binlogdatapb.VEvent) {
Expand Down
16 changes: 16 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,26 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error {
log.Infof("sendFieldEvent returned error %v", err)
return err
}
// sendFieldEvent() sends a BEGIN event first.
uvs.inTransaction = true
}

if len(rows.Rows) == 0 {
log.V(2).Infof("0 rows returned for table %s", tableName)
return nil
}

// We are about to send ROW events, so we need to ensure
// that we do so within a transaction. The COMMIT event
// will be sent in sendEventsForRows() below.
if !uvs.inTransaction {
evs := []*binlogdatapb.VEvent{{
Type: binlogdatapb.VEventType_BEGIN,
}}
uvs.send(evs)
uvs.inTransaction = true
}

newLastPK = sqltypes.CustomProto3ToResult(uvs.pkfields, &querypb.QueryResult{
Fields: rows.Fields,
Rows: []*querypb.Row{rows.Lastpk},
Expand All @@ -271,6 +285,8 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error {
log.Infof("sendEventsForRows returned error %v", err)
return err
}
// sendEventsForRows() sends a COMMIT event last.
uvs.inTransaction = false

uvs.setCopyState(tableName, qrLastPK)
log.V(2).Infof("NewLastPK: %v", qrLastPK)
Expand Down
18 changes: 11 additions & 7 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,17 @@ type uvstreamer struct {
cancel func()

// input parameters
vse *Engine
send func([]*binlogdatapb.VEvent) error
cp dbconfigs.Connector
se *schema.Engine
startPos string
filter *binlogdatapb.Filter
inTablePKs []*binlogdatapb.TableLastPK
vse *Engine
send func([]*binlogdatapb.VEvent) error
cp dbconfigs.Connector
se *schema.Engine
startPos string
// Are we currently in an explicit transaction?
// If we are not, and we're about to send ROW
// events, then we need to send a BEGIN event first.
inTransaction bool
filter *binlogdatapb.Filter
inTablePKs []*binlogdatapb.TableLastPK

vschema *localVSchema

Expand Down