Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func GetMetricsQueryType(query string) MetricsQueryType {
}

// MySQLThrottleMetric has the probed metric for a mysql instance
type MySQLThrottleMetric struct {
type MySQLThrottleMetric struct { // nolint:revive
ClusterName string
Key InstanceKey
Value float64
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletserver/throttle/mysql/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type Probe struct {
User string
Password string
MetricQuery string
TabletHost string
TabletPort int
CacheMillis int
QueryInProgress int64
}
Expand Down
57 changes: 49 additions & 8 deletions go/vt/vttablet/tabletserver/throttle/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ package throttle

import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"math"
"math/rand"
"net/http"
Expand Down Expand Up @@ -38,10 +40,10 @@ import (

const (
leaderCheckInterval = 5 * time.Second
mysqlCollectInterval = 100 * time.Millisecond
mysqlCollectInterval = 250 * time.Millisecond
mysqlDormantCollectInterval = 5 * time.Second
mysqlRefreshInterval = 10 * time.Second
mysqlAggregateInterval = 100 * time.Millisecond
mysqlAggregateInterval = 125 * time.Millisecond

aggregatedMetricsExpiration = 5 * time.Second
aggregatedMetricsCleanup = 10 * time.Second
Expand Down Expand Up @@ -83,7 +85,7 @@ var (
)

// ThrottleCheckType allows a client to indicate what type of check it wants to issue. See available types below.
type ThrottleCheckType int
type ThrottleCheckType int // nolint:revive

const (
// ThrottleCheckPrimaryWrite indicates a check before making a write on a primary server
Expand Down Expand Up @@ -186,7 +188,7 @@ func NewThrottler(env tabletenv.Env, ts *topo.Server, tabletTypeFunc func() topo
throttler.tickers = [](*timer.SuspendableTicker){}
throttler.nonLowPriorityAppRequestsThrottled = cache.New(nonDeprioritizedAppMapExpiration, nonDeprioritizedAppMapInterval)

throttler.httpClient = base.SetupHTTPClient(0)
throttler.httpClient = base.SetupHTTPClient(2 * mysqlCollectInterval)
throttler.initThrottleTabletTypes()
throttler.ThrottleApp("abusing-app", time.Now().Add(time.Hour*24*365*10), defaultThrottleRatio)
throttler.check = NewThrottlerCheck(throttler)
Expand Down Expand Up @@ -518,6 +520,43 @@ func (throttler *Throttler) Operate(ctx context.Context) {
}
}

func (throttler *Throttler) generateTabletHTTPProbeFunction(ctx context.Context, clusterName string, probe *mysql.Probe) (probeFunc func() *mysql.MySQLThrottleMetric) {
if probe.TabletHost == "" {
// nil function means no override; throttler will use default probe behavior, which is to open a direct
// connection to mysql and run a query
return nil
}
return func() *mysql.MySQLThrottleMetric {
// Hit a tablet's `check-self` via HTTP, and convert its CheckResult JSON output into a MySQLThrottleMetric
mySQLThrottleMetric := mysql.NewMySQLThrottleMetric()
mySQLThrottleMetric.ClusterName = clusterName
mySQLThrottleMetric.Key = probe.Key

tabletCheckSelfURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=vitess", probe.TabletHost, probe.TabletPort)
Copy link
Contributor Author

@shlomi-noach shlomi-noach Dec 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can a tablet serve https? Can it expect TLS? I'm sensing this http:// will break sometime.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could support https but it doesn't today. I don't see ListenAndServeTLS or ServeTLS used anywhere today in the codebase (except for orchestrator). It's a valid point though — at some point I think it's worth doing an "https everywhere" project that at least adds support for using it everywhere and makes the protocol used a variable when we form it like this, which we do in many other places already:

$ grep -R "http://" go/vt | grep -Ev "apache|colorbar|test.go|rice|Ref|stack|blogs|Advert|foo|orc|refman|google"
go/vt/vtgate/executor.go:	resp, err := client.Get(fmt.Sprintf("http://%s/throttler/check?app=vtgate", tabletHostPort))
go/vt/vtgate/status.go:    <td><a href="http://{{$status.Addr}}">{{$status.Name}}</a></td>
go/vt/vttablet/onlineddl/executor.go:	curl --max-time 10 -s 'http://localhost:%d/schema-migration/report-status?uuid=%s&status=%s&dryrun='"$GH_OST_DRY_RUN"'&progress='"$GH_OST_PROGRESS"'&eta='"$GH_OST_ETA_SECONDS"'&rowscopied='"$GH_OST_COPIED_ROWS"
go/vt/vttablet/onlineddl/executor.go:			fmt.Sprintf(`--throttle-http=http://localhost:%d/throttler/check?app=online-ddl:gh-ost:%s&p=low`, *servenv.Port, onlineDDL.UUID),
go/vt/vttablet/onlineddl/executor.go:	  get("http://localhost:{{VTTABLET_PORT}}/schema-migration/report-status?uuid={{MIGRATION_UUID}}&status={{OnlineDDLStatusRunning}}&dryrun={{DRYRUN}}");
go/vt/vttablet/onlineddl/executor.go:	    get("http://localhost:{{VTTABLET_PORT}}/schema-migration/report-status?uuid={{MIGRATION_UUID}}&status={{OnlineDDLStatusComplete}}&dryrun={{DRYRUN}}");
go/vt/vttablet/onlineddl/executor.go:	    get("http://localhost:{{VTTABLET_PORT}}/schema-migration/report-status?uuid={{MIGRATION_UUID}}&status={{OnlineDDLStatusFailed}}&dryrun={{DRYRUN}}");
go/vt/vttablet/onlineddl/executor.go:			if (head("http://localhost:{{VTTABLET_PORT}}/throttler/check?app=online-ddl:pt-osc:{{MIGRATION_UUID}}&p=low")) {
go/vt/vttablet/endtoend/framework/server.go:	ServerAddress = fmt.Sprintf("http://%s", ln.Addr().String())
go/vt/wrangler/keyspace.go:		webURL = fmt.Sprintf("http://%v:%d/", ts.Tablet.Hostname, webPort)
go/vt/wrangler/version.go:	resp, err := http.Get("http://" + tabletAddr + "/debug/vars")
go/vt/vtctld/api.go:		tablet.URL = "http://" + netutil.JoinHostPort(t.Hostname, t.PortMap["vt"])
go/vt/discovery/legacy_healthcheck.go:// http://{{.GetTabletHostPort}} -> http://host.dc.domain:22
go/vt/discovery/healthcheck.go:	TabletURLTemplateString = flag.String("tablet_url_template", "http://{{.GetTabletHostPort}}", "format string describing debug tablet url formatting. See the Go code for getTabletDebugURL() how to customize this.")
go/vt/discovery/tablet_health.go:// http://{{.GetTabletHostPort}} -> http://host.dc.domain:22
go/vt/vitessdriver/doc.go:For more information, visit http://www.vitess.io.
go/vt/vttest/vtprocess.go:	url := fmt.Sprintf("http://%s/debug/vars", addr)
go/vt/vtadmin/README.md:# "http://127.0.0.1:14200".

resp, err := throttler.httpClient.Get(tabletCheckSelfURL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a timeout? Maybe interval * 0.5 or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err != nil {
mySQLThrottleMetric.Err = err
return mySQLThrottleMetric
}
b, err := io.ReadAll(resp.Body)
if err != nil {
mySQLThrottleMetric.Err = err
return mySQLThrottleMetric
}
checkResult := &CheckResult{}
if err := json.Unmarshal(b, checkResult); err != nil {
mySQLThrottleMetric.Err = err
return mySQLThrottleMetric
}
mySQLThrottleMetric.Value = checkResult.Value

if checkResult.StatusCode == http.StatusInternalServerError {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we ever get here? I would think we would have gone into one of the earlier err blocks. Doesn't hurt, but if we do handle http codes we could handle any of the err codes, 4XX and 5XX or even anything not 2XX.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably 5XX would be handled earlier and still I'd want to surface that. As for the other error codes, these are app-level codes that are generated by the underlying check. There is no need to propagate the error to the upper level, it's enough that we propagate the Value. In fact, it hurts if we report back an error, where there is no error per-se, just an app-level decision that "this check is a no-go". It's the upper layer, the collector's responsibility to collect all values and decide if the total state is "go" or "no-go".
The introduction of this mid HTTP layer complicates things because we now only have implicit/secondary understanding of the nature of errors on the MySQL side.
As it stands, the current logic passes all the tests correctly, which means it complies with existing behavior.

mySQLThrottleMetric.Err = fmt.Errorf("Status code: %d", checkResult.StatusCode)
}
return mySQLThrottleMetric
}
}

func (throttler *Throttler) collectMySQLMetrics(ctx context.Context) error {
// synchronously, get lists of probes
for clusterName, probes := range throttler.mysqlInventory.ClustersProbes {
Expand All @@ -540,7 +579,7 @@ func (throttler *Throttler) collectMySQLMetrics(ctx context.Context) error {
// (where we incidentally know there's a single probe)
overrideGetMySQLThrottleMetricFunc := throttler.readSelfMySQLThrottleMetric
if clusterName != selfStoreName {
overrideGetMySQLThrottleMetricFunc = nil
overrideGetMySQLThrottleMetricFunc = throttler.generateTabletHTTPProbeFunction(ctx, clusterName, probe)
}
throttleMetrics := mysql.ReadThrottleMetric(probe, clusterName, overrideGetMySQLThrottleMetricFunc)
throttler.mysqlThrottleMetricChan <- throttleMetrics
Expand All @@ -554,7 +593,7 @@ func (throttler *Throttler) collectMySQLMetrics(ctx context.Context) error {
// refreshMySQLInventory will re-structure the inventory based on reading config settings
func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error {

addInstanceKey := func(key *mysql.InstanceKey, clusterName string, clusterSettings *config.MySQLClusterConfigurationSettings, probes *mysql.Probes) {
addInstanceKey := func(tabletHost string, tabletPort int, key *mysql.InstanceKey, clusterName string, clusterSettings *config.MySQLClusterConfigurationSettings, probes *mysql.Probes) {
for _, ignore := range clusterSettings.IgnoreHosts {
if strings.Contains(key.StringCode(), ignore) {
log.Infof("Throttler: instance key ignored: %+v", key)
Expand All @@ -570,6 +609,8 @@ func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error {
Key: *key,
User: clusterSettings.User,
Password: clusterSettings.Password,
TabletHost: tabletHost,
TabletPort: tabletPort,
MetricQuery: clusterSettings.MetricQuery,
CacheMillis: clusterSettings.CacheMillis,
}
Expand All @@ -592,7 +633,7 @@ func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error {
if clusterName == selfStoreName {
// special case: just looking at this tablet's MySQL server
// We will probe this "cluster" (of one server) is a special way.
addInstanceKey(mysql.SelfInstanceKey, clusterName, clusterSettings, clusterProbes.InstanceProbes)
addInstanceKey("", 0, mysql.SelfInstanceKey, clusterName, clusterSettings, clusterProbes.InstanceProbes)
throttler.mysqlClusterProbesChan <- clusterProbes
return
}
Expand All @@ -613,7 +654,7 @@ func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error {
}
if throttler.throttleTabletTypesMap[tablet.Type] {
key := mysql.InstanceKey{Hostname: tablet.MysqlHostname, Port: int(tablet.MysqlPort)}
addInstanceKey(&key, clusterName, clusterSettings, clusterProbes.InstanceProbes)
addInstanceKey(tablet.Hostname, int(tablet.PortMap["vt"]), &key, clusterName, clusterSettings, clusterProbes.InstanceProbes)
}
}
throttler.mysqlClusterProbesChan <- clusterProbes
Expand Down