Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions go/vt/tabletserver/cache_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ func NewCachePool(
name string,
rowCacheConfig RowCacheConfig,
idleTimeout time.Duration,
statsURL string) *CachePool {
statsURL string,
enablePublishStats bool) *CachePool {
cp := &CachePool{name: name, idleTimeout: idleTimeout, statsURL: statsURL}
if name != "" {
if name != "" && enablePublishStats {
cp.memcacheStats = NewMemcacheStats(
rowCacheConfig.StatsPrefix+name, 10*time.Second, enableMain,
func(key string) string {
Expand Down
22 changes: 11 additions & 11 deletions go/vt/tabletserver/cache_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
func TestCachePoolWithEmptyBinary(t *testing.T) {
fakecacheservice.Register()
fakesqldb.Register()
cachePool := newTestCachePool(RowCacheConfig{})
cachePool := newTestCachePool(RowCacheConfig{}, false)
cachePool.Close()
}

Expand All @@ -31,7 +31,7 @@ func TestCachePool(t *testing.T) {
Binary: "ls",
Connections: 100,
}
cachePool := newTestCachePool(rowCacheConfig)
cachePool := newTestCachePool(rowCacheConfig, false)
if !cachePool.IsClosed() {
t.Fatalf("cache pool is not closed")
}
Expand All @@ -52,7 +52,7 @@ func TestCachePoolOpenTwice(t *testing.T) {
Binary: "ls",
Connections: 100,
}
cachePool := newTestCachePool(rowCacheConfig)
cachePool := newTestCachePool(rowCacheConfig, false)
cachePool.Open()
defer cachePool.Close()
defer func() {
Expand All @@ -70,7 +70,7 @@ func TestCachePoolOpenWithEmptyBinary(t *testing.T) {
Binary: "ls",
Connections: 100,
}
cachePool := newTestCachePool(rowCacheConfig)
cachePool := newTestCachePool(rowCacheConfig, false)
defer func() {
if e := recover(); e == nil {
t.Fatalf("open a cache pool with empty rowCacheConfig.Binary should panic")
Expand All @@ -88,7 +88,7 @@ func TestCachePoolOpenWithInvalidBinary(t *testing.T) {
Binary: "invalid_binary",
Connections: 100,
}
cachePool := newTestCachePool(rowCacheConfig)
cachePool := newTestCachePool(rowCacheConfig, false)
defer func() {
if e := recover(); e == nil {
t.Fatalf("open a cache pool with an invalid rowCacheConfig.Binary should panic")
Expand All @@ -105,7 +105,7 @@ func TestCachePoolState(t *testing.T) {
Binary: "ls",
Connections: 100,
}
cachePool := newTestCachePool(rowCacheConfig)
cachePool := newTestCachePool(rowCacheConfig, true)
idleTimeout := 1 * time.Second
cachePool.idleTimeout = idleTimeout
cachePool.Open()
Expand Down Expand Up @@ -141,7 +141,7 @@ func TestCachePoolStateWithoutOpen(t *testing.T) {
Binary: "ls",
Connections: 100,
}
cachePool := newTestCachePool(rowCacheConfig)
cachePool := newTestCachePool(rowCacheConfig, false)
idleTimeout := 1 * time.Second
cachePool.idleTimeout = idleTimeout
if cachePool.StatsJSON() != "{}" {
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestCachePoolGetFailedBecauseCachePoolIsClosed(t *testing.T) {
Binary: "ls",
Connections: 100,
}
cachePool := newTestCachePool(rowCacheConfig)
cachePool := newTestCachePool(rowCacheConfig, false)
idleTimeout := 1 * time.Second
cachePool.idleTimeout = idleTimeout
ctx := context.Background()
Expand All @@ -194,7 +194,7 @@ func TestCachePoolStatsURL(t *testing.T) {
Binary: "ls",
Connections: 100,
}
cachePool := newTestCachePool(rowCacheConfig)
cachePool := newTestCachePool(rowCacheConfig, false)
idleTimeout := 1 * time.Second
cachePool.idleTimeout = idleTimeout
cachePool.Open()
Expand All @@ -204,9 +204,9 @@ func TestCachePoolStatsURL(t *testing.T) {
cachePool.ServeHTTP(response, request)
}

func newTestCachePool(rowcacheConfig RowCacheConfig) *CachePool {
func newTestCachePool(rowcacheConfig RowCacheConfig, enablePublishStats bool) *CachePool {
randID := rand.Int63()
name := fmt.Sprintf("TestCachePool-%d-", randID)
statsURL := fmt.Sprintf("/debug/cache-%d", randID)
return NewCachePool(name, rowcacheConfig, 1*time.Second, statsURL)
return NewCachePool(name, rowcacheConfig, 1*time.Second, statsURL, enablePublishStats)
}
17 changes: 10 additions & 7 deletions go/vt/tabletserver/connpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ type ConnPool struct {
func NewConnPool(
name string,
capacity int,
idleTimeout time.Duration) *ConnPool {
idleTimeout time.Duration,
enablePublishStats bool) *ConnPool {
cp := &ConnPool{
capacity: capacity,
idleTimeout: idleTimeout,
Expand All @@ -50,12 +51,14 @@ func NewConnPool(
if name == "" {
return cp
}
stats.Publish(name+"Capacity", stats.IntFunc(cp.Capacity))
stats.Publish(name+"Available", stats.IntFunc(cp.Available))
stats.Publish(name+"MaxCap", stats.IntFunc(cp.MaxCap))
stats.Publish(name+"WaitCount", stats.IntFunc(cp.WaitCount))
stats.Publish(name+"WaitTime", stats.DurationFunc(cp.WaitTime))
stats.Publish(name+"IdleTimeout", stats.DurationFunc(cp.IdleTimeout))
if enablePublishStats {
stats.Publish(name+"Capacity", stats.IntFunc(cp.Capacity))
stats.Publish(name+"Available", stats.IntFunc(cp.Available))
stats.Publish(name+"MaxCap", stats.IntFunc(cp.MaxCap))
stats.Publish(name+"WaitCount", stats.IntFunc(cp.WaitCount))
stats.Publish(name+"WaitTime", stats.DurationFunc(cp.WaitTime))
stats.Publish(name+"IdleTimeout", stats.DurationFunc(cp.IdleTimeout))
}
return cp
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/tabletserver/connpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestConnectivity(t *testing.T) {
killStats = stats.NewCounters("TestKills")
internalErrors = stats.NewCounters("TestInternalErrors")
mysqlStats = stats.NewTimings("TestMySQLStats")
pool := NewConnPool("p1", 1, 30*time.Second)
pool := NewConnPool("p1", 1, 30*time.Second, false)
pool.Open(appParams, dbaParams)

conn, err := pool.Get(ctx)
Expand Down
21 changes: 14 additions & 7 deletions go/vt/tabletserver/query_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func NewQueryEngine(config Config) *QueryEngine {
},
time.Duration(config.SchemaReloadTime*1e9),
time.Duration(config.IdleTimeout*1e9),
config.EnablePublishStats,
)

mysqlStats = stats.NewTimings(config.StatsPrefix + "Mysql")
Expand All @@ -137,16 +138,19 @@ func NewQueryEngine(config Config) *QueryEngine {
config.RowCache,
time.Duration(config.IdleTimeout*1e9),
config.DebugURLPrefix+"/memcache/",
config.EnablePublishStats,
)
qe.connPool = NewConnPool(
config.PoolNamePrefix+"ConnPool",
config.PoolSize,
time.Duration(config.IdleTimeout*1e9),
config.EnablePublishStats,
)
qe.streamConnPool = NewConnPool(
config.PoolNamePrefix+"StreamConnPool",
config.StreamPoolSize,
time.Duration(config.IdleTimeout*1e9),
config.EnablePublishStats,
)

// Services
Expand All @@ -157,6 +161,7 @@ func NewQueryEngine(config Config) *QueryEngine {
time.Duration(config.TransactionTimeout*1e9),
time.Duration(config.TxPoolTimeout*1e9),
time.Duration(config.IdleTimeout*1e9),
config.EnablePublishStats,
)
qe.consolidator = sync2.NewConsolidator()
http.Handle(config.DebugURLPrefix+"/consolidations", qe.consolidator)
Expand All @@ -178,10 +183,6 @@ func NewQueryEngine(config Config) *QueryEngine {
qe.accessCheckerLogger = logutil.NewThrottledLogger("accessChecker", 1*time.Second)

// Stats
stats.Publish(config.StatsPrefix+"MaxResultSize", stats.IntFunc(qe.maxResultSize.Get))
stats.Publish(config.StatsPrefix+"MaxDMLRows", stats.IntFunc(qe.maxDMLRows.Get))
stats.Publish(config.StatsPrefix+"StreamBufferSize", stats.IntFunc(qe.streamBufferSize.Get))
stats.Publish(config.StatsPrefix+"QueryTimeout", stats.DurationFunc(qe.queryTimeout.Get))
queryStats = stats.NewTimings(config.StatsPrefix + "Queries")
qpsRates = stats.NewRates(config.StatsPrefix+"QPS", queryStats, 15, 60*time.Second)
waitStats = stats.NewTimings(config.StatsPrefix + "Waits")
Expand All @@ -190,11 +191,17 @@ func NewQueryEngine(config Config) *QueryEngine {
errorStats = stats.NewCounters(config.StatsPrefix + "Errors")
internalErrors = stats.NewCounters(config.StatsPrefix + "InternalErrors")
resultStats = stats.NewHistogram(config.StatsPrefix+"Results", resultBuckets)
stats.Publish(config.StatsPrefix+"RowcacheSpotCheckRatio", stats.FloatFunc(func() float64 {
return float64(qe.spotCheckFreq.Get()) / spotCheckMultiplier
}))
spotCheckCount = stats.NewInt(config.StatsPrefix + "RowcacheSpotCheckCount")

if config.EnablePublishStats {
stats.Publish(config.StatsPrefix+"MaxResultSize", stats.IntFunc(qe.maxResultSize.Get))
stats.Publish(config.StatsPrefix+"MaxDMLRows", stats.IntFunc(qe.maxDMLRows.Get))
stats.Publish(config.StatsPrefix+"StreamBufferSize", stats.IntFunc(qe.streamBufferSize.Get))
stats.Publish(config.StatsPrefix+"QueryTimeout", stats.DurationFunc(qe.queryTimeout.Get))
stats.Publish(config.StatsPrefix+"RowcacheSpotCheckRatio", stats.FloatFunc(func() float64 {
return float64(qe.spotCheckFreq.Get()) / spotCheckMultiplier
}))
}
return qe
}

Expand Down
1 change: 1 addition & 0 deletions go/vt/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,7 @@ func newTestQueryExecutor(sql string, ctx context.Context, flags executorFlags)
config.PoolSize = 100
config.TransactionCap = 100
config.SpotCheckRatio = 1.0
config.EnablePublishStats = false
if flags&enableStrict > 0 {
config.StrictMode = true
} else {
Expand Down
3 changes: 3 additions & 0 deletions go/vt/tabletserver/queryctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func init() {
flag.BoolVar(&qsConfig.StrictMode, "queryserver-config-strict-mode", DefaultQsConfig.StrictMode, "allow only predictable DMLs and enforces MySQL's STRICT_TRANS_TABLES")
flag.BoolVar(&qsConfig.StrictTableAcl, "queryserver-config-strict-table-acl", DefaultQsConfig.StrictTableAcl, "only allow queries that pass table acl checks")
flag.BoolVar(&qsConfig.TerseErrors, "queryserver-config-terse-errors", DefaultQsConfig.TerseErrors, "prevent bind vars from escaping in returned errors")
flag.BoolVar(&qsConfig.EnablePublishStats, "queryserver-config-enable-publish-stats", DefaultQsConfig.EnablePublishStats, "set this flag to true makes queryservice publish monitoring stats")
flag.StringVar(&qsConfig.RowCache.Binary, "rowcache-bin", DefaultQsConfig.RowCache.Binary, "rowcache binary file")
flag.IntVar(&qsConfig.RowCache.Memory, "rowcache-memory", DefaultQsConfig.RowCache.Memory, "rowcache max memory usage in MB")
flag.StringVar(&qsConfig.RowCache.Socket, "rowcache-socket", DefaultQsConfig.RowCache.Socket, "socket filename hint: a unique filename will be generated based on this input")
Expand Down Expand Up @@ -114,6 +115,7 @@ type Config struct {
StrictMode bool
StrictTableAcl bool
TerseErrors bool
EnablePublishStats bool
StatsPrefix string
DebugURLPrefix string
PoolNamePrefix string
Expand Down Expand Up @@ -145,6 +147,7 @@ var DefaultQsConfig = Config{
StrictMode: true,
StrictTableAcl: false,
TerseErrors: false,
EnablePublishStats: true,
StatsPrefix: "",
DebugURLPrefix: "/debug",
PoolNamePrefix: "",
Expand Down
2 changes: 1 addition & 1 deletion go/vt/tabletserver/queryz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
func TestQueryzHandler(t *testing.T) {
resp := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/schemaz", nil)
schemaInfo := newTestSchemaInfo(100, 10*time.Second, 10*time.Second)
schemaInfo := newTestSchemaInfo(100, 10*time.Second, 10*time.Second, false)

plan1 := &ExecPlan{
ExecPlan: &planbuilder.ExecPlan{
Expand Down
33 changes: 18 additions & 15 deletions go/vt/tabletserver/schema_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,27 +117,30 @@ func NewSchemaInfo(
statsPrefix string,
endpoints map[string]string,
reloadTime time.Duration,
idleTimeout time.Duration) *SchemaInfo {
idleTimeout time.Duration,
enablePublishStats bool) *SchemaInfo {
si := &SchemaInfo{
queries: cache.NewLRUCache(int64(queryCacheSize)),
connPool: NewConnPool("", 2, idleTimeout),
connPool: NewConnPool("", 2, idleTimeout, enablePublishStats),
ticks: timer.NewTimer(reloadTime),
endpoints: endpoints,
reloadTime: reloadTime,
}
stats.Publish(statsPrefix+"QueryCacheLength", stats.IntFunc(si.queries.Length))
stats.Publish(statsPrefix+"QueryCacheSize", stats.IntFunc(si.queries.Size))
stats.Publish(statsPrefix+"QueryCacheCapacity", stats.IntFunc(si.queries.Capacity))
stats.Publish(statsPrefix+"QueryCacheOldest", stats.StringFunc(func() string {
return fmt.Sprintf("%v", si.queries.Oldest())
}))
stats.Publish(statsPrefix+"SchemaReloadTime", stats.DurationFunc(si.ticks.Interval))
_ = stats.NewMultiCountersFunc(statsPrefix+"RowcacheStats", []string{"Table", "Stats"}, si.getRowcacheStats)
_ = stats.NewMultiCountersFunc(statsPrefix+"RowcacheInvalidations", []string{"Table"}, si.getRowcacheInvalidations)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryCounts", []string{"Table", "Plan"}, si.getQueryCount)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryTimesNs", []string{"Table", "Plan"}, si.getQueryTime)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryRowCounts", []string{"Table", "Plan"}, si.getQueryRowCount)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryErrorCounts", []string{"Table", "Plan"}, si.getQueryErrorCount)
if enablePublishStats {
stats.Publish(statsPrefix+"QueryCacheLength", stats.IntFunc(si.queries.Length))
stats.Publish(statsPrefix+"QueryCacheSize", stats.IntFunc(si.queries.Size))
stats.Publish(statsPrefix+"QueryCacheCapacity", stats.IntFunc(si.queries.Capacity))
stats.Publish(statsPrefix+"QueryCacheOldest", stats.StringFunc(func() string {
return fmt.Sprintf("%v", si.queries.Oldest())
}))
stats.Publish(statsPrefix+"SchemaReloadTime", stats.DurationFunc(si.ticks.Interval))
_ = stats.NewMultiCountersFunc(statsPrefix+"RowcacheStats", []string{"Table", "Stats"}, si.getRowcacheStats)
_ = stats.NewMultiCountersFunc(statsPrefix+"RowcacheInvalidations", []string{"Table"}, si.getRowcacheInvalidations)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryCounts", []string{"Table", "Plan"}, si.getQueryCount)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryTimesNs", []string{"Table", "Plan"}, si.getQueryTime)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryRowCounts", []string{"Table", "Plan"}, si.getQueryRowCount)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryErrorCounts", []string{"Table", "Plan"}, si.getQueryErrorCount)
}
for _, ep := range endpoints {
http.Handle(ep, si)
}
Expand Down
Loading