diff --git a/asynchclient.go b/asynchclient.go index f0851f3..a5f0c63 100644 --- a/asynchclient.go +++ b/asynchclient.go @@ -748,7 +748,7 @@ func (c *asyncClient) Srandmember(arg0 string) (result FutureBytes, err Error) { // Redis ZADD command. func (c *asyncClient) Zadd(arg0 string, arg1 float64, arg2 []byte) (result FutureBool, err Error) { arg0bytes := []byte(arg0) - arg1bytes := []byte(fmt.Sprintf("%e", arg1)) + arg1bytes := []byte(strconv.FormatFloat(arg1, 'e', 15, 64)) arg2bytes := arg2 var resp *PendingResponse @@ -834,8 +834,8 @@ func (c *asyncClient) Zrevrange(arg0 string, arg1 int64, arg2 int64) (result Fut // Redis ZRANGEBYSCORE command. func (c *asyncClient) Zrangebyscore(arg0 string, arg1 float64, arg2 float64) (result FutureBytesArray, err Error) { arg0bytes := []byte(arg0) - arg1bytes := []byte(fmt.Sprintf("%e", arg1)) - arg2bytes := []byte(fmt.Sprintf("%e", arg2)) + arg1bytes := []byte(strconv.FormatFloat(arg1, 'e', 15, 64)) + arg2bytes := []byte(strconv.FormatFloat(arg2, 'e', 15, 64)) var resp *PendingResponse resp, err = c.conn.QueueRequest(&ZRANGEBYSCORE, [][]byte{arg0bytes, arg1bytes, arg2bytes}) @@ -843,7 +843,20 @@ func (c *asyncClient) Zrangebyscore(arg0 string, arg1 float64, arg2 float64) (re result = resp.future.(FutureBytesArray) } return result, err +} + +// Redis ZREMRANGEBYSCORE command. +func (c *asyncClient) Zremrangebyscore(arg0 string, arg1 float64, arg2 float64) (result FutureInt64, err Error) { + arg0bytes := []byte(arg0) + arg1bytes := []byte(strconv.FormatFloat(arg1, 'e', 15, 64)) + arg2bytes := []byte(strconv.FormatFloat(arg2, 'e', 15, 64)) + var resp *PendingResponse + resp, err = c.conn.QueueRequest(&ZREMRANGEBYSCORE, [][]byte{arg0bytes, arg1bytes, arg2bytes}) + if err == nil { + result = resp.future.(FutureInt64) + } + return result, err } // Redis HGET command. diff --git a/connection.go b/connection.go index 7eac625..53d85ab 100644 --- a/connection.go +++ b/connection.go @@ -41,6 +41,8 @@ const ( DefaultTCPKeepalive = true DefaultHeartbeatSecs = 1 * time.Second DefaultProtocol = REDIS_DB + DefaultReconnectDelay = 500 * time.Millisecond + DefaultMaxReconnects = 2 ) // Redis specific default settings @@ -76,20 +78,22 @@ func (p Protocol) String() string { // Defines the set of parameters that are used by the client connections // type ConnectionSpec struct { - host string // redis connection host - port int // redis connection port - password string // redis connection password - db int // Redis connection db # - rBufSize int // tcp read buffer size - wBufSize int // tcp write buffer size - rTimeout time.Duration // tcp read timeout - wTimeout time.Duration // tcp write timeout - keepalive bool // keepalive flag - lingerspec int // -n: finish io; 0: discard, +n: wait for n secs to finish - reqChanCap int // async request channel capacity - see DefaultReqChanSize - rspChanCap int // async response channel capacity - see DefaultRespChanSize - heartbeat time.Duration // 0 means no heartbeat - protocol Protocol + host string // redis connection host + port int // redis connection port + password string // redis connection password + db int // Redis connection db # + rBufSize int // tcp read buffer size + wBufSize int // tcp write buffer size + rTimeout time.Duration // tcp read timeout + wTimeout time.Duration // tcp write timeout + keepalive bool // keepalive flag + lingerspec int // -n: finish io; 0: discard, +n: wait for n secs to finish + reqChanCap int // async request channel capacity - see DefaultReqChanSize + rspChanCap int // async response channel capacity - see DefaultRespChanSize + heartbeat time.Duration // 0 means no heartbeat + protocol Protocol + reconnectDelay time.Duration // minimum delay between consecutive reconnect attempts + maxReconnects int // maximum number of failed reconnects before reconnect panics } // Creates a ConnectionSpec using default settings. @@ -110,6 +114,8 @@ func DefaultSpec() *ConnectionSpec { DefaultRespChanSize, DefaultHeartbeatSecs, DefaultProtocol, + DefaultReconnectDelay, + DefaultMaxReconnects, } } @@ -208,10 +214,12 @@ type Subscription struct { // General control structure used by connections. // type connHdl struct { - spec *ConnectionSpec - conn net.Conn // may want to change this to TCPConn - TODO REVU - reader *bufio.Reader - connected bool // TODO + spec *ConnectionSpec + conn net.Conn // may want to change this to TCPConn - TODO REVU + reader *bufio.Reader + connected bool // TODO + nextReconnect time.Time + reconnectCount int } // Returns minimal info string for logging, etc @@ -315,6 +323,40 @@ func (hdl *connHdl) disconnect() { } } +func (c *connHdl) reconnect() (success bool) { + // panic after too many failed attempts + if c.reconnectCount >= c.spec.maxReconnects { + panic("Too many failed reconnects.") + } + + // only try once every c.spec.reconnectDelay + if time.Now().Before(c.nextReconnect) { + return + } + + defer func() { + if e := recover(); e != nil { // reconnect failed + c.nextReconnect = time.Now().Add(c.spec.reconnectDelay) + c.reconnectCount += 1 + } + }() + + // try reconnect + c.disconnect() + *c = *newConnHdl(c.spec) // panics + + if c.connected { // reconnect succeeded + success = true + // reset reconnect stats + c.nextReconnect = time.Now() + c.reconnectCount = 0 + } else { + // trigger intern panic which gets caught by deferred recover + panic("Failed to reconnect.") + } + return +} + // Creates a new SyncConnection using the provided ConnectionSpec. // Note that this function will also connect to the specified redis server. func NewSyncConnection(spec *ConnectionSpec) (c SyncConnection, err Error) { @@ -338,7 +380,13 @@ func (c *connHdl) ServiceRequest(cmd *Command, args [][]byte) (resp Response, er defer func() { if re := recover(); re != nil { // REVU - needs to be logged - TODO - err = newSystemErrorWithCause("ServiceRequest", re.(error)) + + if success := c.reconnect(); success { + // try again if recconnect was successful + resp, err = c.ServiceRequest(cmd, args) + } else { + err = newSystemErrorWithCause("ServiceRequest", re.(error)) + } } }() @@ -565,7 +613,13 @@ func (c *asyncConnHdl) QueueRequest(cmd *Command, args [][]byte) (pending *Pendi defer func() { if re := recover(); re != nil { // REVU - needs to be logged - TODO - err = newSystemErrorWithCause("QueueRequest", re.(error)) + + if success := c.reconnect(); success { + // try again if recconnect was successful + pending, err = c.QueueRequest(cmd, args) + } else { + err = newSystemErrorWithCause("QueueRequest", re.(error)) + } } }() @@ -670,11 +724,44 @@ func (c *asyncConnHdl) connect() { } // REVU - TODO opt 2 for Quit here -// panics func (c *asyncConnHdl) disconnect() { + c.super.disconnect() + // should we close the channels here? +} + +func (c *asyncConnHdl) reconnect() (success bool) { + // panic after too many failed attempts + if c.super.reconnectCount >= c.super.spec.maxReconnects { + panic("Too many failed reconnects.") + } + + // only try once every c.super.spec.reconnectDelay + if time.Now().Before(c.super.nextReconnect) { + return + } + + defer func() { + if e := recover(); e != nil { // reconnect failed + c.super.nextReconnect = time.Now().Add(c.super.spec.reconnectDelay) + c.super.reconnectCount += 1 + } + }() - panic("asyncConnHdl.disconnect NOT IMLEMENTED!") - // return + // try reconnect + c.disconnect() + *c = *newAsyncConnHdl(c.super.spec) // panics + + if c.super.connected { // reconnect succeeded + c.startup() + success = true + // reset reconnect stats + c.super.nextReconnect = time.Now() + c.super.reconnectCount = 0 + } else { + // trigger intern panic which gets caught by deferred recover + panic("Failed to reconnect.") + } + return } // responsible for managing the various moving parts of the asyncConnHdl diff --git a/redis.go b/redis.go index 073294a..b131e6f 100644 --- a/redis.go +++ b/redis.go @@ -287,6 +287,9 @@ type Client interface { // Redis ZRANGEBYSCORE command. Zrangebyscore(key string, arg1 float64, arg2 float64) (result [][]byte, err Error) + // Redis ZREMRANGEBYSCORE command + Zremrangebyscore(key string, arg1 float64, arg2 float64) (result int64, err Error) + // Redis HGET command. Hget(key string, hashkey string) (result []byte, err Error) @@ -499,6 +502,9 @@ type AsyncClient interface { // Redis ZRANGEBYSCORE command. Zrangebyscore(key string, arg1 float64, arg2 float64) (result FutureBytesArray, err Error) + // Redis ZREMRANGEBYSCORE command + Zremrangebyscore(key string, arg1 float64, arg2 float64) (result FutureInt64, err Error) + // Redis FLUSHDB command. Flushdb() (status FutureBool, err Error) diff --git a/specification.go b/specification.go index 8d273ab..e7faf50 100644 --- a/specification.go +++ b/specification.go @@ -118,76 +118,77 @@ type Command struct { // The supported Command set, with one to one mapping to eponymous Redis command. // var ( - AUTH Command = Command{"AUTH", KEY, STATUS} - PING Command = Command{"PING", NO_ARG, STATUS} - QUIT Command = Command{"QUIT", NO_ARG, VIRTUAL} - SET Command = Command{"SET", KEY_VALUE, STATUS} - GET Command = Command{"GET", KEY, BULK} - GETSET Command = Command{"GETSET", KEY_VALUE, BULK} - MGET Command = Command{"MGET", MULTI_KEY, MULTI_BULK} - SETNX Command = Command{"SETNX", KEY_VALUE, BOOLEAN} - INCR Command = Command{"INCR", KEY, NUMBER} - INCRBY Command = Command{"INCRBY", KEY_NUM, NUMBER} - DECR Command = Command{"DECR", KEY, NUMBER} - DECRBY Command = Command{"DECRBY", KEY_NUM, NUMBER} - EXISTS Command = Command{"EXISTS", KEY, BOOLEAN} - DEL Command = Command{"DEL", KEY, BOOLEAN} - TYPE Command = Command{"TYPE", KEY, STRING} - KEYS Command = Command{"KEYS", KEY, MULTI_BULK} - RANDOMKEY Command = Command{"RANDOMKEY", NO_ARG, BULK} - RENAME Command = Command{"RENAME", KEY_KEY, STATUS} - RENAMENX Command = Command{"RENAMENX", KEY_KEY, BOOLEAN} - DBSIZE Command = Command{"DBSIZE", NO_ARG, NUMBER} - EXPIRE Command = Command{"EXPIRE", KEY_NUM, BOOLEAN} - TTL Command = Command{"TTL", KEY, NUMBER} - RPUSH Command = Command{"RPUSH", KEY_VALUE, STATUS} - LPUSH Command = Command{"LPUSH", KEY_VALUE, STATUS} - LLEN Command = Command{"LLEN", KEY, NUMBER} - LRANGE Command = Command{"LRANGE", KEY_NUM_NUM, MULTI_BULK} - LTRIM Command = Command{"LTRIM", KEY_NUM_NUM, STATUS} - LINDEX Command = Command{"LINDEX", KEY_NUM, BULK} - LSET Command = Command{"LSET", KEY_IDX_VALUE, STATUS} - LREM Command = Command{"LREM", KEY_CNT_VALUE, NUMBER} - LPOP Command = Command{"LPOP", KEY, BULK} - BLPOP Command = Command{"BLPOP", KEY, MULTI_BULK} - RPOP Command = Command{"RPOP", KEY, BULK} - BRPOP Command = Command{"BRPOP", KEY, MULTI_BULK} - RPOPLPUSH Command = Command{"RPOPLPUSH", KEY_VALUE, BULK} - BRPOPLPUSH Command = Command{"BRPOPLPUSH", KEY_VALUE, MULTI_BULK} - SADD Command = Command{"SADD", KEY_VALUE, BOOLEAN} - SREM Command = Command{"SREM", KEY_VALUE, BOOLEAN} - SCARD Command = Command{"SCARD", KEY, NUMBER} - SISMEMBER Command = Command{"SISMEMBER", KEY_VALUE, BOOLEAN} - SINTER Command = Command{"SINTER", MULTI_KEY, MULTI_BULK} - SINTERSTORE Command = Command{"SINTERSTORE", MULTI_KEY, STATUS} - SUNION Command = Command{"SUNION", MULTI_KEY, MULTI_BULK} - SUNIONSTORE Command = Command{"SUNIONSTORE", MULTI_KEY, STATUS} - SDIFF Command = Command{"SDIFF", MULTI_KEY, MULTI_BULK} - SDIFFSTORE Command = Command{"SDIFFSTORE", MULTI_KEY, STATUS} - SMEMBERS Command = Command{"SMEMBERS", KEY, MULTI_BULK} - SMOVE Command = Command{"SMOVE", KEY_KEY_VALUE, BOOLEAN} - SRANDMEMBER Command = Command{"SRANDMEMBER", KEY, BULK} - HGET Command = Command{"HGET", KEY_KEY, BULK} - HSET Command = Command{"HSET", KEY_KEY_VALUE, STATUS} - HGETALL Command = Command{"HGETALL", KEY, MULTI_BULK} - ZADD Command = Command{"ZADD", KEY_IDX_VALUE, BOOLEAN} - ZREM Command = Command{"ZREM", KEY_VALUE, BOOLEAN} - ZCARD Command = Command{"ZCARD", KEY, NUMBER} - ZSCORE Command = Command{"ZSCORE", KEY_VALUE, BULK} - ZRANGE Command = Command{"ZRANGE", KEY_NUM_NUM, MULTI_BULK} - ZREVRANGE Command = Command{"ZREVRANGE", KEY_NUM_NUM, MULTI_BULK} - ZRANGEBYSCORE Command = Command{"ZRANGEBYSCORE", KEY_NUM_NUM, MULTI_BULK} - SELECT Command = Command{"SELECT", KEY, STATUS} - FLUSHDB Command = Command{"FLUSHDB", NO_ARG, STATUS} - FLUSHALL Command = Command{"FLUSHALL", NO_ARG, STATUS} - MOVE Command = Command{"MOVE", KEY_NUM, BOOLEAN} - SORT Command = Command{"SORT", KEY_SPEC, MULTI_BULK} - SAVE Command = Command{"SAVE", NO_ARG, STATUS} - BGSAVE Command = Command{"BGSAVE", NO_ARG, STATUS} - LASTSAVE Command = Command{"LASTSAVE", NO_ARG, NUMBER} - SHUTDOWN Command = Command{"SHUTDOWN", NO_ARG, VIRTUAL} - INFO Command = Command{"INFO", NO_ARG, BULK} - MONITOR Command = Command{"MONITOR", NO_ARG, VIRTUAL} + AUTH Command = Command{"AUTH", KEY, STATUS} + PING Command = Command{"PING", NO_ARG, STATUS} + QUIT Command = Command{"QUIT", NO_ARG, VIRTUAL} + SET Command = Command{"SET", KEY_VALUE, STATUS} + GET Command = Command{"GET", KEY, BULK} + GETSET Command = Command{"GETSET", KEY_VALUE, BULK} + MGET Command = Command{"MGET", MULTI_KEY, MULTI_BULK} + SETNX Command = Command{"SETNX", KEY_VALUE, BOOLEAN} + INCR Command = Command{"INCR", KEY, NUMBER} + INCRBY Command = Command{"INCRBY", KEY_NUM, NUMBER} + DECR Command = Command{"DECR", KEY, NUMBER} + DECRBY Command = Command{"DECRBY", KEY_NUM, NUMBER} + EXISTS Command = Command{"EXISTS", KEY, BOOLEAN} + DEL Command = Command{"DEL", KEY, BOOLEAN} + TYPE Command = Command{"TYPE", KEY, STRING} + KEYS Command = Command{"KEYS", KEY, MULTI_BULK} + RANDOMKEY Command = Command{"RANDOMKEY", NO_ARG, BULK} + RENAME Command = Command{"RENAME", KEY_KEY, STATUS} + RENAMENX Command = Command{"RENAMENX", KEY_KEY, BOOLEAN} + DBSIZE Command = Command{"DBSIZE", NO_ARG, NUMBER} + EXPIRE Command = Command{"EXPIRE", KEY_NUM, BOOLEAN} + TTL Command = Command{"TTL", KEY, NUMBER} + RPUSH Command = Command{"RPUSH", KEY_VALUE, STATUS} + LPUSH Command = Command{"LPUSH", KEY_VALUE, STATUS} + LLEN Command = Command{"LLEN", KEY, NUMBER} + LRANGE Command = Command{"LRANGE", KEY_NUM_NUM, MULTI_BULK} + LTRIM Command = Command{"LTRIM", KEY_NUM_NUM, STATUS} + LINDEX Command = Command{"LINDEX", KEY_NUM, BULK} + LSET Command = Command{"LSET", KEY_IDX_VALUE, STATUS} + LREM Command = Command{"LREM", KEY_CNT_VALUE, NUMBER} + LPOP Command = Command{"LPOP", KEY, BULK} + BLPOP Command = Command{"BLPOP", KEY, MULTI_BULK} + RPOP Command = Command{"RPOP", KEY, BULK} + BRPOP Command = Command{"BRPOP", KEY, MULTI_BULK} + RPOPLPUSH Command = Command{"RPOPLPUSH", KEY_VALUE, BULK} + BRPOPLPUSH Command = Command{"BRPOPLPUSH", KEY_VALUE, MULTI_BULK} + SADD Command = Command{"SADD", KEY_VALUE, BOOLEAN} + SREM Command = Command{"SREM", KEY_VALUE, BOOLEAN} + SCARD Command = Command{"SCARD", KEY, NUMBER} + SISMEMBER Command = Command{"SISMEMBER", KEY_VALUE, BOOLEAN} + SINTER Command = Command{"SINTER", MULTI_KEY, MULTI_BULK} + SINTERSTORE Command = Command{"SINTERSTORE", MULTI_KEY, STATUS} + SUNION Command = Command{"SUNION", MULTI_KEY, MULTI_BULK} + SUNIONSTORE Command = Command{"SUNIONSTORE", MULTI_KEY, STATUS} + SDIFF Command = Command{"SDIFF", MULTI_KEY, MULTI_BULK} + SDIFFSTORE Command = Command{"SDIFFSTORE", MULTI_KEY, STATUS} + SMEMBERS Command = Command{"SMEMBERS", KEY, MULTI_BULK} + SMOVE Command = Command{"SMOVE", KEY_KEY_VALUE, BOOLEAN} + SRANDMEMBER Command = Command{"SRANDMEMBER", KEY, BULK} + HGET Command = Command{"HGET", KEY_KEY, BULK} + HSET Command = Command{"HSET", KEY_KEY_VALUE, STATUS} + HGETALL Command = Command{"HGETALL", KEY, MULTI_BULK} + ZADD Command = Command{"ZADD", KEY_IDX_VALUE, BOOLEAN} + ZREM Command = Command{"ZREM", KEY_VALUE, BOOLEAN} + ZCARD Command = Command{"ZCARD", KEY, NUMBER} + ZSCORE Command = Command{"ZSCORE", KEY_VALUE, BULK} + ZRANGE Command = Command{"ZRANGE", KEY_NUM_NUM, MULTI_BULK} + ZREVRANGE Command = Command{"ZREVRANGE", KEY_NUM_NUM, MULTI_BULK} + ZRANGEBYSCORE Command = Command{"ZRANGEBYSCORE", KEY_NUM_NUM, MULTI_BULK} + ZREMRANGEBYSCORE Command = Command{"ZREMRANGEBYSCORE", KEY_NUM_NUM, NUMBER} + SELECT Command = Command{"SELECT", KEY, STATUS} + FLUSHDB Command = Command{"FLUSHDB", NO_ARG, STATUS} + FLUSHALL Command = Command{"FLUSHALL", NO_ARG, STATUS} + MOVE Command = Command{"MOVE", KEY_NUM, BOOLEAN} + SORT Command = Command{"SORT", KEY_SPEC, MULTI_BULK} + SAVE Command = Command{"SAVE", NO_ARG, STATUS} + BGSAVE Command = Command{"BGSAVE", NO_ARG, STATUS} + LASTSAVE Command = Command{"LASTSAVE", NO_ARG, NUMBER} + SHUTDOWN Command = Command{"SHUTDOWN", NO_ARG, VIRTUAL} + INFO Command = Command{"INFO", NO_ARG, BULK} + MONITOR Command = Command{"MONITOR", NO_ARG, VIRTUAL} // TODO SORT (RequestType.MULTI_KEY, ResponseType.MULTI_BULK), PUBLISH Command = Command{"PUBLISH", KEY_VALUE, NUMBER} SUBSCRIBE Command = Command{"SUBSCRIBE", MULTI_KEY, MULTI_BULK} diff --git a/synchclient.go b/synchclient.go index 3af06d3..bf054fb 100644 --- a/synchclient.go +++ b/synchclient.go @@ -739,7 +739,7 @@ func (c *syncClient) Srandmember(arg0 string) (result []byte, err Error) { // Redis ZADD command. func (c *syncClient) Zadd(arg0 string, arg1 float64, arg2 []byte) (result bool, err Error) { arg0bytes := []byte(arg0) - arg1bytes := []byte(fmt.Sprintf("%e", arg1)) + arg1bytes := []byte(strconv.FormatFloat(arg1, 'e', 15, 64)) arg2bytes := arg2 var resp Response @@ -839,8 +839,8 @@ func (c *syncClient) Zrevrange(arg0 string, arg1 int64, arg2 int64) (result [][] // Redis ZRANGEBYSCORE command. func (c *syncClient) Zrangebyscore(arg0 string, arg1 float64, arg2 float64) (result [][]byte, err Error) { arg0bytes := []byte(arg0) - arg1bytes := []byte(fmt.Sprintf("%e", arg1)) - arg2bytes := []byte(fmt.Sprintf("%e", arg2)) + arg1bytes := []byte(strconv.FormatFloat(arg1, 'e', 15, 64)) + arg2bytes := []byte(strconv.FormatFloat(arg2, 'e', 15, 64)) var resp Response resp, err = c.conn.ServiceRequest(&ZRANGEBYSCORE, [][]byte{arg0bytes, arg1bytes, arg2bytes}) @@ -851,6 +851,21 @@ func (c *syncClient) Zrangebyscore(arg0 string, arg1 float64, arg2 float64) (res } +// Redis ZREMRANGEBYSCORE command. +func (c *syncClient) Zremrangebyscore(arg0 string, arg1 float64, arg2 float64) (result int64, err Error) { + arg0bytes := []byte(arg0) + arg1bytes := []byte(strconv.FormatFloat(arg1, 'e', 15, 64)) + arg2bytes := []byte(strconv.FormatFloat(arg2, 'e', 15, 64)) + + var resp Response + resp, err = c.conn.ServiceRequest(&ZREMRANGEBYSCORE, [][]byte{arg0bytes, arg1bytes, arg2bytes}) + if err == nil { + result = resp.GetNumberValue() + } + return result, err + +} + // Redis HGET command. func (c *syncClient) Hget(arg0 string, arg1 string) (result []byte, err Error) { arg0bytes := []byte(arg0)