Skip to content

Commit

Permalink
Use cache
Browse files Browse the repository at this point in the history
  • Loading branch information
GNURub committed Apr 10, 2020
1 parent 1b3a45d commit d875076
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 125 deletions.
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Created by .ignore support plugin (hsz.mobi)
.idea
dist
room_keys.json
.vscode
.tmp
vendor
vendor
1 change: 0 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o livego .

FROM alpine:latest
LABEL maintainer="Ruben Cid Lara <[email protected]>"
RUN mkdir -p /app/config
WORKDIR /app
ENV RTMP_PORT 1935
Expand Down
128 changes: 39 additions & 89 deletions configure/channel.go
Original file line number Diff line number Diff line change
@@ -1,112 +1,62 @@
package configure

import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"math/rand"
"sync"
"time"

"github.com/go-redis/redis/v7"
"github.com/patrickmn/go-cache"
)

var RoomKeys = LoadRoomKey(*GetKeyFile())
var RoomKeys *RoomKeysType

var roomUpdated = false

var saveInFile = true
var redisCli *redis.Client
var saveInLocal = true

type RoomKeysType struct {
redisCli *redis.Client
localCache *cache.Cache
}

func Init() {
saveInFile = GetRedisAddr() == nil

rand.Seed(time.Now().UnixNano())

This comment has been minimized.

Copy link
@DKingAlpha

DKingAlpha Jul 4, 2020

Collaborator

#117 rand seed initialization removed. @GNURub

if saveInFile {
go func() {
for {
time.Sleep(15 * time.Second)
if roomUpdated {
RoomKeys.Save(*roomKeySaveFile)
roomUpdated = false
}
}
}()
saveInLocal = GetRedisAddr() == nil

RoomKeys = &RoomKeysType{
localCache: cache.New(cache.NoExpiration, 0),
}

if saveInLocal {
return
}

redisCli = redis.NewClient(&redis.Options{
RoomKeys.redisCli = redis.NewClient(&redis.Options{
Addr: *GetRedisAddr(),
Password: *GetRedisPwd(),
DB: 0,
})

_, err := redisCli.Ping().Result()
_, err := RoomKeys.redisCli.Ping().Result()
if err != nil {
panic(err)
}

log.Printf("Redis connected")
}

type RoomKeysType struct {
mapChanKey sync.Map
mapKeyChan sync.Map
}

func LoadRoomKey(f string) *RoomKeysType {
result := &RoomKeysType{
mapChanKey: sync.Map{},
mapKeyChan: sync.Map{},
}
raw := map[string]string{}
content, err := ioutil.ReadFile(f)
if err != nil {
log.Printf("Failed to read file %s for room keys", f)
return result
}
if json.Unmarshal(content, &raw) != nil {
log.Printf("Failed to unmarshal file %s for room keys", f)
return result
}
for room, key := range raw {
result.mapChanKey.Store(room, key)
result.mapKeyChan.Store(key, room)
}
return result
}

func (r *RoomKeysType) Save(f string) {
raw := map[string]string{}
r.mapChanKey.Range(func(channel, key interface{}) bool {
raw[channel.(string)] = key.(string)
return true
})
content, err := json.Marshal(raw)
if err != nil {
log.Println("Failed to marshal room keys")
return
}
if ioutil.WriteFile(f, content, 0644) != nil {
log.Println("Failed to save room keys")
return
}
}

// set/reset a random key for channel
func (r *RoomKeysType) SetKey(channel string) (key string, err error) {
if !saveInFile {
if !saveInLocal {
for {
key = randStringRunes(48)
if _, err = redisCli.Get(key).Result(); err == redis.Nil {
err = redisCli.Set(channel, key, 0).Err()
if _, err = r.redisCli.Get(key).Result(); err == redis.Nil {
err = r.redisCli.Set(channel, key, 0).Err()
if err != nil {
return
}

err = redisCli.Set(key, channel, 0).Err()
err = r.redisCli.Set(key, channel, 0).Err()
return
} else if err != nil {
return
Expand All @@ -116,9 +66,9 @@ func (r *RoomKeysType) SetKey(channel string) (key string, err error) {

for {
key = randStringRunes(48)
if _, found := r.mapKeyChan.Load(key); !found {
r.mapChanKey.Store(channel, key)
r.mapKeyChan.Store(key, channel)
if _, found := r.localCache.Get(key); !found {
r.localCache.SetDefault(channel, key)
r.localCache.SetDefault(key, channel)
break
}
}
Expand All @@ -127,8 +77,8 @@ func (r *RoomKeysType) SetKey(channel string) (key string, err error) {
}

func (r *RoomKeysType) GetKey(channel string) (newKey string, err error) {
if !saveInFile {
if newKey, err = redisCli.Get(channel).Result(); err == redis.Nil {
if !saveInLocal {
if newKey, err = r.redisCli.Get(channel).Result(); err == redis.Nil {
newKey, err = r.SetKey(channel)
log.Printf("[KEY] new channel [%s]: %s", channel, newKey)
return
Expand All @@ -139,7 +89,7 @@ func (r *RoomKeysType) GetKey(channel string) (newKey string, err error) {

var key interface{}
var found bool
if key, found = r.mapChanKey.Load(channel); found {
if key, found = r.localCache.Get(channel); found {
return key.(string), nil
}
newKey, err = r.SetKey(channel)
Expand All @@ -148,11 +98,11 @@ func (r *RoomKeysType) GetKey(channel string) (newKey string, err error) {
}

func (r *RoomKeysType) GetChannel(key string) (channel string, err error) {
if !saveInFile {
return redisCli.Get(key).Result()
if !saveInLocal {
return r.redisCli.Get(key).Result()
}

chann, found := r.mapKeyChan.Load(key)
chann, found := r.localCache.Get(key)
if found {
return chann.(string), nil
} else {
Expand All @@ -161,28 +111,28 @@ func (r *RoomKeysType) GetChannel(key string) (channel string, err error) {
}

func (r *RoomKeysType) DeleteChannel(channel string) bool {
if !saveInFile {
return redisCli.Del(channel).Err() != nil
if !saveInLocal {
return r.redisCli.Del(channel).Err() != nil
}

key, ok := r.mapChanKey.Load(channel)
key, ok := r.localCache.Get(channel)
if ok {
r.mapChanKey.Delete(channel)
r.mapKeyChan.Delete(key)
r.localCache.Delete(channel)
r.localCache.Delete(key.(string))
return true
}
return false
}

func (r *RoomKeysType) DeleteKey(key string) bool {
if !saveInFile {
return redisCli.Del(key).Err() != nil
if !saveInLocal {
return r.redisCli.Del(key).Err() != nil
}

channel, ok := r.mapKeyChan.Load(key)
channel, ok := r.localCache.Get(key)
if ok {
r.mapChanKey.Delete(channel)
r.mapKeyChan.Delete(key)
r.localCache.Delete(channel.(string))
r.localCache.Delete(key)
return true
}
return false
Expand Down
26 changes: 7 additions & 19 deletions configure/liveconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ import (
}
*/
var (
roomKeySaveFile = flag.String("KeyFile", "room_keys.json", "path to save room keys")
RedisAddr = flag.String("redis_addr", "", "redis addr to save room keys ex. localhost:6379")
RedisPwd = flag.String("redis_pwd", "", "redis password")
redisAddr = flag.String("redis_addr", "", "redis addr to save room keys ex. localhost:6379")
redisPwd = flag.String("redis_pwd", "", "redis password")
)

type Application struct {
Expand All @@ -31,14 +30,11 @@ type Application struct {
Hlson string `json:"hlson"`
StaticPush []string `json:"static_push"`
}

type JWTCfg struct {
Secret string `json:"secret"`
Algorithm string `json:"algorithm"`
}

type ServerCfg struct {
KeyFile string `json:"key_file"`
RedisAddr string `json:"redis_addr"`
RedisPwd string `json:"redis_pwd"`
JWTCfg `json:"jwt"`
Expand Down Expand Up @@ -69,32 +65,24 @@ func LoadConfig(configfilename string) error {
return nil
}

func GetKeyFile() *string {
if len(RtmpServercfg.KeyFile) > 0 {
*roomKeySaveFile = RtmpServercfg.KeyFile
}

return roomKeySaveFile
}

func GetRedisAddr() *string {
if len(RtmpServercfg.RedisAddr) > 0 {
*RedisAddr = RtmpServercfg.RedisAddr
*redisAddr = RtmpServercfg.RedisAddr
}

if len(*RedisAddr) == 0 {
if len(*redisAddr) == 0 {
return nil
}

return RedisAddr
return redisAddr
}

func GetRedisPwd() *string {
if len(RtmpServercfg.RedisPwd) > 0 {
*RedisPwd = RtmpServercfg.RedisPwd
*redisPwd = RtmpServercfg.RedisPwd
}

return RedisPwd
return redisPwd
}

func CheckAppName(appname string) bool {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/go-redis/redis/v7 v7.2.0
github.com/gorilla/mux v1.7.4 // indirect
github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/satori/go.uuid v1.2.0
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/stretchr/testify v1.4.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6 h1:lNCW6THrCKBiJBpz8kbVGjC7MgdCGKwuvBgc7LoD6sw=
github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
Expand Down
12 changes: 6 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"time"

"livego/configure"
"livego/protocol/api"
"livego/protocol/hls"
"livego/protocol/httpflv"
"livego/protocol/httpopera"
"livego/protocol/rtmp"
)

Expand Down Expand Up @@ -89,20 +89,20 @@ func startHTTPFlv(stream *rtmp.RtmpStream) {
}()
}

func startHTTPOpera(stream *rtmp.RtmpStream) {
func startAPI(stream *rtmp.RtmpStream) {
if *operaAddr != "" {
opListen, err := net.Listen("tcp", *operaAddr)
if err != nil {
log.Fatal(err)
}
opServer := httpopera.NewServer(stream, *rtmpAddr)
opServer := api.NewServer(stream, *rtmpAddr)
go func() {
defer func() {
if r := recover(); r != nil {
log.Println("HTTP-Operation server panic: ", r)
log.Println("HTTP-API server panic: ", r)
}
}()
log.Println("HTTP-Operation listen On", *operaAddr)
log.Println("HTTP-API listen On", *operaAddr)
opServer.Serve(opListen)
}()
}
Expand All @@ -124,7 +124,7 @@ func main() {
stream := rtmp.NewRtmpStream()
hlsServer := startHls()
startHTTPFlv(stream)
startHTTPOpera(stream)
startAPI(stream)

startRtmp(stream, hlsServer)
//startRtmp(stream, nil)
Expand Down
Loading

0 comments on commit d875076

Please sign in to comment.