Skip to content

Commit

Permalink
Merge pull request #46 from shigde/refactor-fed-connection
Browse files Browse the repository at this point in the history
Refactor fed connection
  • Loading branch information
EnricoSchw authored Sep 29, 2024
2 parents 4c68550 + 6ed1ebf commit 7d1d8e6
Show file tree
Hide file tree
Showing 36 changed files with 653 additions and 465 deletions.
20 changes: 15 additions & 5 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,22 @@
host = "0.0.0.0"
port = 8080

# HTTPS
https = true
crt = "./tls/localhost.crt"
key = "./tls/localhost.key"

[log]
# INFO,WARN,ERROR,DEBUG
level = "DEBUG"
##logfile = "stdout"
logfile = "./mon/dev/log/sfu.log"
logfile = "stdout"
#logfile = "./mon/dev/log/sfu.log"

[store]
# currently only Sqlite3 supported
name = "sqlite3"
dataSource = "shig.db"
loadFixtures = true

[security]
# set list of domains allowed to request api
Expand All @@ -30,7 +36,7 @@ endpoint = "/metrics" # Endpoint where the Prometheus metrics are delivered
port = 8091

[telemetry]
enable = true
enable = false

[rtp]
# Setup ice server for turn and stun
Expand Down Expand Up @@ -65,13 +71,17 @@ iceServer = [{ urls = ["stun:stun.l.google.com:19302"] }]
enable = true
# The domain of your public api
domain = "stream.localhost:8080"
https = false
https = true
release = "v0"
instanceUsername = "shig"
serverName = "shig"
private = false
registerToken = "this-token-must-be-changed-in-public"

[[federation.trustedInstance]]
actor = "http://remote-stream.localhost:8070/federation/accounts/shig"
actor = "https://remote.localhost:8070/federation/accounts/shig"
name = "shig"

[web]
enable = true
dir = "./web"
56 changes: 37 additions & 19 deletions internal/activitypub/models/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,37 +40,55 @@ type Actor struct {

func NewInstanceActor(instanceUrl *url.URL, name string) (*Actor, error) {
actorIri := instance.BuildAccountIri(instanceUrl, name)
now := time.Now()
privateKey, publicKey, err := crypto.GenerateKeys()
if err != nil {
return nil, fmt.Errorf("generation key pair")
}
return &Actor{
ActorType: "Application",
PublicKey: string(publicKey),
PrivateKey: sql.NullString{String: string(privateKey), Valid: true},
ActorIri: actorIri.String(),
FollowingIri: instance.BuildFollowingIri(actorIri).String(),
FollowersIri: instance.BuildFollowersIri(actorIri).String(),
InboxIri: instance.BuildInboxIri(actorIri).String(),
OutboxIri: instance.BuildOutboxIri(actorIri).String(),
SharedInboxIri: instance.BuildSharedInboxIri(instanceUrl).String(),
DisabledAt: sql.NullTime{},
RemoteCreatedAt: now,
PreferredUsername: name,
}, nil
return newActor(Application, actorIri, instanceUrl, name, publicKey, privateKey), nil
}

func NewTrustedInstanceActor(actorIri *url.URL, name string) (*Actor, error) {
instanceUrl, err := url.Parse(fmt.Sprintf("%s://%s", actorIri.Scheme, actorIri.Host))
if err != nil {
return nil, fmt.Errorf("trusted instance actor instanceUrl url")
}
return newActor(Application, actorIri, instanceUrl, name, make([]byte, 0), make([]byte, 0)), nil
}

func NewPersonActor(instanceUrl *url.URL, name string) (*Actor, error) {
actorIri := instance.BuildAccountIri(instanceUrl, name)
privateKey, publicKey, err := crypto.GenerateKeys()
if err != nil {
return nil, fmt.Errorf("generation key pair")
}
return newActor(Person, actorIri, instanceUrl, name, publicKey, privateKey), nil
}

func NewChannelActor(instanceUrl *url.URL, name string) (*Actor, error) {
actorIri := instance.BuildAccountIri(instanceUrl, name)
privateKey, publicKey, err := crypto.GenerateKeys()
if err != nil {
return nil, fmt.Errorf("generation key pair")
}
return newActor(Group, actorIri, instanceUrl, name, publicKey, privateKey), nil
}

func newActor(actorType ActorType, actorIri *url.URL, instanceUrl *url.URL, name string, publicKey []byte, privateKey []byte) *Actor {
now := time.Now()

var privateKeyStr = sql.NullString{}
if len(privateKey) > 0 {
privateKeyStr = sql.NullString{String: string(privateKey), Valid: true}
}
var publicKeyStr = ""
if len(publicKey) > 0 {
publicKeyStr = string(publicKey)
}

return &Actor{
ActorType: "Application",
PublicKey: "",
PrivateKey: sql.NullString{},
ActorType: actorType.String(),
PublicKey: publicKeyStr,
PrivateKey: privateKeyStr,
ActorIri: actorIri.String(),
FollowingIri: instance.BuildFollowingIri(actorIri).String(),
FollowersIri: instance.BuildFollowersIri(actorIri).String(),
Expand All @@ -80,7 +98,7 @@ func NewTrustedInstanceActor(actorIri *url.URL, name string) (*Actor, error) {
DisabledAt: sql.NullTime{},
RemoteCreatedAt: now,
PreferredUsername: name,
}, nil
}
}

func (s *Actor) GetActorIri() *url.URL {
Expand Down
26 changes: 17 additions & 9 deletions internal/auth/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,27 @@ import (
"github.com/shigde/sfu/internal/activitypub/models"
)

func CreateInstanceAccount(actorId string, actor *models.Actor) *Account {
md5Uuid := CreateShigInstanceId(actorId)
func CreateInstanceAccount(userId string, actor *models.Actor) *Account {
md5Uuid := CreateInstanceUuid(userId)
return &Account{
User: actorId,
UUID: md5Uuid.String(),
ActorId: actor.ID,
Actor: actor,
User: userId,
UUID: md5Uuid.String(),
// ActorId: actor.ID,
Actor: actor,
}
}

func CreateShigInstanceId(_ string) uuid.UUID {
actorId := "test-this-out"
nameByte := []byte(actorId)
func CreateInstanceUuid(userId string) uuid.UUID {
nameByte := []byte(userId)
md5String := fmt.Sprintf("%x", md5.Sum(nameByte))
return uuid.MustParse(md5String)
}

func CreateAccount(actorId string, actor *models.Actor, uuidStr string) *Account {
return &Account{
User: actorId,
UUID: uuidStr,
ActorId: actor.ID,
Actor: actor,
}
}
9 changes: 3 additions & 6 deletions internal/config/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/shigde/sfu/internal/metric"
"github.com/shigde/sfu/internal/rtp"
"github.com/shigde/sfu/internal/sfu"
"github.com/shigde/sfu/internal/storage"
"github.com/shigde/sfu/internal/telemetry"
"github.com/spf13/viper"
)
Expand All @@ -31,12 +32,8 @@ func ParseConfig(file string, env *sfu.Environment) (*sfu.Config, error) {
return nil, fmt.Errorf("loading config file: %w", err)
}

if config.StorageConfig.Name != "sqlite3" {
return nil, fmt.Errorf("store.name currently supportes only Sqlite3")
}

if len(config.StorageConfig.DataSource) == 0 {
return nil, fmt.Errorf("store.dataSource should not be empty")
if err := storage.ValidateStorageConfig(config.StorageConfig); err != nil {
return nil, err
}

if len(config.LogConfig.Logfile) == 0 {
Expand Down
4 changes: 2 additions & 2 deletions internal/lobby/clients/api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ func (a *ApiClient) Login() (*authentication.Token, error) {
}

func (a *ApiClient) PostWhepOffer(offer *webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
requestUrl := fmt.Sprintf("%s/fed/space/%s/stream/%s/whep", a.url, a.spaceId, a.streamId)
requestUrl := fmt.Sprintf("%s/space/%s/stream/%s/whep", a.url, a.spaceId, a.streamId)
return a.doOfferRequest(requestUrl, offer)
}

func (a *ApiClient) PostWhipOffer(offer *webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
requestUrl := fmt.Sprintf("%s/fed/space/%s/stream/%s/whip", a.url, a.spaceId, a.streamId)
requestUrl := fmt.Sprintf("%s/space/%s/stream/%s/whip", a.url, a.spaceId, a.streamId)
return a.doOfferRequest(requestUrl, offer)
}

Expand Down
35 changes: 8 additions & 27 deletions internal/lobby/federation/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,44 +46,25 @@ func NewConnector(
}
}

func (c *Connector) BuildIngress() (*commands.OfferIngress, error) {
slog.Debug("lobby.HostController. connect to live stream host instance", "instanceId", c.host.instanceId)
func (c *Connector) Login() error {
slog.Debug("federation connector login to live stream host instance", "instanceId", c.host.instanceId)
if _, err := c.api.Login(); err != nil {
return nil, fmt.Errorf("login to remote host: %w", err)
return fmt.Errorf("login to remote host: %w", err)
}
return nil
}

func (c *Connector) BuildIngress() (*commands.OfferIngress, error) {
slog.Debug("federation connector build ingress for live stream host instance", "instanceId", c.host.instanceId)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cmd := commands.NewOfferIngress(ctx, c.api, c.host.instanceId, sessions.BidirectionalSignalChannel)

return cmd, nil
//
// request := lobby2.newLobbyRequest(ctx, instanceId)
// data := lobby2.newHostGetPipeOfferData()
// request.data = data
// go c.lobby.runRequest(request)
//
// var answer *webrtc.SessionDescription
// select {
// case <-c.ctx.Done():
// return lobby2.errSessionAlreadyClosed
// case err := <-request.err:
// return fmt.Errorf("requesting pipe offer: %w", err)
// case res := <-data.response:
// var err error
// if answer, err = c.hostApi.PostHostPipeOffer(c.settings.space, c.settings.stream, res.offer); err != nil {
// return fmt.Errorf("remote host answer request: %w", err)
// }
// }
// return c.onHostPipeAnswerResponse(answer, instanceId)
}

func (c *Connector) BuildEgress() (*commands.OfferEgress, error) {
slog.Debug("lobby.HostController. connect to live stream host instance", "instanceId", c.host.instanceId)
if _, err := c.api.Login(); err != nil {
return nil, fmt.Errorf("login to remote host: %w", err)
}

slog.Debug("federation connector build egress for live stream host instance", "instanceId", c.host.instanceId)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cmd := commands.NewOfferEgress(ctx, c.api, c.host.instanceId, sessions.BidirectionalSignalChannel)
Expand Down
2 changes: 1 addition & 1 deletion internal/lobby/federation/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type host struct {
func newHost(actorIri url.URL, token string) *host {
preferredName := preferredNameFromActor(actorIri)
actorId := fmt.Sprintf("%s@%s", preferredName, actorIri.Host)
instanceId := auth.CreateShigInstanceId(actorId)
instanceId := auth.CreateInstanceUuid(actorId)
return &host{
actorIri: actorIri,
actorId: actorId,
Expand Down
2 changes: 1 addition & 1 deletion internal/lobby/federation/host_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewHostSettings(host hostx, homeInstanceActor *url.URL, token string) *host
// name := "[email protected]"
preferredName := "shig"
actorId := fmt.Sprintf("%s@%s", preferredName, homeInstanceActor.Host)
instanceId := auth.CreateShigInstanceId(actorId)
instanceId := auth.CreateInstanceUuid(actorId)
return &hostSettings{
instanceId: instanceId,
isHost: isHost,
Expand Down
45 changes: 29 additions & 16 deletions internal/lobby/lobby.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func newLobby(entity *LobbyEntity, rtp sessions.RtpEngine, homeActorIri *url.URL

connector: connector,
}
// session handling should be sequentiell to avoid races conditions in whole group state
// session handling should be sequentiell to avoid race conditions in whole group state
go func(l *lobby, sessionCreator <-chan sessions.Item, sessionGarbage chan sessions.Item, cmdRunner <-chan command) {
for {
select {
Expand Down Expand Up @@ -114,11 +114,13 @@ func newLobby(entity *LobbyEntity, rtp sessions.RtpEngine, homeActorIri *url.URL
}
}(lobObj, creator, garbage, runner)

if !connector.IsThisInstanceLiveSteamHost() {
go func(l *lobby) {
l.connectToLiveStreamHostInstance()
}(lobObj)
}
// if this local instance not the owner of the stream, the lobby will relay all tracks to the remote host of the
// stream
//if !connector.IsThisInstanceLiveSteamHost() {
// go func(l *lobby) {
// l.connectToLiveStreamHostInstance()
// }(lobObj)
//}

return lobObj
}
Expand Down Expand Up @@ -169,22 +171,19 @@ func (l *lobby) handle(cmd command) {
}

func (l *lobby) connectToLiveStreamHostInstance() {
if ok := l.newSession(l.connector.GetInstanceId(), sessions.InstanceSession); !ok {
slog.Error("no session found for instance connection")
// First login to remote owner instance of this live stream
if err := l.connector.Login(); err != nil {
slog.Error("logging in to remote shig instance")
return
}

cmdIngress, err := l.connector.BuildIngress()
if err != nil {
slog.Error("build Ingress connection", "err", err)
return
}
l.runCommand(cmdIngress)
if err = cmdIngress.WaitForDone(); err != nil {
slog.Error("run ingress build connection cmd", "err", err)
// After logged in we build a local session for this connection
if ok := l.newSession(l.connector.GetInstanceId(), sessions.InstanceSession); !ok {
slog.Error("no session connected for instance connection")
return
}

// We open a local Egress endpoint and connecting to a remote Ingress Endpoint
cmdEgress, err := l.connector.BuildEgress()
if err != nil {
slog.Error("build Egress connection", "err", err)
Expand All @@ -195,4 +194,18 @@ func (l *lobby) connectToLiveStreamHostInstance() {
slog.Error("run egress build connection cmd", "err", err)
return
}

// At least we open a local Ingress endpoint and connection to a remote Egress endpoint
cmdIngress, err := l.connector.BuildIngress()
if err != nil {
slog.Error("build Ingress connection", "err", err)
return
}
l.runCommand(cmdIngress)
if err = cmdIngress.WaitForDone(); err != nil {
slog.Error("run ingress build connection cmd", "err", err)
return
}

// Connection established!!
}
2 changes: 1 addition & 1 deletion internal/lobby/lobby_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

type LobbyEntity struct {
LiveStreamId uuid.UUID `json:"streamId"`
LiveStreamId uuid.UUID `json:"streamId" gorm:"not null;index;unique;"`
UUID uuid.UUID `json:"-"`
Space string
IsRunning bool `json:"isLobbyRunning"`
Expand Down
2 changes: 1 addition & 1 deletion internal/lobby/lobby_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (m *LobbyManager) NewIngressResource(ctx context.Context, lobbyId uuid.UUID
return nil, fmt.Errorf("creating new session failes")
}

cmd := commands.NewCreateIngress(ctx, user, offer, sessions.SilentSignalChannel)
cmd := commands.NewCreateIngress(ctx, user, offer, sessions.UnidirectionalSignalChannel)
lobbyObj.runCommand(cmd)

select {
Expand Down
Loading

0 comments on commit 7d1d8e6

Please sign in to comment.