Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

qps/rate limit & reduce expire seek range & expire-key hash #204

Open
wants to merge 61 commits into
base: master
Choose a base branch
from
Open
Changes from 37 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
db84a39
fix empty command which cause titan to crash in parsing command
Sep 3, 2019
3e4b89e
change empty command's reply error from ErrEmptyArray to ErrEmptyCommand
Sep 3, 2019
c89b27f
close the connection when meet empty command
Sep 3, 2019
43bd6c3
some illegal clients send unreadable command which may cause titan cr…
Sep 6, 2019
95bd04b
if a connection send unknown commands 3 times, close it
Sep 6, 2019
63b131d
add expire left seconds metrics
Sep 18, 2019
a9c28fe
fix expire left time calculating
Sep 18, 2019
e5ff501
when set left/delay current seconds(expire), also set other seconds to 0
Sep 19, 2019
fb11c67
delete zadd args output
Oct 25, 2019
586b30d
first version of rate limit
Nov 7, 2019
27f54e7
1 use rate.limiter to implement limit and read tikv key/value to get …
Nov 21, 2019
e5018e4
qps also can be set burst, its limit also support k/K/m/M suffix
Nov 21, 2019
ecd78a2
1 change log level--limit not set, to debug
Nov 21, 2019
0648ab8
add limit default config items and fix error in config.go
Nov 21, 2019
f86245a
in startSyncNewLimit, just read all match limit once for every comman…
Nov 21, 2019
157d70a
change got limit log trace
Nov 22, 2019
fd0d8cc
add limit cleared log trace, just log limit is trigger when delay > 0
Nov 22, 2019
756a0e6
change limit/commandFunc cost seconds factor from 2 to 1.4
Nov 22, 2019
46bb6da
fix titan active time decoding bug
Nov 22, 2019
a839995
fix limit local percent bug
Nov 22, 2019
815efa4
fix titan active time parsing bug
Nov 22, 2019
9612571
add updateLimitPercent log
Nov 22, 2019
cc4255a
when create commandLimiter, also use localPercent to set limit
Nov 22, 2019
f00d8c5
fix limitersMgr localPercent using bug and lock before use it
Nov 25, 2019
da49bfa
reportLocalStat every globalBalancePeriod,even the commandLimiter is nil
Nov 25, 2019
30f7d2c
limit can also work on auth-disabled titan server
Nov 26, 2019
bcbd5a7
decrease the lock range in reportStat
Nov 26, 2019
1a50695
reportLocalStat run in itself go routine
Nov 26, 2019
a7753c4
1 change how to balance
Nov 29, 2019
fda1e66
update some config item's description and log
Dec 2, 2019
a474656
add limit changed log
Dec 2, 2019
8ed0698
commandLimiter store globalQpsLimit/globalRateLimit in int64, use it …
Dec 2, 2019
6ae93be
when all titan'a limiter qps is < devideUsage of its weight percent, …
Dec 3, 2019
70b3f15
add showlimit.sh and add a conf parameter for setlimit.sh
Dec 4, 2019
a22ca0f
1 resolve the problem that titan still process command for remote clo…
Dec 6, 2019
8403a21
1 BytesArray add judge for write eror
Dec 6, 2019
2d59108
refactor commandLimiter's qpsLimiter/ratelimiter and their globalLimi…
Dec 7, 2019
b034581
fix balance limit's bug
Dec 7, 2019
3b71e69
1 in Exec(), add transaction begin, cmd args num, cmd func, reply fun…
Dec 11, 2019
0fb99b7
simplify the msg recv/send log in debug level
Dec 11, 2019
37c1c68
fix bug of setting cmd arg num metric in Exec()
Dec 11, 2019
9de2ae9
add expire/gc round/seek/commit cost metrics
Dec 12, 2019
6c2328d
prevent connection more and more: when tikv command cost high, client…
Dec 22, 2019
bbfce19
fix bug of connection limit
Dec 25, 2019
93cd7df
1 add metrics range, fix cost exceed max value problem
Feb 18, 2020
7e2521b
Merge branch 'fpxu_dev' into 'master'
Feb 18, 2020
ad7cff1
1 add example for limit-connection and max-connection-wait in titan.toml
Feb 19, 2020
aeca4b3
1 fix unlock bug in limit-connection
Feb 19, 2020
bb617b0
don't lock s.serverctx.lock when limit-connection disabled
Feb 19, 2020
003e6b6
expire seek end in at:now keys to reduce rocksdb tomstone problem
Feb 25, 2020
4423eaf
fix a failed case in tests/redis/unit/expire.tcl
Feb 26, 2020
cc15799
expire seek start in last processed expireKeys to reduce rocksdb toms…
Feb 27, 2020
d9a1baf
fix expire log
Feb 27, 2020
83a370d
also reduce expire seek range when no expire keys or all expire keys …
Feb 27, 2020
d75dd34
also reduce expire seek range when no expire keys or all expire keys …
Feb 28, 2020
3519ff9
comment unsupported string functions in command list, and delete its …
Mar 2, 2020
c4eea83
comment unit test of unsupported string functions in strings_test.go
Mar 2, 2020
bd43ec2
hash expire-key, it will improve the keys num handled every seconds, …
Apr 21, 2020
fe6e84b
1 fix runExpire's problem that unhashed goroutine scan hashed expire-key
Apr 24, 2020
977b457
unhash expire goroutine use individual limit configuration item、expir…
Apr 26, 2020
9161d7e
buckets of comand/limit/commandFunc/txn commit cost is too many to l…
Apr 26, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions bin/titan/main.go
Original file line number Diff line number Diff line change
@@ -68,11 +68,19 @@ func main() {
}

svr := metrics.NewServer(&config.Status)

limitersMgr, err := db.NewLimitersMgr(store, &config.Tikv.RateLimit)
if err != nil {
zap.L().Fatal("create limitersMgr failed", zap.Error(err))
os.Exit(1)
}
serv := titan.New(&context.ServerContext{
RequirePass: config.Server.Auth,
Store: store,
ListZipThreshold: config.Server.ListZipThreshold,
RequirePass: config.Server.Auth,
Store: store,
ListZipThreshold: config.Server.ListZipThreshold,
LimitersMgr: limitersMgr,
LimitConnection: config.Server.LimitConnection,
MaxConnection: config.Server.MaxConnection,
MaxConnectionWait: config.Server.MaxConnectionWait,
})

var servOpts, statusOpts []continuous.ServerOption
53 changes: 23 additions & 30 deletions client.go
Original file line number Diff line number Diff line change
@@ -6,7 +6,6 @@ import (
"io/ioutil"
"net"
"strings"
"sync"
"time"

"github.com/distributedio/titan/command"
@@ -22,54 +21,43 @@ type client struct {
exec *command.Executor
r *bufio.Reader

eofLock sync.Mutex //the lock of reading_writing 'eof'
eof bool //is over when read data from socket
remoteClosed bool //is the connection closed by remote peer?
}

func newClient(cliCtx *context.ClientContext, s *Server, exec *command.Executor) *client {
return &client{
cliCtx: cliCtx,
server: s,
exec: exec,
eof: false,
cliCtx: cliCtx,
server: s,
exec: exec,
remoteClosed: false,
}
}

func (c *client) readEof() {
c.eofLock.Lock()
defer c.eofLock.Unlock()

c.eof = true
}

func (c *client) isEof() bool {
c.eofLock.Lock()
defer c.eofLock.Unlock()

return c.eof
}

// Write to conn and log error if needed
func (c *client) Write(p []byte) (int, error) {
zap.L().Debug("write to client", zap.Int64("clientid", c.cliCtx.ID), zap.String("msg", string(p)))
n, err := c.conn.Write(p)
if err != nil {
c.conn.Close()
if err == io.EOF {
zap.L().Info("close connection", zap.String("addr", c.cliCtx.RemoteAddr),
zap.Int64("clientid", c.cliCtx.ID))
zap.L().Info("connection was half-closed by remote peer", zap.String("addr", c.cliCtx.RemoteAddr),
zap.Int64("clientid", c.cliCtx.ID), zap.String("namespace", c.cliCtx.Namespace))
} else {
//may be unknown error with message "connection reset by peer"
zap.L().Error("write net failed", zap.String("addr", c.cliCtx.RemoteAddr),
zap.Int64("clientid", c.cliCtx.ID),
zap.String("namespace", c.cliCtx.Namespace),
zap.Bool("multi", c.cliCtx.Multi),
zap.Bool("watching", c.cliCtx.Txn != nil),
zap.String("command", c.cliCtx.LastCmd),
zap.String("error", err.Error()))
return 0, err
}
//client.serve() will get the channel close event, close the connection, exit current go routine
//if the remote client use pipeline to invoke command, then close the connection(timeout etc), titan still get command from client.bufio.Reader and process
//setting client.remoteClosed to true will help client.serve() to interrupt command processing
c.remoteClosed = true
}
return n, nil
//return err for above write() error, then replying many times command can break its sending to a half-closed connection, etc BytesArray(lrange invoke it).
return n, err
}

func (c *client) serve(conn net.Conn) error {
@@ -84,16 +72,21 @@ func (c *client) serve(conn net.Conn) error {
case <-c.cliCtx.Done:
return c.conn.Close()
default:
if c.remoteClosed {
zap.L().Info("close connection", zap.String("addr", c.cliCtx.RemoteAddr),
zap.Int64("clientid", c.cliCtx.ID), zap.String("namespace", c.cliCtx.Namespace))
return c.conn.Close()
}
cmd, err = c.readCommand()
if err != nil {
c.conn.Close()
if err == io.EOF {
zap.L().Info("close connection", zap.String("addr", c.cliCtx.RemoteAddr),
zap.Int64("clientid", c.cliCtx.ID))
zap.Int64("clientid", c.cliCtx.ID), zap.String("namespace", c.cliCtx.Namespace))
return nil
}
zap.L().Error("read command failed", zap.String("addr", c.cliCtx.RemoteAddr),
zap.Int64("clientid", c.cliCtx.ID), zap.Error(err))
zap.Int64("clientid", c.cliCtx.ID), zap.String("namespace", c.cliCtx.Namespace), zap.Error(err))
return err
}
}
@@ -137,7 +130,6 @@ func (c *client) serve(conn net.Conn) error {
}

ctx.Context = context.New(c.cliCtx, c.server.servCtx)
zap.L().Debug("recv msg", zap.String("command", ctx.Name), zap.Strings("arguments", ctx.Args))

// Skip reply if necessary
if c.cliCtx.SkipN != 0 {
@@ -150,7 +142,8 @@ func (c *client) serve(conn net.Conn) error {
env.Write(zap.String("addr", c.cliCtx.RemoteAddr),
zap.Int64("clientid", c.cliCtx.ID),
zap.String("traceid", ctx.TraceID),
zap.String("command", ctx.Name))
zap.String("command", ctx.Name),
zap.Strings("arguments", ctx.Args))
}

c.exec.Execute(ctx)
31 changes: 16 additions & 15 deletions command/command.go
Original file line number Diff line number Diff line change
@@ -61,19 +61,18 @@ func Integer(w io.Writer, v int64) OnCommit {
// BytesArray replies a [][]byte when commit
func BytesArray(w io.Writer, a [][]byte) OnCommit {
return func() {
start := time.Now()
resp.ReplyArray(w, len(a))
zap.L().Debug("reply array size", zap.Int64("cost(us)", time.Since(start).Nanoseconds()/1000))
start = time.Now()
if _, err := resp.ReplyArray(w, len(a)); err != nil {
return
}
for i := range a {
if a[i] == nil {
resp.ReplyNullBulkString(w)
if err := resp.ReplyNullBulkString(w); err != nil {
return
}
continue
}
resp.ReplyBulkString(w, string(a[i]))
if i%10 == 9 {
zap.L().Debug("reply 10 bulk string", zap.Int64("cost(us)", time.Since(start).Nanoseconds()/1000))
start = time.Now()
if err := resp.ReplyBulkString(w, string(a[i])); err != nil {
return
}
}
}
@@ -92,6 +91,10 @@ type TxnCommand func(ctx *Context, txn *db.Transaction) (OnCommit, error)
func Call(ctx *Context) {
ctx.Name = strings.ToLower(ctx.Name)

if _, ok := txnCommands[ctx.Name]; ok && ctx.Server.LimitersMgr != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

txnCommands is not used anymore, use commands instead.

ctx.Server.LimitersMgr.CheckLimit(ctx.Client.Namespace, ctx.Name, ctx.Args)
}

if ctx.Name != "auth" &&
ctx.Server.RequirePass != "" &&
ctx.Client.Authenticated == false {
@@ -187,13 +190,11 @@ func AutoCommit(cmd TxnCommand) Command {
key := ""
if len(ctx.Args) > 0 {
key = ctx.Args[0]
if len(ctx.Args) > 1 {
mt.CommandArgsNumHistogramVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Observe(float64(len(ctx.Args) - 1))
}
mt.CommandArgsNumHistogramVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Observe(float64(len(ctx.Args)))
}
cost := time.Since(start).Seconds()
zap.L().Debug("transation begin", zap.String("name", ctx.Name), zap.String("key", key), zap.Int64("cost(us)", int64(cost*1000000)))
mt.TxnBeginHistogramVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Observe(cost)
zap.L().Debug("transation begin", zap.String("name", ctx.Name), zap.String("key", key), zap.Int64("cost(us)", int64(cost*1000000)))
if err != nil {
mt.TxnFailuresCounterVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Inc()
resp.ReplyError(ctx.Out, "ERR "+err.Error())
@@ -208,8 +209,8 @@ func AutoCommit(cmd TxnCommand) Command {
start = time.Now()
onCommit, err := cmd(ctx, txn)
cost = time.Since(start).Seconds()
zap.L().Debug("command done", zap.String("name", ctx.Name), zap.String("key", key), zap.Int64("cost(us)", int64(cost*1000000)))
mt.CommandFuncDoneHistogramVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Observe(cost)
zap.L().Debug("command done", zap.String("name", ctx.Name), zap.String("key", key), zap.Int64("cost(us)", int64(cost*1000000)))
if err != nil {
mt.TxnFailuresCounterVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Inc()
resp.ReplyError(ctx.Out, err.Error())
@@ -257,8 +258,8 @@ func AutoCommit(cmd TxnCommand) Command {
onCommit()
}
cost = time.Since(start).Seconds()
zap.L().Debug("onCommit ", zap.String("name", ctx.Name), zap.String("key", key), zap.Int64("cost(us)", int64(cost*1000000)))
mt.ReplyFuncDoneHistogramVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Observe(cost)
zap.L().Debug("onCommit ", zap.String("name", ctx.Name), zap.String("key", key), zap.Int64("cost(us)", int64(cost*1000000)))
mtFunc()
return nil
})
20 changes: 17 additions & 3 deletions command/transactions.go
Original file line number Diff line number Diff line change
@@ -38,9 +38,15 @@ func Exec(ctx *Context) {
var outputs []*bytes.Buffer
var onCommits []OnCommit
err = retry.Ensure(ctx, func() error {
mt := metrics.GetMetrics()
if !watching {
start := time.Now()
txn, err = ctx.Client.DB.Begin()
cost := time.Since(start).Seconds()
mt.TxnBeginHistogramVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Observe(cost)
zap.L().Debug("transation begin", zap.String("name", ctx.Name), zap.Int64("cost(us)", int64(cost*1000000)))
if err != nil {
mt.TxnFailuresCounterVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Inc()
zap.L().Error("begin txn failed",
zap.Int64("clientid", ctx.Client.ID),
zap.String("command", ctx.Name),
@@ -63,12 +69,18 @@ func Exec(ctx *Context) {
Out: out,
Context: ctx.Context,
}
if len(cmd.Args) > 0 {
mt.CommandArgsNumHistogramVec.WithLabelValues(ctx.Client.Namespace, cmd.Name).Observe(float64(len(cmd.Args)))
}
name := strings.ToLower(cmd.Name)
if _, ok := txnCommands[name]; ok {
start := time.Now()
onCommit, err = TxnCall(subCtx, txn)
zap.L().Debug("execute", zap.String("command", subCtx.Name), zap.Int64("cost(us)", time.Since(start).Nanoseconds()/1000))
cost := time.Since(start).Seconds()
mt.CommandFuncDoneHistogramVec.WithLabelValues(ctx.Client.Namespace, cmd.Name).Observe(cost)
zap.L().Debug("execute", zap.String("command", cmd.Name), zap.Int64("cost(us)", int64(cost*1000000)))
if err != nil {
mt.TxnFailuresCounterVec.WithLabelValues(ctx.Client.Namespace, cmd.Name).Inc()
resp.ReplyError(out, err.Error())
}
} else {
@@ -79,7 +91,6 @@ func Exec(ctx *Context) {
commandCount++
}
start := time.Now()
mt := metrics.GetMetrics()
mt.MultiCommandHistogramVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Observe(float64(commandCount))
defer func() {
cost := time.Since(start).Seconds()
@@ -123,7 +134,7 @@ func Exec(ctx *Context) {
return
}


start := time.Now()
resp.ReplyArray(ctx.Out, size)
// run OnCommit that fill reply to outputs
for i := range onCommits {
@@ -141,6 +152,9 @@ func Exec(ctx *Context) {
break
}
}
cost := time.Since(start).Seconds()
metrics.GetMetrics().ReplyFuncDoneHistogramVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Observe(cost)
zap.L().Debug("onCommit ", zap.String("name", ctx.Name), zap.Int64("cost(us)", int64(cost*1000000)))
}

// Watch starts a transaction, watch is a global transaction and is not key associated(this is different from redis)
43 changes: 30 additions & 13 deletions conf/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package conf

import "time"
import (
"time"
)

// Titan configuration center
type Titan struct {
@@ -24,22 +26,25 @@ type Hash struct {

// Server config is the config of titan server
type Server struct {
Auth string `cfg:"auth;;;client connetion auth"`
Listen string `cfg:"listen; 0.0.0.0:7369; netaddr; address to listen"`
SSLCertFile string `cfg:"ssl-cert-file;;;server SSL certificate file (enables SSL support)"`
SSLKeyFile string `cfg:"ssl-key-file;;;server SSL key file"`
MaxConnection int64 `cfg:"max-connection;1000;numeric;client connection count"`
ListZipThreshold int `cfg:"list-zip-threshold;100;numeric;the max limit length of elements in list"`
Auth string `cfg:"auth;;;client connetion auth"`
Listen string `cfg:"listen; 0.0.0.0:7369; netaddr; address to listen"`
SSLCertFile string `cfg:"ssl-cert-file;;;server SSL certificate file (enables SSL support)"`
SSLKeyFile string `cfg:"ssl-key-file;;;server SSL key file"`
LimitConnection bool `cfg:"limit-connection; false; boolean; limit max connection num when it's true"`
MaxConnection int64 `cfg:"max-connection;500;numeric;client connection count"`
ListZipThreshold int `cfg:"list-zip-threshold;100;numeric;the max limit length of elements in list"`
MaxConnectionWait int64 `cfg:"max-connection-wait;1000;numeric;wait ms before close connection when exceed max connection"`
}

// Tikv config is the config of tikv sdk
type Tikv struct {
PdAddrs string `cfg:"pd-addrs;required; ;pd address in tidb"`
DB DB `cfg:"db"`
GC GC `cfg:"gc"`
Expire Expire `cfg:"expire"`
ZT ZT `cfg:"zt"`
TikvGC TikvGC `cfg:"tikv-gc"`
PdAddrs string `cfg:"pd-addrs;required; ;pd address in tidb"`
DB DB `cfg:"db"`
GC GC `cfg:"gc"`
Expire Expire `cfg:"expire"`
ZT ZT `cfg:"zt"`
TikvGC TikvGC `cfg:"tikv-gc"`
RateLimit RateLimit `cfg:"rate-limit"`
}

// TikvGC config is the config of implement tikv sdk gcwork
@@ -99,3 +104,15 @@ type Status struct {
SSLCertFile string `cfg:"ssl-cert-file;;;status server SSL certificate file (enables SSL support)"`
SSLKeyFile string `cfg:"ssl-key-file;;;status server SSL key file"`
}

type RateLimit struct {
InterfaceName string `cfg:"interface-name; eth0; ; the interface name to get ip and write local titan status to tikv for balancing rate limit"`
LimiterNamespace string `cfg:"limiter-namespace; sys_ratelimit;; the namespace of getting limit/balance data"`
GlobalBalancePeriod time.Duration `cfg:"global-balance-period; 15s;; the period in seconds to balance rate limiting with other titan nodes"`
TitanStatusLifetime time.Duration `cfg:"titanstatus-life-time; 1m;; how long if a titan didn't update its status, we consider it dead"`
SyncSetPeriod time.Duration `cfg:"sync-set-period; 3s;; the period in seconds to sync new limit set in tikv"`
UsageToDivide float64 `cfg:"usage-to-divide; 0.6;; if the qps/weighted limit < the percent, will divide change Factor to balance limit"`
UsageToMultiply float64 `cfg:"usage-to-multiply; 0.9;; if the qps/weighted limit >= the percent, will multiply change Factor to balance limit"`
WeightChangeFactor float64 `cfg:"weight-change-factor; 1.5;; the factor to devide/multipy in current weight"`
InitialPercent float64 `cfg:"initial-percent; 0.33;; the limit is set in the percent when a commandLimiter is created"`
}
10 changes: 10 additions & 0 deletions conf/mockconfig.go
Original file line number Diff line number Diff line change
@@ -33,6 +33,16 @@ func MockConf() *Titan {
SafePointLifeTime: 10 * time.Minute,
Concurrency: 2,
},
RateLimit: RateLimit{
LimiterNamespace: "sys_ratelimit",
SyncSetPeriod: 1 * time.Second,
GlobalBalancePeriod: 3 * time.Second,
TitanStatusLifetime: 6 * time.Second,
UsageToDivide: 0.6,
UsageToMultiply: 0.9,
WeightChangeFactor: 1.5,
InitialPercent: 1,
},
},
}
}
Loading