Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions asynchclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ func (c *asyncClient) Srandmember(arg0 string) (result FutureBytes, err Error) {
// Redis ZADD command.
func (c *asyncClient) Zadd(arg0 string, arg1 float64, arg2 []byte) (result FutureBool, err Error) {
arg0bytes := []byte(arg0)
arg1bytes := []byte(fmt.Sprintf("%e", arg1))
arg1bytes := []byte(strconv.FormatFloat(arg1, 'e', 15, 64))
arg2bytes := arg2

var resp *PendingResponse
Expand Down Expand Up @@ -834,16 +834,29 @@ func (c *asyncClient) Zrevrange(arg0 string, arg1 int64, arg2 int64) (result Fut
// Redis ZRANGEBYSCORE command.
func (c *asyncClient) Zrangebyscore(arg0 string, arg1 float64, arg2 float64) (result FutureBytesArray, err Error) {
arg0bytes := []byte(arg0)
arg1bytes := []byte(fmt.Sprintf("%e", arg1))
arg2bytes := []byte(fmt.Sprintf("%e", arg2))
arg1bytes := []byte(strconv.FormatFloat(arg1, 'e', 15, 64))
arg2bytes := []byte(strconv.FormatFloat(arg2, 'e', 15, 64))

var resp *PendingResponse
resp, err = c.conn.QueueRequest(&ZRANGEBYSCORE, [][]byte{arg0bytes, arg1bytes, arg2bytes})
if err == nil {
result = resp.future.(FutureBytesArray)
}
return result, err
}

// Redis ZREMRANGEBYSCORE command.
func (c *asyncClient) Zremrangebyscore(arg0 string, arg1 float64, arg2 float64) (result FutureInt64, err Error) {
arg0bytes := []byte(arg0)
arg1bytes := []byte(strconv.FormatFloat(arg1, 'e', 15, 64))
arg2bytes := []byte(strconv.FormatFloat(arg2, 'e', 15, 64))

var resp *PendingResponse
resp, err = c.conn.QueueRequest(&ZREMRANGEBYSCORE, [][]byte{arg0bytes, arg1bytes, arg2bytes})
if err == nil {
result = resp.future.(FutureInt64)
}
return result, err
}

// Redis HGET command.
Expand Down
133 changes: 110 additions & 23 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
DefaultTCPKeepalive = true
DefaultHeartbeatSecs = 1 * time.Second
DefaultProtocol = REDIS_DB
DefaultReconnectDelay = 500 * time.Millisecond
DefaultMaxReconnects = 2
)

// Redis specific default settings
Expand Down Expand Up @@ -76,20 +78,22 @@ func (p Protocol) String() string {
// Defines the set of parameters that are used by the client connections
//
type ConnectionSpec struct {
host string // redis connection host
port int // redis connection port
password string // redis connection password
db int // Redis connection db #
rBufSize int // tcp read buffer size
wBufSize int // tcp write buffer size
rTimeout time.Duration // tcp read timeout
wTimeout time.Duration // tcp write timeout
keepalive bool // keepalive flag
lingerspec int // -n: finish io; 0: discard, +n: wait for n secs to finish
reqChanCap int // async request channel capacity - see DefaultReqChanSize
rspChanCap int // async response channel capacity - see DefaultRespChanSize
heartbeat time.Duration // 0 means no heartbeat
protocol Protocol
host string // redis connection host
port int // redis connection port
password string // redis connection password
db int // Redis connection db #
rBufSize int // tcp read buffer size
wBufSize int // tcp write buffer size
rTimeout time.Duration // tcp read timeout
wTimeout time.Duration // tcp write timeout
keepalive bool // keepalive flag
lingerspec int // -n: finish io; 0: discard, +n: wait for n secs to finish
reqChanCap int // async request channel capacity - see DefaultReqChanSize
rspChanCap int // async response channel capacity - see DefaultRespChanSize
heartbeat time.Duration // 0 means no heartbeat
protocol Protocol
reconnectDelay time.Duration // minimum delay between consecutive reconnect attempts
maxReconnects int // maximum number of failed reconnects before reconnect panics
}

// Creates a ConnectionSpec using default settings.
Expand All @@ -110,6 +114,8 @@ func DefaultSpec() *ConnectionSpec {
DefaultRespChanSize,
DefaultHeartbeatSecs,
DefaultProtocol,
DefaultReconnectDelay,
DefaultMaxReconnects,
}
}

Expand Down Expand Up @@ -208,10 +214,12 @@ type Subscription struct {
// General control structure used by connections.
//
type connHdl struct {
spec *ConnectionSpec
conn net.Conn // may want to change this to TCPConn - TODO REVU
reader *bufio.Reader
connected bool // TODO
spec *ConnectionSpec
conn net.Conn // may want to change this to TCPConn - TODO REVU
reader *bufio.Reader
connected bool // TODO
nextReconnect time.Time
reconnectCount int
}

// Returns minimal info string for logging, etc
Expand Down Expand Up @@ -315,6 +323,40 @@ func (hdl *connHdl) disconnect() {
}
}

func (c *connHdl) reconnect() (success bool) {
// panic after too many failed attempts
if c.reconnectCount >= c.spec.maxReconnects {
panic("Too many failed reconnects.")
}

// only try once every c.spec.reconnectDelay
if time.Now().Before(c.nextReconnect) {
return
}

defer func() {
if e := recover(); e != nil { // reconnect failed
c.nextReconnect = time.Now().Add(c.spec.reconnectDelay)
c.reconnectCount += 1
}
}()

// try reconnect
c.disconnect()
*c = *newConnHdl(c.spec) // panics

if c.connected { // reconnect succeeded
success = true
// reset reconnect stats
c.nextReconnect = time.Now()
c.reconnectCount = 0
} else {
// trigger intern panic which gets caught by deferred recover
panic("Failed to reconnect.")
}
return
}

// Creates a new SyncConnection using the provided ConnectionSpec.
// Note that this function will also connect to the specified redis server.
func NewSyncConnection(spec *ConnectionSpec) (c SyncConnection, err Error) {
Expand All @@ -338,7 +380,13 @@ func (c *connHdl) ServiceRequest(cmd *Command, args [][]byte) (resp Response, er
defer func() {
if re := recover(); re != nil {
// REVU - needs to be logged - TODO
err = newSystemErrorWithCause("ServiceRequest", re.(error))

if success := c.reconnect(); success {
// try again if recconnect was successful
resp, err = c.ServiceRequest(cmd, args)
} else {
err = newSystemErrorWithCause("ServiceRequest", re.(error))
}
}
}()

Expand Down Expand Up @@ -565,7 +613,13 @@ func (c *asyncConnHdl) QueueRequest(cmd *Command, args [][]byte) (pending *Pendi
defer func() {
if re := recover(); re != nil {
// REVU - needs to be logged - TODO
err = newSystemErrorWithCause("QueueRequest", re.(error))

if success := c.reconnect(); success {
// try again if recconnect was successful
pending, err = c.QueueRequest(cmd, args)
} else {
err = newSystemErrorWithCause("QueueRequest", re.(error))
}
}
}()

Expand Down Expand Up @@ -670,11 +724,44 @@ func (c *asyncConnHdl) connect() {
}

// REVU - TODO opt 2 for Quit here
// panics
func (c *asyncConnHdl) disconnect() {
c.super.disconnect()
// should we close the channels here?
}

func (c *asyncConnHdl) reconnect() (success bool) {
// panic after too many failed attempts
if c.super.reconnectCount >= c.super.spec.maxReconnects {
panic("Too many failed reconnects.")
}

// only try once every c.super.spec.reconnectDelay
if time.Now().Before(c.super.nextReconnect) {
return
}

defer func() {
if e := recover(); e != nil { // reconnect failed
c.super.nextReconnect = time.Now().Add(c.super.spec.reconnectDelay)
c.super.reconnectCount += 1
}
}()

panic("asyncConnHdl.disconnect NOT IMLEMENTED!")
// return
// try reconnect
c.disconnect()
*c = *newAsyncConnHdl(c.super.spec) // panics

if c.super.connected { // reconnect succeeded
c.startup()
success = true
// reset reconnect stats
c.super.nextReconnect = time.Now()
c.super.reconnectCount = 0
} else {
// trigger intern panic which gets caught by deferred recover
panic("Failed to reconnect.")
}
return
}

// responsible for managing the various moving parts of the asyncConnHdl
Expand Down
6 changes: 6 additions & 0 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ type Client interface {
// Redis ZRANGEBYSCORE command.
Zrangebyscore(key string, arg1 float64, arg2 float64) (result [][]byte, err Error)

// Redis ZREMRANGEBYSCORE command
Zremrangebyscore(key string, arg1 float64, arg2 float64) (result int64, err Error)

// Redis HGET command.
Hget(key string, hashkey string) (result []byte, err Error)

Expand Down Expand Up @@ -499,6 +502,9 @@ type AsyncClient interface {
// Redis ZRANGEBYSCORE command.
Zrangebyscore(key string, arg1 float64, arg2 float64) (result FutureBytesArray, err Error)

// Redis ZREMRANGEBYSCORE command
Zremrangebyscore(key string, arg1 float64, arg2 float64) (result FutureInt64, err Error)

// Redis FLUSHDB command.
Flushdb() (status FutureBool, err Error)

Expand Down
141 changes: 71 additions & 70 deletions specification.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,76 +118,77 @@ type Command struct {
// The supported Command set, with one to one mapping to eponymous Redis command.
//
var (
AUTH Command = Command{"AUTH", KEY, STATUS}
PING Command = Command{"PING", NO_ARG, STATUS}
QUIT Command = Command{"QUIT", NO_ARG, VIRTUAL}
SET Command = Command{"SET", KEY_VALUE, STATUS}
GET Command = Command{"GET", KEY, BULK}
GETSET Command = Command{"GETSET", KEY_VALUE, BULK}
MGET Command = Command{"MGET", MULTI_KEY, MULTI_BULK}
SETNX Command = Command{"SETNX", KEY_VALUE, BOOLEAN}
INCR Command = Command{"INCR", KEY, NUMBER}
INCRBY Command = Command{"INCRBY", KEY_NUM, NUMBER}
DECR Command = Command{"DECR", KEY, NUMBER}
DECRBY Command = Command{"DECRBY", KEY_NUM, NUMBER}
EXISTS Command = Command{"EXISTS", KEY, BOOLEAN}
DEL Command = Command{"DEL", KEY, BOOLEAN}
TYPE Command = Command{"TYPE", KEY, STRING}
KEYS Command = Command{"KEYS", KEY, MULTI_BULK}
RANDOMKEY Command = Command{"RANDOMKEY", NO_ARG, BULK}
RENAME Command = Command{"RENAME", KEY_KEY, STATUS}
RENAMENX Command = Command{"RENAMENX", KEY_KEY, BOOLEAN}
DBSIZE Command = Command{"DBSIZE", NO_ARG, NUMBER}
EXPIRE Command = Command{"EXPIRE", KEY_NUM, BOOLEAN}
TTL Command = Command{"TTL", KEY, NUMBER}
RPUSH Command = Command{"RPUSH", KEY_VALUE, STATUS}
LPUSH Command = Command{"LPUSH", KEY_VALUE, STATUS}
LLEN Command = Command{"LLEN", KEY, NUMBER}
LRANGE Command = Command{"LRANGE", KEY_NUM_NUM, MULTI_BULK}
LTRIM Command = Command{"LTRIM", KEY_NUM_NUM, STATUS}
LINDEX Command = Command{"LINDEX", KEY_NUM, BULK}
LSET Command = Command{"LSET", KEY_IDX_VALUE, STATUS}
LREM Command = Command{"LREM", KEY_CNT_VALUE, NUMBER}
LPOP Command = Command{"LPOP", KEY, BULK}
BLPOP Command = Command{"BLPOP", KEY, MULTI_BULK}
RPOP Command = Command{"RPOP", KEY, BULK}
BRPOP Command = Command{"BRPOP", KEY, MULTI_BULK}
RPOPLPUSH Command = Command{"RPOPLPUSH", KEY_VALUE, BULK}
BRPOPLPUSH Command = Command{"BRPOPLPUSH", KEY_VALUE, MULTI_BULK}
SADD Command = Command{"SADD", KEY_VALUE, BOOLEAN}
SREM Command = Command{"SREM", KEY_VALUE, BOOLEAN}
SCARD Command = Command{"SCARD", KEY, NUMBER}
SISMEMBER Command = Command{"SISMEMBER", KEY_VALUE, BOOLEAN}
SINTER Command = Command{"SINTER", MULTI_KEY, MULTI_BULK}
SINTERSTORE Command = Command{"SINTERSTORE", MULTI_KEY, STATUS}
SUNION Command = Command{"SUNION", MULTI_KEY, MULTI_BULK}
SUNIONSTORE Command = Command{"SUNIONSTORE", MULTI_KEY, STATUS}
SDIFF Command = Command{"SDIFF", MULTI_KEY, MULTI_BULK}
SDIFFSTORE Command = Command{"SDIFFSTORE", MULTI_KEY, STATUS}
SMEMBERS Command = Command{"SMEMBERS", KEY, MULTI_BULK}
SMOVE Command = Command{"SMOVE", KEY_KEY_VALUE, BOOLEAN}
SRANDMEMBER Command = Command{"SRANDMEMBER", KEY, BULK}
HGET Command = Command{"HGET", KEY_KEY, BULK}
HSET Command = Command{"HSET", KEY_KEY_VALUE, STATUS}
HGETALL Command = Command{"HGETALL", KEY, MULTI_BULK}
ZADD Command = Command{"ZADD", KEY_IDX_VALUE, BOOLEAN}
ZREM Command = Command{"ZREM", KEY_VALUE, BOOLEAN}
ZCARD Command = Command{"ZCARD", KEY, NUMBER}
ZSCORE Command = Command{"ZSCORE", KEY_VALUE, BULK}
ZRANGE Command = Command{"ZRANGE", KEY_NUM_NUM, MULTI_BULK}
ZREVRANGE Command = Command{"ZREVRANGE", KEY_NUM_NUM, MULTI_BULK}
ZRANGEBYSCORE Command = Command{"ZRANGEBYSCORE", KEY_NUM_NUM, MULTI_BULK}
SELECT Command = Command{"SELECT", KEY, STATUS}
FLUSHDB Command = Command{"FLUSHDB", NO_ARG, STATUS}
FLUSHALL Command = Command{"FLUSHALL", NO_ARG, STATUS}
MOVE Command = Command{"MOVE", KEY_NUM, BOOLEAN}
SORT Command = Command{"SORT", KEY_SPEC, MULTI_BULK}
SAVE Command = Command{"SAVE", NO_ARG, STATUS}
BGSAVE Command = Command{"BGSAVE", NO_ARG, STATUS}
LASTSAVE Command = Command{"LASTSAVE", NO_ARG, NUMBER}
SHUTDOWN Command = Command{"SHUTDOWN", NO_ARG, VIRTUAL}
INFO Command = Command{"INFO", NO_ARG, BULK}
MONITOR Command = Command{"MONITOR", NO_ARG, VIRTUAL}
AUTH Command = Command{"AUTH", KEY, STATUS}
PING Command = Command{"PING", NO_ARG, STATUS}
QUIT Command = Command{"QUIT", NO_ARG, VIRTUAL}
SET Command = Command{"SET", KEY_VALUE, STATUS}
GET Command = Command{"GET", KEY, BULK}
GETSET Command = Command{"GETSET", KEY_VALUE, BULK}
MGET Command = Command{"MGET", MULTI_KEY, MULTI_BULK}
SETNX Command = Command{"SETNX", KEY_VALUE, BOOLEAN}
INCR Command = Command{"INCR", KEY, NUMBER}
INCRBY Command = Command{"INCRBY", KEY_NUM, NUMBER}
DECR Command = Command{"DECR", KEY, NUMBER}
DECRBY Command = Command{"DECRBY", KEY_NUM, NUMBER}
EXISTS Command = Command{"EXISTS", KEY, BOOLEAN}
DEL Command = Command{"DEL", KEY, BOOLEAN}
TYPE Command = Command{"TYPE", KEY, STRING}
KEYS Command = Command{"KEYS", KEY, MULTI_BULK}
RANDOMKEY Command = Command{"RANDOMKEY", NO_ARG, BULK}
RENAME Command = Command{"RENAME", KEY_KEY, STATUS}
RENAMENX Command = Command{"RENAMENX", KEY_KEY, BOOLEAN}
DBSIZE Command = Command{"DBSIZE", NO_ARG, NUMBER}
EXPIRE Command = Command{"EXPIRE", KEY_NUM, BOOLEAN}
TTL Command = Command{"TTL", KEY, NUMBER}
RPUSH Command = Command{"RPUSH", KEY_VALUE, STATUS}
LPUSH Command = Command{"LPUSH", KEY_VALUE, STATUS}
LLEN Command = Command{"LLEN", KEY, NUMBER}
LRANGE Command = Command{"LRANGE", KEY_NUM_NUM, MULTI_BULK}
LTRIM Command = Command{"LTRIM", KEY_NUM_NUM, STATUS}
LINDEX Command = Command{"LINDEX", KEY_NUM, BULK}
LSET Command = Command{"LSET", KEY_IDX_VALUE, STATUS}
LREM Command = Command{"LREM", KEY_CNT_VALUE, NUMBER}
LPOP Command = Command{"LPOP", KEY, BULK}
BLPOP Command = Command{"BLPOP", KEY, MULTI_BULK}
RPOP Command = Command{"RPOP", KEY, BULK}
BRPOP Command = Command{"BRPOP", KEY, MULTI_BULK}
RPOPLPUSH Command = Command{"RPOPLPUSH", KEY_VALUE, BULK}
BRPOPLPUSH Command = Command{"BRPOPLPUSH", KEY_VALUE, MULTI_BULK}
SADD Command = Command{"SADD", KEY_VALUE, BOOLEAN}
SREM Command = Command{"SREM", KEY_VALUE, BOOLEAN}
SCARD Command = Command{"SCARD", KEY, NUMBER}
SISMEMBER Command = Command{"SISMEMBER", KEY_VALUE, BOOLEAN}
SINTER Command = Command{"SINTER", MULTI_KEY, MULTI_BULK}
SINTERSTORE Command = Command{"SINTERSTORE", MULTI_KEY, STATUS}
SUNION Command = Command{"SUNION", MULTI_KEY, MULTI_BULK}
SUNIONSTORE Command = Command{"SUNIONSTORE", MULTI_KEY, STATUS}
SDIFF Command = Command{"SDIFF", MULTI_KEY, MULTI_BULK}
SDIFFSTORE Command = Command{"SDIFFSTORE", MULTI_KEY, STATUS}
SMEMBERS Command = Command{"SMEMBERS", KEY, MULTI_BULK}
SMOVE Command = Command{"SMOVE", KEY_KEY_VALUE, BOOLEAN}
SRANDMEMBER Command = Command{"SRANDMEMBER", KEY, BULK}
HGET Command = Command{"HGET", KEY_KEY, BULK}
HSET Command = Command{"HSET", KEY_KEY_VALUE, STATUS}
HGETALL Command = Command{"HGETALL", KEY, MULTI_BULK}
ZADD Command = Command{"ZADD", KEY_IDX_VALUE, BOOLEAN}
ZREM Command = Command{"ZREM", KEY_VALUE, BOOLEAN}
ZCARD Command = Command{"ZCARD", KEY, NUMBER}
ZSCORE Command = Command{"ZSCORE", KEY_VALUE, BULK}
ZRANGE Command = Command{"ZRANGE", KEY_NUM_NUM, MULTI_BULK}
ZREVRANGE Command = Command{"ZREVRANGE", KEY_NUM_NUM, MULTI_BULK}
ZRANGEBYSCORE Command = Command{"ZRANGEBYSCORE", KEY_NUM_NUM, MULTI_BULK}
ZREMRANGEBYSCORE Command = Command{"ZREMRANGEBYSCORE", KEY_NUM_NUM, NUMBER}
SELECT Command = Command{"SELECT", KEY, STATUS}
FLUSHDB Command = Command{"FLUSHDB", NO_ARG, STATUS}
FLUSHALL Command = Command{"FLUSHALL", NO_ARG, STATUS}
MOVE Command = Command{"MOVE", KEY_NUM, BOOLEAN}
SORT Command = Command{"SORT", KEY_SPEC, MULTI_BULK}
SAVE Command = Command{"SAVE", NO_ARG, STATUS}
BGSAVE Command = Command{"BGSAVE", NO_ARG, STATUS}
LASTSAVE Command = Command{"LASTSAVE", NO_ARG, NUMBER}
SHUTDOWN Command = Command{"SHUTDOWN", NO_ARG, VIRTUAL}
INFO Command = Command{"INFO", NO_ARG, BULK}
MONITOR Command = Command{"MONITOR", NO_ARG, VIRTUAL}
// TODO SORT (RequestType.MULTI_KEY, ResponseType.MULTI_BULK),
PUBLISH Command = Command{"PUBLISH", KEY_VALUE, NUMBER}
SUBSCRIBE Command = Command{"SUBSCRIBE", MULTI_KEY, MULTI_BULK}
Expand Down
Loading