From 9b475d9d52be54234d3d77383ec69f1085581c12 Mon Sep 17 00:00:00 2001 From: Jacob Su Date: Sat, 16 Nov 2024 07:59:40 +0800 Subject: [PATCH] Let forward feature support select stream by its name. (#220) when config source stream name, the forward task will select the stream by this name to forward, when source stream name is missing, it will random select one active stream to forward. --- platform/forward.go | 18 ++++++++++++++++-- ui/src/pages/ScenarioForward.js | 11 ++++++++--- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/platform/forward.go b/platform/forward.go index 1b485105..3cb82c4a 100644 --- a/platform/forward.go +++ b/platform/forward.go @@ -19,6 +19,7 @@ import ( "github.com/ossrs/go-oryx-lib/errors" ohttp "github.com/ossrs/go-oryx-lib/http" "github.com/ossrs/go-oryx-lib/logger" + // Use v8 because we use Go 1.16+, while v9 requires Go 1.18+ "github.com/go-redis/redis/v8" "github.com/google/uuid" @@ -334,6 +335,8 @@ func (v *ForwardWorker) Start(ctx context.Context) error { type ForwardConfigure struct { // The platform name, for example, wx Platform string `json:"platform"` + // the source stream in oryx. + Stream string `json:"stream"` // The RTMP server url, for example, rtmp://localhost/live Server string `json:"server"` // The RTMP stream and secret, for example, livestream @@ -347,13 +350,14 @@ type ForwardConfigure struct { } func (v *ForwardConfigure) String() string { - return fmt.Sprintf("platform=%v, server=%v, secret=%v, enabled=%v, customed=%v, label=%v", - v.Platform, v.Server, v.Secret, v.Enabled, v.Customed, v.Label, + return fmt.Sprintf("platform=%v, stream=%v, server=%v, secret=%v, enabled=%v, customed=%v, label=%v", + v.Platform, v.Stream, v.Server, v.Secret, v.Enabled, v.Customed, v.Label, ) } func (v *ForwardConfigure) Update(u *ForwardConfigure) error { v.Platform = u.Platform + v.Stream = u.Stream v.Server = u.Server v.Secret = u.Secret v.Label = u.Label @@ -500,18 +504,28 @@ func (v *ForwardTask) Run(ctx context.Context) error { ctx = logger.WithContext(ctx) logger.Tf(ctx, "forward run task %v", v.String()) + // select active stream by stream name or random select one when stream name is empty. selectActiveStream := func() (*SrsStream, error) { streams, err := rdb.HGetAll(ctx, SRS_STREAM_ACTIVE).Result() if err != nil { return nil, errors.Wrapf(err, "hgetall %v", SRS_STREAM_ACTIVE) } + streamName := v.config.Stream + var best *SrsStream for _, v := range streams { var stream SrsStream if err := json.Unmarshal([]byte(v), &stream); err != nil { return nil, errors.Wrapf(err, "unmarshal %v", v) } + if streamName != "" { + if stream.Stream == streamName { + best = &stream + break + } + continue + } if best == nil { best = &stream diff --git a/ui/src/pages/ScenarioForward.js b/ui/src/pages/ScenarioForward.js index edb94fcf..33704fbe 100644 --- a/ui/src/pages/ScenarioForward.js +++ b/ui/src/pages/ScenarioForward.js @@ -162,7 +162,7 @@ function ScenarioForwardImpl({defaultActiveKey, defaultSecrets}) { }, [configs, setConfigs]); // Update the forward config to server. - const updateSecrets = React.useCallback((e, action, platform, server, secret, enabled, custom, label, onSuccess) => { + const updateSecrets = React.useCallback((e, action, platform, stream, server, secret, enabled, custom, label, onSuccess) => { e.preventDefault(); if (!server) return alert(t('plat.com.addr')); if (custom && !label) return alert(t('plat.com.label')); @@ -171,7 +171,7 @@ function ScenarioForwardImpl({defaultActiveKey, defaultSecrets}) { setSubmiting(true); axios.post('/terraform/v1/ffmpeg/forward/secret', { - action, platform, server, secret, enabled: !!enabled, custom: !!custom, label, + action, platform, stream, server, secret, enabled: !!enabled, custom: !!custom, label, }, { headers: Token.loadBearerHeader(), }).then(res => { @@ -235,6 +235,11 @@ function ScenarioForwardImpl({defaultActiveKey, defaultSecrets}) { * {conf.custom ? `(${t('helper.required')})` : `(${t('helper.optional')})`} {t('plat.com.name2')} updateConfigObject({...conf, label: e.target.value})}/> + + {t('plat.com.source')} + {!conf.custom && * {t('plat.com.source')} check System-{'>'}Streams tab} + updateConfigObject({...conf, stream: e.target.value})}/> + {conf.custom ? t('plat.com.server') : t('plat.com.server2')} {!conf.custom && * {t('plat.com.server3')} {conf?.locale?.link2}, {t('plat.com.server4')}} @@ -259,7 +264,7 @@ function ScenarioForwardImpl({defaultActiveKey, defaultSecrets}) { type="submit" disabled={submiting} onClick={(e) => { - updateSecrets(e, 'update', conf.platform, conf.server, conf.secret, !conf.enabled, conf.custom, conf.label, () => { + updateSecrets(e, 'update', conf.platform, conf.stream, conf.server, conf.secret, !conf.enabled, conf.custom, conf.label, () => { updateConfigObject({...conf, enabled: !conf.enabled}); }); }}