diff --git a/.gitignore b/.gitignore index 9836eec0..3e26a3b7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,7 @@ # Created by .ignore support plugin (hsz.mobi) .idea dist -room_keys.json .vscode .tmp -vendor \ No newline at end of file +vendor +livego diff --git a/CHANGELOG.md b/CHANGELOG.md index b89bd581..460529fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,36 +9,44 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - JSON Web Token support. ``` json - // .livego.json + // livego.json { "jwt": { "secret": "testing", - "algorithm": "HS256s" + "algorithm": "HS256" }, "server": [ { "appname": "live", - "liveon": "on", - "hlson": "on" + "live": true, + "hls": true } ] } ``` - Use redis for store room keys ``` json - // .livego.json + // livego.json { "redis_addr": "localhost:6379", "server": [ { "appname": "live", - "liveon": "on", - "hlson": "on" + "live": true, + "hls": true } ] } ``` +- Makefile ### Changed - Show `players`. -- Show `stream_id`. \ No newline at end of file +- Show `stream_id`. +- Deleted keys saved in physical file, now the keys are in cached using `go-cache` by default. +- Using `logrus` like log system. +- Using method `.Get(queryParamName)` to get an url query param. +- Replaced `errors.New(...)` to `fmt.Errorf(...)`. +- Replaced types string on config params `liveon` and `hlson` to booleans `live: true/false` and `hls: true/false` +- Using viper for config, allow use file, cloud providers, environment vars or flags. +- Using yaml config by default. diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..7266291b --- /dev/null +++ b/Makefile @@ -0,0 +1,36 @@ +GOCMD ?= go +GOBUILD = $(GOCMD) build +GOCLEAN = $(GOCMD) clean +GOTEST = $(GOCMD) test +GOGET = $(GOCMD) get +BINARY_NAME = livego +BINARY_UNIX = $(BINARY_NAME)_unix + +DOCKER_ACC ?= gwuhaolin +DOCKER_REPO ?= livego + +TAG ?= $(shell git describe --tags --abbrev=0 2>/dev/null) + +default: all + +all: test build dockerize +build: + $(GOBUILD) -o $(BINARY_NAME) -v -ldflags="-X main.VERSION=$(TAG)" + +test: + $(GOTEST) -v ./... + +clean: + $(GOCLEAN) + rm -f $(BINARY_NAME) + rm -f $(BINARY_UNIX) + +run: build + ./$(BINARY_NAME) + +build-linux: + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 $(GOBUILD) -o $(BINARY_UNIX) -v + +dockerize: + docker build -t $(DOCKER_ACC)/$(DOCKER_REPO):$(TAG) . + docker push $(DOCKER_ACC)/$(DOCKER_REPO):$(TAG) diff --git a/README.md b/README.md index 0b07b6c4..15cdfc25 100644 --- a/README.md +++ b/README.md @@ -30,10 +30,24 @@ Run `docker run -p 1935:1935 -p 7001:7001 -p 7002:7002 -d --name livego gwuhaoli #### Compile from source 1. Download the source code `git clone https://github.com/gwuhaolin/livego.git` -2. Go to the livego directory and execute `go build` +2. Go to the livego directory and execute `go build` or `make build` ## Use -2. Start the service: execute the livego binary file to start the livego service; +```bash +./livego -h +Usage of ./livego: + --api_addr string HTTP manage interface server listen address (default ":8090") + --config_file string configure filename (default "livego.yaml") + --flv_dir string output flv file at flvDir/APP/KEY_TIME.flv (default "tmp") + --gop_num int gop num (default 1) + --hls_addr string HLS server listen address (default ":7002") + --httpflv_addr string HTTP-FLV server listen address (default ":7001") + --level string Log level (default "info") + --read_timeout int read time out (default 10) + --rtmp_addr string RTMP server listen address (default ":1935") + --write_timeout int write time out (default 10) +``` +2. Start the service: execute the livego binary file or `make run` to start the livego service; 3. Upstream push: Push the video stream to `rtmp://localhost:1935/live/movie` through the` RTMP` protocol, for example, use `ffmpeg -re -i demo.flv -c copy -f flv rtmp://localhost:1935/live/movie` push; 4. Downstream playback: The following three playback protocols are supported, and the playback address is as follows: -`RTMP`:`rtmp://localhost:1935/live/movie` diff --git a/configure/channel.go b/configure/channel.go index 38eb30a5..270c52aa 100755 --- a/configure/channel.go +++ b/configure/channel.go @@ -1,112 +1,58 @@ package configure import ( - "encoding/json" "fmt" - "io/ioutil" - "log" - "math/rand" - "sync" - "time" + + "livego/utils/uid" "github.com/go-redis/redis/v7" + "github.com/patrickmn/go-cache" + log "github.com/sirupsen/logrus" ) -var RoomKeys = LoadRoomKey(*GetKeyFile()) +type RoomKeysType struct { + redisCli *redis.Client + localCache *cache.Cache +} -var roomUpdated = false +var RoomKeys = &RoomKeysType{ + localCache: cache.New(cache.NoExpiration, 0), +} -var saveInFile = true -var redisCli *redis.Client +var saveInLocal = true func Init() { - saveInFile = GetRedisAddr() == nil - - rand.Seed(time.Now().UnixNano()) - if saveInFile { - go func() { - for { - time.Sleep(15 * time.Second) - if roomUpdated { - RoomKeys.Save(*roomKeySaveFile) - roomUpdated = false - } - } - }() - + saveInLocal = len(Config.GetString("redis_addr")) == 0 + if saveInLocal { return } - redisCli = redis.NewClient(&redis.Options{ - Addr: *GetRedisAddr(), - Password: *GetRedisPwd(), + RoomKeys.redisCli = redis.NewClient(&redis.Options{ + Addr: Config.GetString("redis_addr"), + Password: Config.GetString("redis_pwd"), DB: 0, }) - _, err := redisCli.Ping().Result() - if err != nil { - panic(err) - } - - log.Printf("Redis connected") -} - -type RoomKeysType struct { - mapChanKey sync.Map - mapKeyChan sync.Map -} - -func LoadRoomKey(f string) *RoomKeysType { - result := &RoomKeysType{ - mapChanKey: sync.Map{}, - mapKeyChan: sync.Map{}, - } - raw := map[string]string{} - content, err := ioutil.ReadFile(f) + _, err := RoomKeys.redisCli.Ping().Result() if err != nil { - log.Printf("Failed to read file %s for room keys", f) - return result - } - if json.Unmarshal(content, &raw) != nil { - log.Printf("Failed to unmarshal file %s for room keys", f) - return result + log.Panic("Redis: ", err) } - for room, key := range raw { - result.mapChanKey.Store(room, key) - result.mapKeyChan.Store(key, room) - } - return result -} -func (r *RoomKeysType) Save(f string) { - raw := map[string]string{} - r.mapChanKey.Range(func(channel, key interface{}) bool { - raw[channel.(string)] = key.(string) - return true - }) - content, err := json.Marshal(raw) - if err != nil { - log.Println("Failed to marshal room keys") - return - } - if ioutil.WriteFile(f, content, 0644) != nil { - log.Println("Failed to save room keys") - return - } + log.Info("Redis connected") } // set/reset a random key for channel func (r *RoomKeysType) SetKey(channel string) (key string, err error) { - if !saveInFile { + if !saveInLocal { for { - key = randStringRunes(48) - if _, err = redisCli.Get(key).Result(); err == redis.Nil { - err = redisCli.Set(channel, key, 0).Err() + key = uid.RandStringRunes(48) + if _, err = r.redisCli.Get(key).Result(); err == redis.Nil { + err = r.redisCli.Set(channel, key, 0).Err() if err != nil { return } - err = redisCli.Set(key, channel, 0).Err() + err = r.redisCli.Set(key, channel, 0).Err() return } else if err != nil { return @@ -115,22 +61,21 @@ func (r *RoomKeysType) SetKey(channel string) (key string, err error) { } for { - key = randStringRunes(48) - if _, found := r.mapKeyChan.Load(key); !found { - r.mapChanKey.Store(channel, key) - r.mapKeyChan.Store(key, channel) + key = uid.RandStringRunes(48) + if _, found := r.localCache.Get(key); !found { + r.localCache.SetDefault(channel, key) + r.localCache.SetDefault(key, channel) break } } - roomUpdated = true return } func (r *RoomKeysType) GetKey(channel string) (newKey string, err error) { - if !saveInFile { - if newKey, err = redisCli.Get(channel).Result(); err == redis.Nil { + if !saveInLocal { + if newKey, err = r.redisCli.Get(channel).Result(); err == redis.Nil { newKey, err = r.SetKey(channel) - log.Printf("[KEY] new channel [%s]: %s", channel, newKey) + log.Debugf("[KEY] new channel [%s]: %s", channel, newKey) return } @@ -139,20 +84,20 @@ func (r *RoomKeysType) GetKey(channel string) (newKey string, err error) { var key interface{} var found bool - if key, found = r.mapChanKey.Load(channel); found { + if key, found = r.localCache.Get(channel); found { return key.(string), nil } newKey, err = r.SetKey(channel) - log.Printf("[KEY] new channel [%s]: %s", channel, newKey) + log.Debugf("[KEY] new channel [%s]: %s", channel, newKey) return } func (r *RoomKeysType) GetChannel(key string) (channel string, err error) { - if !saveInFile { - return redisCli.Get(key).Result() + if !saveInLocal { + return r.redisCli.Get(key).Result() } - chann, found := r.mapKeyChan.Load(key) + chann, found := r.localCache.Get(key) if found { return chann.(string), nil } else { @@ -161,40 +106,29 @@ func (r *RoomKeysType) GetChannel(key string) (channel string, err error) { } func (r *RoomKeysType) DeleteChannel(channel string) bool { - if !saveInFile { - return redisCli.Del(channel).Err() != nil + if !saveInLocal { + return r.redisCli.Del(channel).Err() != nil } - key, ok := r.mapChanKey.Load(channel) + key, ok := r.localCache.Get(channel) if ok { - r.mapChanKey.Delete(channel) - r.mapKeyChan.Delete(key) + r.localCache.Delete(channel) + r.localCache.Delete(key.(string)) return true } return false } func (r *RoomKeysType) DeleteKey(key string) bool { - if !saveInFile { - return redisCli.Del(key).Err() != nil + if !saveInLocal { + return r.redisCli.Del(key).Err() != nil } - channel, ok := r.mapKeyChan.Load(key) + channel, ok := r.localCache.Get(key) if ok { - r.mapChanKey.Delete(channel) - r.mapKeyChan.Delete(key) + r.localCache.Delete(channel.(string)) + r.localCache.Delete(key) return true } return false } - -// helpers -var letterRunes = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") - -func randStringRunes(n int) string { - b := make([]rune, n) - for i := range b { - b[i] = letterRunes[rand.Intn(len(letterRunes))] - } - return string(b) -} diff --git a/configure/liveconfig.go b/configure/liveconfig.go index eeb29f9e..de2e49bb 100644 --- a/configure/liveconfig.go +++ b/configure/liveconfig.go @@ -1,10 +1,14 @@ package configure import ( + "bytes" "encoding/json" - "flag" - "io/ioutil" - "log" + "strings" + + "github.com/kr/pretty" + log "github.com/sirupsen/logrus" + "github.com/spf13/pflag" + "github.com/spf13/viper" ) /* @@ -12,112 +16,138 @@ import ( "server": [ { "appname": "live", - "liveon": "on", - "hlson": "on", + "live": true, + "hls": true, "static_push": [] } ] } */ -var ( - roomKeySaveFile = flag.String("KeyFile", "room_keys.json", "path to save room keys") - RedisAddr = flag.String("redis_addr", "", "redis addr to save room keys ex. localhost:6379") - RedisPwd = flag.String("redis_pwd", "", "redis password") -) type Application struct { - Appname string `json:"appname"` - Liveon string `json:"liveon"` - Hlson string `json:"hlson"` - StaticPush []string `json:"static_push"` + Appname string `mapstructure:"appname"` + Live bool `mapstructure:"live"` + Hls bool `mapstructure:"hls"` + StaticPush []string `mapstructure:"static_push"` } -type JWTCfg struct { - Secret string `json:"secret"` - Algorithm string `json:"algorithm"` -} +type Applications []Application +type JWT struct { + Secret string `mapstructure:"secret"` + Algorithm string `mapstructure:"algorithm"` +} type ServerCfg struct { - KeyFile string `json:"key_file"` - RedisAddr string `json:"redis_addr"` - RedisPwd string `json:"redis_pwd"` - JWTCfg `json:"jwt"` - Server []Application `json:"server"` + Level string `mapstructure:"level"` + ConfigFile string `mapstructure:"config_file"` + FLVDir string `mapstructure:"flv_dir"` + RTMPAddr string `mapstructure:"rtmp_addr"` + HTTPFLVAddr string `mapstructure:"httpflv_addr"` + HLSAddr string `mapstructure:"hls_addr"` + APIAddr string `mapstructure:"api_addr"` + RedisAddr string `mapstructure:"redis_addr"` + RedisPwd string `mapstructure:"redis_pwd"` + ReadTimeout int `mapstructure:"read_timeout"` + WriteTimeout int `mapstructure:"write_timeout"` + GopNum int `mapstructure:"gop_num"` + JWT JWT `mapstructure:"jwt"` + Server []Application `mapstructure:"server"` } // default config -var RtmpServercfg = ServerCfg{ +var defaultConf = ServerCfg{ + ConfigFile: "livego.yaml", + RTMPAddr: ":1935", + HTTPFLVAddr: ":7001", + HLSAddr: ":7002", + APIAddr: ":8090", + WriteTimeout: 10, + ReadTimeout: 10, + GopNum: 1, Server: []Application{{ Appname: "livego", - Liveon: "on", - Hlson: "on", + Live: true, + Hls: true, StaticPush: nil, }}, } -func LoadConfig(configfilename string) { - log.Printf("starting load configure file %s", configfilename) - data, err := ioutil.ReadFile(configfilename) - if err != nil { - log.Printf("ReadFile %s error:%v", configfilename, err) - } - - err = json.Unmarshal(data, &RtmpServercfg) - if err != nil { - log.Printf("json.Unmarshal error:%v", err) - } - log.Printf("get config json data:%v", RtmpServercfg) - - Init() -} +var Config = viper.New() -func GetKeyFile() *string { - if len(RtmpServercfg.KeyFile) > 0 { - *roomKeySaveFile = RtmpServercfg.KeyFile +func initLog() { + if l, err := log.ParseLevel(Config.GetString("level")); err == nil { + log.SetLevel(l) + log.SetReportCaller(l == log.DebugLevel) } - - return roomKeySaveFile } -func GetRedisAddr() *string { - if len(RtmpServercfg.RedisAddr) > 0 { - *RedisAddr = RtmpServercfg.RedisAddr - } - - if len(*RedisAddr) == 0 { - return nil +func LoadConfig() { + defer Init() + + // Default config + b, _ := json.Marshal(defaultConf) + defaultConfig := bytes.NewReader(b) + Config.MergeConfig(defaultConfig) + + // Flags + pflag.String("rtmp_addr", ":1935", "RTMP server listen address") + pflag.String("httpflv_addr", ":7001", "HTTP-FLV server listen address") + pflag.String("hls_addr", ":7002", "HLS server listen address") + pflag.String("api_addr", ":8090", "HTTP manage interface server listen address") + pflag.String("config_file", "livego.yaml", "configure filename") + pflag.String("level", "info", "Log level") + pflag.String("flv_dir", "tmp", "output flv file at flvDir/APP/KEY_TIME.flv") + pflag.Int("read_timeout", 10, "read time out") + pflag.Int("write_timeout", 10, "write time out") + pflag.Int("gop_num", 1, "gop num") + pflag.Parse() + Config.BindPFlags(pflag.CommandLine) + + // File + Config.SetConfigFile(Config.GetString("config_file")) + Config.AddConfigPath(".") + err := Config.ReadInConfig() + if err != nil { + log.Error(err) + log.Info("Using default config") } - return RedisAddr -} + // Environment + replacer := strings.NewReplacer(".", "_") + Config.SetEnvKeyReplacer(replacer) + Config.AllowEmptyEnv(true) + Config.AutomaticEnv() -func GetRedisPwd() *string { - if len(RtmpServercfg.RedisPwd) > 0 { - *RedisPwd = RtmpServercfg.RedisPwd - } + // Log + initLog() - return RedisPwd + c := ServerCfg{} + Config.Unmarshal(&c) + log.Debugf("Current configurations: \n%# v", pretty.Formatter(c)) } func CheckAppName(appname string) bool { - for _, app := range RtmpServercfg.Server { - if (app.Appname == appname) && (app.Liveon == "on") { - return true + apps := Applications{} + Config.UnmarshalKey("server", &apps) + for _, app := range apps { + if app.Appname == appname { + return app.Live } } return false } func GetStaticPushUrlList(appname string) ([]string, bool) { - for _, app := range RtmpServercfg.Server { - if (app.Appname == appname) && (app.Liveon == "on") { + apps := Applications{} + Config.UnmarshalKey("server", &apps) + for _, app := range apps { + if (app.Appname == appname) && app.Live { if len(app.StaticPush) > 0 { return app.StaticPush, true } else { return nil, false } } - } return nil, false } diff --git a/container/flv/demuxer.go b/container/flv/demuxer.go index bfcc7842..896d39ad 100755 --- a/container/flv/demuxer.go +++ b/container/flv/demuxer.go @@ -1,13 +1,12 @@ package flv import ( - "errors" - + "fmt" "livego/av" ) var ( - ErrAvcEndSEQ = errors.New("avc end sequence") + ErrAvcEndSEQ = fmt.Errorf("avc end sequence") ) type Demuxer struct { diff --git a/container/flv/muxer.go b/container/flv/muxer.go index a7eaa884..f7ef137d 100755 --- a/container/flv/muxer.go +++ b/container/flv/muxer.go @@ -1,23 +1,23 @@ package flv import ( - "flag" "fmt" - "log" "os" "path" "strings" "time" "livego/av" + "livego/configure" "livego/protocol/amf" "livego/utils/pio" "livego/utils/uid" + + log "github.com/sirupsen/logrus" ) var ( flvHeader = []byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x00, 0x00, 0x00, 0x09} - flvDir = flag.String("flvDir", "tmp", "output flv file at flvDir/APP/KEY_TIME.flv") ) /* @@ -25,13 +25,13 @@ func NewFlv(handler av.Handler, info av.Info) { patths := strings.SplitN(info.Key, "/", 2) if len(patths) != 2 { - log.Println("invalid info") + log.Warning("invalid info") return } w, err := os.OpenFile(*flvFile, os.O_CREATE|os.O_RDWR, 0755) if err != nil { - log.Println("open file error: ", err) + log.Error("open file error: ", err) } writer := NewFLVWriter(patths[0], patths[1], info.URL, w) @@ -40,7 +40,7 @@ func NewFlv(handler av.Handler, info av.Info) { writer.Wait() // close flv file - log.Println("close flv file") + log.Debug("close flv file") writer.ctx.Close() } */ @@ -147,25 +147,27 @@ type FlvDvr struct{} func (f *FlvDvr) GetWriter(info av.Info) av.WriteCloser { paths := strings.SplitN(info.Key, "/", 2) if len(paths) != 2 { - log.Println("invalid info") + log.Warning("invalid info") return nil } - err := os.MkdirAll(path.Join(*flvDir, paths[0]), 0755) + flvDir := configure.Config.GetString("flv_dir") + + err := os.MkdirAll(path.Join(flvDir, paths[0]), 0755) if err != nil { - log.Println("mkdir error:", err) + log.Error("mkdir error: ", err) return nil } - fileName := fmt.Sprintf("%s_%d.%s", path.Join(*flvDir, info.Key), time.Now().Unix(), "flv") - log.Println("flv dvr save stream to: ", fileName) + fileName := fmt.Sprintf("%s_%d.%s", path.Join(flvDir, info.Key), time.Now().Unix(), "flv") + log.Debug("flv dvr save stream to: ", fileName) w, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0755) if err != nil { - log.Println("open file error: ", err) + log.Error("open file error: ", err) return nil } writer := NewFLVWriter(paths[0], paths[1], info.URL, w) - log.Println("new flv dvr: ", writer.Info()) + log.Debug("new flv dvr: ", writer.Info()) return writer } diff --git a/go.mod b/go.mod index 5294b52a..e1cf69ea 100644 --- a/go.mod +++ b/go.mod @@ -7,9 +7,13 @@ require ( github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/go-redis/redis/v7 v7.2.0 github.com/gorilla/mux v1.7.4 // indirect + github.com/kr/pretty v0.1.0 github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6 + github.com/patrickmn/go-cache v2.1.0+incompatible github.com/satori/go.uuid v1.2.0 - github.com/smartystreets/goconvey v1.6.4 // indirect + github.com/sirupsen/logrus v1.5.0 + github.com/spf13/pflag v1.0.3 + github.com/spf13/viper v1.6.3 github.com/stretchr/testify v1.4.0 github.com/urfave/negroni v1.0.0 // indirect ) diff --git a/go.sum b/go.sum index 000ad9eb..a4a636cf 100644 --- a/go.sum +++ b/go.sum @@ -1,29 +1,85 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/auth0/go-jwt-middleware v0.0.0-20190805220309-36081240882b h1:CvoEHGmxWl5kONC5icxwqV899dkf4VjOScbxLpllEnw= github.com/auth0/go-jwt-middleware v0.0.0-20190805220309-36081240882b/go.mod h1:LWMyo4iOLWXHGdBki7NIht1kHru/0wM179h+d3g8ATM= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= +github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= +github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-redis/redis/v7 v7.2.0 h1:CrCexy/jYWZjW0AyVoHlcJUeZN19VWlbepTh1Vq6dJs= github.com/go-redis/redis/v7 v7.2.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc= github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= +github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= +github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= +github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= +github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= +github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= +github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo= github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -31,42 +87,114 @@ github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6 h1:lNCW6THrCKBiJBpz8kbVGjC7MgdCGKwuvBgc7LoD6sw= github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.5.0 h1:1N5EYkVAPEywqZRJd7cwnRtCb6xJx7NH3T3WUTF980Q= +github.com/sirupsen/logrus v1.5.0/go.mod h1:+F7Ogzej0PZc/94MaYx/nvG9jOFMD2osvC3s+Squfpo= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI= +github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8= +github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/viper v1.6.3 h1:pDDu1OyEDTKzpJwdq4TiuLyMsUgRa/BT5cn5O62NoHs= +github.com/spf13/viper v1.6.3/go.mod h1:jUMtyi0/lB5yZH/FjyGAoH7IMNrIhlBf6pXZmbMDvzw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= +github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= +github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/urfave/negroni v1.0.0 h1:kIimOitoypq34K7TG7DUaJ9kq/N4Ofuwi1sjz0KipXc= github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= +github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= +github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g= golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191010194322-b09406accb47 h1:/XfQ9z7ib8eEJX2hdgFTZJ/ntt0swNk5oYBziWeTCvY= golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno= +gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/livego.json b/livego.json deleted file mode 100644 index 5a6e195a..00000000 --- a/livego.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "server": [ - { - "appname": "livego", - "liveon": "on", - "hlson": "on" - } - ] -} diff --git a/livego.yaml b/livego.yaml new file mode 100644 index 00000000..8a034de3 --- /dev/null +++ b/livego.yaml @@ -0,0 +1,22 @@ +# # Logger level +# level: info + +# # FLV Options +# flv_dir: "./tmp" +# httpflv_addr: ":7001" + +# # RTMP Options +# rtmp_addr: ":1935" +# read_timeout: 10 +# write_timeout: 10 + +# # HLS Options +# hls_addr: ":7002" + +# # API Options +# api_addr: ":8090" + +server: +- appname: livego + live: true + hls: true diff --git a/main.go b/main.go index 8cf1790a..e3a7cb22 100755 --- a/main.go +++ b/main.go @@ -1,34 +1,25 @@ package main import ( - "flag" - "log" - "net" - "time" - + "fmt" "livego/configure" + "livego/protocol/api" "livego/protocol/hls" "livego/protocol/httpflv" - "livego/protocol/httpopera" "livego/protocol/rtmp" -) + "net" + "path" + "runtime" + "time" -var ( - version = "master" - rtmpAddr = flag.String("rtmp-addr", ":1935", "RTMP server listen address") - httpFlvAddr = flag.String("httpflv-addr", ":7001", "HTTP-FLV server listen address") - hlsAddr = flag.String("hls-addr", ":7002", "HLS server listen address") - operaAddr = flag.String("manage-addr", ":8090", "HTTP manage interface server listen address") - configfilename = flag.String("config-file", "livego.json", "configure filename") + log "github.com/sirupsen/logrus" ) -func init() { - log.SetFlags(log.Lshortfile | log.Ltime | log.Ldate) - flag.Parse() -} +var VERSION = "master" func startHls() *hls.Server { - hlsListen, err := net.Listen("tcp", *hlsAddr) + hlsAddr := configure.Config.GetString("hls_addr") + hlsListen, err := net.Listen("tcp", hlsAddr) if err != nil { log.Fatal(err) } @@ -37,17 +28,21 @@ func startHls() *hls.Server { go func() { defer func() { if r := recover(); r != nil { - log.Println("HLS server panic: ", r) + log.Error("HLS server panic: ", r) } }() - log.Println("HLS listen On", *hlsAddr) + log.Info("HLS listen On ", hlsAddr) hlsServer.Serve(hlsListen) }() return hlsServer } +var rtmpAddr string + func startRtmp(stream *rtmp.RtmpStream, hlsServer *hls.Server) { - rtmpListen, err := net.Listen("tcp", *rtmpAddr) + rtmpAddr = configure.Config.GetString("rtmp_addr") + + rtmpListen, err := net.Listen("tcp", rtmpAddr) if err != nil { log.Fatal(err) } @@ -56,23 +51,25 @@ func startRtmp(stream *rtmp.RtmpStream, hlsServer *hls.Server) { if hlsServer == nil { rtmpServer = rtmp.NewRtmpServer(stream, nil) - log.Printf("hls server disable....") + log.Info("HLS server disable....") } else { rtmpServer = rtmp.NewRtmpServer(stream, hlsServer) - log.Printf("hls server enable....") + log.Info("HLS server enable....") } defer func() { if r := recover(); r != nil { - log.Println("RTMP server panic: ", r) + log.Error("RTMP server panic: ", r) } }() - log.Println("RTMP Listen On", *rtmpAddr) + log.Info("RTMP Listen On ", rtmpAddr) rtmpServer.Serve(rtmpListen) } func startHTTPFlv(stream *rtmp.RtmpStream) { - flvListen, err := net.Listen("tcp", *httpFlvAddr) + httpflvAddr := configure.Config.GetString("httpflv_addr") + + flvListen, err := net.Listen("tcp", httpflvAddr) if err != nil { log.Fatal(err) } @@ -81,48 +78,68 @@ func startHTTPFlv(stream *rtmp.RtmpStream) { go func() { defer func() { if r := recover(); r != nil { - log.Println("HTTP-FLV server panic: ", r) + log.Error("HTTP-FLV server panic: ", r) } }() - log.Println("HTTP-FLV listen On", *httpFlvAddr) + log.Info("HTTP-FLV listen On ", httpflvAddr) hdlServer.Serve(flvListen) }() } -func startHTTPOpera(stream *rtmp.RtmpStream) { - if *operaAddr != "" { - opListen, err := net.Listen("tcp", *operaAddr) +func startAPI(stream *rtmp.RtmpStream) { + apiAddr := configure.Config.GetString("api_addr") + + if apiAddr != "" { + opListen, err := net.Listen("tcp", apiAddr) if err != nil { log.Fatal(err) } - opServer := httpopera.NewServer(stream, *rtmpAddr) + opServer := api.NewServer(stream, rtmpAddr) go func() { defer func() { if r := recover(); r != nil { - log.Println("HTTP-Operation server panic: ", r) + log.Error("HTTP-API server panic: ", r) } }() - log.Println("HTTP-Operation listen On", *operaAddr) + log.Info("HTTP-API listen On ", apiAddr) opServer.Serve(opListen) }() } } +func init() { + log.SetFormatter(&log.TextFormatter{ + FullTimestamp: true, + CallerPrettyfier: func(f *runtime.Frame) (string, string) { + filename := path.Base(f.File) + return fmt.Sprintf("%s()", f.Function), fmt.Sprintf(" %s:%d", filename, f.Line) + }, + }) +} + func main() { defer func() { if r := recover(); r != nil { - log.Println("livego panic: ", r) + log.Error("livego panic: ", r) time.Sleep(1 * time.Second) } }() - log.Println("start livego, version", version) - configure.LoadConfig(*configfilename) + + configure.LoadConfig() + + log.Infof(` + _ _ ____ + | | (_)_ _____ / ___| ___ + | | | \ \ / / _ \ | _ / _ \ + | |___| |\ V / __/ |_| | (_) | + |_____|_| \_/ \___|\____|\___/ + version: %s + `, VERSION) stream := rtmp.NewRtmpStream() hlsServer := startHls() startHTTPFlv(stream) - startHTTPOpera(stream) + startAPI(stream) startRtmp(stream, hlsServer) - //startRtmp(stream, nil) } diff --git a/parser/aac/parser.go b/parser/aac/parser.go index 7ad9f05f..f05bcf68 100755 --- a/parser/aac/parser.go +++ b/parser/aac/parser.go @@ -1,7 +1,7 @@ package aac import ( - "errors" + "fmt" "io" "livego/av" @@ -26,8 +26,8 @@ type mpegCfgInfo struct { var aacRates = []int{96000, 88200, 64000, 48000, 44100, 32000, 24000, 22050, 16000, 12000, 11025, 8000, 7350} var ( - specificBufInvalid = errors.New("audio mpegspecific error") - audioBufInvalid = errors.New("audiodata invalid") + specificBufInvalid = fmt.Errorf("audio mpegspecific error") + audioBufInvalid = fmt.Errorf("audiodata invalid") ) const ( diff --git a/parser/h264/parser.go b/parser/h264/parser.go index b3c5481f..9a94782a 100755 --- a/parser/h264/parser.go +++ b/parser/h264/parser.go @@ -2,7 +2,7 @@ package h264 import ( "bytes" - "errors" + "fmt" "io" ) @@ -34,14 +34,14 @@ const ( ) var ( - decDataNil = errors.New("dec buf is nil") - spsDataError = errors.New("sps data error") - ppsHeaderError = errors.New("pps header error") - ppsDataError = errors.New("pps data error") - naluHeaderInvalid = errors.New("nalu header invalid") - videoDataInvalid = errors.New("video data not match") - dataSizeNotMatch = errors.New("data size not match") - naluBodyLenError = errors.New("nalu body len error") + decDataNil = fmt.Errorf("dec buf is nil") + spsDataError = fmt.Errorf("sps data error") + ppsHeaderError = fmt.Errorf("pps header error") + ppsDataError = fmt.Errorf("pps data error") + naluHeaderInvalid = fmt.Errorf("nalu header invalid") + videoDataInvalid = fmt.Errorf("video data not match") + dataSizeNotMatch = fmt.Errorf("data size not match") + naluBodyLenError = fmt.Errorf("nalu body len error") ) var startCode = []byte{0x00, 0x00, 0x00, 0x01} @@ -132,7 +132,7 @@ func (parser *Parser) isNaluHeader(src []byte) bool { func (parser *Parser) naluSize(src []byte) (int, error) { if len(src) < naluBytesLen { - return 0, errors.New("nalusizedata invalid") + return 0, fmt.Errorf("nalusizedata invalid") } buf := src[:naluBytesLen] size := int(0) diff --git a/parser/h264/parser_test.go b/parser/h264/parser_test.go index 40d43e40..e73068ae 100755 --- a/parser/h264/parser_test.go +++ b/parser/h264/parser_test.go @@ -2,7 +2,7 @@ package h264 import ( "bytes" - "errors" + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -47,7 +47,7 @@ func TestH264NalueSizeException(t *testing.T) { d := NewParser() w := bytes.NewBuffer(nil) err := d.Parse(nalu, false, w) - at.Equal(err, errors.New("video data not match")) + at.Equal(err, fmt.Errorf("video data not match")) } func TestH264Mp4Demux(t *testing.T) { diff --git a/parser/mp3/parser.go b/parser/mp3/parser.go index 2f854b36..cebca10b 100755 --- a/parser/mp3/parser.go +++ b/parser/mp3/parser.go @@ -1,6 +1,8 @@ package mp3 -import "errors" +import ( + "fmt" +) type Parser struct { samplingFrequency int @@ -17,8 +19,8 @@ func NewParser() *Parser { // '11' reserved var mp3Rates = []int{44100, 48000, 32000} var ( - errMp3DataInvalid = errors.New("mp3data invalid") - errIndexInvalid = errors.New("invalid rate index") + errMp3DataInvalid = fmt.Errorf("mp3data invalid") + errIndexInvalid = fmt.Errorf("invalid rate index") ) func (parser *Parser) Parse(src []byte) error { diff --git a/parser/parser.go b/parser/parser.go index 31bdd4fa..8fa8f9b9 100755 --- a/parser/parser.go +++ b/parser/parser.go @@ -1,7 +1,7 @@ package parser import ( - "errors" + "fmt" "io" "livego/av" @@ -11,7 +11,7 @@ import ( ) var ( - errNoAudio = errors.New("demuxer no audio") + errNoAudio = fmt.Errorf("demuxer no audio") ) type CodecParser struct { diff --git a/protocol/amf/amf.go b/protocol/amf/amf.go index e17fd47b..f941011b 100755 --- a/protocol/amf/amf.go +++ b/protocol/amf/amf.go @@ -1,7 +1,6 @@ package amf import ( - "errors" "fmt" "io" ) @@ -26,7 +25,7 @@ func (d *Decoder) Decode(r io.Reader, ver Version) (interface{}, error) { return d.DecodeAmf3(r) } - return nil, errors.New(fmt.Sprintf("decode amf: unsupported version %d", ver)) + return nil, fmt.Errorf("decode amf: unsupported version %d", ver) } func (e *Encoder) EncodeBatch(w io.Writer, ver Version, val ...interface{}) (int, error) { @@ -46,5 +45,5 @@ func (e *Encoder) Encode(w io.Writer, val interface{}, ver Version) (int, error) return e.EncodeAmf3(w, val) } - return 0, Error("encode amf: unsupported version %d", ver) + return 0, fmt.Errorf("encode amf: unsupported version %d", ver) } diff --git a/protocol/amf/amf_test.go b/protocol/amf/amf_test.go index 9796d672..dabaaf63 100755 --- a/protocol/amf/amf_test.go +++ b/protocol/amf/amf_test.go @@ -2,7 +2,6 @@ package amf import ( "bytes" - "errors" "fmt" "reflect" "testing" @@ -17,12 +16,12 @@ func EncodeAndDecode(val interface{}, ver Version) (result interface{}, err erro _, err = enc.Encode(buf, val, ver) if err != nil { - return nil, errors.New(fmt.Sprintf("error in encode: %s", err)) + return nil, fmt.Errorf("error in encode: %s", err) } result, err = dec.Decode(buf, ver) if err != nil { - return nil, errors.New(fmt.Sprintf("error in decode: %s", err)) + return nil, fmt.Errorf("error in decode: %s", err) } return @@ -108,7 +107,7 @@ func TestAmf0Array(t *testing.T) { res, err := EncodeAndDecode(arr, 0) if err != nil { - t.Error("amf0 object: %s", err) + t.Errorf("amf0 object: %s", err) } result, ok := res.(Array) @@ -170,7 +169,7 @@ func TestAmf3Array(t *testing.T) { res, err := EncodeAndDecode(arr, 3) if err != nil { - t.Error("amf3 object: %s", err) + t.Errorf("amf3 object: %s", err) } result, ok := res.(Array) diff --git a/protocol/amf/decoder_amf0.go b/protocol/amf/decoder_amf0.go index 2de5e21c..4b60b679 100755 --- a/protocol/amf/decoder_amf0.go +++ b/protocol/amf/decoder_amf0.go @@ -2,6 +2,7 @@ package amf import ( "encoding/binary" + "fmt" "io" ) @@ -22,13 +23,13 @@ func (d *Decoder) DecodeAmf0(r io.Reader) (interface{}, error) { case AMF0_OBJECT_MARKER: return d.DecodeAmf0Object(r, false) case AMF0_MOVIECLIP_MARKER: - return nil, Error("decode amf0: unsupported type movieclip") + return nil, fmt.Errorf("decode amf0: unsupported type movieclip") case AMF0_NULL_MARKER: return d.DecodeAmf0Null(r, false) case AMF0_UNDEFINED_MARKER: return d.DecodeAmf0Undefined(r, false) case AMF0_REFERENCE_MARKER: - return nil, Error("decode amf0: unsupported type reference") + return nil, fmt.Errorf("decode amf0: unsupported type reference") case AMF0_ECMA_ARRAY_MARKER: return d.DecodeAmf0EcmaArray(r, false) case AMF0_STRICT_ARRAY_MARKER: @@ -40,7 +41,7 @@ func (d *Decoder) DecodeAmf0(r io.Reader) (interface{}, error) { case AMF0_UNSUPPORTED_MARKER: return d.DecodeAmf0Unsupported(r, false) case AMF0_RECORDSET_MARKER: - return nil, Error("decode amf0: unsupported type recordset") + return nil, fmt.Errorf("decode amf0: unsupported type recordset") case AMF0_XML_DOCUMENT_MARKER: return d.DecodeAmf0XmlDocument(r, false) case AMF0_TYPED_OBJECT_MARKER: @@ -49,7 +50,7 @@ func (d *Decoder) DecodeAmf0(r io.Reader) (interface{}, error) { return d.DecodeAmf3(r) } - return nil, Error("decode amf0: unsupported type %d", marker) + return nil, fmt.Errorf("decode amf0: unsupported type %d", marker) } // marker: 1 byte 0x00 @@ -61,7 +62,7 @@ func (d *Decoder) DecodeAmf0Number(r io.Reader, decodeMarker bool) (result float err = binary.Read(r, binary.BigEndian, &result) if err != nil { - return float64(0), Error("amf0 decode: unable to read number: %s", err) + return float64(0), fmt.Errorf("amf0 decode: unable to read number: %s", err) } return @@ -85,7 +86,7 @@ func (d *Decoder) DecodeAmf0Boolean(r io.Reader, decodeMarker bool) (result bool return true, nil } - return false, Error("decode amf0: unexpected value %v for boolean", b) + return false, fmt.Errorf("decode amf0: unexpected value %v for boolean", b) } // marker: 1 byte 0x02 @@ -100,12 +101,12 @@ func (d *Decoder) DecodeAmf0String(r io.Reader, decodeMarker bool) (result strin var length uint16 err = binary.Read(r, binary.BigEndian, &length) if err != nil { - return "", Error("decode amf0: unable to decode string length: %s", err) + return "", fmt.Errorf("decode amf0: unable to decode string length: %s", err) } var bytes = make([]byte, length) if bytes, err = ReadBytes(r, int(length)); err != nil { - return "", Error("decode amf0: unable to decode string value: %s", err) + return "", fmt.Errorf("decode amf0: unable to decode string value: %s", err) } return string(bytes), nil @@ -131,7 +132,7 @@ func (d *Decoder) DecodeAmf0Object(r io.Reader, decodeMarker bool) (Object, erro if key == "" { if err = AssertMarker(r, true, AMF0_OBJECT_END_MARKER); err != nil { - return nil, Error("decode amf0: expected object end marker: %s", err) + return nil, fmt.Errorf("decode amf0: expected object end marker: %s", err) } break @@ -139,7 +140,7 @@ func (d *Decoder) DecodeAmf0Object(r io.Reader, decodeMarker bool) (Object, erro value, err := d.DecodeAmf0(r) if err != nil { - return nil, Error("decode amf0: unable to decode object value: %s", err) + return nil, fmt.Errorf("decode amf0: unable to decode object value: %s", err) } result[key] = value @@ -176,11 +177,11 @@ func (d *Decoder) DecodeAmf0Reference(r io.Reader, decodeMarker bool) (interface err = binary.Read(r, binary.BigEndian, &ref) if err != nil { - return nil, Error("decode amf0: unable to decode reference id: %s", err) + return nil, fmt.Errorf("decode amf0: unable to decode reference id: %s", err) } if int(ref) > len(d.refCache) { - return nil, Error("decode amf0: bad reference %d (current length %d)", ref, len(d.refCache)) + return nil, fmt.Errorf("decode amf0: bad reference %d (current length %d)", ref, len(d.refCache)) } result := d.refCache[ref] @@ -205,7 +206,7 @@ func (d *Decoder) DecodeAmf0EcmaArray(r io.Reader, decodeMarker bool) (Object, e result, err := d.DecodeAmf0Object(r, false) if err != nil { - return nil, Error("decode amf0: unable to decode ecma array object: %s", err) + return nil, fmt.Errorf("decode amf0: unable to decode ecma array object: %s", err) } return result, nil @@ -223,7 +224,7 @@ func (d *Decoder) DecodeAmf0StrictArray(r io.Reader, decodeMarker bool) (result var length uint32 err = binary.Read(r, binary.BigEndian, &length) if err != nil { - return nil, Error("decode amf0: unable to decode strict array length: %s", err) + return nil, fmt.Errorf("decode amf0: unable to decode strict array length: %s", err) } d.refCache = append(d.refCache, result) @@ -231,7 +232,7 @@ func (d *Decoder) DecodeAmf0StrictArray(r io.Reader, decodeMarker bool) (result for i := uint32(0); i < length; i++ { tmp, err := d.DecodeAmf0(r) if err != nil { - return nil, Error("decode amf0: unable to decode strict array object: %s", err) + return nil, fmt.Errorf("decode amf0: unable to decode strict array object: %s", err) } result = append(result, tmp) } @@ -250,11 +251,11 @@ func (d *Decoder) DecodeAmf0Date(r io.Reader, decodeMarker bool) (result float64 } if result, err = d.DecodeAmf0Number(r, false); err != nil { - return float64(0), Error("decode amf0: unable to decode float in date: %s", err) + return float64(0), fmt.Errorf("decode amf0: unable to decode float in date: %s", err) } if _, err = ReadBytes(r, 2); err != nil { - return float64(0), Error("decode amf0: unable to read 2 trail bytes in date: %s", err) + return float64(0), fmt.Errorf("decode amf0: unable to read 2 trail bytes in date: %s", err) } return @@ -272,12 +273,12 @@ func (d *Decoder) DecodeAmf0LongString(r io.Reader, decodeMarker bool) (result s var length uint32 err = binary.Read(r, binary.BigEndian, &length) if err != nil { - return "", Error("decode amf0: unable to decode long string length: %s", err) + return "", fmt.Errorf("decode amf0: unable to decode long string length: %s", err) } var bytes = make([]byte, length) if bytes, err = ReadBytes(r, int(length)); err != nil { - return "", Error("decode amf0: unable to decode long string value: %s", err) + return "", fmt.Errorf("decode amf0: unable to decode long string value: %s", err) } return string(bytes), nil @@ -323,12 +324,12 @@ func (d *Decoder) DecodeAmf0TypedObject(r io.Reader, decodeMarker bool) (TypedOb result.Type, err = d.DecodeAmf0String(r, false) if err != nil { - return result, Error("decode amf0: typed object unable to determine type: %s", err) + return result, fmt.Errorf("decode amf0: typed object unable to determine type: %s", err) } result.Object, err = d.DecodeAmf0Object(r, false) if err != nil { - return result, Error("decode amf0: typed object unable to determine object: %s", err) + return result, fmt.Errorf("decode amf0: typed object unable to determine object: %s", err) } return result, nil diff --git a/protocol/amf/decoder_amf3.go b/protocol/amf/decoder_amf3.go index 7c8260ad..4013b7bd 100755 --- a/protocol/amf/decoder_amf3.go +++ b/protocol/amf/decoder_amf3.go @@ -2,6 +2,7 @@ package amf import ( "encoding/binary" + "fmt" "io" "time" ) @@ -42,7 +43,7 @@ func (d *Decoder) DecodeAmf3(r io.Reader) (interface{}, error) { return d.DecodeAmf3ByteArray(r, false) } - return nil, Error("decode amf3: unsupported type %d", marker) + return nil, fmt.Errorf("decode amf3: unsupported type %d", marker) } // marker: 1 byte 0x00 @@ -103,7 +104,7 @@ func (d *Decoder) DecodeAmf3Double(r io.Reader, decodeMarker bool) (result float err = binary.Read(r, binary.BigEndian, &result) if err != nil { - return float64(0), Error("amf3 decode: unable to read double: %s", err) + return float64(0), fmt.Errorf("amf3 decode: unable to read double: %s", err) } return @@ -122,7 +123,7 @@ func (d *Decoder) DecodeAmf3String(r io.Reader, decodeMarker bool) (result strin var refVal uint32 isRef, refVal, err = d.decodeReferenceInt(r) if err != nil { - return "", Error("amf3 decode: unable to decode string reference and length: %s", err) + return "", fmt.Errorf("amf3 decode: unable to decode string reference and length: %s", err) } if isRef { @@ -133,7 +134,7 @@ func (d *Decoder) DecodeAmf3String(r io.Reader, decodeMarker bool) (result strin buf := make([]byte, refVal) _, err = r.Read(buf) if err != nil { - return "", Error("amf3 decode: unable to read string: %s", err) + return "", fmt.Errorf("amf3 decode: unable to read string: %s", err) } result = string(buf) @@ -157,13 +158,13 @@ func (d *Decoder) DecodeAmf3Date(r io.Reader, decodeMarker bool) (result time.Ti var refVal uint32 isRef, refVal, err = d.decodeReferenceInt(r) if err != nil { - return result, Error("amf3 decode: unable to decode date reference and length: %s", err) + return result, fmt.Errorf("amf3 decode: unable to decode date reference and length: %s", err) } if isRef { res, ok := d.objectRefs[refVal].(time.Time) if ok != true { - return result, Error("amf3 decode: unable to extract time from date object references") + return result, fmt.Errorf("amf3 decode: unable to extract time from date object references") } return res, err @@ -172,7 +173,7 @@ func (d *Decoder) DecodeAmf3Date(r io.Reader, decodeMarker bool) (result time.Ti var u64 float64 err = binary.Read(r, binary.BigEndian, &u64) if err != nil { - return result, Error("amf3 decode: unable to read double: %s", err) + return result, fmt.Errorf("amf3 decode: unable to read double: %s", err) } result = time.Unix(int64(u64/1000), 0).UTC() @@ -196,7 +197,7 @@ func (d *Decoder) DecodeAmf3Array(r io.Reader, decodeMarker bool) (result Array, var refVal uint32 isRef, refVal, err = d.decodeReferenceInt(r) if err != nil { - return result, Error("amf3 decode: unable to decode array reference and length: %s", err) + return result, fmt.Errorf("amf3 decode: unable to decode array reference and length: %s", err) } if isRef { @@ -204,7 +205,7 @@ func (d *Decoder) DecodeAmf3Array(r io.Reader, decodeMarker bool) (result Array, res, ok := d.objectRefs[objRefId].(Array) if ok != true { - return result, Error("amf3 decode: unable to extract array from object references") + return result, fmt.Errorf("amf3 decode: unable to extract array from object references") } return res, err @@ -213,17 +214,17 @@ func (d *Decoder) DecodeAmf3Array(r io.Reader, decodeMarker bool) (result Array, var key string key, err = d.DecodeAmf3String(r, false) if err != nil { - return result, Error("amf3 decode: unable to read key for array: %s", err) + return result, fmt.Errorf("amf3 decode: unable to read key for array: %s", err) } if key != "" { - return result, Error("amf3 decode: array key is not empty, can't handle associative array") + return result, fmt.Errorf("amf3 decode: array key is not empty, can't handle associative array") } for i := uint32(0); i < refVal; i++ { tmp, err := d.DecodeAmf3(r) if err != nil { - return result, Error("amf3 decode: array element could not be decoded: %s", err) + return result, fmt.Errorf("amf3 decode: array element could not be decoded: %s", err) } result = append(result, tmp) } @@ -243,7 +244,7 @@ func (d *Decoder) DecodeAmf3Object(r io.Reader, decodeMarker bool) (result inter // decode the initial u29 isRef, refVal, err := d.decodeReferenceInt(r) if err != nil { - return nil, Error("amf3 decode: unable to decode object reference and length: %s", err) + return nil, fmt.Errorf("amf3 decode: unable to decode object reference and length: %s", err) } // if this is a object reference only, grab it and return it @@ -272,7 +273,7 @@ func (d *Decoder) DecodeAmf3Object(r io.Reader, decodeMarker bool) (result inter var cls string cls, err = d.DecodeAmf3String(r, false) if err != nil { - return result, Error("amf3 decode: unable to read trait type for object: %s", err) + return result, fmt.Errorf("amf3 decode: unable to read trait type for object: %s", err) } trait.Type = cls @@ -281,7 +282,7 @@ func (d *Decoder) DecodeAmf3Object(r io.Reader, decodeMarker bool) (result inter for i := uint32(0); i < propLength; i++ { tmp, err := d.DecodeAmf3String(r, false) if err != nil { - return result, Error("amf3 decode: unable to read trait property for object: %s", err) + return result, fmt.Errorf("amf3 decode: unable to read trait property for object: %s", err) } trait.Properties = append(trait.Properties, tmp) } @@ -299,17 +300,17 @@ func (d *Decoder) DecodeAmf3Object(r io.Reader, decodeMarker bool) (result inter case "DSA": // AsyncMessageExt result, err = d.decodeAsyncMessageExt(r) if err != nil { - return result, Error("amf3 decode: unable to decode dsa: %s", err) + return result, fmt.Errorf("amf3 decode: unable to decode dsa: %s", err) } case "DSK": // AcknowledgeMessageExt result, err = d.decodeAcknowledgeMessageExt(r) if err != nil { - return result, Error("amf3 decode: unable to decode dsk: %s", err) + return result, fmt.Errorf("amf3 decode: unable to decode dsk: %s", err) } case "flex.messaging.io.ArrayCollection": result, err = d.decodeArrayCollection(r) if err != nil { - return result, Error("amf3 decode: unable to decode ac: %s", err) + return result, fmt.Errorf("amf3 decode: unable to decode ac: %s", err) } // store an extra reference to array collection container @@ -320,10 +321,10 @@ func (d *Decoder) DecodeAmf3Object(r io.Reader, decodeMarker bool) (result inter if ok { result, err = fn(d, r) if err != nil { - return result, Error("amf3 decode: unable to call external decoder for type %s: %s", trait.Type, err) + return result, fmt.Errorf("amf3 decode: unable to call external decoder for type %s: %s", trait.Type, err) } } else { - return result, Error("amf3 decode: unable to decode external type %s, no handler", trait.Type) + return result, fmt.Errorf("amf3 decode: unable to decode external type %s, no handler", trait.Type) } } @@ -341,7 +342,7 @@ func (d *Decoder) DecodeAmf3Object(r io.Reader, decodeMarker bool) (result inter for _, key = range trait.Properties { val, err = d.DecodeAmf3(r) if err != nil { - return result, Error("amf3 decode: unable to decode object property: %s", err) + return result, fmt.Errorf("amf3 decode: unable to decode object property: %s", err) } obj[key] = val @@ -353,14 +354,14 @@ func (d *Decoder) DecodeAmf3Object(r io.Reader, decodeMarker bool) (result inter for { key, err = d.DecodeAmf3String(r, false) if err != nil { - return result, Error("amf3 decode: unable to decode dynamic key: %s", err) + return result, fmt.Errorf("amf3 decode: unable to decode dynamic key: %s", err) } if key == "" { break } val, err = d.DecodeAmf3(r) if err != nil { - return result, Error("amf3 decode: unable to decode dynamic value: %s", err) + return result, fmt.Errorf("amf3 decode: unable to decode dynamic value: %s", err) } obj[key] = val @@ -385,7 +386,7 @@ func (d *Decoder) DecodeAmf3Xml(r io.Reader, decodeMarker bool) (result string, } if (marker != AMF3_XMLDOC_MARKER) && (marker != AMF3_XMLSTRING_MARKER) { - return "", Error("decode assert marker failed: expected %v or %v, got %v", AMF3_XMLDOC_MARKER, AMF3_XMLSTRING_MARKER, marker) + return "", fmt.Errorf("decode assert marker failed: expected %v or %v, got %v", AMF3_XMLDOC_MARKER, AMF3_XMLSTRING_MARKER, marker) } } @@ -393,7 +394,7 @@ func (d *Decoder) DecodeAmf3Xml(r io.Reader, decodeMarker bool) (result string, var refVal uint32 isRef, refVal, err = d.decodeReferenceInt(r) if err != nil { - return "", Error("amf3 decode: unable to decode xml reference and length: %s", err) + return "", fmt.Errorf("amf3 decode: unable to decode xml reference and length: %s", err) } if isRef { @@ -401,7 +402,7 @@ func (d *Decoder) DecodeAmf3Xml(r io.Reader, decodeMarker bool) (result string, buf := d.objectRefs[refVal] result, ok = buf.(string) if ok != true { - return "", Error("amf3 decode: cannot coerce object reference into xml string") + return "", fmt.Errorf("amf3 decode: cannot coerce object reference into xml string") } return @@ -410,7 +411,7 @@ func (d *Decoder) DecodeAmf3Xml(r io.Reader, decodeMarker bool) (result string, buf := make([]byte, refVal) _, err = r.Read(buf) if err != nil { - return "", Error("amf3 decode: unable to read xml string: %s", err) + return "", fmt.Errorf("amf3 decode: unable to read xml string: %s", err) } result = string(buf) @@ -435,14 +436,14 @@ func (d *Decoder) DecodeAmf3ByteArray(r io.Reader, decodeMarker bool) (result [] var refVal uint32 isRef, refVal, err = d.decodeReferenceInt(r) if err != nil { - return result, Error("amf3 decode: unable to decode byte array reference and length: %s", err) + return result, fmt.Errorf("amf3 decode: unable to decode byte array reference and length: %s", err) } if isRef { var ok bool result, ok = d.objectRefs[refVal].([]byte) if ok != true { - return result, Error("amf3 decode: unable to convert object ref to bytes") + return result, fmt.Errorf("amf3 decode: unable to convert object ref to bytes") } return @@ -451,7 +452,7 @@ func (d *Decoder) DecodeAmf3ByteArray(r io.Reader, decodeMarker bool) (result [] result = make([]byte, refVal) _, err = r.Read(result) if err != nil { - return result, Error("amf3 decode: unable to read bytearray: %s", err) + return result, fmt.Errorf("amf3 decode: unable to read bytearray: %s", err) } d.objectRefs = append(d.objectRefs, result) @@ -486,7 +487,7 @@ func (d *Decoder) decodeU29(r io.Reader) (result uint32, err error) { func (d *Decoder) decodeReferenceInt(r io.Reader) (isRef bool, refVal uint32, err error) { u29, err := d.decodeU29(r) if err != nil { - return false, 0, Error("amf3 decode: unable to decode reference int: %s", err) + return false, 0, fmt.Errorf("amf3 decode: unable to decode reference int: %s", err) } isRef = u29&0x01 == 0 diff --git a/protocol/amf/decoder_amf3_external.go b/protocol/amf/decoder_amf3_external.go index 35f4b7de..a4e237b0 100755 --- a/protocol/amf/decoder_amf3_external.go +++ b/protocol/amf/decoder_amf3_external.go @@ -13,7 +13,7 @@ func (d *Decoder) decodeAbstractMessage(r io.Reader) (result Object, err error) if err = d.decodeExternal(r, &result, []string{"body", "clientId", "destination", "headers", "messageId", "timeStamp", "timeToLive"}, []string{"clientIdBytes", "messageIdBytes"}); err != nil { - return result, Error("unable to decode abstract external: %s", err) + return result, fmt.Errorf("unable to decode abstract external: %s", err) } return @@ -26,11 +26,11 @@ func (d *Decoder) decodeAsyncMessageExt(r io.Reader) (result Object, err error) func (d *Decoder) decodeAsyncMessage(r io.Reader) (result Object, err error) { result, err = d.decodeAbstractMessage(r) if err != nil { - return result, Error("unable to decode abstract for async: %s", err) + return result, fmt.Errorf("unable to decode abstract for async: %s", err) } if err = d.decodeExternal(r, &result, []string{"correlationId", "correlationIdBytes"}); err != nil { - return result, Error("unable to decode async external: %s", err) + return result, fmt.Errorf("unable to decode async external: %s", err) } return @@ -43,11 +43,11 @@ func (d *Decoder) decodeAcknowledgeMessageExt(r io.Reader) (result Object, err e func (d *Decoder) decodeAcknowledgeMessage(r io.Reader) (result Object, err error) { result, err = d.decodeAsyncMessage(r) if err != nil { - return result, Error("unable to decode async for ack: %s", err) + return result, fmt.Errorf("unable to decode async for ack: %s", err) } if err = d.decodeExternal(r, &result); err != nil { - return result, Error("unable to decode ack external: %s", err) + return result, fmt.Errorf("unable to decode ack external: %s", err) } return @@ -57,7 +57,7 @@ func (d *Decoder) decodeAcknowledgeMessage(r io.Reader) (result Object, err erro func (d *Decoder) decodeArrayCollection(r io.Reader) (interface{}, error) { result, err := d.DecodeAmf3(r) if err != nil { - return result, Error("cannot decode child of array collection: %s", err) + return result, fmt.Errorf("cannot decode child of array collection: %s", err) } return result, nil @@ -70,7 +70,7 @@ func (d *Decoder) decodeExternal(r io.Reader, obj *Object, fieldSets ...[]string flagSet, err = readFlags(r) if err != nil { - return Error("unable to read flags: %s", err) + return fmt.Errorf("unable to read flags: %s", err) } for i, flags := range flagSet { @@ -87,7 +87,7 @@ func (d *Decoder) decodeExternal(r io.Reader, obj *Object, fieldSets ...[]string if (flags & flagBit) != 0 { tmp, err := d.DecodeAmf3(r) if err != nil { - return Error("unable to decode external field %s %d %d (%#v): %s", field, i, p, flagSet, err) + return fmt.Errorf("unable to decode external field %s %d %d (%#v): %s", field, i, p, flagSet, err) } (*obj)[field] = tmp } @@ -99,7 +99,7 @@ func (d *Decoder) decodeExternal(r io.Reader, obj *Object, fieldSets ...[]string field := fmt.Sprintf("extra_%d_%d", i, j) tmp, err := d.DecodeAmf3(r) if err != nil { - return Error("unable to decode post-external field %d %d (%#v): %s", i, j, flagSet, err) + return fmt.Errorf("unable to decode post-external field %d %d (%#v): %s", i, j, flagSet, err) } (*obj)[field] = tmp } @@ -114,7 +114,7 @@ func readFlags(r io.Reader) (result []uint8, err error) { for { flag, err := ReadByte(r) if err != nil { - return result, Error("unable to read flags: %s", err) + return result, fmt.Errorf("unable to read flags: %s", err) } result = append(result, flag) diff --git a/protocol/amf/decoder_amf3_test.go b/protocol/amf/decoder_amf3_test.go index d633f3ae..fd70660e 100755 --- a/protocol/amf/decoder_amf3_test.go +++ b/protocol/amf/decoder_amf3_test.go @@ -186,7 +186,7 @@ func TestDecodeAmf3Array(t *testing.T) { for i, v := range expect { if got[i] != v { - t.Error("expected array element %d to be %v, got %v", i, v, got[i]) + t.Errorf("expected array element %d to be %v, got %v", i, v, got[i]) } } } @@ -211,10 +211,10 @@ func TestDecodeAmf3Object(t *testing.T) { } if to["foo"] != "bar" { - t.Error("expected foo to be bar, got: %+v", to["foo"]) + t.Errorf("expected foo to be bar, got: %+v", to["foo"]) } if to["baz"] != nil { - t.Error("expected baz to be nil, got: %+v", to["baz"]) + t.Errorf("expected baz to be nil, got: %+v", to["baz"]) } } diff --git a/protocol/amf/encoder_amf0.go b/protocol/amf/encoder_amf0.go index 20b4c41e..7648e954 100755 --- a/protocol/amf/encoder_amf0.go +++ b/protocol/amf/encoder_amf0.go @@ -2,6 +2,7 @@ package amf import ( "encoding/binary" + "fmt" "io" "reflect" ) @@ -43,16 +44,16 @@ func (e *Encoder) EncodeAmf0(w io.Writer, val interface{}) (int, error) { case reflect.Map: obj, ok := val.(Object) if ok != true { - return 0, Error("encode amf0: unable to create object from map") + return 0, fmt.Errorf("encode amf0: unable to create object from map") } return e.EncodeAmf0Object(w, obj, true) } if _, ok := val.(TypedObject); ok { - return 0, Error("encode amf0: unsupported type typed object") + return 0, fmt.Errorf("encode amf0: unsupported type typed object") } - return 0, Error("encode amf0: unsupported type %s", v.Type()) + return 0, fmt.Errorf("encode amf0: unsupported type %s", v.Type()) } // marker: 1 byte 0x00 @@ -117,13 +118,13 @@ func (e *Encoder) EncodeAmf0String(w io.Writer, val string, encodeMarker bool) ( length := uint16(len(val)) err = binary.Write(w, binary.BigEndian, length) if err != nil { - return n, Error("encode amf0: unable to encode string length: %s", err) + return n, fmt.Errorf("encode amf0: unable to encode string length: %s", err) } n += 2 m, err = w.Write([]byte(val)) if err != nil { - return n, Error("encode amf0: unable to encode string value: %s", err) + return n, fmt.Errorf("encode amf0: unable to encode string value: %s", err) } n += m @@ -146,26 +147,26 @@ func (e *Encoder) EncodeAmf0Object(w io.Writer, val Object, encodeMarker bool) ( for k, v := range val { m, err = e.EncodeAmf0String(w, k, false) if err != nil { - return n, Error("encode amf0: unable to encode object key: %s", err) + return n, fmt.Errorf("encode amf0: unable to encode object key: %s", err) } n += m m, err = e.EncodeAmf0(w, v) if err != nil { - return n, Error("encode amf0: unable to encode object value: %s", err) + return n, fmt.Errorf("encode amf0: unable to encode object value: %s", err) } n += m } m, err = e.EncodeAmf0String(w, "", false) if err != nil { - return n, Error("encode amf0: unable to encode object empty string: %s", err) + return n, fmt.Errorf("encode amf0: unable to encode object empty string: %s", err) } n += m err = WriteMarker(w, AMF0_OBJECT_END_MARKER) if err != nil { - return n, Error("encode amf0: unable to object end marker: %s", err) + return n, fmt.Errorf("encode amf0: unable to object end marker: %s", err) } n += 1 @@ -216,13 +217,13 @@ func (e *Encoder) EncodeAmf0EcmaArray(w io.Writer, val Object, encodeMarker bool length := uint32(len(val)) err = binary.Write(w, binary.BigEndian, length) if err != nil { - return n, Error("encode amf0: unable to encode ecma array length: %s", err) + return n, fmt.Errorf("encode amf0: unable to encode ecma array length: %s", err) } n += 4 m, err = e.EncodeAmf0Object(w, val, false) if err != nil { - return n, Error("encode amf0: unable to encode ecma array object: %s", err) + return n, fmt.Errorf("encode amf0: unable to encode ecma array object: %s", err) } n += m @@ -245,14 +246,14 @@ func (e *Encoder) EncodeAmf0StrictArray(w io.Writer, val Array, encodeMarker boo length := uint32(len(val)) err = binary.Write(w, binary.BigEndian, length) if err != nil { - return n, Error("encode amf0: unable to encode strict array length: %s", err) + return n, fmt.Errorf("encode amf0: unable to encode strict array length: %s", err) } n += 4 for _, v := range val { m, err = e.EncodeAmf0(w, v) if err != nil { - return n, Error("encode amf0: unable to encode strict array element: %s", err) + return n, fmt.Errorf("encode amf0: unable to encode strict array element: %s", err) } n += m } @@ -276,13 +277,13 @@ func (e *Encoder) EncodeAmf0LongString(w io.Writer, val string, encodeMarker boo length := uint32(len(val)) err = binary.Write(w, binary.BigEndian, length) if err != nil { - return n, Error("encode amf0: unable to encode long string length: %s", err) + return n, fmt.Errorf("encode amf0: unable to encode long string length: %s", err) } n += 4 m, err = w.Write([]byte(val)) if err != nil { - return n, Error("encode amf0: unable to encode long string value: %s", err) + return n, fmt.Errorf("encode amf0: unable to encode long string value: %s", err) } n += m diff --git a/protocol/amf/encoder_amf3.go b/protocol/amf/encoder_amf3.go index 9565c46e..a760f77b 100755 --- a/protocol/amf/encoder_amf3.go +++ b/protocol/amf/encoder_amf3.go @@ -2,6 +2,7 @@ package amf import ( "encoding/binary" + "fmt" "io" "reflect" "sort" @@ -59,7 +60,7 @@ func (e *Encoder) EncodeAmf3(w io.Writer, val interface{}) (int, error) { case reflect.Map: obj, ok := val.(Object) if ok != true { - return 0, Error("encode amf3: unable to create object from map") + return 0, fmt.Errorf("encode amf3: unable to create object from map") } to := *new(TypedObject) @@ -76,7 +77,7 @@ func (e *Encoder) EncodeAmf3(w io.Writer, val interface{}) (int, error) { return e.EncodeAmf3Object(w, to, true) } - return 0, Error("encode amf3: unsupported type %s", v.Type()) + return 0, fmt.Errorf("encode amf3: unsupported type %s", v.Type()) } // marker: 1 byte 0x00 @@ -204,14 +205,14 @@ func (e *Encoder) EncodeAmf3Date(w io.Writer, val time.Time, encodeMarker bool) } if err = WriteMarker(w, 0x01); err != nil { - return n, Error("amf3 encode: cannot encode u29 for array: %s", err) + return n, fmt.Errorf("amf3 encode: cannot encode u29 for array: %s", err) } n += 1 u64 := float64(val.Unix()) * 1000.0 err = binary.Write(w, binary.BigEndian, &u64) if err != nil { - return n, Error("amf3 encode: unable to write date double: %s", err) + return n, fmt.Errorf("amf3 encode: unable to write date double: %s", err) } n += 8 @@ -237,20 +238,20 @@ func (e *Encoder) EncodeAmf3Array(w io.Writer, val Array, encodeMarker bool) (n m, err = e.encodeAmf3Uint29(w, u29) if err != nil { - return n, Error("amf3 encode: cannot encode u29 for array: %s", err) + return n, fmt.Errorf("amf3 encode: cannot encode u29 for array: %s", err) } n += m m, err = e.encodeAmf3Utf8(w, "") if err != nil { - return n, Error("amf3 encode: cannot encode empty string for array: %s", err) + return n, fmt.Errorf("amf3 encode: cannot encode empty string for array: %s", err) } n += m for _, v := range val { m, err := e.EncodeAmf3(w, v) if err != nil { - return n, Error("amf3 encode: cannot encode array element: %s", err) + return n, fmt.Errorf("amf3 encode: cannot encode array element: %s", err) } n += m } @@ -294,32 +295,32 @@ func (e *Encoder) EncodeAmf3Object(w io.Writer, val TypedObject, encodeMarker bo m, err = e.encodeAmf3Uint29(w, u29) if err != nil { - return n, Error("amf3 encode: cannot encode trait header for object: %s", err) + return n, fmt.Errorf("amf3 encode: cannot encode trait header for object: %s", err) } n += m m, err = e.encodeAmf3Utf8(w, trait.Type) if err != nil { - return n, Error("amf3 encode: cannot encode trait type for object: %s", err) + return n, fmt.Errorf("amf3 encode: cannot encode trait type for object: %s", err) } n += m for _, prop := range trait.Properties { m, err = e.encodeAmf3Utf8(w, prop) if err != nil { - return n, Error("amf3 encode: cannot encode trait property for object: %s", err) + return n, fmt.Errorf("amf3 encode: cannot encode trait property for object: %s", err) } n += m } if trait.Externalizable { - return n, Error("amf3 encode: cannot encode externalizable object") + return n, fmt.Errorf("amf3 encode: cannot encode externalizable object") } for _, prop := range trait.Properties { m, err = e.EncodeAmf3(w, val.Object[prop]) if err != nil { - return n, Error("amf3 encode: cannot encode sealed object value: %s", err) + return n, fmt.Errorf("amf3 encode: cannot encode sealed object value: %s", err) } n += m } @@ -337,20 +338,20 @@ func (e *Encoder) EncodeAmf3Object(w io.Writer, val TypedObject, encodeMarker bo if foundProp != true { m, err = e.encodeAmf3Utf8(w, k) if err != nil { - return n, Error("amf3 encode: cannot encode dynamic object property key: %s", err) + return n, fmt.Errorf("amf3 encode: cannot encode dynamic object property key: %s", err) } n += m m, err = e.EncodeAmf3(w, v) if err != nil { - return n, Error("amf3 encode: cannot encode dynamic object value: %s", err) + return n, fmt.Errorf("amf3 encode: cannot encode dynamic object value: %s", err) } n += m } m, err = e.encodeAmf3Utf8(w, "") if err != nil { - return n, Error("amf3 encode: cannot encode dynamic object ending marker string: %s", err) + return n, fmt.Errorf("amf3 encode: cannot encode dynamic object ending marker string: %s", err) } n += m } @@ -378,13 +379,13 @@ func (e *Encoder) EncodeAmf3ByteArray(w io.Writer, val []byte, encodeMarker bool m, err = e.encodeAmf3Uint29(w, u29) if err != nil { - return n, Error("amf3 encode: cannot encode u29 for bytearray: %s", err) + return n, fmt.Errorf("amf3 encode: cannot encode u29 for bytearray: %s", err) } n += m m, err = w.Write(val) if err != nil { - return n, Error("encode amf3: unable to encode bytearray value: %s", err) + return n, fmt.Errorf("encode amf3: unable to encode bytearray value: %s", err) } n += m @@ -398,13 +399,13 @@ func (e *Encoder) encodeAmf3Utf8(w io.Writer, val string) (n int, err error) { var m int m, err = e.encodeAmf3Uint29(w, u29) if err != nil { - return n, Error("amf3 encode: cannot encode u29 for string: %s", err) + return n, fmt.Errorf("amf3 encode: cannot encode u29 for string: %s", err) } n += m m, err = w.Write([]byte(val)) if err != nil { - return n, Error("encode amf3: unable to encode string value: %s", err) + return n, fmt.Errorf("encode amf3: unable to encode string value: %s", err) } n += m @@ -424,7 +425,7 @@ func (e *Encoder) encodeAmf3Uint29(w io.Writer, val uint32) (n int, err error) { } else if val <= 0x1FFFFFFF { n, err = w.Write([]byte{byte(val>>22 | 0x80), byte(val>>15&0x7F | 0x80), byte(val>>8&0x7F | 0x80), byte(val)}) } else { - return n, Error("amf3 encode: cannot encode u29 with value %d (out of range)", val) + return n, fmt.Errorf("amf3 encode: cannot encode u29 with value %d (out of range)", val) } return diff --git a/protocol/amf/metadata.go b/protocol/amf/metadata.go index 722984d8..327b439a 100755 --- a/protocol/amf/metadata.go +++ b/protocol/amf/metadata.go @@ -3,7 +3,8 @@ package amf import ( "bytes" "fmt" - "log" + + log "github.com/sirupsen/logrus" ) const ( diff --git a/protocol/amf/util.go b/protocol/amf/util.go index c94e2aa9..35bdc1d8 100755 --- a/protocol/amf/util.go +++ b/protocol/amf/util.go @@ -2,7 +2,6 @@ package amf import ( "encoding/json" - "errors" "fmt" "io" ) @@ -18,17 +17,13 @@ func DumpBytes(label string, buf []byte, size int) { func Dump(label string, val interface{}) error { json, err := json.MarshalIndent(val, "", " ") if err != nil { - return Error("Error dumping %s: %s", label, err) + return fmt.Errorf("Error dumping %s: %s", label, err) } fmt.Printf("Dumping %s:\n%s\n", label, json) return nil } -func Error(f string, v ...interface{}) error { - return errors.New(fmt.Sprintf(f, v...)) -} - func WriteByte(w io.Writer, b byte) (err error) { bytes := make([]byte, 1) bytes[0] = b @@ -85,7 +80,7 @@ func AssertMarker(r io.Reader, checkMarker bool, m byte) error { } if marker != m { - return Error("decode assert marker failed: expected %v got %v", m, marker) + return fmt.Errorf("decode assert marker failed: expected %v got %v", m, marker) } return nil diff --git a/protocol/httpopera/http_opera.go b/protocol/api/api.go similarity index 77% rename from protocol/httpopera/http_opera.go rename to protocol/api/api.go index 584f0e14..20cb22b9 100755 --- a/protocol/httpopera/http_opera.go +++ b/protocol/api/api.go @@ -1,9 +1,8 @@ -package httpopera +package api import ( "encoding/json" "fmt" - "log" "net" "net/http" @@ -14,6 +13,7 @@ import ( jwtmiddleware "github.com/auth0/go-jwt-middleware" "github.com/dgrijalva/jwt-go" + log "github.com/sirupsen/logrus" ) type Response struct { @@ -63,30 +63,40 @@ func NewServer(h av.Handler, rtmpAddr string) *Server { } func JWTMiddleware(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if len(configure.RtmpServercfg.JWTCfg.Secret) > 0 { - var algorithm jwt.SigningMethod - if len(configure.RtmpServercfg.JWTCfg.Algorithm) > 0 { - algorithm = jwt.GetSigningMethod(configure.RtmpServercfg.JWTCfg.Algorithm) - } + isJWT := len(configure.Config.GetString("jwt.secret")) > 0 + if !isJWT { + return next + } - if algorithm == nil { - algorithm = jwt.SigningMethodHS256 - } + log.Info("Using JWT middleware") - jwtMiddleware := jwtmiddleware.New(jwtmiddleware.Options{ - Extractor: jwtmiddleware.FromFirst(jwtmiddleware.FromAuthHeader, jwtmiddleware.FromParameter("jwt")), - ValidationKeyGetter: func(token *jwt.Token) (interface{}, error) { - return []byte(configure.RtmpServercfg.Secret), nil - }, - SigningMethod: algorithm, - }) + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var algorithm jwt.SigningMethod + if len(configure.Config.GetString("jwt.algorithm")) > 0 { + algorithm = jwt.GetSigningMethod(configure.Config.GetString("jwt.algorithm")) + } - jwtMiddleware.HandlerWithNext(w, r, next.ServeHTTP) - return + if algorithm == nil { + algorithm = jwt.SigningMethodHS256 } - next.ServeHTTP(w, r) + jwtMiddleware := jwtmiddleware.New(jwtmiddleware.Options{ + Extractor: jwtmiddleware.FromFirst(jwtmiddleware.FromAuthHeader, jwtmiddleware.FromParameter("jwt")), + ValidationKeyGetter: func(token *jwt.Token) (interface{}, error) { + return []byte(configure.Config.GetString("jwt.secret")), nil + }, + SigningMethod: algorithm, + ErrorHandler: func(w http.ResponseWriter, r *http.Request, err string) { + res := &Response{ + w: w, + Status: 403, + Data: err, + } + res.SendJson() + }, + }) + + jwtMiddleware.HandlerWithNext(w, r, next.ServeHTTP) }) } @@ -204,23 +214,23 @@ func (s *Server) handlePull(w http.ResponseWriter, req *http.Request) { return } - oper := req.Form["oper"] - app := req.Form["app"] - name := req.Form["name"] - url := req.Form["url"] + oper := req.Form.Get("oper") + app := req.Form.Get("app") + name := req.Form.Get("name") + url := req.Form.Get("url") - log.Printf("control pull: oper=%v, app=%v, name=%v, url=%v", oper, app, name, url) + log.Debugf("control pull: oper=%v, app=%v, name=%v, url=%v", oper, app, name, url) if (len(app) <= 0) || (len(name) <= 0) || (len(url) <= 0) { res.Status = 400 res.Data = "control push parameter error, please check them." return } - remoteurl := "rtmp://127.0.0.1" + s.rtmpAddr + "/" + app[0] + "/" + name[0] - localurl := url[0] + remoteurl := "rtmp://127.0.0.1" + s.rtmpAddr + "/" + app + "/" + name + localurl := url - keyString := "pull:" + app[0] + "/" + name[0] - if oper[0] == "stop" { + keyString := "pull:" + app + "/" + name + if oper == "stop" { pullRtmprelay, found := s.session[keyString] if !found { @@ -229,27 +239,27 @@ func (s *Server) handlePull(w http.ResponseWriter, req *http.Request) { res.Data = retString return } - log.Printf("rtmprelay stop push %s from %s", remoteurl, localurl) + log.Debugf("rtmprelay stop push %s from %s", remoteurl, localurl) pullRtmprelay.Stop() delete(s.session, keyString) - retString = fmt.Sprintf("