Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions go/vt/tabletserver/cache_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestCachePoolState(t *testing.T) {
Binary: "ls",
Connections: 100,
}
cachePool := newTestCachePool(rowCacheConfig)
cachePool := newTestCachePoolWithStats(rowCacheConfig)
idleTimeout := 1 * time.Second
cachePool.idleTimeout = idleTimeout
cachePool.Open()
Expand Down Expand Up @@ -204,9 +204,15 @@ func TestCachePoolStatsURL(t *testing.T) {
cachePool.ServeHTTP(response, request)
}

func newTestCachePool(rowcacheConfig RowCacheConfig) *CachePool {
func newTestCachePoolWithStats(rowcacheConfig RowCacheConfig) *CachePool {
randID := rand.Int63()
name := fmt.Sprintf("TestCachePool-%d-", randID)
statsURL := fmt.Sprintf("/debug/cache-%d", randID)
return NewCachePool(name, rowcacheConfig, 1*time.Second, statsURL)
}

func newTestCachePool(rowcacheConfig RowCacheConfig) *CachePool {
randID := rand.Int63()
statsURL := fmt.Sprintf("/debug/cache-%d", randID)
return NewCachePool("", rowcacheConfig, 1*time.Second, statsURL)
}
48 changes: 34 additions & 14 deletions go/vt/tabletserver/query_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const spotCheckMultiplier = 1e6
// panic with NewTabletError as the error type.
// TODO(sougou): Switch to error return scheme.
type QueryEngine struct {
name string
schemaInfo *SchemaInfo
dbconfigs *dbconfigs.DBConfigs

Expand Down Expand Up @@ -114,9 +115,25 @@ func getOrPanic(ctx context.Context, pool *ConnPool) *DBConn {
// NewQueryEngine creates a new QueryEngine.
// This is a singleton class.
// You must call this only once.
func NewQueryEngine(config Config) *QueryEngine {
qe := &QueryEngine{}
func NewQueryEngine(config Config, name string) *QueryEngine {
qe := &QueryEngine{name: name}

var schemaInfoName string
var cachePoolName string
var connPoolName string
var streamConnPoolName string
var txPoolName string

if name != "" {
schemaInfoName = config.PoolNamePrefix + "SchemaInfo"
cachePoolName = config.PoolNamePrefix + "Rowcache"
connPoolName = config.PoolNamePrefix + "ConnPool"
streamConnPoolName = config.PoolNamePrefix + "StreamConnPool"
txPoolName = config.PoolNamePrefix + "TransactionPool"
}

qe.schemaInfo = NewSchemaInfo(
schemaInfoName,
config.QueryCacheSize,
config.StatsPrefix,
map[string]string{
Expand All @@ -133,25 +150,25 @@ func NewQueryEngine(config Config) *QueryEngine {

// Pools
qe.cachePool = NewCachePool(
config.PoolNamePrefix+"Rowcache",
cachePoolName,
config.RowCache,
time.Duration(config.IdleTimeout*1e9),
config.DebugURLPrefix+"/memcache/",
)
qe.connPool = NewConnPool(
config.PoolNamePrefix+"ConnPool",
connPoolName,
config.PoolSize,
time.Duration(config.IdleTimeout*1e9),
)
qe.streamConnPool = NewConnPool(
config.PoolNamePrefix+"StreamConnPool",
streamConnPoolName,
config.StreamPoolSize,
time.Duration(config.IdleTimeout*1e9),
)

// Services
qe.txPool = NewTxPool(
config.PoolNamePrefix+"TransactionPool",
txPoolName,
config.StatsPrefix,
config.TransactionCap,
time.Duration(config.TransactionTimeout*1e9),
Expand All @@ -177,11 +194,6 @@ func NewQueryEngine(config Config) *QueryEngine {
// loggers
qe.accessCheckerLogger = logutil.NewThrottledLogger("accessChecker", 1*time.Second)

// Stats
stats.Publish(config.StatsPrefix+"MaxResultSize", stats.IntFunc(qe.maxResultSize.Get))
stats.Publish(config.StatsPrefix+"MaxDMLRows", stats.IntFunc(qe.maxDMLRows.Get))
stats.Publish(config.StatsPrefix+"StreamBufferSize", stats.IntFunc(qe.streamBufferSize.Get))
stats.Publish(config.StatsPrefix+"QueryTimeout", stats.DurationFunc(qe.queryTimeout.Get))
queryStats = stats.NewTimings(config.StatsPrefix + "Queries")
qpsRates = stats.NewRates(config.StatsPrefix+"QPS", queryStats, 15, 60*time.Second)
waitStats = stats.NewTimings(config.StatsPrefix + "Waits")
Expand All @@ -190,11 +202,19 @@ func NewQueryEngine(config Config) *QueryEngine {
errorStats = stats.NewCounters(config.StatsPrefix + "Errors")
internalErrors = stats.NewCounters(config.StatsPrefix + "InternalErrors")
resultStats = stats.NewHistogram(config.StatsPrefix+"Results", resultBuckets)
stats.Publish(config.StatsPrefix+"RowcacheSpotCheckRatio", stats.FloatFunc(func() float64 {
return float64(qe.spotCheckFreq.Get()) / spotCheckMultiplier
}))
spotCheckCount = stats.NewInt(config.StatsPrefix + "RowcacheSpotCheckCount")

// Stats
if name != "" {
stats.Publish(config.StatsPrefix+"MaxResultSize", stats.IntFunc(qe.maxResultSize.Get))
stats.Publish(config.StatsPrefix+"MaxDMLRows", stats.IntFunc(qe.maxDMLRows.Get))
stats.Publish(config.StatsPrefix+"StreamBufferSize", stats.IntFunc(qe.streamBufferSize.Get))
stats.Publish(config.StatsPrefix+"QueryTimeout", stats.DurationFunc(qe.queryTimeout.Get))
stats.Publish(config.StatsPrefix+"RowcacheSpotCheckRatio", stats.FloatFunc(func() float64 {
return float64(qe.spotCheckFreq.Get()) / spotCheckMultiplier
}))
}

return qe
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ func newTestQueryExecutor(sql string, ctx context.Context, flags executorFlags)
} else {
config.StrictTableAcl = false
}
sqlQuery := NewSqlQuery(config)
sqlQuery := NewSqlQuery(config, "")
txID := int64(0)
dbconfigs := newTestDBConfigs()
if flags&enableRowCache > 0 {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/tabletserver/queryctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ type realQueryServiceControl struct {
// NewQueryServiceControl returns a real implementation of QueryServiceControl
func NewQueryServiceControl() QueryServiceControl {
return &realQueryServiceControl{
sqlQueryRPCService: NewSqlQuery(qsConfig),
sqlQueryRPCService: NewSqlQuery(qsConfig, "SqlQuery"),
}
}

Expand Down
22 changes: 15 additions & 7 deletions go/vt/tabletserver/schema_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ type SchemaOverride struct {
// keep itself and the rowcache up-to-date.
type SchemaInfo struct {
mu sync.Mutex
name string
tables map[string]*TableInfo
overrides []SchemaOverride
queries *cache.LRUCache
Expand All @@ -113,12 +114,14 @@ type SchemaInfo struct {

// NewSchemaInfo creates a new SchemaInfo.
func NewSchemaInfo(
name string,
queryCacheSize int,
statsPrefix string,
endpoints map[string]string,
reloadTime time.Duration,
idleTimeout time.Duration) *SchemaInfo {
si := &SchemaInfo{
name: name,
queries: cache.NewLRUCache(int64(queryCacheSize)),
connPool: NewConnPool("", 2, idleTimeout),
ticks: timer.NewTimer(reloadTime),
Expand All @@ -131,16 +134,21 @@ func NewSchemaInfo(
stats.Publish(statsPrefix+"QueryCacheOldest", stats.StringFunc(func() string {
return fmt.Sprintf("%v", si.queries.Oldest())
}))
stats.Publish(statsPrefix+"SchemaReloadTime", stats.DurationFunc(si.ticks.Interval))
_ = stats.NewMultiCountersFunc(statsPrefix+"RowcacheStats", []string{"Table", "Stats"}, si.getRowcacheStats)
_ = stats.NewMultiCountersFunc(statsPrefix+"RowcacheInvalidations", []string{"Table"}, si.getRowcacheInvalidations)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryCounts", []string{"Table", "Plan"}, si.getQueryCount)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryTimesNs", []string{"Table", "Plan"}, si.getQueryTime)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryRowCounts", []string{"Table", "Plan"}, si.getQueryRowCount)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryErrorCounts", []string{"Table", "Plan"}, si.getQueryErrorCount)

for _, ep := range endpoints {
http.Handle(ep, si)
}

if name != "" {
stats.Publish(statsPrefix+"SchemaReloadTime", stats.DurationFunc(si.ticks.Interval))
_ = stats.NewMultiCountersFunc(statsPrefix+"RowcacheStats", []string{"Table", "Stats"}, si.getRowcacheStats)
_ = stats.NewMultiCountersFunc(statsPrefix+"RowcacheInvalidations", []string{"Table"}, si.getRowcacheInvalidations)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryCounts", []string{"Table", "Plan"}, si.getQueryCount)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryTimesNs", []string{"Table", "Plan"}, si.getQueryTime)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryRowCounts", []string{"Table", "Plan"}, si.getQueryRowCount)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryErrorCounts", []string{"Table", "Plan"}, si.getQueryErrorCount)
}

return si
}

Expand Down
18 changes: 14 additions & 4 deletions go/vt/tabletserver/schema_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,10 +416,10 @@ func TestSchemaInfoQueryCache(t *testing.T) {
for query, result := range getSchemaInfoTestSupportedQueries() {
db.AddQuery(query, result)
}
schemaInfo := newTestSchemaInfo(10, 10*time.Second, 10*time.Second)
schemaInfo := newTestSchemaInfoWithStats(10, 10*time.Second, 10*time.Second)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool()
cachePool := newTestSchemaInfoCachePoolWithStats()
cachePool.Open()
defer cachePool.Close()
schemaOverrides := getSchemaInfoTestSchemaOverride()
Expand Down Expand Up @@ -452,10 +452,10 @@ func TestSchemaInfoExportVars(t *testing.T) {
for query, result := range getSchemaInfoTestSupportedQueries() {
db.AddQuery(query, result)
}
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second)
schemaInfo := newTestSchemaInfoWithStats(10, 1*time.Second, 1*time.Second)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool()
cachePool := newTestSchemaInfoCachePoolWithStats()
cachePool.Open()
defer cachePool.Close()
schemaInfo.Open(&appParams, &dbaParams, []SchemaOverride{}, cachePool, true)
Expand Down Expand Up @@ -507,6 +507,16 @@ func TestSchemaInfoStatsURL(t *testing.T) {
}

func newTestSchemaInfoCachePool() *CachePool {
rowCacheConfig := RowCacheConfig{
Binary: "ls",
Connections: 100,
}
randID := rand.Int63()
statsURL := fmt.Sprintf("/debug/cache-%d", randID)
return NewCachePool("", rowCacheConfig, 1*time.Second, statsURL)
}

func newTestSchemaInfoCachePoolWithStats() *CachePool {
rowCacheConfig := RowCacheConfig{
Binary: "ls",
Connections: 100,
Expand Down
26 changes: 17 additions & 9 deletions go/vt/tabletserver/sqlquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ var stateName = []string{

// SqlQuery implements the RPC interface for the query service.
type SqlQuery struct {
name string
config Config
// mu is used to access state. It's also used to ensure
// that state does not change out of StateServing or StateShuttingTx
Expand All @@ -86,18 +87,25 @@ type SqlQuery struct {

// NewSqlQuery creates an instance of SqlQuery. Only one instance
// of SqlQuery can be created per process.
func NewSqlQuery(config Config) *SqlQuery {
func NewSqlQuery(config Config, name string) *SqlQuery {
sq := &SqlQuery{
config: config,
name: name,
}
var queryEngineName string
if name != "" {
queryEngineName = name + "QueryEngine"
}
sq.qe = NewQueryEngine(config, queryEngineName)
if name != "" {
stats.Publish(config.StatsPrefix+"TabletState", stats.IntFunc(func() int64 {
sq.mu.Lock()
state := sq.state
sq.mu.Unlock()
return state
}))
stats.Publish(config.StatsPrefix+"TabletStateName", stats.StringFunc(sq.GetState))
}
sq.qe = NewQueryEngine(config)
stats.Publish(config.StatsPrefix+"TabletState", stats.IntFunc(func() int64 {
sq.mu.Lock()
state := sq.state
sq.mu.Unlock()
return state
}))
stats.Publish(config.StatsPrefix+"TabletStateName", stats.StringFunc(sq.GetState))
return sq
}

Expand Down
Loading