Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,361 changes: 2,000 additions & 361 deletions go/vt/proto/vtadmin/vtadmin.pb.go

Large diffs are not rendered by default.

114 changes: 92 additions & 22 deletions go/vt/vtadmin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"k8s.io/apimachinery/pkg/util/sets"

"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/concurrency"
Expand Down Expand Up @@ -106,6 +107,8 @@ func NewAPI(clusters []*cluster.Cluster, opts grpcserver.Options, httpOpts vtadm
router.HandleFunc("/vschema/{cluster_id}/{keyspace}", httpAPI.Adapt(vtadminhttp.GetVSchema)).Name("API.GetVSchema")
router.HandleFunc("/vschemas", httpAPI.Adapt(vtadminhttp.GetVSchemas)).Name("API.GetVSchemas")
router.HandleFunc("/vtexplain", httpAPI.Adapt(vtadminhttp.VTExplain)).Name("API.VTExplain")
router.HandleFunc("/workflow/{cluster_id}/{keyspace}/{name}", httpAPI.Adapt(vtadminhttp.GetWorkflow)).Name("API.GetWorkflow")
router.HandleFunc("/workflows", httpAPI.Adapt(vtadminhttp.GetWorkflows)).Name("API.GetWorkflows")

// Middlewares are executed in order of addition. Our ordering (all
// middlewares being optional) is:
Expand Down Expand Up @@ -662,28 +665,6 @@ func (api *API) GetTablets(ctx context.Context, req *vtadminpb.GetTabletsRequest
}, nil
}

func (api *API) getClustersForRequest(ids []string) ([]*cluster.Cluster, []string) {
if len(ids) == 0 {
clusterIDs := make([]string, 0, len(api.clusters))

for k := range api.clusterMap {
clusterIDs = append(clusterIDs, k)
}

return api.clusters, clusterIDs
}

clusters := make([]*cluster.Cluster, 0, len(ids))

for _, id := range ids {
if c, ok := api.clusterMap[id]; ok {
clusters = append(clusters, c)
}
}

return clusters, ids
}

// GetVSchema is part of the vtadminpb.VTAdminServer interface.
func (api *API) GetVSchema(ctx context.Context, req *vtadminpb.GetVSchemaRequest) (*vtadminpb.VSchema, error) {
span, ctx := trace.NewSpan(ctx, "API.GetVSchema")
Expand Down Expand Up @@ -801,6 +782,73 @@ func (api *API) GetVSchemas(ctx context.Context, req *vtadminpb.GetVSchemasReque
}, nil
}

// GetWorkflow is part of the vtadminpb.VTAdminServer interface.
func (api *API) GetWorkflow(ctx context.Context, req *vtadminpb.GetWorkflowRequest) (*vtadminpb.Workflow, error) {
span, ctx := trace.NewSpan(ctx, "API.GetWorkflow")
defer span.Finish()

c, ok := api.clusterMap[req.ClusterId]
if !ok {
return nil, fmt.Errorf("%w: no such cluster %s", errors.ErrUnsupportedCluster, req.ClusterId)
}

cluster.AnnotateSpan(c, span)
span.Annotate("keyspace", req.Keyspace)
span.Annotate("workflow_name", req.Name)
span.Annotate("active_only", req.ActiveOnly)

return c.GetWorkflow(ctx, req.Keyspace, req.Name, cluster.GetWorkflowOptions{
ActiveOnly: req.ActiveOnly,
})
}

// GetWorkflows is part of the vtadminpb.VTAdminServer interface.
func (api *API) GetWorkflows(ctx context.Context, req *vtadminpb.GetWorkflowsRequest) (*vtadminpb.GetWorkflowsResponse, error) {
span, ctx := trace.NewSpan(ctx, "API.GetWorkflows")
defer span.Finish()

clusters, _ := api.getClustersForRequest(req.ClusterIds)

var (
m sync.Mutex
wg sync.WaitGroup
rec concurrency.AllErrorRecorder
results = map[string]*vtadminpb.ClusterWorkflows{}
)

for _, c := range clusters {
wg.Add(1)

go func(c *cluster.Cluster) {
defer wg.Done()

workflows, err := c.GetWorkflows(ctx, req.Keyspaces, cluster.GetWorkflowsOptions{
ActiveOnly: req.ActiveOnly,
IgnoreKeyspaces: sets.NewString(req.IgnoreKeyspaces...),
})
if err != nil {
rec.RecordError(err)

return
}

m.Lock()
results[c.ID] = workflows
m.Unlock()
}(c)
}

wg.Wait()

if rec.HasErrors() {
return nil, rec.Error()
}

return &vtadminpb.GetWorkflowsResponse{
WorkflowsByCluster: results,
}, nil
}

// VTExplain is part of the vtadminpb.VTAdminServer interface.
func (api *API) VTExplain(ctx context.Context, req *vtadminpb.VTExplainRequest) (*vtadminpb.VTExplainResponse, error) {
span, ctx := trace.NewSpan(ctx, "API.VTExplain")
Expand Down Expand Up @@ -962,3 +1010,25 @@ func (api *API) VTExplain(ctx context.Context, req *vtadminpb.VTExplainRequest)
Response: response,
}, nil
}

func (api *API) getClustersForRequest(ids []string) ([]*cluster.Cluster, []string) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just moving this to the bottom of the file; it felt a bit out of place being stuck between two public functions without really being a helper function for either of them in particular

if len(ids) == 0 {
clusterIDs := make([]string, 0, len(api.clusters))

for k := range api.clusterMap {
clusterIDs = append(clusterIDs, k)
}

return api.clusters, clusterIDs
}

clusters := make([]*cluster.Cluster, 0, len(ids))

for _, id := range ids {
if c, ok := api.clusterMap[id]; ok {
clusters = append(clusters, c)
}
}

return clusters, ids
}
Loading