diff --git a/src/projects/config/items/virtual_hosts/applications/publishers/srt_publisher.h b/src/projects/config/items/virtual_hosts/applications/publishers/srt_publisher.h index 119e5fc15..4c38c0d1a 100644 --- a/src/projects/config/items/virtual_hosts/applications/publishers/srt_publisher.h +++ b/src/projects/config/items/virtual_hosts/applications/publishers/srt_publisher.h @@ -20,12 +20,13 @@ namespace cfg { struct SrtPublisher : public Publisher { + public: PublisherType GetType() const override { return PublisherType::Srt; } }; } // namespace pub - } // namespace app - } // namespace vhost + } // namespace app + } // namespace vhost } // namespace cfg diff --git a/src/projects/publishers/srt/srt_playlist.cpp b/src/projects/publishers/srt/srt_playlist.cpp new file mode 100644 index 000000000..0e87211f1 --- /dev/null +++ b/src/projects/publishers/srt/srt_playlist.cpp @@ -0,0 +1,144 @@ +//============================================================================== +// +// OvenMediaEngine +// +// Created by Hyunjun Jang +// Copyright (c) 2025 AirenSoft. All rights reserved. +// +//============================================================================== +#include "srt_playlist.h" + +#include +#include +#include + +#include "srt_private.h" + +#define SRT_STREAM_DESC \ + _stream_info->GetApplicationName(), _stream_info->GetName().CStr(), _stream_info->GetId() + +#define logap(format, ...) logtp("[%s/%s(%u)] " format, SRT_STREAM_DESC, ##__VA_ARGS__) +#define logad(format, ...) logtd("[%s/%s(%u)] " format, SRT_STREAM_DESC, ##__VA_ARGS__) +#define logas(format, ...) logts("[%s/%s(%u)] " format, SRT_STREAM_DESC, ##__VA_ARGS__) + +#define logai(format, ...) logti("[%s/%s(%u)] " format, SRT_STREAM_DESC, ##__VA_ARGS__) +#define logaw(format, ...) logtw("[%s/%s(%u)] " format, SRT_STREAM_DESC, ##__VA_ARGS__) +#define logae(format, ...) logte("[%s/%s(%u)] " format, SRT_STREAM_DESC, ##__VA_ARGS__) +#define logac(format, ...) logtc("[%s/%s(%u)] " format, SRT_STREAM_DESC, ##__VA_ARGS__) + +namespace pub +{ + SrtPlaylist::SrtPlaylist( + const std::shared_ptr &stream_info, + const std::shared_ptr &playlist_info, + const std::shared_ptr &sink) + : _stream_info(stream_info), + _playlist_info(playlist_info), + _sink(sink) + { + _packetizer = std::make_shared(); + } + + void SrtPlaylist::AddTrack(const std::shared_ptr &track) + { + // Duplicated tracks will be ignored by the packetizer + _packetizer->AddTrack(track); + _track_map[track->GetId()] = track; + } + + void SrtPlaylist::AddTracks(const std::vector> &tracks) + { + for (const auto &track : tracks) + { + // Duplicated tracks will be ignored by the packetizer + _packetizer->AddTrack(track); + _track_map[track->GetId()] = track; + } + } + + bool SrtPlaylist::Start() + { + _packetizer->AddSink(GetSharedPtrAs()); + + return _packetizer->Start(); + } + + bool SrtPlaylist::Stop() + { + return _packetizer->Stop(); + } + + void SrtPlaylist::EnqueuePacket(const std::shared_ptr &media_packet) + { +#if DEBUG + if (_track_map.find(media_packet->GetTrackId()) == _track_map.end()) + { + logte("The track is not found in the playlist map"); + OV_ASSERT2(false); + } +#endif // DEBUG + + _packetizer->AppendFrame(media_packet); + } + + void SrtPlaylist::SendData(const std::vector> &packets) + { + if (_sink == nullptr) + { + return; + } + + auto self = GetSharedPtrAs(); + + for (auto &packet : packets) + { + auto size = _data_to_send->GetLength(); + const auto &data = packet->GetData(); + + // Broadcast if the data size exceeds the SRT's payload length + if ((size + data->GetLength()) > SRT_LIVE_DEF_PLSIZE) + { + _sink->OnSrtPlaylistData(self, _data_to_send); + _data_to_send = data->Clone(); + } + else + { + _data_to_send->Append(packet->GetData()); + } + } + } + + void SrtPlaylist::OnPsi(const std::vector> &tracks, const std::vector> &psi_packets) + { + std::shared_ptr psi_data = std::make_shared(); + + // Concatenate PSI packets + for (const auto &packet : psi_packets) + { + psi_data->Append(packet->GetData()); + } + + logap("OnPsi - %zu packets (total %zu bytes)", psi_packets.size(), psi_data->GetLength()); + + _psi_data = std::move(psi_data); + + SendData(psi_packets); + } + + void SrtPlaylist::OnFrame(const std::shared_ptr &media_packet, const std::vector> &pes_packets) + { +#if DEBUG + // Since adding up the total packet size is costly, it is calculated only in debug mode + size_t total_packet_size = 0; + + for (const auto &packet : pes_packets) + { + total_packet_size += packet->GetData()->GetLength(); + } + + logap("OnFrame - %zu packets (total %zu bytes)", pes_packets.size(), total_packet_size); +#endif // DEBUG + + SendData(pes_packets); + } +} // namespace pub diff --git a/src/projects/publishers/srt/srt_playlist.h b/src/projects/publishers/srt/srt_playlist.h new file mode 100644 index 000000000..1c3973c13 --- /dev/null +++ b/src/projects/publishers/srt/srt_playlist.h @@ -0,0 +1,97 @@ +//============================================================================== +// +// OvenMediaEngine +// +// Created by Hyunjun Jang +// Copyright (c) 2025 AirenSoft. All rights reserved. +// +//============================================================================== +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace pub +{ + class SrtPlaylist; + + class SrtPlaylistSink + { + public: + virtual ~SrtPlaylistSink() = default; + + virtual void OnSrtPlaylistData( + const std::shared_ptr &playlist, + const std::shared_ptr &data) = 0; + }; + + struct SrtData + { + SrtData( + const std::shared_ptr &playlist, + const std::shared_ptr &data) + : playlist(playlist), + data(data) + { + } + + // The playlist that this data belongs to + std::shared_ptr playlist; + + // The data to send + std::shared_ptr data; + }; + + // SrtPlaylist IS NOT thread safe, so it should be used with a lock if needed + class SrtPlaylist : public mpegts::PacketizerSink, public ov::EnableSharedFromThis + { + public: + SrtPlaylist( + const std::shared_ptr &stream_info, + const std::shared_ptr &playlist_info, + const std::shared_ptr &sink); + + void AddTrack(const std::shared_ptr &track); + void AddTracks(const std::vector> &tracks); + + bool Start(); + bool Stop(); + + void EnqueuePacket(const std::shared_ptr &media_packet); + + //-------------------------------------------------------------------- + // Implementation of mpegts::PacketizerSink + //-------------------------------------------------------------------- + // Do not need to lock _packetizer_mutex inside OnPsi() because it will be called only once when the packetizer starts + void OnPsi(const std::vector> &tracks, const std::vector> &psi_packets) override; + // Do not need to lock _packetizer_mutex inside OnFrame() because it's called after acquiring the lock in EnqueuePacket() + // (It's called in the thread that calls EnqueuePacket()) + void OnFrame(const std::shared_ptr &media_packet, const std::vector> &pes_packets) override; + //-------------------------------------------------------------------- + + const std::shared_ptr &GetPsiData() const + { + return _psi_data; + } + + private: + void SendData(const std::vector> &packets); + + private: + std::shared_ptr _stream_info; + std::shared_ptr _playlist_info; + + std::map> _track_map; + + std::shared_ptr _packetizer; + + std::shared_ptr _sink; + + std::shared_ptr _psi_data; + std::shared_ptr _data_to_send = std::make_shared(); + }; +} // namespace pub diff --git a/src/projects/publishers/srt/srt_publisher.cpp b/src/projects/publishers/srt/srt_publisher.cpp index 65d5e92f2..66339227d 100644 --- a/src/projects/publishers/srt/srt_publisher.cpp +++ b/src/projects/publishers/srt/srt_publisher.cpp @@ -272,16 +272,17 @@ namespace pub } else { - // {vhost}/{app}/{stream} format + // {vhost}/{app}/{stream}[/{playlist}] format auto parts = stream_path.Split("/"); + auto part_count = parts.size(); - if (parts.size() != 3) + if ((part_count != 3) && (part_count != 4)) { - logte("The streamid for SRT must be in the following format: {vhost}/{app}/{stream}, but [%s]", stream_path.CStr()); + logte("The streamid for SRT must be in the following format: {vhost}/{app}/{stream}[/{playlist}], but [%s]", stream_path.CStr()); return nullptr; } - // Convert to srt://{vhost}/{app}/{stream} + // Convert to srt://{vhost}/{app}/{stream}[/{playlist}] stream_path.Prepend("srt://"); is_vhost = true; @@ -298,14 +299,8 @@ namespace pub } else { - if (streamid.IsEmpty()) - { - logte("The streamid for SRT must be in one of the following formats: srt://{host}[:{port}]/{app}/{stream}[?{query}={value}] or {vhost}/{app}/{stream}, but [%s]", stream_path.CStr()); - } - else - { - logte("The streamid for SRT must be in one of the following formats: srt://{host}[:{port}]/{app}/{stream}[?{query}={value}] or {vhost}/{app}/{stream}, but [%s] (streamid: [%s])", stream_path.CStr(), streamid.CStr()); - } + auto extra_log = streamid.IsEmpty() ? "" : ov::String::FormatString(" (streamid: [%s])", streamid.CStr()); + logte("The streamid for SRT must be in one of the following formats: srt://{host}[:{port}]/{app}/{stream}[/{playlist}][?{query}={value}] or {vhost}/{app}/{stream}[/{playlist}], but [%s]%s", stream_path.CStr(), extra_log.CStr()); } return final_url; @@ -414,18 +409,38 @@ namespace pub return; } - auto stream = application->GetStream(final_url->Stream()); + auto stream = application->GetStreamAs(final_url->Stream()); if (stream == nullptr) + { + stream = std::dynamic_pointer_cast(PullStream(final_url, vhost_app_name, final_url->Host(), final_url->Stream())); + } + + if(stream == nullptr) { logte("Could not find stream: %s", final_url->Stream().CStr()); + remote->Close(); + return; + } + + auto playlist_name = final_url->File(); + std::shared_ptr srt_playlist = nullptr; + + if (playlist_name.IsEmpty()) + { + playlist_name = DEFAULT_SRT_PLAYLIST_NAME; + } - ::srt_setrejectreason(remote->GetNativeHandle(), 1404); + srt_playlist = stream->GetSrtPlaylist(playlist_name); + + if (srt_playlist == nullptr) + { + logte("Could not find playlist: %s", final_url->File().CStr()); remote->Close(); return; } - auto session = SrtSession::Create(application, stream, remote->GetNativeHandle(), remote); + auto session = SrtSession::Create(application, stream, remote->GetNativeHandle(), remote, srt_playlist); { std::unique_lock lock(_session_map_mutex); diff --git a/src/projects/publishers/srt/srt_session.cpp b/src/projects/publishers/srt/srt_session.cpp index e0a768695..a422a05c8 100644 --- a/src/projects/publishers/srt/srt_session.cpp +++ b/src/projects/publishers/srt/srt_session.cpp @@ -31,10 +31,11 @@ namespace pub std::shared_ptr SrtSession::Create(const std::shared_ptr &application, const std::shared_ptr &stream, uint32_t session_id, - const std::shared_ptr &connector) + const std::shared_ptr &connector, + const std::shared_ptr &srt_playlist) { auto session_info = info::Session(*std::static_pointer_cast(stream), session_id); - auto session = std::make_shared(session_info, application, stream, connector); + auto session = std::make_shared(session_info, application, stream, connector, srt_playlist); if (session->Start() == false) { @@ -47,10 +48,12 @@ namespace pub SrtSession::SrtSession(const info::Session &session_info, const std::shared_ptr &application, const std::shared_ptr &stream, - const std::shared_ptr &connector) + const std::shared_ptr &connector, + const std::shared_ptr &srt_playlist) : Session(session_info, application, stream) { _connector = connector; + _srt_playlist = srt_playlist; MonitorInstance->OnSessionConnected(*GetStream(), PublisherType::Srt); } @@ -89,40 +92,48 @@ namespace pub return Session::Stop(); } + std::shared_ptr SrtSession::ToSrtData(const std::any &packet) + { + std::shared_ptr srt_data; + + try + { + srt_data = std::any_cast>(packet); + } + catch (const std::bad_any_cast &e) + { + logad("An incorrect type of packet was input from the stream. (%s)", e.what()); + + OV_ASSERT2(false); + return nullptr; + } + + return (srt_data->playlist == _srt_playlist) ? srt_data : nullptr; + } + void SrtSession::SendOutgoingData(const std::any &packet) { + auto srt_data = ToSrtData(packet); + + if (srt_data == nullptr) + { + // The packet is not for this session + return; + } + std::shared_ptr mpegts_data; if (_need_to_send_psi) { _need_to_send_psi = false; - - auto stream = std::dynamic_pointer_cast(GetStream()); - - if (stream != nullptr) - { - _connector->Send(stream->GetPsiData()); - } - else - { - OV_ASSERT2(false); - } + _connector->Send(srt_data->playlist->GetPsiData()); } - try - { - mpegts_data = std::any_cast>(packet); + mpegts_data = srt_data->data; - if (mpegts_data == nullptr) - { - OV_ASSERT2("SrtStream::BroadcastPacket never sends a null packet"); - return; - } - } - catch (const std::bad_any_cast &e) + if (mpegts_data == nullptr) { - logad("An incorrect type of packet was input from the stream. (%s)", e.what()); - OV_ASSERT2(false); + OV_ASSERT2("SrtStream::BroadcastPacket never sends a null packet"); return; } diff --git a/src/projects/publishers/srt/srt_session.h b/src/projects/publishers/srt/srt_session.h index c0bdd5ba8..913ea1c16 100644 --- a/src/projects/publishers/srt/srt_session.h +++ b/src/projects/publishers/srt/srt_session.h @@ -12,6 +12,8 @@ #include #include +#include "./srt_playlist.h" + namespace pub { class SrtSession final : public Session @@ -20,12 +22,14 @@ namespace pub static std::shared_ptr Create(const std::shared_ptr &application, const std::shared_ptr &stream, uint32_t srt_session_id, - const std::shared_ptr &connector); + const std::shared_ptr &connector, + const std::shared_ptr &srt_playlist); SrtSession(const info::Session &session_info, const std::shared_ptr &application, const std::shared_ptr &stream, - const std::shared_ptr &connector); + const std::shared_ptr &connector, + const std::shared_ptr &srt_playlist); ~SrtSession() override final; //-------------------------------------------------------------------- @@ -64,9 +68,13 @@ namespace pub private: ov::String GetAppStreamName() const; + std::shared_ptr ToSrtData(const std::any &packet); + private: std::shared_ptr _connector; + std::shared_ptr _srt_playlist; + bool _need_to_send_psi = true; bool _is_keyframe_sent = false; diff --git a/src/projects/publishers/srt/srt_stream.cpp b/src/projects/publishers/srt/srt_stream.cpp index 591fc0490..1052671f9 100644 --- a/src/projects/publishers/srt/srt_stream.cpp +++ b/src/projects/publishers/srt/srt_stream.cpp @@ -12,6 +12,7 @@ #include "base/publisher/application.h" #include "base/publisher/stream.h" +#include "srt_playlist.h" #include "srt_private.h" #include "srt_session.h" @@ -48,256 +49,407 @@ namespace pub logad("SrtStream has been terminated finally"); } -#define SRT_SET_TRACK(from, to, supported, message, ...) \ - if (to == nullptr) \ - { \ - if (supported) \ - { \ - to = from; \ - } \ - else \ - { \ - logai("SrtStream - " message, ##__VA_ARGS__); \ - } \ - } - - bool SrtStream::Start() + std::shared_ptr SrtStream::PrepareDefaultPlaylist() { - if (GetState() != Stream::State::CREATED) - { - return false; - } - - // If this stream is from OriginMapStore, don't register it to OriginMapStore again. - if (IsFromOriginMapStore() == false) - { - auto result = ocst::Orchestrator::GetInstance()->RegisterStreamToOriginMapStore(GetApplicationInfo().GetVHostAppName(), GetName()); - - if (result == CommonErrorCode::ERROR) - { - logaw("Failed to register stream to origin map store"); - return false; - } - } + auto playlist_name = DEFAULT_SRT_PLAYLIST_NAME; + auto playlist = Stream::GetPlaylist(playlist_name); - if (CreateStreamWorker(_worker_count) == false) + if (playlist != nullptr) { - return false; + // The playlist is already created + logaw("The playlist %s is already created", playlist_name); + OV_ASSERT2(false); + return playlist; } - logad("The SRT stream has been started"); - - auto packetizer = std::make_shared(); - + // Pick the first track of each media type std::shared_ptr first_video_track = nullptr; std::shared_ptr first_audio_track = nullptr; std::shared_ptr first_data_track = nullptr; - for (const auto &[id, track] : GetTracks()) + for (const auto &[id, track] : GetSupportedTracks(GetTracks())) { - switch (track->GetMediaType()) + const auto media_type = track->GetMediaType(); + + switch (media_type) { case cmn::MediaType::Video: - SRT_SET_TRACK(track, first_video_track, - mpegts::Packetizer::IsSupportedCodec(track->GetCodecId()), - "Ignore unsupported video codec (%s)", StringFromMediaCodecId(track->GetCodecId()).CStr()); + first_video_track = (first_video_track == nullptr) ? track : first_video_track; break; case cmn::MediaType::Audio: - SRT_SET_TRACK(track, first_audio_track, - mpegts::Packetizer::IsSupportedCodec(track->GetCodecId()), - "Ignore unsupported audio codec (%s)", StringFromMediaCodecId(track->GetCodecId()).CStr()); + first_audio_track = (first_audio_track == nullptr) ? track : first_audio_track; break; case cmn::MediaType::Data: - SRT_SET_TRACK(track, first_data_track, true, ); + first_data_track = (first_data_track == nullptr) ? track : first_data_track; break; default: - logad("SrtStream - Ignore unsupported media type(%s)", GetMediaTypeString(track->GetMediaType()).CStr()); - continue; + logad("SrtStream - Ignore unsupported media type: %s", GetMediaTypeString(track->GetMediaType()).CStr()); + break; } } if ((first_video_track == nullptr) && (first_audio_track == nullptr)) { logaw("There is no track to create SRT stream"); - return false; + return nullptr; } - bool result = ((first_video_track != nullptr) ? packetizer->AddTrack(first_video_track) : true) && - ((first_audio_track != nullptr) ? packetizer->AddTrack(first_audio_track) : true) && - ((first_data_track != nullptr) ? packetizer->AddTrack(first_data_track) : true); + auto new_playlist = std::make_shared(playlist_name, playlist_name); + auto rendition = std::make_shared( + "default", + (first_video_track != nullptr) ? first_video_track->GetVariantName() : "", + (first_audio_track != nullptr) ? first_audio_track->GetVariantName() : ""); - if (result == false) - { - logae("Failed to add track to packetizer"); - return false; - } + new_playlist->AddRendition(rendition); + + AddPlaylist(new_playlist); - _psi_data->Clear(); - _data_to_send->Clear(); + return new_playlist; + } + + bool SrtStream::IsSupportedTrack(const std::shared_ptr &track) const + { + auto media_type = track->GetMediaType(); - if (packetizer->AddSink(GetSharedPtrAs())) + if ((media_type == cmn::MediaType::Video) || (media_type == cmn::MediaType::Audio)) { - std::unique_lock lock(_packetizer_mutex); - _packetizer = packetizer; + auto codec_id = track->GetCodecId(); + + if (mpegts::Packetizer::IsSupportedCodec(codec_id)) + { + return true; + } + else + { + logai("Ignore unsupported %s codec (%s)", + StringFromMediaType(media_type).CStr(), + StringFromMediaCodecId(codec_id).CStr()); + } + } + else if (media_type == cmn::MediaType::Data) + { + return true; } else { - logae("Could not initialize packetizer for SRT Stream"); - return false; + logai("Ignore unsupported media type: %s", StringFromMediaType(media_type).CStr()); } - return packetizer->Start() && Stream::Start(); + return false; } - bool SrtStream::Stop() + std::shared_ptr SrtStream::GetSrtPlaylistInternal(const ov::String &file_name) { - if (GetState() != Stream::State::STARTED) + auto item = _srt_playlist_map_by_file_name.find(file_name); + if (item == _srt_playlist_map_by_file_name.end()) { - return false; + return nullptr; } - logad("The SRT stream has been stopped"); + return item->second; + } + + std::shared_ptr SrtStream::GetSrtPlaylist(const ov::String &file_name) + { + std::shared_lock lock(_srt_playlist_map_mutex); + + return GetSrtPlaylistInternal(file_name); + } + + std::map> SrtStream::GetSupportedTracks(const std::map> &track_map) const + { + return ov::maputils::Filter( + track_map, + std::bind(&SrtStream::IsSupportedTrack, this, std::placeholders::_2)); + } + + std::vector> SrtStream::GetSupportedTracks(const std::vector> &tracks) const + { + std::vector> supported_tracks; - auto linked_input_stream = GetLinkedInputStream(); + std::copy_if( + tracks.begin(), tracks.end(), + std::back_inserter(supported_tracks), + std::bind(&SrtStream::IsSupportedTrack, this, std::placeholders::_1)); - if ((linked_input_stream != nullptr) && (linked_input_stream->IsFromOriginMapStore() == false)) + return supported_tracks; + } + + std::vector> SrtStream::GetSupportedTracks(const std::shared_ptr &group) const + { + return (group != nullptr) ? GetSupportedTracks(group->GetTracks()) : std::vector>(); + } + + void SrtStream::AddSupportedTrack(const std::shared_ptr &track, std::map> &to) + { + auto media_type = track->GetMediaType(); + + if ((media_type == cmn::MediaType::Video) || (media_type == cmn::MediaType::Audio)) { - // Unregister stream if OriginMapStore is enabled - auto result = ocst::Orchestrator::GetInstance()->UnregisterStreamFromOriginMapStore(GetApplicationInfo().GetVHostAppName(), GetName()); + auto codec_id = track->GetCodecId(); - if (result == CommonErrorCode::ERROR) + if (mpegts::Packetizer::IsSupportedCodec(codec_id)) + { + to[track->GetVariantName()] = track; + } + else { - logaw("Failed to unregister stream from origin map store"); - return false; + logai("SrtStream - Ignore unsupported %s codec (%s)", + StringFromMediaType(media_type).CStr(), + StringFromMediaCodecId(codec_id).CStr()); } } - - std::shared_ptr packetizer; - + else if (media_type == cmn::MediaType::Data) { - std::unique_lock lock(_packetizer_mutex); - packetizer = std::move(_packetizer); + to[track->GetVariantName()] = track; } - - OV_ASSERT2(packetizer != nullptr); - - if (packetizer != nullptr) + else { - packetizer->Stop(); - packetizer.reset(); + logai("SrtStream - Ignore unsupported media type: %s", StringFromMediaType(media_type).CStr()); } + } - return Stream::Stop(); + void SrtStream::PrepareForTrack( + const std::shared_ptr &track, + std::map> &psi_data_map, + std::map> &data_to_send_map) + { + psi_data_map[track->GetId()] = std::make_shared(); + data_to_send_map[track->GetId()] = std::make_shared(); } - void SrtStream::EnqueuePacket(const std::shared_ptr &media_packet) + bool SrtStream::Start() { - if (GetState() != Stream::State::STARTED) + std::unique_lock lock(_srt_playlist_map_mutex); + + if (GetState() != Stream::State::CREATED) { - return; + return false; } - std::unique_lock lock(_packetizer_mutex); - - if (_packetizer == nullptr) + if (CreateStreamWorker(_worker_count) == false) { - OV_ASSERT2(false); -#if DEBUG - logaw("Packetizer is not initialized"); -#endif // DEBUG - return; + return false; } - _packetizer->AppendFrame(media_packet); - } + auto config = GetApplication()->GetConfig(); + auto srt_config = config.GetPublishers().GetSrtPublisher(); - void SrtStream::SendVideoFrame(const std::shared_ptr &media_packet) - { - EnqueuePacket(media_packet); - } + std::map> data_tracks; - void SrtStream::SendAudioFrame(const std::shared_ptr &media_packet) - { - EnqueuePacket(media_packet); - } + PrepareDefaultPlaylist(); - void SrtStream::SendDataFrame(const std::shared_ptr &media_packet) - { - EnqueuePacket(media_packet); - } + data_tracks = ov::maputils::Filter(GetSupportedTracks(GetTracks()), [](int32_t track_id, const std::shared_ptr &track) { + return track->GetMediaType() == cmn::MediaType::Data; + }); - void SrtStream::BroadcastIfReady(const std::vector> &packets) - { - std::vector> data_list; - size_t total_data_size = 0; + auto suceeded = true; + for (const auto &[file_name, playlist] : GetPlaylists()) { - for (auto &packet : packets) + bool is_default_playlist = (file_name == DEFAULT_SRT_PLAYLIST_NAME); + auto &rendition_list = playlist->GetRenditionList(); + + auto srt_playlist = GetSrtPlaylistInternal(file_name); + + if (srt_playlist != nullptr) { - auto size = _data_to_send->GetLength(); - const auto &data = packet->GetData(); + // The playlist is already created + continue; + } - if ((size + data->GetLength()) > SRT_LIVE_DEF_PLSIZE) - { - total_data_size += _data_to_send->GetLength(); - data_list.push_back(std::move(_data_to_send)); + srt_playlist = std::make_shared( + GetSharedPtrAs(), + playlist, + GetSharedPtrAs()); + + _srt_playlist_map_by_file_name[file_name] = srt_playlist; - _data_to_send = data->Clone(); + auto first_supported_rendition_found = false; + + for (const auto &rendition : rendition_list) + { + if (first_supported_rendition_found == false) + { + auto video_variant_name = rendition->GetVideoVariantName(); + auto audio_variant_name = rendition->GetAudioVariantName(); + + auto video_media_track_group = (video_variant_name.IsEmpty() == false) ? GetMediaTrackGroup(video_variant_name) : nullptr; + auto audio_media_track_group = (audio_variant_name.IsEmpty() == false) ? GetMediaTrackGroup(audio_variant_name) : nullptr; + + if ((video_variant_name.IsEmpty() == false) && (video_media_track_group == nullptr)) + { + logaw("%s video is excluded from the %s rendition in %s playlist because there is no video track.", + video_variant_name.CStr(), rendition->GetName().CStr(), playlist->GetFileName().CStr()); + video_variant_name.Clear(); + } + + if ((audio_variant_name.IsEmpty() == false) && (audio_media_track_group == nullptr)) + { + logaw("%s audio is excluded from the %s rendition in %s playlist because there is no audio track.", + audio_variant_name.CStr(), rendition->GetName().CStr(), playlist->GetFileName().CStr()); + audio_variant_name.Clear(); + } + + if (video_variant_name.IsEmpty() && audio_variant_name.IsEmpty()) + { + logaw("Invalid rendition %s. The variant name video(%s) audio(%s) is not found in the track list", + rendition->GetName().CStr(), video_variant_name.CStr(), audio_variant_name.CStr()); + continue; + } + + auto video_tracks = GetSupportedTracks(video_media_track_group); + auto audio_tracks = GetSupportedTracks(audio_media_track_group); + + if (video_tracks.empty() && audio_tracks.empty()) + { + logaw("Could not add variants (video: %s, audio: %s) because there is no supported codec.", + video_variant_name.CStr(), audio_variant_name.CStr()); + continue; + } + + for (const auto &track : video_tracks) + { + _srt_playlist_map_by_track_id[track->GetId()].push_back(srt_playlist); + } + srt_playlist->AddTracks(video_tracks); + + for (const auto &track : audio_tracks) + { + _srt_playlist_map_by_track_id[track->GetId()].push_back(srt_playlist); + } + srt_playlist->AddTracks(audio_tracks); + + if (is_default_playlist) + { + // Add data tracks to the default playlist + for (const auto &[id, track] : data_tracks) + { + _srt_playlist_map_by_track_id[track->GetId()].push_back(srt_playlist); + srt_playlist->AddTrack(track); + } + } + + first_supported_rendition_found = true; + + logai("A SRT playist %s has been created (with variant: %s, %s)", + file_name.CStr(), + video_variant_name.CStr(), + audio_variant_name.CStr()); } else { - _data_to_send->Append(packet->GetData()); + logaw("Rendition %s is ignored - SRT stream supports only one rendition per playlist", rendition->GetName().CStr()); } } + + suceeded = suceeded && srt_playlist->Start(); + + if (suceeded == false) + { + logae("Could not start the SRT playlist: %s", file_name.CStr()); + break; + } } - for (const auto &data : data_list) + auto result = Stream::Start(); + + if (result) { - BroadcastPacket(std::make_any>(data)); + logai("The SRT stream has been started"); + } + else + { + logae("Failed to start the SRT stream"); } - MonitorInstance->IncreaseBytesOut( - *GetSharedPtrAs(), - PublisherType::Srt, - total_data_size * GetSessionCount()); + return result; } - void SrtStream::OnPsi(const std::vector> &tracks, const std::vector> &psi_packets) + bool SrtStream::Stop() { - std::shared_ptr psi_data = std::make_shared(); + std::unique_lock lock(_srt_playlist_map_mutex); + + if (GetState() != Stream::State::STARTED) + { + return false; + } - for (const auto &packet : psi_packets) + for (const auto &[file_name, playlist] : _srt_playlist_map_by_file_name) { - psi_data->Append(packet->GetData()); + playlist->Stop(); } - logap("OnPsi - %zu packets (total %zu bytes)", psi_packets.size(), psi_data->GetLength()); + auto result = Stream::Stop(); + if (result) { - std::unique_lock lock(_psi_data_mutex); - _psi_data = std::move(psi_data); + logai("The SRT stream has been stopped"); + } + else + { + logae("Failed to stop the SRT stream"); } - BroadcastIfReady(psi_packets); + return result; } - void SrtStream::OnFrame(const std::shared_ptr &media_packet, const std::vector> &pes_packets) + void SrtStream::EnqueuePacket(const std::shared_ptr &media_packet) { -#if DEBUG - // Since adding up the total packet size is costly, it is calculated only in debug mode - size_t total_packet_size = 0; + std::shared_lock lock(_srt_playlist_map_mutex); - for (const auto &packet : pes_packets) + if (GetState() != Stream::State::STARTED) { - total_packet_size += packet->GetData()->GetLength(); + return; } - logap("OnFrame - %zu packets (total %zu bytes)", pes_packets.size(), total_packet_size); -#endif // DEBUG + auto track_id = media_packet->GetTrackId(); + + auto srt_playlists_iterator = _srt_playlist_map_by_track_id.find(track_id); + + if (srt_playlists_iterator == _srt_playlist_map_by_track_id.end()) + { + // It may have been filtered out because it is an unsupported codec + return; + } + + auto &srt_playlists = srt_playlists_iterator->second; + + for (const auto &srt_playlist : srt_playlists) + { + srt_playlist->EnqueuePacket(media_packet); + } + } + + void SrtStream::SendVideoFrame(const std::shared_ptr &media_packet) + { + EnqueuePacket(media_packet); + } + + void SrtStream::SendAudioFrame(const std::shared_ptr &media_packet) + { + EnqueuePacket(media_packet); + } - BroadcastIfReady(pes_packets); + void SrtStream::SendDataFrame(const std::shared_ptr &media_packet) + { + EnqueuePacket(media_packet); + } + + void SrtStream::OnSrtPlaylistData( + const std::shared_ptr &playlist, + const std::shared_ptr &data) + { + auto srt_data = std::make_shared(playlist, data); + + BroadcastPacket(std::make_any>(srt_data)); + + MonitorInstance->IncreaseBytesOut( + *GetSharedPtrAs(), + PublisherType::Srt, + data->GetLength() * GetSessionCount()); } } // namespace pub diff --git a/src/projects/publishers/srt/srt_stream.h b/src/projects/publishers/srt/srt_stream.h index 8aadf34a3..ce9db2faa 100644 --- a/src/projects/publishers/srt/srt_stream.h +++ b/src/projects/publishers/srt/srt_stream.h @@ -10,13 +10,15 @@ #include #include -#include +#include "./srt_playlist.h" #include "monitoring/monitoring.h" +#define DEFAULT_SRT_PLAYLIST_NAME "playlist" + namespace pub { - class SrtStream final : public Stream, public mpegts::PacketizerSink + class SrtStream final : public Stream, public SrtPlaylistSink { public: static std::shared_ptr Create(const std::shared_ptr application, @@ -27,6 +29,8 @@ namespace pub uint32_t worker_count); ~SrtStream() override final; + std::shared_ptr GetSrtPlaylist(const ov::String &file_name); + //-------------------------------------------------------------------- // Overriding of Stream //-------------------------------------------------------------------- @@ -36,35 +40,46 @@ namespace pub //-------------------------------------------------------------------- //-------------------------------------------------------------------- - // Implementation of mpegts::PacketizerSink + // Implementation of SrtPlaylistSink //-------------------------------------------------------------------- - void OnPsi(const std::vector> &tracks, const std::vector> &psi_packets) override; - void OnFrame(const std::shared_ptr &media_packet, const std::vector> &pes_packets) override; + void OnSrtPlaylistData( + const std::shared_ptr &playlist, + const std::shared_ptr &data) override; //-------------------------------------------------------------------- - std::shared_ptr GetPsiData() const - { - std::shared_lock lock(_psi_data_mutex); - - return _psi_data; - } - private: bool Start() override; bool Stop() override; - void SetTrack(const std::shared_ptr &from, std::shared_ptr *to); + bool IsSupportedTrack(const std::shared_ptr &track) const; + + std::shared_ptr GetSrtPlaylistInternal(const ov::String &file_name); + + std::map> GetSupportedTracks(const std::map> &track_map) const; + std::vector> GetSupportedTracks(const std::vector> &tracks) const; + std::vector> GetSupportedTracks(const std::shared_ptr &group) const; + + std::shared_ptr PrepareDefaultPlaylist(); + // std::shared_ptr CreatePlaylist( + // const cfg::vhost::app::oprf::Playlist &playlist_config, + // const TrackMapSet &track_map_set); + void AddSupportedTrack(const std::shared_ptr &track, std::map> &to); + + void PrepareForTrack( + const std::shared_ptr &track, + std::map> &psi_data_map, + std::map> &data_to_send_map); void EnqueuePacket(const std::shared_ptr &media_packet); void BroadcastIfReady(const std::vector> &packets); + private: uint32_t _worker_count = 0; - std::shared_mutex _packetizer_mutex; - std::shared_ptr _packetizer; - - mutable std::shared_mutex _psi_data_mutex; - std::shared_ptr _psi_data = std::make_shared(); - std::shared_ptr _data_to_send = std::make_shared(); + std::shared_mutex _srt_playlist_map_mutex; + // key: track id, value: list of playlist that uses the track + std::map>> _srt_playlist_map_by_track_id; + // key: playlist file name, value: playlist + std::map> _srt_playlist_map_by_file_name; }; } // namespace pub