-
Notifications
You must be signed in to change notification settings - Fork 7
/
redis.go
96 lines (85 loc) · 2.41 KB
/
redis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package main
import (
"log"
"time"
"github.com/gomodule/redigo/redis"
)
// package scoped vars we can access wherever
var (
redisPool *redis.Pool
)
func myRedisSubscriptions() (<-chan redis.Message, <-chan redis.Message) {
// set up structures and channels to stream events out on
scoreUpdates := make(chan redis.Message)
detailUpdates := make(chan redis.Message)
go func() {
// Get a new Redis connection from pool. Since this is the first time
// the app tries to do something with Redis, die if we can't get a valid
// connection, since something is probably configured wrong.
conn := redisPool.Get()
_, err := conn.Do("PING")
if err != nil {
log.Fatal("Could not connect to Redis, check your configuration.")
}
// subscribe to and handle streams
const scoreKey = "stream.score_updates"
const detailKey = "stream.tweet_updates.*"
psc := redis.PubSubConn{Conn: conn}
psc.Subscribe(scoreKey)
psc.PSubscribe(detailKey)
for {
switch v := psc.Receive().(type) {
case redis.Message:
switch {
case v.Channel == scoreKey:
scoreUpdates <- v
case v.Pattern == detailKey:
detailUpdates <- v
default:
log.Println("Received a message on an unexpected channel ", v.Channel)
}
case error:
log.Println("redis subscribe connection errored?@&*(#)akjd")
// probable cause is connection was EOF
// reminder: in this context, "Close" means just return to pool
// pool will detect if connection is errored via testOnBorrow
conn.Close()
log.Println("attempting to get a new one in 5 seconds...")
time.Sleep(5 * time.Second)
conn = redisPool.Get()
psc = redis.PubSubConn{Conn: conn}
psc.Subscribe("stream.score_updates")
psc.PSubscribe("stream.tweet_updates.*")
}
}
}()
return scoreUpdates, detailUpdates
}
// https://godoc.org/github.com/gomodule/redigo/redis#Pool
func newPool(server, password string) *redis.Pool {
return &redis.Pool{
MaxIdle: 3,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", server)
if err != nil {
return nil, err
}
if password != "" {
if _, err := c.Do("AUTH", password); err != nil {
c.Close()
return nil, err
}
}
return c, err
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
}
func initRedisPool() {
host, pass := envRedis()
redisPool = newPool(host, pass)
}