Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions go/vt/vttablet/tabletmanager/vreplication/vstreamer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ type TabletVStreamerClient struct {
// mu protects isOpen, streamers, streamIdx and kschema.
mu sync.Mutex

isOpen bool
isOpen bool
openConnection int32

tablet *topodatapb.Tablet
target *querypb.Target
Expand All @@ -74,7 +75,8 @@ type MySQLVStreamerClient struct {
// mu protects isOpen, streamers, streamIdx and kschema.
mu sync.Mutex

isOpen bool
isOpen bool
openConnection int32

sourceConnParams dbconfigs.Connector
sourceSe *schema.Engine
Expand All @@ -96,11 +98,11 @@ func NewTabletVStreamerClient(tablet *topodatapb.Tablet) *TabletVStreamerClient
func (vsClient *TabletVStreamerClient) Open(ctx context.Context) (err error) {
vsClient.mu.Lock()
defer vsClient.mu.Unlock()
if vsClient.isOpen {
vsClient.openConnection++
if vsClient.openConnection > 1 {
return nil
}
vsClient.isOpen = true

vsClient.tsQueryService, err = tabletconn.GetDialer()(vsClient.tablet, grpcclient.FailFast(true))
return err
}
Expand All @@ -109,7 +111,10 @@ func (vsClient *TabletVStreamerClient) Open(ctx context.Context) (err error) {
func (vsClient *TabletVStreamerClient) Close(ctx context.Context) (err error) {
vsClient.mu.Lock()
defer vsClient.mu.Unlock()
if !vsClient.isOpen {
if vsClient.openConnection > 0 {
vsClient.openConnection--
}
if vsClient.openConnection > 0 {
return nil
}
vsClient.isOpen = false
Expand Down Expand Up @@ -150,10 +155,10 @@ func NewMySQLVStreamerClient() *MySQLVStreamerClient {
func (vsClient *MySQLVStreamerClient) Open(ctx context.Context) (err error) {
vsClient.mu.Lock()
defer vsClient.mu.Unlock()
if vsClient.isOpen {
vsClient.openConnection++
if vsClient.openConnection > 1 {
return nil
}
vsClient.isOpen = true

// Let's create all the required components by vstreamer

Expand All @@ -171,10 +176,12 @@ func (vsClient *MySQLVStreamerClient) Open(ctx context.Context) (err error) {
func (vsClient *MySQLVStreamerClient) Close(ctx context.Context) (err error) {
vsClient.mu.Lock()
defer vsClient.mu.Unlock()
if !vsClient.isOpen {
if vsClient.openConnection > 0 {
vsClient.openConnection--
}
if vsClient.openConnection > 0 {
return nil
}

vsClient.isOpen = false
vsClient.sourceSe.Close()
return nil
Expand Down
146 changes: 146 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vstreamer_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,75 @@ func TestTabletVStreamerClientClose(t *testing.T) {
}
}

func TestTabletVStreamerClientCloseTwice(t *testing.T) {
tablet := addTablet(100)
defer deleteTablet(tablet)

type fields struct {
tablet *topodatapb.Tablet
}
type args struct {
ctx context.Context
}
tests := []struct {
name string
fields fields
args args
err string
}{
{
name: "closes engine correctly",
fields: fields{
tablet: tablet,
},
args: args{
ctx: context.Background(),
},
},
}

for _, tcase := range tests {
t.Run(tcase.name, func(t *testing.T) {
vsClient := &TabletVStreamerClient{
tablet: tcase.fields.tablet,
}

err := vsClient.Open(tcase.args.ctx)
if err != nil {
t.Errorf("Failed to Open vsClient")
return
}

// open again
err = vsClient.Open(tcase.args.ctx)
if err != nil {
t.Errorf("Failed to Open vsClient")
return
}

err = vsClient.Close(tcase.args.ctx)

if tcase.err != "" {
t.Errorf("MySQLVStreamerClient.Close() error:\n%v, want\n%v", err, tcase.err)
}

if !vsClient.isOpen {
t.Errorf("MySQLVStreamerClient.Close() should not close the connection opened by other")
}

err = vsClient.Close(tcase.args.ctx)

if tcase.err != "" {
t.Errorf("MySQLVStreamerClient.Close() error:\n%v, want\n%v", err, tcase.err)
}

if vsClient.isOpen {
t.Errorf("MySQLVStreamerClient.Close() isOpen set to true, expected false")
}
})
}
}

func TestTabletVStreamerClientVStream(t *testing.T) {
tablet := addTablet(100)
defer deleteTablet(tablet)
Expand Down Expand Up @@ -410,6 +479,83 @@ func TestMySQLVStreamerClientClose(t *testing.T) {
}
}

func TestMySQLVStreamerClientCloseTwice(t *testing.T) {
type fields struct {
isOpen bool
sourceConnParams dbconfigs.Connector
}
type args struct {
ctx context.Context
}

tests := []struct {
name string
fields fields
args args
err string
}{
{
name: "closes engine correctly",
fields: fields{
sourceConnParams: dbcfgs.ExternalReplWithDB(),
},
args: args{
ctx: context.Background(),
},
},
}

for _, tcase := range tests {
t.Run(tcase.name, func(t *testing.T) {
vsClient := &MySQLVStreamerClient{
isOpen: tcase.fields.isOpen,
sourceConnParams: tcase.fields.sourceConnParams,
}

err := vsClient.Open(tcase.args.ctx)
if err != nil {
t.Errorf("Failed to Open vsClient")
return
}

// open again
err = vsClient.Open(tcase.args.ctx)
if err != nil {
t.Errorf("Failed to Open vsClient")
return
}

err = vsClient.Close(tcase.args.ctx)

if tcase.err != "" {
t.Errorf("MySQLVStreamerClient.Close() error:\n%v, want\n%v", err, tcase.err)
}

if vsClient.isOpen {
t.Errorf("MySQLVStreamerClient.Close() should not close the connection opened by other")
}

if !vsClient.sourceSe.IsOpen() {
t.Errorf("MySQLVStreamerClient.Close() expected sourceSe not to be closed")
}

err = vsClient.Close(tcase.args.ctx)

if tcase.err != "" {
t.Errorf("MySQLVStreamerClient.Close() error:\n%v, want\n%v", err, tcase.err)
}

if vsClient.isOpen {
t.Errorf("MySQLVStreamerClient.Close() isOpen set to true, expected false")
}

if vsClient.sourceSe.IsOpen() {
t.Errorf("MySQLVStreamerClient.Close() expected sourceSe to be closed")
}
})
}
}

func TestMySQLVStreamerClientVStream(t *testing.T) {
vsClient := &MySQLVStreamerClient{
sourceConnParams: dbcfgs.ExternalReplWithDB(),
Expand Down