diff --git a/dev.env b/dev.env index f1e6e71b896..781f54be7ba 100644 --- a/dev.env +++ b/dev.env @@ -1,13 +1,13 @@ # No shebang line as this script is sourced from an external shell. # Copyright 2019 The Vitess Authors. -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,6 +34,6 @@ export PATH # According to https://github.com/etcd-io/etcd/blob/a621d807f061e1dd635033a8d6bc261461429e27/Documentation/op-guide/supported-platform.md, # currently, etcd is unstable on arm64, so ETCD_UNSUPPORTED_ARCH should be set. -if [ "$(uname -m)" == aarch64 ]; then +if [ "$(uname -m)" = aarch64 ]; then export ETCD_UNSUPPORTED_ARCH=arm64 fi diff --git a/go/cache/cache.go b/go/cache/cache.go index 3f81cec50e7..0447e8a690a 100644 --- a/go/cache/cache.go +++ b/go/cache/cache.go @@ -76,5 +76,5 @@ type Config struct { var DefaultConfig = &Config{ MaxEntries: 5000, MaxMemoryUsage: 32 * 1024 * 1024, - LFU: false, + LFU: true, } diff --git a/go/cache/ristretto/bloom/bbloom.go b/go/cache/ristretto/bloom/bbloom.go index 586adec9cb6..ce5daa6864d 100644 --- a/go/cache/ristretto/bloom/bbloom.go +++ b/go/cache/ristretto/bloom/bbloom.go @@ -21,9 +21,6 @@ package bloom import ( - "bytes" - "encoding/json" - "log" "math" "unsafe" ) @@ -43,26 +40,16 @@ func getSize(ui64 uint64) (size uint64, exponent uint64) { return size, exponent } -func calcSizeByWrongPositives(numEntries, wrongs float64) (uint64, uint64) { - size := -1 * numEntries * math.Log(wrongs) / math.Pow(float64(0.69314718056), 2) - locs := math.Ceil(float64(0.69314718056) * size / numEntries) - return uint64(size), uint64(locs) +// NewBloomFilterWithErrorRate returns a new bloomfilter with optimal size for the given +// error rate +func NewBloomFilterWithErrorRate(numEntries uint64, wrongs float64) *Bloom { + size := -1 * float64(numEntries) * math.Log(wrongs) / math.Pow(0.69314718056, 2) + locs := math.Ceil(0.69314718056 * size / float64(numEntries)) + return NewBloomFilter(uint64(size), uint64(locs)) } // NewBloomFilter returns a new bloomfilter. -func NewBloomFilter(params ...float64) (bloomfilter *Bloom) { - var entries, locs uint64 - if len(params) == 2 { - if params[1] < 1 { - entries, locs = calcSizeByWrongPositives(params[0], params[1]) - } else { - entries, locs = uint64(params[0]), uint64(params[1]) - } - } else { - log.Fatal("usage: New(float64(number_of_entries), float64(number_of_hashlocations))" + - " i.e. New(float64(1000), float64(3)) or New(float64(number_of_entries)," + - " float64(number_of_hashlocations)) i.e. New(float64(1000), float64(0.03))") - } +func NewBloomFilter(entries, locs uint64) (bloomfilter *Bloom) { size, exponent := getSize(entries) bloomfilter = &Bloom{ sizeExp: exponent, @@ -162,49 +149,3 @@ func (bl *Bloom) IsSet(idx uint64) bool { r := ((*(*uint8)(ptr)) >> (idx % 8)) & 1 return r == 1 } - -// bloomJSONImExport -// Im/Export structure used by JSONMarshal / JSONUnmarshal -type bloomJSONImExport struct { - FilterSet []byte - SetLocs uint64 -} - -// NewWithBoolset takes a []byte slice and number of locs per entry, -// returns the bloomfilter with a bitset populated according to the input []byte. -func newWithBoolset(bs *[]byte, locs uint64) *Bloom { - bloomfilter := NewBloomFilter(float64(len(*bs)<<3), float64(locs)) - for i, b := range *bs { - *(*uint8)(unsafe.Pointer(uintptr(unsafe.Pointer(&bloomfilter.bitset[0])) + uintptr(i))) = b - } - return bloomfilter -} - -// JSONUnmarshal takes JSON-Object (type bloomJSONImExport) as []bytes -// returns bloom32 / bloom64 object. -func JSONUnmarshal(dbData []byte) (*Bloom, error) { - bloomImEx := bloomJSONImExport{} - if err := json.Unmarshal(dbData, &bloomImEx); err != nil { - return nil, err - } - buf := bytes.NewBuffer(bloomImEx.FilterSet) - bs := buf.Bytes() - bf := newWithBoolset(&bs, bloomImEx.SetLocs) - return bf, nil -} - -// JSONMarshal returns JSON-object (type bloomJSONImExport) as []byte. -func (bl Bloom) JSONMarshal() []byte { - bloomImEx := bloomJSONImExport{} - bloomImEx.SetLocs = bl.setLocs - bloomImEx.FilterSet = make([]byte, len(bl.bitset)<<3) - for i := range bloomImEx.FilterSet { - bloomImEx.FilterSet[i] = *(*byte)(unsafe.Pointer(uintptr(unsafe.Pointer(&bl.bitset[0])) + - uintptr(i))) - } - data, err := json.Marshal(bloomImEx) - if err != nil { - log.Fatal("json.Marshal failed: ", err) - } - return data -} diff --git a/go/cache/ristretto/bloom/bbloom_test.go b/go/cache/ristretto/bloom/bbloom_test.go index ac0cb9c9104..960fb034e63 100644 --- a/go/cache/ristretto/bloom/bbloom_test.go +++ b/go/cache/ristretto/bloom/bbloom_test.go @@ -6,14 +6,12 @@ import ( "os" "testing" - "github.com/stretchr/testify/require" - "vitess.io/vitess/go/hack" ) var ( wordlist1 [][]byte - n = 1 << 16 + n = uint64(1 << 16) bf *Bloom ) @@ -31,7 +29,7 @@ func TestMain(m *testing.M) { } func TestM_NumberOfWrongs(t *testing.T) { - bf = NewBloomFilter(float64(n*10), float64(7)) + bf = NewBloomFilter(n*10, 7) cnt := 0 for i := range wordlist1 { @@ -44,43 +42,14 @@ func TestM_NumberOfWrongs(t *testing.T) { } -func TestM_JSON(t *testing.T) { - const shallBe = int(1 << 16) - - bf = NewBloomFilter(float64(n*10), float64(7)) - - cnt := 0 - for i := range wordlist1 { - hash := hack.RuntimeMemhash(wordlist1[i], 0) - if !bf.AddIfNotHas(hash) { - cnt++ - } - } - - jsonm := bf.JSONMarshal() - - // create new bloomfilter from bloomfilter's JSON representation - bf2, err := JSONUnmarshal(jsonm) - require.NoError(t, err) - - cnt2 := 0 - for i := range wordlist1 { - hash := hack.RuntimeMemhash(wordlist1[i], 0) - if !bf2.AddIfNotHas(hash) { - cnt2++ - } - } - require.Equal(t, shallBe, cnt2) -} - func BenchmarkM_New(b *testing.B) { for r := 0; r < b.N; r++ { - _ = NewBloomFilter(float64(n*10), float64(7)) + _ = NewBloomFilter(n*10, 7) } } func BenchmarkM_Clear(b *testing.B) { - bf = NewBloomFilter(float64(n*10), float64(7)) + bf = NewBloomFilter(n*10, 7) for i := range wordlist1 { hash := hack.RuntimeMemhash(wordlist1[i], 0) bf.Add(hash) @@ -92,7 +61,7 @@ func BenchmarkM_Clear(b *testing.B) { } func BenchmarkM_Add(b *testing.B) { - bf = NewBloomFilter(float64(n*10), float64(7)) + bf = NewBloomFilter(n*10, 7) b.ResetTimer() for r := 0; r < b.N; r++ { for i := range wordlist1 { diff --git a/go/cache/ristretto/cache.go b/go/cache/ristretto/cache.go index 62f086f69c2..2d2368f1427 100644 --- a/go/cache/ristretto/cache.go +++ b/go/cache/ristretto/cache.go @@ -45,7 +45,8 @@ func defaultStringHash(key string) (uint64, uint64) { type itemCallback func(*Item) -const itemSize = int64(unsafe.Sizeof(storeItem{})) +// CacheItemSize is the overhead in bytes for every stored cache item +const CacheItemSize = int64(unsafe.Sizeof(storeItem{})) // Cache is a thread-safe implementation of a hashmap with a TinyLFU admission // policy and a Sampled LFU eviction policy. You can use the same Cache instance @@ -446,7 +447,7 @@ func (c *Cache) processItems() { } if !c.ignoreInternalCost { // Add the cost of internally storing the object. - i.Cost += itemSize + i.Cost += CacheItemSize } switch i.flag { diff --git a/go/cache/ristretto/policy.go b/go/cache/ristretto/policy.go index 9ebf0b38d72..38ffbf6d3d6 100644 --- a/go/cache/ristretto/policy.go +++ b/go/cache/ristretto/policy.go @@ -375,7 +375,7 @@ type tinyLFU struct { func newTinyLFU(numCounters int64) *tinyLFU { return &tinyLFU{ freq: newCmSketch(numCounters), - door: bloom.NewBloomFilter(float64(numCounters), 0.01), + door: bloom.NewBloomFilterWithErrorRate(uint64(numCounters), 0.01), resetAt: numCounters, } } diff --git a/go/mysql/fakesqldb/server.go b/go/mysql/fakesqldb/server.go index 82c1e320c06..863ee2bbd91 100644 --- a/go/mysql/fakesqldb/server.go +++ b/go/mysql/fakesqldb/server.go @@ -50,8 +50,8 @@ const appendEntry = -1 type DB struct { // Fields set at construction time. - // t is our testing.T instance - t *testing.T + // t is our testing.TB instance + t testing.TB // listener is our mysql.Listener. listener *mysql.Listener @@ -151,7 +151,7 @@ type ExpectedExecuteFetch struct { } // New creates a server, and starts listening. -func New(t *testing.T) *DB { +func New(t testing.TB) *DB { // Pick a path for our socket. socketDir, err := ioutil.TempDir("", "fakesqldb") if err != nil { diff --git a/go/vt/vttablet/endtoend/config_test.go b/go/vt/vttablet/endtoend/config_test.go index 9848fd3cb26..0b8aa3a9ac4 100644 --- a/go/vt/vttablet/endtoend/config_test.go +++ b/go/vt/vttablet/endtoend/config_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/cache" + "vitess.io/vitess/go/cache/ristretto" "vitess.io/vitess/go/sqltypes" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -178,9 +179,8 @@ func TestConsolidatorReplicasOnly(t *testing.T) { func TestQueryPlanCache(t *testing.T) { if cache.DefaultConfig.LFU { - const cacheItemSize = 40 - const cachedPlanSize = 2275 + cacheItemSize - const cachePlanSize2 = 2254 + cacheItemSize + const cachedPlanSize = 2352 + int(ristretto.CacheItemSize) + const cachePlanSize2 = 2326 + int(ristretto.CacheItemSize) testQueryPlanCache(t, cachedPlanSize, cachePlanSize2) } else { testQueryPlanCache(t, 1, 1) @@ -203,7 +203,7 @@ func testQueryPlanCache(t *testing.T, cachedPlanSize, cachePlanSize2 int) { client := framework.NewClient() _, _ = client.Execute("select * from vitess_test where intval=:ival1", bindVars) _, _ = client.Execute("select * from vitess_test where intval=:ival2", bindVars) - time.Sleep(100 * time.Millisecond) + assert.Equal(t, 1, framework.Server.QueryPlanCacheLen()) vend := framework.DebugVars() verifyIntValue(t, vend, "QueryCacheLength", 1) @@ -212,13 +212,14 @@ func testQueryPlanCache(t *testing.T, cachedPlanSize, cachePlanSize2 int) { framework.Server.SetQueryPlanCacheCap(64 * 1024) _, _ = client.Execute("select * from vitess_test where intval=:ival1", bindVars) - time.Sleep(100 * time.Millisecond) + require.Equal(t, 2, framework.Server.QueryPlanCacheLen()) vend = framework.DebugVars() verifyIntValue(t, vend, "QueryCacheLength", 2) verifyIntValue(t, vend, "QueryCacheSize", cachedPlanSize*2) + _, _ = client.Execute("select * from vitess_test where intval=1", bindVars) - time.Sleep(100 * time.Millisecond) + require.Equal(t, 3, framework.Server.QueryPlanCacheLen()) vend = framework.DebugVars() verifyIntValue(t, vend, "QueryCacheLength", 3) diff --git a/go/vt/vttablet/endtoend/queries_test.go b/go/vt/vttablet/endtoend/queries_test.go index 347596c9a7a..4e3bf4ff6bb 100644 --- a/go/vt/vttablet/endtoend/queries_test.go +++ b/go/vt/vttablet/endtoend/queries_test.go @@ -18,6 +18,7 @@ package endtoend import ( "testing" + "time" "github.com/stretchr/testify/require" @@ -1783,6 +1784,10 @@ func TestQueries(t *testing.T) { }, }, } + + // Wait for the vtgate caches to flush + time.Sleep(1 * time.Second) + for _, tcase := range testCases { if err := tcase.Test("", client); err != nil { t.Error(err) diff --git a/go/vt/vttablet/tabletserver/query_engine.go b/go/vt/vttablet/tabletserver/query_engine.go index 94f37090d47..5e4e5c22b05 100644 --- a/go/vt/vttablet/tabletserver/query_engine.go +++ b/go/vt/vttablet/tabletserver/query_engine.go @@ -413,6 +413,12 @@ func (qe *QueryEngine) QueryPlanCacheCap() int { return int(qe.plans.MaxCapacity()) } +// QueryPlanCacheLen returns the length (size in entries) of the query cache +func (qe *QueryEngine) QueryPlanCacheLen() int { + qe.plans.Wait() + return qe.plans.Len() +} + // AddStats adds the given stats for the planName.tableName func (qe *QueryEngine) AddStats(planName, tableName string, queryCount int64, duration, mysqlTime time.Duration, rowCount, errorCount int64) { // table names can contain "." characters, replace them! diff --git a/go/vt/vttablet/tabletserver/query_engine_test.go b/go/vt/vttablet/tabletserver/query_engine_test.go index 58d78ed30b3..9d4803b0f2d 100644 --- a/go/vt/vttablet/tabletserver/query_engine_test.go +++ b/go/vt/vttablet/tabletserver/query_engine_test.go @@ -372,6 +372,84 @@ func TestConsolidationsUIRedaction(t *testing.T) { } } +func BenchmarkPlanCacheThroughput(b *testing.B) { + db := fakesqldb.New(b) + defer db.Close() + + for query, result := range schematest.Queries() { + db.AddQuery(query, result) + } + + db.AddQueryPattern(".*", &sqltypes.Result{}) + + qe := newTestQueryEngine(10*time.Second, true, newDBConfigs(db)) + qe.se.Open() + qe.Open() + defer qe.Close() + + ctx := context.Background() + logStats := tabletenv.NewLogStats(ctx, "GetPlanStats") + + for i := 0; i < b.N; i++ { + query := fmt.Sprintf("SELECT (a, b, c) FROM test_table_%d", rand.Intn(500)) + _, err := qe.GetPlan(ctx, logStats, query, false, false /* inReservedConn */) + if err != nil { + b.Fatal(err) + } + } +} + +func benchmarkPlanCache(b *testing.B, db *fakesqldb.DB, lfu bool, par int) { + b.Helper() + + dbcfgs := newDBConfigs(db) + config := tabletenv.NewDefaultConfig() + config.DB = dbcfgs + config.QueryCacheLFU = lfu + + env := tabletenv.NewEnv(config, "TabletServerTest") + se := schema.NewEngine(env) + qe := NewQueryEngine(env, se) + + se.InitDBConfig(dbcfgs.DbaWithDB()) + require.NoError(b, se.Open()) + require.NoError(b, qe.Open()) + defer qe.Close() + + b.SetParallelism(par) + b.RunParallel(func(pb *testing.PB) { + ctx := context.Background() + logStats := tabletenv.NewLogStats(ctx, "GetPlanStats") + + for pb.Next() { + query := fmt.Sprintf("SELECT (a, b, c) FROM test_table_%d", rand.Intn(500)) + _, err := qe.GetPlan(ctx, logStats, query, false, false /* inReservedConn */) + require.NoErrorf(b, err, "bad query: %s", query) + } + }) +} + +func BenchmarkPlanCacheContention(b *testing.B) { + db := fakesqldb.New(b) + defer db.Close() + + for query, result := range schematest.Queries() { + db.AddQuery(query, result) + } + + db.AddQueryPattern(".*", &sqltypes.Result{}) + + for par := 1; par <= 8; par *= 2 { + b.Run(fmt.Sprintf("ContentionLRU-%d", par), func(b *testing.B) { + benchmarkPlanCache(b, db, false, par) + }) + + b.Run(fmt.Sprintf("ContentionLFU-%d", par), func(b *testing.B) { + benchmarkPlanCache(b, db, true, par) + }) + } +} + func TestPlanCachePollution(t *testing.T) { plotPath := os.Getenv("CACHE_PLOT_PATH") if plotPath == "" { diff --git a/go/vt/vttablet/tabletserver/tabletenv/config_test.go b/go/vt/vttablet/tabletserver/tabletenv/config_test.go index 76d9bd6ac17..4b654ffb7ce 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config_test.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config_test.go @@ -131,6 +131,7 @@ oltpReadPool: idleTimeoutSeconds: 1800 maxWaiters: 5000 size: 16 +queryCacheLFU: true queryCacheMemory: 33554432 queryCacheSize: 5000 replicationTracker: diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index aa4c8929d71..1889cae0543 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -1732,9 +1732,14 @@ func (tsv *TabletServer) SetQueryPlanCacheCap(val int) { tsv.qe.SetQueryPlanCacheCap(val) } -// QueryPlanCacheCap returns the pool size. +// QueryPlanCacheCap returns the plan cache capacity func (tsv *TabletServer) QueryPlanCacheCap() int { - return int(tsv.qe.QueryPlanCacheCap()) + return tsv.qe.QueryPlanCacheCap() +} + +// QueryPlanCacheLen returns the plan cache length +func (tsv *TabletServer) QueryPlanCacheLen() int { + return tsv.qe.QueryPlanCacheLen() } // SetMaxResultSize changes the max result size to the specified value.