diff --git a/go/vt/vtexplain/vtexplain_vtgate.go b/go/vt/vtexplain/vtexplain_vtgate.go index d7b3c3accf7..a98e6c3a724 100644 --- a/go/vt/vtexplain/vtexplain_vtgate.go +++ b/go/vt/vtexplain/vtexplain_vtgate.go @@ -55,7 +55,7 @@ func (vte *VTExplain) initVtgateExecutor(ctx context.Context, vSchemaStr, ksShar vte.explainTopo.TopoServer = memorytopo.NewServer(ctx, vtexplainCell) vte.healthCheck = discovery.NewFakeHealthCheck(nil) - resolver := vte.newFakeResolver(opts, vte.explainTopo, vtexplainCell) + resolver := vte.newFakeResolver(ctx, opts, vte.explainTopo, vtexplainCell) err := vte.buildTopology(ctx, opts, vSchemaStr, ksShardMapStr, opts.NumShards) if err != nil { @@ -80,10 +80,9 @@ func (vte *VTExplain) initVtgateExecutor(ctx context.Context, vSchemaStr, ksShar return nil } -func (vte *VTExplain) newFakeResolver(opts *Options, serv srvtopo.Server, cell string) *vtgate.Resolver { - ctx := context.Background() +func (vte *VTExplain) newFakeResolver(ctx context.Context, opts *Options, serv srvtopo.Server, cell string) *vtgate.Resolver { gw := vtgate.NewTabletGateway(ctx, vte.healthCheck, serv, cell) - _ = gw.WaitForTablets([]topodatapb.TabletType{topodatapb.TabletType_REPLICA}) + _ = gw.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_REPLICA}) txMode := vtgatepb.TransactionMode_MULTI if opts.ExecutionMode == ModeTwoPC { diff --git a/go/vt/vtgate/legacy_scatter_conn_test.go b/go/vt/vtgate/legacy_scatter_conn_test.go index c4dcd3746fa..7ada1c3ac31 100644 --- a/go/vt/vtgate/legacy_scatter_conn_test.go +++ b/go/vt/vtgate/legacy_scatter_conn_test.go @@ -108,8 +108,7 @@ func TestLegacyExecuteFailOnAutocommit(t *testing.T) { } func TestScatterConnExecuteMulti(t *testing.T) { - testScatterConnGeneric(t, "TestScatterConnExecuteMultiShard", func(sc *ScatterConn, shards []string) (*sqltypes.Result, error) { - ctx := utils.LeakCheckContext(t) + testScatterConnGeneric(t, "TestScatterConnExecuteMultiShard", func(ctx context.Context, sc *ScatterConn, shards []string) (*sqltypes.Result, error) { res := srvtopo.NewResolver(newSandboxForCells(ctx, []string{"aa"}), sc.gateway, "aa") rss, err := res.ResolveDestination(ctx, "TestScatterConnExecuteMultiShard", topodatapb.TabletType_REPLICA, key.DestinationShards(shards)) if err != nil { @@ -130,8 +129,7 @@ func TestScatterConnExecuteMulti(t *testing.T) { } func TestScatterConnStreamExecuteMulti(t *testing.T) { - testScatterConnGeneric(t, "TestScatterConnStreamExecuteMulti", func(sc *ScatterConn, shards []string) (*sqltypes.Result, error) { - ctx := utils.LeakCheckContext(t) + testScatterConnGeneric(t, "TestScatterConnStreamExecuteMulti", func(ctx context.Context, sc *ScatterConn, shards []string) (*sqltypes.Result, error) { res := srvtopo.NewResolver(newSandboxForCells(ctx, []string{"aa"}), sc.gateway, "aa") rss, err := res.ResolveDestination(ctx, "TestScatterConnStreamExecuteMulti", topodatapb.TabletType_REPLICA, key.DestinationShards(shards)) if err != nil { @@ -158,7 +156,7 @@ func verifyScatterConnError(t *testing.T, err error, wantErr string, wantCode vt assert.Equal(t, wantCode, vterrors.Code(err)) } -func testScatterConnGeneric(t *testing.T, name string, f func(sc *ScatterConn, shards []string) (*sqltypes.Result, error)) { +func testScatterConnGeneric(t *testing.T, name string, f func(ctx context.Context, sc *ScatterConn, shards []string) (*sqltypes.Result, error)) { ctx := utils.LeakCheckContext(t) hc := discovery.NewFakeHealthCheck(nil) @@ -166,7 +164,7 @@ func testScatterConnGeneric(t *testing.T, name string, f func(sc *ScatterConn, s // no shard s := createSandbox(name) sc := newTestScatterConn(ctx, hc, newSandboxForCells(ctx, []string{"aa"}), "aa") - qr, err := f(sc, nil) + qr, err := f(ctx, sc, nil) require.NoError(t, err) if qr.RowsAffected != 0 { t.Errorf("want 0, got %v", qr.RowsAffected) @@ -177,7 +175,7 @@ func testScatterConnGeneric(t *testing.T, name string, f func(sc *ScatterConn, s sc = newTestScatterConn(ctx, hc, newSandboxForCells(ctx, []string{"aa"}), "aa") sbc := hc.AddTestTablet("aa", "0", 1, name, "0", topodatapb.TabletType_REPLICA, true, 1, nil) sbc.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 - _, err = f(sc, []string{"0"}) + _, err = f(ctx, sc, []string{"0"}) want := fmt.Sprintf("target: %v.0.replica: INVALID_ARGUMENT error", name) // Verify server error string. if err == nil || err.Error() != want { @@ -196,7 +194,7 @@ func testScatterConnGeneric(t *testing.T, name string, f func(sc *ScatterConn, s sbc1 := hc.AddTestTablet("aa", "1", 1, name, "1", topodatapb.TabletType_REPLICA, true, 1, nil) sbc0.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 sbc1.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 - _, err = f(sc, []string{"0", "1"}) + _, err = f(ctx, sc, []string{"0", "1"}) // Verify server errors are consolidated. want = fmt.Sprintf("target: %v.0.replica: INVALID_ARGUMENT error\ntarget: %v.1.replica: INVALID_ARGUMENT error", name, name) verifyScatterConnError(t, err, want, vtrpcpb.Code_INVALID_ARGUMENT) @@ -216,7 +214,7 @@ func testScatterConnGeneric(t *testing.T, name string, f func(sc *ScatterConn, s sbc1 = hc.AddTestTablet("aa", "1", 1, name, "1", topodatapb.TabletType_REPLICA, true, 1, nil) sbc0.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 sbc1.MustFailCodes[vtrpcpb.Code_RESOURCE_EXHAUSTED] = 1 - _, err = f(sc, []string{"0", "1"}) + _, err = f(ctx, sc, []string{"0", "1"}) // Verify server errors are consolidated. want = fmt.Sprintf("target: %v.0.replica: INVALID_ARGUMENT error\ntarget: %v.1.replica: RESOURCE_EXHAUSTED error", name, name) // We should only surface the higher priority error code @@ -234,7 +232,7 @@ func testScatterConnGeneric(t *testing.T, name string, f func(sc *ScatterConn, s hc.Reset() sc = newTestScatterConn(ctx, hc, newSandboxForCells(ctx, []string{"aa"}), "aa") sbc = hc.AddTestTablet("aa", "0", 1, name, "0", topodatapb.TabletType_REPLICA, true, 1, nil) - _, _ = f(sc, []string{"0", "0"}) + _, _ = f(ctx, sc, []string{"0", "0"}) // Ensure that we executed only once. if execCount := sbc.ExecCount.Load(); execCount != 1 { t.Errorf("want 1, got %v", execCount) @@ -246,7 +244,7 @@ func testScatterConnGeneric(t *testing.T, name string, f func(sc *ScatterConn, s sc = newTestScatterConn(ctx, hc, newSandboxForCells(ctx, []string{"aa"}), "aa") sbc0 = hc.AddTestTablet("aa", "0", 1, name, "0", topodatapb.TabletType_REPLICA, true, 1, nil) sbc1 = hc.AddTestTablet("aa", "1", 1, name, "1", topodatapb.TabletType_REPLICA, true, 1, nil) - qr, err = f(sc, []string{"0", "1"}) + qr, err = f(ctx, sc, []string{"0", "1"}) if err != nil { t.Fatalf("want nil, got %v", err) } diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index f7caf6819e2..de63da87907 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -166,9 +166,9 @@ func (gw *TabletGateway) RegisterStats() { } // WaitForTablets is part of the Gateway interface. -func (gw *TabletGateway) WaitForTablets(tabletTypesToWait []topodatapb.TabletType) (err error) { +func (gw *TabletGateway) WaitForTablets(ctx context.Context, tabletTypesToWait []topodatapb.TabletType) (err error) { log.Infof("Gateway waiting for serving tablets of types %v ...", tabletTypesToWait) - ctx, cancel := context.WithTimeout(context.Background(), initialTabletTimeout) + ctx, cancel := context.WithTimeout(ctx, initialTabletTimeout) defer cancel() defer func() { diff --git a/go/vt/vtgate/tabletgateway_test.go b/go/vt/vtgate/tabletgateway_test.go index e88f3cbf32b..32d18dcc9ab 100644 --- a/go/vt/vtgate/tabletgateway_test.go +++ b/go/vt/vtgate/tabletgateway_test.go @@ -37,19 +37,21 @@ import ( ) func TestTabletGatewayExecute(t *testing.T) { - testTabletGatewayGeneric(t, func(tg *TabletGateway, target *querypb.Target) error { - _, err := tg.Execute(context.Background(), target, "query", nil, 0, 0, nil) + ctx := utils.LeakCheckContext(t) + testTabletGatewayGeneric(t, ctx, func(ctx context.Context, tg *TabletGateway, target *querypb.Target) error { + _, err := tg.Execute(ctx, target, "query", nil, 0, 0, nil) return err }) - testTabletGatewayTransact(t, func(tg *TabletGateway, target *querypb.Target) error { - _, err := tg.Execute(context.Background(), target, "query", nil, 1, 0, nil) + testTabletGatewayTransact(t, ctx, func(ctx context.Context, tg *TabletGateway, target *querypb.Target) error { + _, err := tg.Execute(ctx, target, "query", nil, 1, 0, nil) return err }) } func TestTabletGatewayExecuteStream(t *testing.T) { - testTabletGatewayGeneric(t, func(tg *TabletGateway, target *querypb.Target) error { - err := tg.StreamExecute(context.Background(), target, "query", nil, 0, 0, nil, func(qr *sqltypes.Result) error { + ctx := utils.LeakCheckContext(t) + testTabletGatewayGeneric(t, ctx, func(ctx context.Context, tg *TabletGateway, target *querypb.Target) error { + err := tg.StreamExecute(ctx, target, "query", nil, 0, 0, nil, func(qr *sqltypes.Result) error { return nil }) return err @@ -57,29 +59,33 @@ func TestTabletGatewayExecuteStream(t *testing.T) { } func TestTabletGatewayBegin(t *testing.T) { - testTabletGatewayGeneric(t, func(tg *TabletGateway, target *querypb.Target) error { - _, err := tg.Begin(context.Background(), target, nil) + ctx := utils.LeakCheckContext(t) + testTabletGatewayGeneric(t, ctx, func(ctx context.Context, tg *TabletGateway, target *querypb.Target) error { + _, err := tg.Begin(ctx, target, nil) return err }) } func TestTabletGatewayCommit(t *testing.T) { - testTabletGatewayTransact(t, func(tg *TabletGateway, target *querypb.Target) error { - _, err := tg.Commit(context.Background(), target, 1) + ctx := utils.LeakCheckContext(t) + testTabletGatewayTransact(t, ctx, func(ctx context.Context, tg *TabletGateway, target *querypb.Target) error { + _, err := tg.Commit(ctx, target, 1) return err }) } func TestTabletGatewayRollback(t *testing.T) { - testTabletGatewayTransact(t, func(tg *TabletGateway, target *querypb.Target) error { - _, err := tg.Rollback(context.Background(), target, 1) + ctx := utils.LeakCheckContext(t) + testTabletGatewayTransact(t, ctx, func(ctx context.Context, tg *TabletGateway, target *querypb.Target) error { + _, err := tg.Rollback(ctx, target, 1) return err }) } func TestTabletGatewayBeginExecute(t *testing.T) { - testTabletGatewayGeneric(t, func(tg *TabletGateway, target *querypb.Target) error { - _, _, err := tg.BeginExecute(context.Background(), target, nil, "query", nil, 0, nil) + ctx := utils.LeakCheckContext(t) + testTabletGatewayGeneric(t, ctx, func(ctx context.Context, tg *TabletGateway, target *querypb.Target) error { + _, _, err := tg.BeginExecute(ctx, target, nil, "query", nil, 0, nil) return err }) } @@ -167,14 +173,12 @@ func TestTabletGatewayReplicaTransactionError(t *testing.T) { defer tg.Close(ctx) _ = hc.AddTestTablet("cell", host, port, keyspace, shard, tabletType, true, 10, nil) - _, err := tg.Execute(context.Background(), target, "query", nil, 1, 0, nil) + _, err := tg.Execute(ctx, target, "query", nil, 1, 0, nil) verifyContainsError(t, err, "query service can only be used for non-transactional queries on replicas", vtrpcpb.Code_INTERNAL) } -func testTabletGatewayGeneric(t *testing.T, f func(tg *TabletGateway, target *querypb.Target) error) { +func testTabletGatewayGeneric(t *testing.T, ctx context.Context, f func(ctx context.Context, tg *TabletGateway, target *querypb.Target) error) { t.Helper() - ctx := utils.LeakCheckContext(t) - keyspace := "ks" shard := "0" tabletType := topodatapb.TabletType_REPLICA @@ -192,19 +196,19 @@ func testTabletGatewayGeneric(t *testing.T, f func(tg *TabletGateway, target *qu // no tablet want := []string{"target: ks.0.replica", `no healthy tablet available for 'keyspace:"ks" shard:"0" tablet_type:REPLICA`} - err := f(tg, target) + err := f(ctx, tg, target) verifyShardErrors(t, err, want, vtrpcpb.Code_UNAVAILABLE) // tablet with error hc.Reset() hc.AddTestTablet("cell", host, port, keyspace, shard, tabletType, false, 10, fmt.Errorf("no connection")) - err = f(tg, target) + err = f(ctx, tg, target) verifyShardErrors(t, err, want, vtrpcpb.Code_UNAVAILABLE) // tablet without connection hc.Reset() _ = hc.AddTestTablet("cell", host, port, keyspace, shard, tabletType, false, 10, nil).Tablet() - err = f(tg, target) + err = f(ctx, tg, target) verifyShardErrors(t, err, want, vtrpcpb.Code_UNAVAILABLE) // retry error @@ -214,7 +218,7 @@ func testTabletGatewayGeneric(t *testing.T, f func(tg *TabletGateway, target *qu sc1.MustFailCodes[vtrpcpb.Code_FAILED_PRECONDITION] = 1 sc2.MustFailCodes[vtrpcpb.Code_FAILED_PRECONDITION] = 1 - err = f(tg, target) + err = f(ctx, tg, target) verifyContainsError(t, err, "target: ks.0.replica", vtrpcpb.Code_FAILED_PRECONDITION) // fatal error @@ -223,26 +227,25 @@ func testTabletGatewayGeneric(t *testing.T, f func(tg *TabletGateway, target *qu sc2 = hc.AddTestTablet("cell", host, port+1, keyspace, shard, tabletType, true, 10, nil) sc1.MustFailCodes[vtrpcpb.Code_FAILED_PRECONDITION] = 1 sc2.MustFailCodes[vtrpcpb.Code_FAILED_PRECONDITION] = 1 - err = f(tg, target) + err = f(ctx, tg, target) verifyContainsError(t, err, "target: ks.0.replica", vtrpcpb.Code_FAILED_PRECONDITION) // server error - no retry hc.Reset() sc1 = hc.AddTestTablet("cell", host, port, keyspace, shard, tabletType, true, 10, nil) sc1.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 - err = f(tg, target) + err = f(ctx, tg, target) assert.Equal(t, vtrpcpb.Code_INVALID_ARGUMENT, vterrors.Code(err)) // no failure hc.Reset() hc.AddTestTablet("cell", host, port, keyspace, shard, tabletType, true, 10, nil) - err = f(tg, target) + err = f(ctx, tg, target) assert.NoError(t, err) } -func testTabletGatewayTransact(t *testing.T, f func(tg *TabletGateway, target *querypb.Target) error) { +func testTabletGatewayTransact(t *testing.T, ctx context.Context, f func(ctx context.Context, tg *TabletGateway, target *querypb.Target) error) { t.Helper() - ctx := utils.LeakCheckContext(t) keyspace := "ks" shard := "0" @@ -267,14 +270,14 @@ func testTabletGatewayTransact(t *testing.T, f func(tg *TabletGateway, target *q sc1.MustFailCodes[vtrpcpb.Code_FAILED_PRECONDITION] = 1 sc2.MustFailCodes[vtrpcpb.Code_FAILED_PRECONDITION] = 1 - err := f(tg, target) + err := f(ctx, tg, target) verifyContainsError(t, err, "target: ks.0.primary", vtrpcpb.Code_FAILED_PRECONDITION) // server error - no retry hc.Reset() sc1 = hc.AddTestTablet("cell", host, port, keyspace, shard, tabletType, true, 10, nil) sc1.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 - err = f(tg, target) + err = f(ctx, tg, target) verifyContainsError(t, err, "target: ks.0.primary", vtrpcpb.Code_INVALID_ARGUMENT) } diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 13aae235d44..70040a36653 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -89,7 +89,7 @@ func TestVStreamSkew(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(ctx, hc, st, cell) vgtid := &binlogdatapb.VGtid{ShardGtids: []*binlogdatapb.ShardGtid{}} want := int64(0) var sbc0, sbc1 *sandboxconn.SandboxConn @@ -135,7 +135,7 @@ func TestVStreamEvents(t *testing.T) { hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(ctx, hc, st, cell) sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) @@ -212,7 +212,7 @@ func TestVStreamChunks(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(ctx, hc, st, cell) sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -280,7 +280,7 @@ func TestVStreamMulti(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) - vsm := newTestVStreamManager(hc, st, "aa") + vsm := newTestVStreamManager(ctx, hc, st, "aa") sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -342,7 +342,7 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(ctx, hc, st, cell) vsm.vstreamsCreated.ResetAll() vsm.vstreamsLag.ResetAll() sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -397,7 +397,7 @@ func TestVStreamRetry(t *testing.T) { hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(hc, st, "aa") + vsm := newTestVStreamManager(ctx, hc, st, "aa") sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) commit := []*binlogdatapb.VEvent{ @@ -437,7 +437,7 @@ func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(ctx, hc, st, cell) sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) @@ -487,7 +487,7 @@ func TestVStreamJournalOneToMany(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"}) - vsm := newTestVStreamManager(hc, st, "aa") + vsm := newTestVStreamManager(ctx, hc, st, "aa") sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -600,7 +600,7 @@ func TestVStreamJournalManyToOne(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(ctx, hc, st, cell) sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -717,7 +717,7 @@ func TestVStreamJournalNoMatch(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(hc, st, "aa") + vsm := newTestVStreamManager(ctx, hc, st, "aa") sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) @@ -846,7 +846,7 @@ func TestVStreamJournalPartialMatch(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"}) - vsm := newTestVStreamManager(hc, st, "aa") + vsm := newTestVStreamManager(ctx, hc, st, "aa") sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, ctx, st, ks, "-10", sbc1.Tablet()) sbc2 := hc.AddTestTablet("aa", "1.1.1.1", 1003, ks, "10-20", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -929,7 +929,7 @@ func TestResolveVStreamParams(t *testing.T) { name := "TestVStream" _ = createSandbox(name) hc := discovery.NewFakeHealthCheck(nil) - vsm := newTestVStreamManager(hc, newSandboxForCells(ctx, []string{"aa"}), "aa") + vsm := newTestVStreamManager(ctx, hc, newSandboxForCells(ctx, []string{"aa"}), "aa") testcases := []struct { input *binlogdatapb.VGtid output *binlogdatapb.VGtid @@ -1144,7 +1144,7 @@ func TestVStreamIdleHeartbeat(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(ctx, hc, st, cell) sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) vgtid := &binlogdatapb.VGtid{ @@ -1170,7 +1170,7 @@ func TestVStreamIdleHeartbeat(t *testing.T) { t.Run(tcase.name, func(t *testing.T) { var mu sync.Mutex var heartbeatCount int - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) go func() { vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{HeartbeatInterval: tcase.heartbeatInterval}, func(events []*binlogdatapb.VEvent) error { @@ -1193,8 +1193,8 @@ func TestVStreamIdleHeartbeat(t *testing.T) { } } -func newTestVStreamManager(hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager { - gw := NewTabletGateway(context.Background(), hc, serv, cell) +func newTestVStreamManager(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager { + gw := NewTabletGateway(ctx, hc, serv, cell) srvResolver := srvtopo.NewResolver(serv, gw, cell) return newVStreamManager(srvResolver, serv, cell) } diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index eee0442fda7..4f8cffceb12 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -253,7 +253,7 @@ func Init( // TabletGateway can create it's own healthcheck gw := NewTabletGateway(ctx, hc, serv, cell) gw.RegisterStats() - if err := gw.WaitForTablets(tabletTypesToWait); err != nil { + if err := gw.WaitForTablets(ctx, tabletTypesToWait); err != nil { log.Fatalf("tabletGateway.WaitForTablets failed: %v", err) }