diff --git a/.gitignore b/.gitignore index 9836eec0..e35af1e5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,6 @@ # Created by .ignore support plugin (hsz.mobi) .idea dist -room_keys.json .vscode .tmp -vendor \ No newline at end of file +vendor diff --git a/Dockerfile b/Dockerfile index 37aa7d11..545ad95b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,7 +6,6 @@ COPY . . RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o livego . FROM alpine:latest -LABEL maintainer="Ruben Cid Lara " RUN mkdir -p /app/config WORKDIR /app ENV RTMP_PORT 1935 diff --git a/configure/channel.go b/configure/channel.go index 38eb30a5..6781b3f2 100755 --- a/configure/channel.go +++ b/configure/channel.go @@ -1,49 +1,43 @@ package configure import ( - "encoding/json" "fmt" - "io/ioutil" "log" "math/rand" - "sync" - "time" "github.com/go-redis/redis/v7" + "github.com/patrickmn/go-cache" ) -var RoomKeys = LoadRoomKey(*GetKeyFile()) +var RoomKeys *RoomKeysType var roomUpdated = false -var saveInFile = true -var redisCli *redis.Client +var saveInLocal = true + +type RoomKeysType struct { + redisCli *redis.Client + localCache *cache.Cache +} func Init() { - saveInFile = GetRedisAddr() == nil - - rand.Seed(time.Now().UnixNano()) - if saveInFile { - go func() { - for { - time.Sleep(15 * time.Second) - if roomUpdated { - RoomKeys.Save(*roomKeySaveFile) - roomUpdated = false - } - } - }() + saveInLocal = GetRedisAddr() == nil + + RoomKeys = &RoomKeysType{ + localCache: cache.New(cache.NoExpiration, 0), + } + if saveInLocal { return } - redisCli = redis.NewClient(&redis.Options{ + RoomKeys.redisCli = redis.NewClient(&redis.Options{ Addr: *GetRedisAddr(), Password: *GetRedisPwd(), DB: 0, }) - _, err := redisCli.Ping().Result() + _, err := RoomKeys.redisCli.Ping().Result() if err != nil { panic(err) } @@ -51,62 +45,18 @@ func Init() { log.Printf("Redis connected") } -type RoomKeysType struct { - mapChanKey sync.Map - mapKeyChan sync.Map -} - -func LoadRoomKey(f string) *RoomKeysType { - result := &RoomKeysType{ - mapChanKey: sync.Map{}, - mapKeyChan: sync.Map{}, - } - raw := map[string]string{} - content, err := ioutil.ReadFile(f) - if err != nil { - log.Printf("Failed to read file %s for room keys", f) - return result - } - if json.Unmarshal(content, &raw) != nil { - log.Printf("Failed to unmarshal file %s for room keys", f) - return result - } - for room, key := range raw { - result.mapChanKey.Store(room, key) - result.mapKeyChan.Store(key, room) - } - return result -} - -func (r *RoomKeysType) Save(f string) { - raw := map[string]string{} - r.mapChanKey.Range(func(channel, key interface{}) bool { - raw[channel.(string)] = key.(string) - return true - }) - content, err := json.Marshal(raw) - if err != nil { - log.Println("Failed to marshal room keys") - return - } - if ioutil.WriteFile(f, content, 0644) != nil { - log.Println("Failed to save room keys") - return - } -} - // set/reset a random key for channel func (r *RoomKeysType) SetKey(channel string) (key string, err error) { - if !saveInFile { + if !saveInLocal { for { key = randStringRunes(48) - if _, err = redisCli.Get(key).Result(); err == redis.Nil { - err = redisCli.Set(channel, key, 0).Err() + if _, err = r.redisCli.Get(key).Result(); err == redis.Nil { + err = r.redisCli.Set(channel, key, 0).Err() if err != nil { return } - err = redisCli.Set(key, channel, 0).Err() + err = r.redisCli.Set(key, channel, 0).Err() return } else if err != nil { return @@ -116,9 +66,9 @@ func (r *RoomKeysType) SetKey(channel string) (key string, err error) { for { key = randStringRunes(48) - if _, found := r.mapKeyChan.Load(key); !found { - r.mapChanKey.Store(channel, key) - r.mapKeyChan.Store(key, channel) + if _, found := r.localCache.Get(key); !found { + r.localCache.SetDefault(channel, key) + r.localCache.SetDefault(key, channel) break } } @@ -127,8 +77,8 @@ func (r *RoomKeysType) SetKey(channel string) (key string, err error) { } func (r *RoomKeysType) GetKey(channel string) (newKey string, err error) { - if !saveInFile { - if newKey, err = redisCli.Get(channel).Result(); err == redis.Nil { + if !saveInLocal { + if newKey, err = r.redisCli.Get(channel).Result(); err == redis.Nil { newKey, err = r.SetKey(channel) log.Printf("[KEY] new channel [%s]: %s", channel, newKey) return @@ -139,7 +89,7 @@ func (r *RoomKeysType) GetKey(channel string) (newKey string, err error) { var key interface{} var found bool - if key, found = r.mapChanKey.Load(channel); found { + if key, found = r.localCache.Get(channel); found { return key.(string), nil } newKey, err = r.SetKey(channel) @@ -148,11 +98,11 @@ func (r *RoomKeysType) GetKey(channel string) (newKey string, err error) { } func (r *RoomKeysType) GetChannel(key string) (channel string, err error) { - if !saveInFile { - return redisCli.Get(key).Result() + if !saveInLocal { + return r.redisCli.Get(key).Result() } - chann, found := r.mapKeyChan.Load(key) + chann, found := r.localCache.Get(key) if found { return chann.(string), nil } else { @@ -161,28 +111,28 @@ func (r *RoomKeysType) GetChannel(key string) (channel string, err error) { } func (r *RoomKeysType) DeleteChannel(channel string) bool { - if !saveInFile { - return redisCli.Del(channel).Err() != nil + if !saveInLocal { + return r.redisCli.Del(channel).Err() != nil } - key, ok := r.mapChanKey.Load(channel) + key, ok := r.localCache.Get(channel) if ok { - r.mapChanKey.Delete(channel) - r.mapKeyChan.Delete(key) + r.localCache.Delete(channel) + r.localCache.Delete(key.(string)) return true } return false } func (r *RoomKeysType) DeleteKey(key string) bool { - if !saveInFile { - return redisCli.Del(key).Err() != nil + if !saveInLocal { + return r.redisCli.Del(key).Err() != nil } - channel, ok := r.mapKeyChan.Load(key) + channel, ok := r.localCache.Get(key) if ok { - r.mapChanKey.Delete(channel) - r.mapKeyChan.Delete(key) + r.localCache.Delete(channel.(string)) + r.localCache.Delete(key) return true } return false diff --git a/configure/liveconfig.go b/configure/liveconfig.go index af5350cb..6bdcc225 100644 --- a/configure/liveconfig.go +++ b/configure/liveconfig.go @@ -20,9 +20,8 @@ import ( } */ var ( - roomKeySaveFile = flag.String("KeyFile", "room_keys.json", "path to save room keys") - RedisAddr = flag.String("redis_addr", "", "redis addr to save room keys ex. localhost:6379") - RedisPwd = flag.String("redis_pwd", "", "redis password") + redisAddr = flag.String("redis_addr", "", "redis addr to save room keys ex. localhost:6379") + redisPwd = flag.String("redis_pwd", "", "redis password") ) type Application struct { @@ -31,14 +30,11 @@ type Application struct { Hlson string `json:"hlson"` StaticPush []string `json:"static_push"` } - type JWTCfg struct { Secret string `json:"secret"` Algorithm string `json:"algorithm"` } - type ServerCfg struct { - KeyFile string `json:"key_file"` RedisAddr string `json:"redis_addr"` RedisPwd string `json:"redis_pwd"` JWTCfg `json:"jwt"` @@ -69,32 +65,24 @@ func LoadConfig(configfilename string) error { return nil } -func GetKeyFile() *string { - if len(RtmpServercfg.KeyFile) > 0 { - *roomKeySaveFile = RtmpServercfg.KeyFile - } - - return roomKeySaveFile -} - func GetRedisAddr() *string { if len(RtmpServercfg.RedisAddr) > 0 { - *RedisAddr = RtmpServercfg.RedisAddr + *redisAddr = RtmpServercfg.RedisAddr } - if len(*RedisAddr) == 0 { + if len(*redisAddr) == 0 { return nil } - return RedisAddr + return redisAddr } func GetRedisPwd() *string { if len(RtmpServercfg.RedisPwd) > 0 { - *RedisPwd = RtmpServercfg.RedisPwd + *redisPwd = RtmpServercfg.RedisPwd } - return RedisPwd + return redisPwd } func CheckAppName(appname string) bool { diff --git a/go.mod b/go.mod index 5294b52a..ef38e3b8 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/go-redis/redis/v7 v7.2.0 github.com/gorilla/mux v1.7.4 // indirect github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6 + github.com/patrickmn/go-cache v2.1.0+incompatible github.com/satori/go.uuid v1.2.0 github.com/smartystreets/goconvey v1.6.4 // indirect github.com/stretchr/testify v1.4.0 diff --git a/go.sum b/go.sum index 000ad9eb..1c05e47a 100644 --- a/go.sum +++ b/go.sum @@ -31,6 +31,8 @@ github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6 h1:lNCW6THrCKBiJBpz8kbVGjC7MgdCGKwuvBgc7LoD6sw= github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= diff --git a/main.go b/main.go index 09575bc8..52c61dee 100755 --- a/main.go +++ b/main.go @@ -7,9 +7,9 @@ import ( "time" "livego/configure" + "livego/protocol/api" "livego/protocol/hls" "livego/protocol/httpflv" - "livego/protocol/httpopera" "livego/protocol/rtmp" ) @@ -89,20 +89,20 @@ func startHTTPFlv(stream *rtmp.RtmpStream) { }() } -func startHTTPOpera(stream *rtmp.RtmpStream) { +func startAPI(stream *rtmp.RtmpStream) { if *operaAddr != "" { opListen, err := net.Listen("tcp", *operaAddr) if err != nil { log.Fatal(err) } - opServer := httpopera.NewServer(stream, *rtmpAddr) + opServer := api.NewServer(stream, *rtmpAddr) go func() { defer func() { if r := recover(); r != nil { - log.Println("HTTP-Operation server panic: ", r) + log.Println("HTTP-API server panic: ", r) } }() - log.Println("HTTP-Operation listen On", *operaAddr) + log.Println("HTTP-API listen On", *operaAddr) opServer.Serve(opListen) }() } @@ -124,7 +124,7 @@ func main() { stream := rtmp.NewRtmpStream() hlsServer := startHls() startHTTPFlv(stream) - startHTTPOpera(stream) + startAPI(stream) startRtmp(stream, hlsServer) //startRtmp(stream, nil) diff --git a/protocol/httpopera/http_opera.go b/protocol/api/api.go similarity index 96% rename from protocol/httpopera/http_opera.go rename to protocol/api/api.go index 584f0e14..892861b1 100755 --- a/protocol/httpopera/http_opera.go +++ b/protocol/api/api.go @@ -1,4 +1,4 @@ -package httpopera +package api import ( "encoding/json" @@ -327,14 +327,14 @@ func (s *Server) handleReset(w http.ResponseWriter, r *http.Request) { if err := r.ParseForm(); err != nil { res.Status = 400 - res.Data = "url: /control/reset?room=ROOM_NAME" + res.Data = "url: /control/reset?room=" return } room := r.Form.Get("room") if len(room) == 0 { res.Status = 400 - res.Data = "url: /control/get?room=ROOM_NAME" + res.Data = "url: /control/get?room=" return } @@ -359,7 +359,7 @@ func (s *Server) handleGet(w http.ResponseWriter, r *http.Request) { if err := r.ParseForm(); err != nil { res.Status = 400 - res.Data = "url: /control/get?room=ROOM_NAME" + res.Data = "url: /control/get?room=" return } @@ -367,7 +367,7 @@ func (s *Server) handleGet(w http.ResponseWriter, r *http.Request) { if len(room) == 0 { res.Status = 400 - res.Data = "url: /control/get?room=ROOM_NAME" + res.Data = "url: /control/get?room=" return } @@ -390,7 +390,7 @@ func (s *Server) handleDelete(w http.ResponseWriter, r *http.Request) { if err := r.ParseForm(); err != nil { res.Status = 400 - res.Data = "url: /control/delete?room=ROOM_NAME" + res.Data = "url: /control/delete?room=" return } @@ -398,7 +398,7 @@ func (s *Server) handleDelete(w http.ResponseWriter, r *http.Request) { if len(room) == 0 { res.Status = 400 - res.Data = "url: /control/get?room=ROOM_NAME" + res.Data = "url: /control/get?room=" return } @@ -407,5 +407,5 @@ func (s *Server) handleDelete(w http.ResponseWriter, r *http.Request) { return } res.Status = 404 - res.Data = "Room not found" + res.Data = "room not found" }