-
Notifications
You must be signed in to change notification settings - Fork 2.3k
vtexplain: Ensure memory topo is set up for throttler #15279
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,26 +22,23 @@ package vtexplain | |
| import ( | ||
| "context" | ||
| "fmt" | ||
| "path" | ||
| "sort" | ||
| "strings" | ||
|
|
||
| "vitess.io/vitess/go/cache/theine" | ||
| "vitess.io/vitess/go/vt/vtgate/logstats" | ||
| "vitess.io/vitess/go/vt/vtgate/vindexes" | ||
|
|
||
| "vitess.io/vitess/go/vt/topo" | ||
| "vitess.io/vitess/go/vt/topo/memorytopo" | ||
|
|
||
| "vitess.io/vitess/go/vt/vterrors" | ||
|
|
||
| "vitess.io/vitess/go/json2" | ||
| "vitess.io/vitess/go/streamlog" | ||
| "vitess.io/vitess/go/vt/discovery" | ||
| "vitess.io/vitess/go/vt/key" | ||
| "vitess.io/vitess/go/vt/log" | ||
| "vitess.io/vitess/go/vt/srvtopo" | ||
| "vitess.io/vitess/go/vt/topo" | ||
| "vitess.io/vitess/go/vt/vterrors" | ||
| "vitess.io/vitess/go/vt/vtgate" | ||
| "vitess.io/vitess/go/vt/vtgate/engine" | ||
| "vitess.io/vitess/go/vt/vtgate/logstats" | ||
| "vitess.io/vitess/go/vt/vtgate/vindexes" | ||
| "vitess.io/vitess/go/vt/vttablet/queryservice" | ||
|
|
||
| querypb "vitess.io/vitess/go/vt/proto/query" | ||
|
|
@@ -50,14 +47,14 @@ import ( | |
| vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" | ||
| ) | ||
|
|
||
| func (vte *VTExplain) initVtgateExecutor(ctx context.Context, vSchemaStr, ksShardMapStr string, opts *Options) error { | ||
| func (vte *VTExplain) initVtgateExecutor(ctx context.Context, ts *topo.Server, vSchemaStr, ksShardMapStr string, opts *Options) error { | ||
| vte.explainTopo = &ExplainTopo{NumShards: opts.NumShards} | ||
| vte.explainTopo.TopoServer = memorytopo.NewServer(ctx, vtexplainCell) | ||
| vte.explainTopo.TopoServer = ts | ||
| vte.healthCheck = discovery.NewFakeHealthCheck(nil) | ||
|
|
||
| resolver := vte.newFakeResolver(ctx, opts, vte.explainTopo, vtexplainCell) | ||
| resolver := vte.newFakeResolver(ctx, opts, vte.explainTopo, Cell) | ||
|
|
||
| err := vte.buildTopology(ctx, opts, vSchemaStr, ksShardMapStr, opts.NumShards) | ||
| err := vte.buildTopology(ctx, ts, opts, vSchemaStr, ksShardMapStr, opts.NumShards) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
@@ -75,7 +72,7 @@ func (vte *VTExplain) initVtgateExecutor(ctx context.Context, vSchemaStr, ksShar | |
| var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests | ||
| queryLogBufferSize := 10 | ||
| plans := theine.NewStore[vtgate.PlanCacheKey, *engine.Plan](4*1024*1024, false) | ||
| vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.env, vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion, 0) | ||
| vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.env, vte.explainTopo, Cell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion, 0) | ||
| vte.vtgateExecutor.SetQueryLogger(streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)) | ||
|
|
||
| return nil | ||
|
|
@@ -95,7 +92,7 @@ func (vte *VTExplain) newFakeResolver(ctx context.Context, opts *Options, serv s | |
| return vtgate.NewResolver(srvResolver, serv, cell, sc) | ||
| } | ||
|
|
||
| func (vte *VTExplain) buildTopology(ctx context.Context, opts *Options, vschemaStr string, ksShardMapStr string, numShardsPerKeyspace int) error { | ||
| func (vte *VTExplain) buildTopology(ctx context.Context, ts *topo.Server, opts *Options, vschemaStr string, ksShardMapStr string, numShardsPerKeyspace int) error { | ||
| vte.explainTopo.Lock.Lock() | ||
| defer vte.explainTopo.Lock.Unlock() | ||
|
|
||
|
|
@@ -120,6 +117,10 @@ func (vte *VTExplain) buildTopology(ctx context.Context, opts *Options, vschemaS | |
| return err | ||
| } | ||
|
|
||
| conn, err := ts.ConnForCell(ctx, Cell) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| vte.explainTopo.TabletConns = make(map[string]*explainTablet) | ||
| vte.explainTopo.KeyspaceShards = make(map[string]map[string]*topodatapb.ShardReference) | ||
| for ks, vschema := range vte.explainTopo.Keyspaces { | ||
|
|
@@ -130,6 +131,32 @@ func (vte *VTExplain) buildTopology(ctx context.Context, opts *Options, vschemaS | |
|
|
||
| vte.explainTopo.KeyspaceShards[ks] = make(map[string]*topodatapb.ShardReference) | ||
|
|
||
| srvPath := path.Join(topo.KeyspacesPath, ks, topo.SrvKeyspaceFile) | ||
| srvKeyspace := &topodatapb.SrvKeyspace{ | ||
| Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{ | ||
| { | ||
| ServedType: topodatapb.TabletType_PRIMARY, | ||
| ShardReferences: shards, | ||
| }, | ||
| { | ||
| ServedType: topodatapb.TabletType_REPLICA, | ||
| ShardReferences: shards, | ||
| }, | ||
| { | ||
| ServedType: topodatapb.TabletType_RDONLY, | ||
| ShardReferences: shards, | ||
| }, | ||
| }, | ||
| } | ||
| data, err := srvKeyspace.MarshalVT() | ||
| if err != nil { | ||
| return err | ||
| } | ||
| _, err = conn.Update(ctx, srvPath, data, nil) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
||
|
|
||
| for _, shard := range shards { | ||
| // If the topology is in the middle of a reshard, there can be two shards covering the same key range (e.g. | ||
| // both source shard 80- and target shard 80-c0 cover the keyrange 80-c0). For the purposes of explain, we | ||
|
|
@@ -142,14 +169,13 @@ func (vte *VTExplain) buildTopology(ctx context.Context, opts *Options, vschemaS | |
| hostname := fmt.Sprintf("%s/%s", ks, shard.Name) | ||
| log.Infof("registering test tablet %s for keyspace %s shard %s", hostname, ks, shard.Name) | ||
|
|
||
| tablet := vte.healthCheck.AddFakeTablet(vtexplainCell, hostname, 1, ks, shard.Name, topodatapb.TabletType_PRIMARY, true, 1, nil, func(t *topodatapb.Tablet) queryservice.QueryService { | ||
| return vte.newTablet(ctx, vte.env, opts, t) | ||
| tablet := vte.healthCheck.AddFakeTablet(Cell, hostname, 1, ks, shard.Name, topodatapb.TabletType_PRIMARY, true, 1, nil, func(t *topodatapb.Tablet) queryservice.QueryService { | ||
| return vte.newTablet(ctx, vte.env, opts, t, ts) | ||
| }) | ||
| vte.explainTopo.TabletConns[hostname] = tablet.(*explainTablet) | ||
| vte.explainTopo.KeyspaceShards[ks][shard.Name] = shard | ||
| } | ||
| } | ||
|
|
||
| return err | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was incorrectly using
context.Background(). I also audited the paths into this function and from things like the CLI invocation, we do pass in the background context correctly.See also
watch_srvvschemawhere we do have the logic correct.