Skip to content

Commit 4ee7348

Browse files
committed
add feature control the max subscriber per key
1 parent df1428b commit 4ee7348

File tree

6 files changed

+132
-84
lines changed

6 files changed

+132
-84
lines changed

config.go

+34-32
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,23 @@ func init() {
1616
}
1717

1818
type Config struct {
19-
Addr string `json:"addr"`
20-
Port int `json:"port"`
21-
Pprof int `json:"pprof"`
22-
PprofAddr string `json:"pprof_addr"`
23-
PprofPort int `json:"pprof_port"`
24-
PubAddr string `json:"pub_addr"`
25-
PubPort int `json:"pub_port"`
26-
LongpollingTimeout int `json:"longpolling_timeout"`
27-
MessageTimeout int `json:"message_timeout"`
28-
Log string `json:"log"`
29-
RedisNetwork string `json:"redis_network"`
30-
RedisAddr string `json:"redis_addr"`
31-
RedisTimeout int `json:"redis_timeout"`
32-
RedisPoolSize int `json:"redis_poolsize"`
33-
RedisMQSize int `json:"redis_mqsize"`
34-
MaxProcs int `json:"max_procs"`
19+
Addr string `json:"addr"`
20+
Port int `json:"port"`
21+
Pprof int `json:"pprof"`
22+
PprofAddr string `json:"pprof_addr"`
23+
PprofPort int `json:"pprof_port"`
24+
PubAddr string `json:"pub_addr"`
25+
PubPort int `json:"pub_port"`
26+
LongpollingTimeout int `json:"longpolling_timeout"`
27+
MessageTimeout int `json:"message_timeout"`
28+
Log string `json:"log"`
29+
RedisNetwork string `json:"redis_network"`
30+
RedisAddr string `json:"redis_addr"`
31+
RedisTimeout int `json:"redis_timeout"`
32+
RedisPoolSize int `json:"redis_poolsize"`
33+
RedisMQSize int `json:"redis_mqsize"`
34+
MaxProcs int `json:"max_procs"`
35+
MaxSubscriberPerKey int `json:"max_subscriber_per_key"`
3536
}
3637

3738
func InitConfig(file string) (*Config, error) {
@@ -42,22 +43,23 @@ func InitConfig(file string) (*Config, error) {
4243
}
4344

4445
cf := &Config{
45-
Addr: "localhost",
46-
Port: 8080,
47-
PprofAddr: "localhost",
48-
PprofPort: 8080,
49-
PubAddr: "localhost",
50-
PubPort: 8080,
51-
Pprof: 1,
52-
LongpollingTimeout: 300,
53-
MessageTimeout: 7200,
54-
Log: "./gopush.log",
55-
RedisNetwork: "tcp",
56-
RedisAddr: "localhost:6379",
57-
RedisTimeout: 28800,
58-
RedisPoolSize: 50,
59-
RedisMQSize: 20,
60-
MaxProcs: runtime.NumCPU(),
46+
Addr: "localhost",
47+
Port: 8080,
48+
PprofAddr: "localhost",
49+
PprofPort: 8080,
50+
PubAddr: "localhost",
51+
PubPort: 8080,
52+
Pprof: 1,
53+
LongpollingTimeout: 300,
54+
MessageTimeout: 7200,
55+
Log: "./gopush.log",
56+
RedisNetwork: "tcp",
57+
RedisAddr: "localhost:6379",
58+
RedisTimeout: 28800,
59+
RedisPoolSize: 50,
60+
RedisMQSize: 20,
61+
MaxSubscriberPerKey: 0, // no limit
62+
MaxProcs: runtime.NumCPU(),
6163
}
6264
if err = json.Unmarshal(c, cf); err != nil {
6365
Log.Printf("json.Unmarshal(\"%s\", cf) failed (%s)", string(c), err.Error())

gopush.conf

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,6 @@
1414
"redis_timeout": 28800,
1515
"redis_poolsize": 50,
1616
"redis_mqsize": 20,
17-
"max_procs": 4
17+
"max_procs": 4,
18+
"max_subscriber_per_key": 3
1819
}

job.go

+4-10
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,10 @@ var (
1212
func CleanConnKeyJob() {
1313
for {
1414
key := <-ConnectedKeyCh
15-
16-
for {
17-
if err := RedisHDel(ConnectedKey, key); err != nil {
18-
Log.Printf("RedisHDel(\"%s\", \"%s\") failed (%s)", ConnectedKey, key, err.Error())
19-
// if failed, sleep 1 second and retry
20-
time.Sleep(1 * time.Second)
21-
continue
22-
}
23-
24-
break
15+
if err := RedisDecr(ConnectedKey, key); err != nil {
16+
Log.Printf("RedisDecrBy(\"%s\", \"%s\") failed (%s)", ConnectedKey, key, err.Error())
17+
// if failed, sleep 1 second and retry
18+
time.Sleep(1 * time.Second)
2519
}
2620
}
2721
}

main.go

-2
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ func main() {
4545
runtime.GOMAXPROCS(Conf.MaxProcs)
4646
// init redis
4747
InitRedis()
48-
// init clean connected key job
49-
go CleanConnKeyJob()
5048
// for test client
5149
http.HandleFunc("/client", Client)
5250
// sub

push.go

+34-39
Original file line numberDiff line numberDiff line change
@@ -112,53 +112,32 @@ func Subscribe(ws *websocket.Conn) {
112112
return
113113
}
114114
// Generate Key
115-
key = pusher.Key(key)
115+
Log.Printf("Client (%s) subscribe to key %s", ws.Request().RemoteAddr, key)
116116
// check multi cli connected
117-
exists, err := RedisHExists(ConnectedKey, key)
118-
if err != nil {
119-
Log.Printf("RedisHExists(\"%s\", \"%s\") failed (%s)", ConnectedKey, key, err.Error())
120-
if err = responseWriter(ws, InternalErr, result); err != nil {
121-
Log.Printf("responseWriter failed (%s)", err.Error())
122-
}
123-
124-
return
125-
}
117+
if Conf.MaxSubscriberPerKey > 0 {
118+
ok, err := multiCliCheck(key)
119+
if err != nil {
120+
if err = responseWriter(ws, InternalErr, result); err != nil {
121+
Log.Printf("responseWriter failed (%s)", err.Error())
122+
}
126123

127-
if exists == 1 {
128-
Log.Printf("key : %s already has a client sub to", key)
129-
if err = responseWriter(ws, MultiCliErr, result); err != nil {
130-
Log.Printf("responseWriter failed (%s)", err.Error())
124+
return
131125
}
132126

133-
return
134-
}
135-
// set connected flag
136-
cliNum, err := RedisHSetnx(ConnectedKey, key, "")
137-
if err != nil {
138-
Log.Printf("RedisHSetnx(\"%s\", \"%s\", \"\") failed (%s)", ConnectedKey, key, err.Error())
139-
if err = responseWriter(ws, InternalErr, result); err != nil {
140-
Log.Printf("responseWriter failed (%s)", err.Error())
141-
}
127+
defer func() {
128+
if err = RedisDecr(ConnectedKey, key); err != nil {
129+
Log.Printf("RedisDecr(\"%s\", \"%s\") failed (%s)", ConnectedKey, key, err.Error())
130+
}
131+
}()
142132

143-
return
144-
}
133+
if !ok {
134+
if err = responseWriter(ws, MultiCliErr, result); err != nil {
135+
Log.Printf("responseWriter failed (%s)", err.Error())
136+
}
145137

146-
if cliNum == 0 {
147-
Log.Printf("key : %s already has a client sub to", key)
148-
if err = responseWriter(ws, MultiCliErr, result); err != nil {
149-
Log.Printf("responseWriter failed (%s)", err.Error())
138+
return
150139
}
151-
152-
return
153140
}
154-
155-
defer func() {
156-
if err := RedisHDel(ConnectedKey, key); err != nil {
157-
Log.Printf("RedisHDel(\"%s\", \"%s\") failed (%s)", ConnectedKey, key, err.Error())
158-
// retry to delete the key
159-
ConnectedKeyCh <- key
160-
}
161-
}()
162141
// redis routine for receive pub message or error
163142
redisC, psc, err := RedisSub(key)
164143
if err != nil {
@@ -207,6 +186,22 @@ func Subscribe(ws *websocket.Conn) {
207186
}
208187
}
209188

189+
func multiCliCheck(key string) (bool, error) {
190+
// incr client num
191+
cliNum, err := RedisIncr(ConnectedKey, key)
192+
if err != nil {
193+
Log.Printf("RedisIncr(\"%s\", \"%s\") failed (%s)", ConnectedKey, key, err.Error())
194+
return false, err
195+
}
196+
// check
197+
if cliNum > Conf.MaxSubscriberPerKey {
198+
Log.Printf("key %s has %d subscribers exceed %d", key, cliNum, Conf.MaxSubscriberPerKey)
199+
return false, nil
200+
}
201+
202+
return true, nil
203+
}
204+
210205
func netRead(ws *websocket.Conn) chan error {
211206
c := make(chan error, 1)
212207
// client close or network error, go routine exit

redis.go

+58
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ func InitRedis() {
3232
return c, err
3333
},
3434
}
35+
36+
if Conf.MaxSubscriberPerKey > 0 {
37+
// init clean connected key job
38+
go CleanConnKeyJob()
39+
}
3540
}
3641

3742
func RedisUnSub(key string, psc redis.PubSubConn) error {
@@ -300,6 +305,59 @@ func RedisHExists(key, field string) (int, error) {
300305
return v, nil
301306
}
302307

308+
func RedisHGet(key, field string) (int, error) {
309+
c := redisPool.Get()
310+
defer c.Close()
311+
reply, err := c.Do("HGET", key, field)
312+
if err != nil {
313+
Log.Printf("c.Do(\"HGET\", \"%s\") failed (%s)", key, err.Error())
314+
return 0, err
315+
}
316+
317+
if reply == nil {
318+
return -1, nil
319+
}
320+
321+
v, err := redis.Int(reply, nil)
322+
if err != nil {
323+
Log.Printf("redis.Int() failed (%s)", err.Error())
324+
return 0, err
325+
}
326+
327+
return v, nil
328+
}
329+
330+
func RedisIncr(key, field string) (int, error) {
331+
c := redisPool.Get()
332+
defer c.Close()
333+
reply, err := c.Do("HINCRBY", key, field, 1)
334+
if err != nil {
335+
Log.Printf("c.Do(\"HINCRBY\", \"%s\", 1) failed (%s)", key, err.Error())
336+
return 0, err
337+
}
338+
339+
v, err := redis.Int(reply, nil)
340+
if err != nil {
341+
Log.Printf("redis.Int() failed (%s)", err.Error())
342+
return 0, err
343+
}
344+
345+
return v, nil
346+
}
347+
348+
func RedisDecr(key, field string) error {
349+
c := redisPool.Get()
350+
defer c.Close()
351+
_, err := c.Do("HINCRBY", key, field, -1)
352+
if err != nil {
353+
Log.Printf("c.Do(\"HINCRBY\", \"%s\", -1) failed (%s)", key, err.Error())
354+
ConnectedKeyCh <- key
355+
return err
356+
}
357+
358+
return nil
359+
}
360+
303361
func RedisHDel(key, field string) error {
304362
c := redisPool.Get()
305363
defer c.Close()

0 commit comments

Comments
 (0)