Skip to content

Commit

Permalink
Let forward feature support select stream by its name. (#220)
Browse files Browse the repository at this point in the history
when config source stream name, the forward task will select the stream
by this name to forward, when source stream name is missing, it will
random select one active stream to forward.
  • Loading branch information
suzp1984 authored Nov 15, 2024
1 parent 11cce62 commit 9b475d9
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 5 deletions.
18 changes: 16 additions & 2 deletions platform/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/ossrs/go-oryx-lib/errors"
ohttp "github.com/ossrs/go-oryx-lib/http"
"github.com/ossrs/go-oryx-lib/logger"

// Use v8 because we use Go 1.16+, while v9 requires Go 1.18+
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
Expand Down Expand Up @@ -334,6 +335,8 @@ func (v *ForwardWorker) Start(ctx context.Context) error {
type ForwardConfigure struct {
// The platform name, for example, wx
Platform string `json:"platform"`
// the source stream in oryx.
Stream string `json:"stream"`
// The RTMP server url, for example, rtmp://localhost/live
Server string `json:"server"`
// The RTMP stream and secret, for example, livestream
Expand All @@ -347,13 +350,14 @@ type ForwardConfigure struct {
}

func (v *ForwardConfigure) String() string {
return fmt.Sprintf("platform=%v, server=%v, secret=%v, enabled=%v, customed=%v, label=%v",
v.Platform, v.Server, v.Secret, v.Enabled, v.Customed, v.Label,
return fmt.Sprintf("platform=%v, stream=%v, server=%v, secret=%v, enabled=%v, customed=%v, label=%v",
v.Platform, v.Stream, v.Server, v.Secret, v.Enabled, v.Customed, v.Label,
)
}

func (v *ForwardConfigure) Update(u *ForwardConfigure) error {
v.Platform = u.Platform
v.Stream = u.Stream
v.Server = u.Server
v.Secret = u.Secret
v.Label = u.Label
Expand Down Expand Up @@ -500,18 +504,28 @@ func (v *ForwardTask) Run(ctx context.Context) error {
ctx = logger.WithContext(ctx)
logger.Tf(ctx, "forward run task %v", v.String())

// select active stream by stream name or random select one when stream name is empty.
selectActiveStream := func() (*SrsStream, error) {
streams, err := rdb.HGetAll(ctx, SRS_STREAM_ACTIVE).Result()
if err != nil {
return nil, errors.Wrapf(err, "hgetall %v", SRS_STREAM_ACTIVE)
}

streamName := v.config.Stream

var best *SrsStream
for _, v := range streams {
var stream SrsStream
if err := json.Unmarshal([]byte(v), &stream); err != nil {
return nil, errors.Wrapf(err, "unmarshal %v", v)
}
if streamName != "" {
if stream.Stream == streamName {
best = &stream
break
}
continue
}

if best == nil {
best = &stream
Expand Down
11 changes: 8 additions & 3 deletions ui/src/pages/ScenarioForward.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ function ScenarioForwardImpl({defaultActiveKey, defaultSecrets}) {
}, [configs, setConfigs]);

// Update the forward config to server.
const updateSecrets = React.useCallback((e, action, platform, server, secret, enabled, custom, label, onSuccess) => {
const updateSecrets = React.useCallback((e, action, platform, stream, server, secret, enabled, custom, label, onSuccess) => {
e.preventDefault();
if (!server) return alert(t('plat.com.addr'));
if (custom && !label) return alert(t('plat.com.label'));
Expand All @@ -171,7 +171,7 @@ function ScenarioForwardImpl({defaultActiveKey, defaultSecrets}) {
setSubmiting(true);

axios.post('/terraform/v1/ffmpeg/forward/secret', {
action, platform, server, secret, enabled: !!enabled, custom: !!custom, label,
action, platform, stream, server, secret, enabled: !!enabled, custom: !!custom, label,
}, {
headers: Token.loadBearerHeader(),
}).then(res => {
Expand Down Expand Up @@ -235,6 +235,11 @@ function ScenarioForwardImpl({defaultActiveKey, defaultSecrets}) {
<Form.Text> * {conf.custom ? `(${t('helper.required')})` : `(${t('helper.optional')})`} {t('plat.com.name2')}</Form.Text>
<Form.Control as="input" defaultValue={conf.label} onChange={(e) => updateConfigObject({...conf, label: e.target.value})}/>
</Form.Group>
<Form.Group className="mb-3">
<Form.Label>{t('plat.com.source')}</Form.Label>
{!conf.custom && <Form.Text> * {t('plat.com.source')} check System-{'>'}Streams tab</Form.Text>}
<Form.Control as="input" defaultValue={conf.stream} onChange={(e) => updateConfigObject({...conf, stream: e.target.value})}/>
</Form.Group>
<Form.Group className="mb-3">
<Form.Label>{conf.custom ? t('plat.com.server') : t('plat.com.server2')}</Form.Label>
{!conf.custom && <Form.Text> * {t('plat.com.server3')} <a href={conf?.locale?.link} target='_blank' rel='noreferrer'>{conf?.locale?.link2}</a>, {t('plat.com.server4')}</Form.Text>}
Expand All @@ -259,7 +264,7 @@ function ScenarioForwardImpl({defaultActiveKey, defaultSecrets}) {
type="submit"
disabled={submiting}
onClick={(e) => {
updateSecrets(e, 'update', conf.platform, conf.server, conf.secret, !conf.enabled, conf.custom, conf.label, () => {
updateSecrets(e, 'update', conf.platform, conf.stream, conf.server, conf.secret, !conf.enabled, conf.custom, conf.label, () => {
updateConfigObject({...conf, enabled: !conf.enabled});
});
}}
Expand Down

0 comments on commit 9b475d9

Please sign in to comment.