Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 16 additions & 17 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"fmt"
"regexp"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -75,7 +76,7 @@ var testState = &state{}

var positions map[string]string
var allEvents []*binlogdatapb.VEvent

var muAllEvents sync.Mutex
var callbacks map[string]func()

func TestVStreamCopyFilterValidations(t *testing.T) {
Expand Down Expand Up @@ -114,12 +115,12 @@ func TestVStreamCopyFilterValidations(t *testing.T) {
}
return uvs
}
var testFilter = func(rules []*binlogdatapb.Rule, tablePKs []*binlogdatapb.TableLastPK, expected []string, err string) {
var testFilter = func(rules []*binlogdatapb.Rule, tablePKs []*binlogdatapb.TableLastPK, expected []string, expectedError string) {
uvs := getUVStreamer(&binlogdatapb.Filter{Rules: rules}, tablePKs)
if err == "" {
if expectedError == "" {
require.NoError(t, uvs.init())
} else {
require.Error(t, uvs.init(), err)
require.Error(t, uvs.init(), expectedError)
return
}
require.Equal(t, len(expected), len(uvs.plans))
Expand All @@ -135,10 +136,10 @@ func TestVStreamCopyFilterValidations(t *testing.T) {
}

type TestCase struct {
rules []*binlogdatapb.Rule
tablePKs []*binlogdatapb.TableLastPK
expected []string
err string
rules []*binlogdatapb.Rule
tablePKs []*binlogdatapb.TableLastPK
expected []string
expectedError string
}

var testCases []*TestCase
Expand All @@ -159,7 +160,7 @@ func TestVStreamCopyFilterValidations(t *testing.T) {

for _, tc := range testCases {
log.Infof("Running %v", tc.rules)
testFilter(tc.rules, tc.tablePKs, tc.expected, tc.err)
testFilter(tc.rules, tc.tablePKs, tc.expected, tc.expectedError)
}
}

Expand Down Expand Up @@ -196,9 +197,7 @@ func TestVStreamCopyCompleteFlow(t *testing.T) {
// Test event called after t1 copy is complete
callbacks["OTHER.*Copy Start t2"] = func() {
conn, err := env.Mysqld.GetDbaConnection(ctx)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
defer conn.Close()

log.Info("Inserting row for fast forward to find, locking t2")
Expand All @@ -212,9 +211,7 @@ func TestVStreamCopyCompleteFlow(t *testing.T) {

callbacks["OTHER.*Copy Start t3"] = func() {
conn, err := env.Mysqld.GetDbaConnection(ctx)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
defer conn.Close()

log.Info("Inserting row for fast forward to find, locking t3")
Expand Down Expand Up @@ -268,7 +265,8 @@ func TestVStreamCopyCompleteFlow(t *testing.T) {
case <-ctx.Done():
log.Infof("Received context.Done, ending test")
}

muAllEvents.Lock()
defer muAllEvents.Unlock()
printAllEvents("End of test")
if len(allEvents) != numExpectedEvents {
log.Errorf("Received %d events, expected %d", len(allEvents), numExpectedEvents)
Expand All @@ -280,7 +278,6 @@ func TestVStreamCopyCompleteFlow(t *testing.T) {
log.Infof("Successfully received %d events", numExpectedEvents)
}
validateReceivedEvents(t)

}

func validateReceivedEvents(t *testing.T) {
Expand Down Expand Up @@ -389,6 +386,8 @@ func startVStreamCopy(ctx context.Context, t *testing.T, filter *binlogdatapb.Fi
go func() {
err := engine.Stream(ctx, pos, tablePKs, filter, func(evs []*binlogdatapb.VEvent) error {
//t.Logf("Received events: %v", evs)
muAllEvents.Lock()
defer muAllEvents.Unlock()
for _, ev := range evs {
if ev.Type == binlogdatapb.VEventType_HEARTBEAT {
continue
Expand Down