Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b09ab1d
WIP: first pass and adding sizes to protobufs
Mar 9, 2021
1a576f4
WIP: first stab at an aggregator version of `GetSchema`
Mar 13, 2021
de087ac
WIP: refactor out FilterTablets
Mar 13, 2021
8fd5d51
WIP: change protos to uint64
Mar 13, 2021
2693c11
WIP: update code for uint64
Mar 13, 2021
330d603
WIP: add Dial
Mar 13, 2021
1d92c60
WIP: more typed errors
Mar 13, 2021
af751d4
WIP: refactor to use GetSchemaForKeyspace in api (I'll back-name this…
Mar 13, 2021
1d45afb
WIP: remove last instance of old cluster.GetSchema, some minor touches
Mar 13, 2021
f47f149
WIP: add spans, add error wraps
Mar 14, 2021
46a0ec5
Minor changes to `GetSchemaForKeyspace`, update test code
Mar 22, 2021
6a1c5cf
Remove old method, rename new method to match, and docs docs docs
Mar 23, 2021
dd6c1ab
Add cluster-level GetSchema tests for size aggregation
Mar 24, 2021
65d8153
Update existing api tests for GetSchema(s)/FindSchema, remove dead code
Mar 24, 2021
13908c0
Thread sizeopts through all vtadmin schema endpoints
Mar 25, 2021
d7498e2
Update http adapters to grab sizeopts out of the query params
Mar 25, 2021
4d6bbf9
Add test for multi-cluster GetSchema size-aggr
Mar 26, 2021
f06582f
Add size aggr test case for FindSchema
Mar 26, 2021
30edd7d
Add sizeaggr test case for GetSchemas
Mar 26, 2021
be136d8
nits: whitespace, replace stub log lines
Mar 26, 2021
6de47eb
Refactor `FindAllShardsInKeyspace` to a cluster-level method
Mar 27, 2021
39a7eae
Extract helper function for collecting tabletsToQuery for GetSchema
Mar 28, 2021
9777f0d
Extract helper function for orchestrating the per-tablet GetSchema rpcs
Mar 28, 2021
9d5a725
Remove IncludeNonServingShards from TableSizeOptions
Mar 28, 2021
db26e63
PR feedback around naming, logging, and error handling
Mar 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,489 changes: 1,310 additions & 179 deletions go/vt/proto/vtadmin/vtadmin.pb.go

Large diffs are not rendered by default.

123 changes: 43 additions & 80 deletions go/vt/vtadmin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package vtadmin
import (
"context"
"encoding/json"
stderrors "errors"
"fmt"
"net/http"
"strings"
Expand All @@ -39,6 +40,7 @@ import (
vtadminhttp "vitess.io/vitess/go/vt/vtadmin/http"
vthandlers "vitess.io/vitess/go/vt/vtadmin/http/handlers"
"vitess.io/vitess/go/vt/vtadmin/sort"
"vitess.io/vitess/go/vt/vtadmin/vtadminproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtexplain"

Expand Down Expand Up @@ -176,7 +178,10 @@ func (api *API) FindSchema(ctx context.Context, req *vtadminpb.FindSchemaRequest
return
}

schemas, err := api.getSchemas(ctx, c, tablets)
schemas, err := api.getSchemas(ctx, c, cluster.GetSchemaOptions{
Tablets: tablets,
TableSizeOptions: req.TableSizeOptions,
})
if err != nil {
err := fmt.Errorf("%w: while collecting schemas for cluster %s", err, c.ID)
rec.RecordError(err)
Expand Down Expand Up @@ -342,26 +347,20 @@ func (api *API) GetKeyspaces(ctx context.Context, req *vtadminpb.GetKeyspacesReq
go func(c *cluster.Cluster, ks *vtctldatapb.Keyspace) {
defer kwg.Done()

span, ctx := trace.NewSpan(ctx, "Cluster.FindAllShardsInKeyspace")
defer span.Finish()

cluster.AnnotateSpan(c, span)
span.Annotate("keyspace", ks.Name)

sr, err := c.Vtctld.FindAllShardsInKeyspace(ctx, &vtctldatapb.FindAllShardsInKeyspaceRequest{
Keyspace: ks.Name,
shards, err := c.FindAllShardsInKeyspace(ctx, ks.Name, cluster.FindAllShardsInKeyspaceOptions{
SkipDial: true,
})

if err != nil {
er.RecordError(fmt.Errorf("FindAllShardsInKeyspace(%s): %w", ks.Name, err))
er.RecordError(err)
return
}

km.Lock()
kss = append(kss, &vtadminpb.Keyspace{
Cluster: c.ToProto(),
Keyspace: ks,
Shards: sr.Shards,
Shards: shards,
})
km.Unlock()
}(c, ks)
Expand Down Expand Up @@ -394,30 +393,19 @@ func (api *API) GetSchema(ctx context.Context, req *vtadminpb.GetSchemaRequest)
span.Annotate("cluster_id", req.ClusterId)
span.Annotate("keyspace", req.Keyspace)
span.Annotate("table", req.Table)
vtadminproto.AnnotateSpanWithGetSchemaTableSizeOptions(req.TableSizeOptions, span)

clusters, _ := api.getClustersForRequest([]string{req.ClusterId})
if len(clusters) == 0 {
c, ok := api.clusterMap[req.ClusterId]
if !ok {
return nil, fmt.Errorf("%w: no cluster with id %s", errors.ErrUnsupportedCluster, req.ClusterId)
}

cluster := clusters[0]

tablet, err := cluster.FindTablet(ctx, func(t *vtadminpb.Tablet) bool {
return t.Tablet.Keyspace == req.Keyspace && t.State == vtadminpb.Tablet_SERVING
return c.GetSchema(ctx, req.Keyspace, cluster.GetSchemaOptions{
BaseRequest: &vtctldatapb.GetSchemaRequest{
Tables: []string{req.Table},
},
TableSizeOptions: req.TableSizeOptions,
})
if err != nil {
return nil, fmt.Errorf("%w: no serving tablet found for keyspace %s", err, req.Keyspace)
}

span.Annotate("tablet_alias", topoproto.TabletAliasString(tablet.Tablet.Alias))

if err := cluster.Vtctld.Dial(ctx); err != nil {
return nil, err
}

return cluster.GetSchema(ctx, &vtctldatapb.GetSchemaRequest{
Tables: []string{req.Table},
}, tablet)
}

// GetSchemas is part of the vtadminpb.VTAdminServer interface.
Expand Down Expand Up @@ -449,7 +437,10 @@ func (api *API) GetSchemas(ctx context.Context, req *vtadminpb.GetSchemasRequest
return
}

ss, err := api.getSchemas(ctx, c, tablets)
ss, err := api.getSchemas(ctx, c, cluster.GetSchemaOptions{
Tablets: tablets,
TableSizeOptions: req.TableSizeOptions,
})
if err != nil {
er.RecordError(err)
return
Expand All @@ -473,7 +464,7 @@ func (api *API) GetSchemas(ctx context.Context, req *vtadminpb.GetSchemasRequest
}

// getSchemas returns all of the schemas across all keyspaces in the given cluster.
func (api *API) getSchemas(ctx context.Context, c *cluster.Cluster, tablets []*vtadminpb.Tablet) ([]*vtadminpb.Schema, error) {
func (api *API) getSchemas(ctx context.Context, c *cluster.Cluster, opts cluster.GetSchemaOptions) ([]*vtadminpb.Schema, error) {
if err := c.Vtctld.Dial(ctx); err != nil {
return nil, err
}
Expand Down Expand Up @@ -503,14 +494,26 @@ func (api *API) getSchemas(ctx context.Context, c *cluster.Cluster, tablets []*v
go func(c *cluster.Cluster, ks *vtctldatapb.Keyspace) {
defer wg.Done()

ss, err := api.getSchemasForKeyspace(ctx, c, ks, tablets)
ss, err := c.GetSchema(ctx, ks.Name, opts)
if err != nil {
// Ignore keyspaces without any serving tablets.
if stderrors.Is(err, errors.ErrNoServingTablet) {
log.Infof(err.Error())
return
}

er.RecordError(err)
return
}

// Ignore keyspaces without schemas
if ss == nil {
log.Infof("No schemas for %s", ks.Name)
return
}

if len(ss.TableDefinitions) == 0 {
log.Infof("No tables in schema for %s", ks.Name)
return
}

Expand All @@ -529,41 +532,6 @@ func (api *API) getSchemas(ctx context.Context, c *cluster.Cluster, tablets []*v
return schemas, nil
}

func (api *API) getSchemasForKeyspace(ctx context.Context, c *cluster.Cluster, ks *vtctldatapb.Keyspace, tablets []*vtadminpb.Tablet) (*vtadminpb.Schema, error) {
// Choose the first serving tablet.
var kt *vtadminpb.Tablet
for _, t := range tablets {
if t.Tablet.Keyspace != ks.Name || t.State != vtadminpb.Tablet_SERVING {
continue
}

kt = t
}

// Skip schema lookup on this keyspace if there are no serving tablets.
if kt == nil {
return nil, nil
}

if err := c.Vtctld.Dial(ctx); err != nil {
return nil, err
}

s, err := c.GetSchema(ctx, &vtctldatapb.GetSchemaRequest{}, kt)
if err != nil {
return nil, err
}

// Ignore any schemas without table definitions; otherwise we return
// a vtadminpb.Schema object with only Cluster and Keyspace defined,
// which is not particularly useful.
if s == nil || len(s.TableDefinitions) == 0 {
return nil, nil
}

return s, nil
}

// GetTablet is part of the vtadminpb.VTAdminServer interface.
func (api *API) GetTablet(ctx context.Context, req *vtadminpb.GetTabletRequest) (*vtadminpb.Tablet, error) {
span, ctx := trace.NewSpan(ctx, "API.GetTablet")
Expand Down Expand Up @@ -907,7 +875,9 @@ func (api *API) VTExplain(ctx context.Context, req *vtadminpb.VTExplainRequest)
go func(c *cluster.Cluster) {
defer wg.Done()

res, err := c.GetSchema(ctx, &vtctldatapb.GetSchemaRequest{}, tablet)
res, err := c.GetSchema(ctx, req.Keyspace, cluster.GetSchemaOptions{
Tablets: []*vtadminpb.Tablet{tablet},
})
if err != nil {
er.RecordError(fmt.Errorf("GetSchema(%s): %w", topoproto.TabletAliasString(tablet.Tablet.Alias), err))
return
Expand Down Expand Up @@ -959,23 +929,16 @@ func (api *API) VTExplain(ctx context.Context, req *vtadminpb.VTExplainRequest)
go func(c *cluster.Cluster) {
defer wg.Done()

span, ctx := trace.NewSpan(ctx, "Cluster.FindAllShardsInKeyspace")
defer span.Finish()

span.Annotate("keyspace", req.Keyspace)
cluster.AnnotateSpan(c, span)

ksm, err := c.Vtctld.FindAllShardsInKeyspace(ctx, &vtctldatapb.FindAllShardsInKeyspaceRequest{
Keyspace: req.Keyspace,
shards, err := c.FindAllShardsInKeyspace(ctx, req.Keyspace, cluster.FindAllShardsInKeyspaceOptions{
SkipDial: true,
})

if err != nil {
er.RecordError(fmt.Errorf("FindAllShardsInKeyspace(%s): %w", req.Keyspace, err))
er.RecordError(err)
return
}

vtsm := make(map[string]*topodatapb.Shard)
for _, s := range ksm.Shards {
for _, s := range shards {
vtsm[s.Name] = s.Shard
}

Expand Down
Loading