Skip to content

Commit

Permalink
🌊 Enable restricting max bandwidth of incoming streams (#80)
Browse files Browse the repository at this point in the history
Fixes #42

This change introduces the `FTL_MAX_ALLOWED_BITS_PER_SECOND` configuration, providing a means of restricting the maximum bandwidth any given stream is allowed to consume before it is kicked off the server.

There are a few other changes in this payload as well:
- `FtlControlConnection::FtlResponseCode` enum is introduced (from [ftl-sdk](https://github.com/microsoft/ftl-sdk/blob/d0c8469f66806b5ea738d607f7d2b000af8b1129/libftl/ftl_private.h#L365-L385)), and is now referenced instead of hard-coded string values throughout `FtlControlConnection`.
- Better locking mechanisms around `NetworkSocketConnectionTransport`'s stopping procedure to prevent getting into a deadlocked state if the server is trying to stop a stream around the same time a client is.

An issue #79 was also discovered where pending writes to a `NetworkSocketConnectionTransport` aren't flushed when the connection is stopped, preventing us from communicating the response code to the client before closing the connection.

Worth noting as well is that [OBS behavior](https://github.com/obsproject/obs-studio/blob/92a7c12909556d8b64e1ea68a80a255b46d672cf/plugins/obs-outputs/ftl-stream.c#L888-L892) when a stream connection is terminated (even with a valid error response) is to reconnect every ten seconds, regardless of the error received. The net user result is OBS transparently trying to reconnect in a loop despite the server kicking them off for excessive bandwidth use.
  • Loading branch information
haydenmc authored Feb 15, 2021
1 parent 3c36edc commit 526aca4
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 27 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ Configuration is achieved through environment variables.
| `FTL_ORCHESTRATOR_REGION_CODE` | String value, default: `global` | This is a string value used by the Orchestrator to group regional nodes together to more effectively distribute video traffic. |
| `FTL_SERVICE_CONNECTION` | `DUMMY`: (default) Dummy service connection <br />`GLIMESH`: Glimesh service connection <br />`REST`: REST service connection ([docs](docs/REST_SERVICE.md)) | This configuration value determines which service FTL should plug into for operations such as stream key retrieval. |
| `FTL_SERVICE_METADATAREPORTINTERVALMS` | Time in milliseconds | Defaults to `4000`, controls how often FTL stream metadata will be reported to the service. |
| `FTL_SERVICE_DUMMY_HMAC_KEY` | String, default: `aBcDeFgHiJkLmNoPqRsTuVwXyZ123456` | Key all FTL clients must use if service connection is `DUMMY`. The HMAC key is the part after the dash in a stream key.` |
| `FTL_MAX_ALLOWED_BITS_PER_SECOND` | Integer bits per second | Defaults to `0` (disabled), FTL connections that exceed the bandwidth specified here will be stopped.<br />**Note that this is a strictly enforced maximum** based on a rolling average; consider providing some buffer size for encoder spikes above the configured average. |
| `FTL_SERVICE_DUMMY_HMAC_KEY` | String, default: `aBcDeFgHiJkLmNoPqRsTuVwXyZ123456` | Key all FTL clients must use if service connection is `DUMMY`. The HMAC key is the part after the dash in a stream key. |
| `FTL_SERVICE_DUMMY_PREVIEWIMAGEPATH` | `/path/to/directory` | The path where preview images of ingested streams will be stored if service connection is `DUMMY`. Defaults to `~/.ftl/previews` |
| `FTL_SERVICE_GLIMESH_HOSTNAME` | Hostname value (ex. `localhost`, `glimesh.tv`) | This is the hostname the Glimesh service connection will attempt to reach. |
| `FTL_SERVICE_GLIMESH_PORT` | Port number, `1`-`65535`. | This is the port used to communicate with the Glimesh service via HTTP/HTTPS. |
Expand Down
11 changes: 11 additions & 0 deletions src/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ void Configuration::Load()
serviceConnectionMetadataReportIntervalMs = std::stoi(varVal);
}

// FTL_MAX_ALLOWED_BITS_PER_SECOND -> MaxAllowedBitsPerSecond
if (char* varVal = std::getenv("FTL_MAX_ALLOWED_BITS_PER_SECOND"))
{
maxAllowedBitsPerSecond = std::stoi(varVal);
}

// FTL_SERVICE_DUMMY_HMAC_KEY -> DummyHmacKey
if (char* varVal = std::getenv("FTL_SERVICE_DUMMY_HMAC_KEY"))
{
Expand Down Expand Up @@ -258,6 +264,11 @@ uint16_t Configuration::GetServiceConnectionMetadataReportIntervalMs()
return serviceConnectionMetadataReportIntervalMs;
}

uint32_t Configuration::GetMaxAllowedBitsPerSecond()
{
return maxAllowedBitsPerSecond;
}

std::string Configuration::GetGlimeshServiceHostname()
{
return glimeshServiceHostname;
Expand Down
2 changes: 2 additions & 0 deletions src/Configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Configuration
std::string GetOrchestratorRegionCode();
ServiceConnectionKind GetServiceConnectionKind();
uint16_t GetServiceConnectionMetadataReportIntervalMs();
uint32_t GetMaxAllowedBitsPerSecond();

// Dummy Service Connection Values
std::vector<std::byte> GetDummyHmacKey();
Expand Down Expand Up @@ -73,6 +74,7 @@ class Configuration
std::string orchestratorRegionCode = "global";
ServiceConnectionKind serviceConnectionKind = ServiceConnectionKind::DummyServiceConnection;
uint16_t serviceConnectionMetadataReportIntervalMs = 4000;
uint32_t maxAllowedBitsPerSecond = 0;

// Dummy Service Connection Backing Stores
// "aBcDeFgHiJkLmNoPqRsTuVwXyZ123456"
Expand Down
42 changes: 26 additions & 16 deletions src/ConnectionTransports/NetworkSocketConnectionTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ Result<void> NetworkSocketConnectionTransport::StartAsync()

void NetworkSocketConnectionTransport::Stop(bool noBlock)
{
std::unique_lock stoppingLock(stoppingMutex);
if (!isStopping && !isStopped)
{
// Looks like this connection hasn't stopped yet.
Expand All @@ -109,6 +110,7 @@ void NetworkSocketConnectionTransport::Stop(bool noBlock)
// Wait for the connection thread (only if it has actually started)
if (!noBlock && connectionThreadEndedFuture.valid())
{
stoppingLock.unlock(); // Unlock so the connection thread can finish stopping
connectionThreadEndedFuture.wait();
}
}
Expand All @@ -117,13 +119,15 @@ void NetworkSocketConnectionTransport::Stop(bool noBlock)
// We're already stopping - just wait for the connnection thread to end.
if (!noBlock && connectionThreadEndedFuture.valid())
{
stoppingLock.unlock(); // Unlock so the connection thread can finish stopping
connectionThreadEndedFuture.wait();
}
}
}

void NetworkSocketConnectionTransport::Write(const std::vector<std::byte>& bytes)
{
std::unique_lock stoppingLock(stoppingMutex);
if (!isStopping && !isStopped)
{
std::lock_guard<std::mutex> lock(writeMutex);
Expand All @@ -150,6 +154,7 @@ void NetworkSocketConnectionTransport::Write(const std::vector<std::byte>& bytes
"Write returned {} result, expected {} bytes.",
writeResult,
bytes.size());
stoppingLock.unlock(); // Unlock so we can start stopping :)
closeConnection();
}
}
Expand Down Expand Up @@ -414,30 +419,35 @@ Result<void> NetworkSocketConnectionTransport::sendData(const std::vector<std::b

void NetworkSocketConnectionTransport::closeConnection()
{
if (!isStopping)
bool fireCallback = false;
{
// We haven't been asked to stop, so the connection was closed for another reason.
isStopping = true;
if (socketHandle != 0)
std::lock_guard lock(stoppingMutex);
if (!isStopping)
{
shutdown(socketHandle, SHUT_RDWR);
close(socketHandle);
// We haven't been asked to stop, so the connection was closed for another reason.
isStopping = true;
if (socketHandle != 0)
{
shutdown(socketHandle, SHUT_RDWR);
close(socketHandle);
}

// We only callback when we haven't been explicitly told to stop (to avoid feedback loops)
fireCallback = true;
}

// We only callback when we haven't been explicitly told to stop (to avoid feedback loops)
if (onConnectionClosed)
if (!isStopped)
{
onConnectionClosed();
// Once we reach this point, we know the socket has finished closing.
// Close our write pipes
close(writePipeFds[0]);
close(writePipeFds[1]);
isStopped = true;
}
}

if (!isStopped)
if (fireCallback && onConnectionClosed)
{
// Once we reach this point, we know the socket has finished closing.
// Close our write pipes
close(writePipeFds[0]);
close(writePipeFds[1]);
isStopped = true;
onConnectionClosed();
}
}
#pragma endregion Private methods
5 changes: 3 additions & 2 deletions src/ConnectionTransports/NetworkSocketConnectionTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ class NetworkSocketConnectionTransport : public ConnectionTransport
const NetworkSocketConnectionKind connectionKind;
const int socketHandle = 0;
std::optional<sockaddr_in> targetAddr = std::nullopt;
std::atomic<bool> isStopping { false }; // Indicates that the socket has been requested to close
std::atomic<bool> isStopped { false }; // Indicates that the socket has finished closing
std::mutex stoppingMutex;
bool isStopping = false; // Indicates that the socket has been requested to close
bool isStopped = false; // Indicates that the socket has finished closing
std::thread connectionThread;
std::future<void> connectionThreadEndedFuture;
std::mutex writeMutex;
Expand Down
16 changes: 11 additions & 5 deletions src/FtlControlConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,13 @@ Result<void> FtlControlConnection::StartAsync()
return transport->StartAsync();
}

void FtlControlConnection::Stop()
void FtlControlConnection::Stop(FtlResponseCode responseCode)
{
// BUG: Right now, the transport will halt the connection before these bytes
// can make it out the door.
// https://github.com/Glimesh/janus-ftl-plugin/issues/79
writeToTransport(fmt::format("{}\n", responseCode));

// Stop the transport, but don't fire OnConnectionClosed
transport->Stop();
}
Expand Down Expand Up @@ -174,7 +179,7 @@ void FtlControlConnection::processHmacCommand()
hmacPayload = Util::GenerateRandomBinaryPayload(HMAC_PAYLOAD_SIZE);
std::string hmacString = Util::ByteArrayToHexString(
reinterpret_cast<std::byte*>(&hmacPayload[0]), hmacPayload.size());
writeToTransport(fmt::format("200 {}\n", hmacString));
writeToTransport(fmt::format("{} {}\n", FtlResponseCode::FTL_INGEST_RESP_OK, hmacString));
}

void FtlControlConnection::processConnectCommand(const std::string& command)
Expand Down Expand Up @@ -241,7 +246,7 @@ void FtlControlConnection::processConnectCommand(const std::string& command)
{
isAuthenticated = true;
channelId = requestedChannelId;
writeToTransport("200\n");
writeToTransport(fmt::format("{}\n", FtlResponseCode::FTL_INGEST_RESP_OK));
std::string addrStr = transport->GetAddr().has_value() ?
Util::AddrToString(transport->GetAddr().value().sin_addr) : "UNKNOWN";
spdlog::info("{} authenticated as Channel {} successfully.", addrStr,
Expand Down Expand Up @@ -438,12 +443,13 @@ void FtlControlConnection::processDotCommand()
uint16_t mediaPort = mediaPortResult.Value;
isStreaming = true;
spdlog::info("Assigned Channel {} media port {}", channelId, mediaPort);
writeToTransport(fmt::format("200 hi. Use UDP port {}\n", mediaPort));
writeToTransport(fmt::format("{} hi. Use UDP port {}\n", FtlResponseCode::FTL_INGEST_RESP_OK,
mediaPort));
}

void FtlControlConnection::processPingCommand()
{
// TODO: Rate limit this.
writeToTransport("201\n");
writeToTransport(fmt::format("{}\n", FtlResponseCode::FTL_INGEST_RESP_PING));
}
#pragma endregion Private functions
25 changes: 24 additions & 1 deletion src/FtlControlConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,29 @@ class FtlControlConnection
using StartMediaPortCallback = std::function<Result<uint16_t>(
FtlControlConnection&, ftl_channel_id_t, MediaMetadata, in_addr)>;
using ConnectionClosedCallback = std::function<void(FtlControlConnection&)>;
enum FtlResponseCode
{
// See ftl-sdk/ftl_private.h
FTL_INGEST_RESP_UNKNOWN = 0,
FTL_INGEST_RESP_OK = 200,
FTL_INGEST_RESP_PING = 201,
FTL_INGEST_RESP_BAD_REQUEST = 400,
FTL_INGEST_RESP_UNAUTHORIZED = 401,
FTL_INGEST_RESP_OLD_VERSION = 402,
FTL_INGEST_RESP_AUDIO_SSRC_COLLISION = 403,
FTL_INGEST_RESP_VIDEO_SSRC_COLLISION = 404,
FTL_INGEST_RESP_INVALID_STREAM_KEY = 405,
FTL_INGEST_RESP_CHANNEL_IN_USE = 406,
FTL_INGEST_RESP_REGION_UNSUPPORTED = 407,
FTL_INGEST_RESP_NO_MEDIA_TIMEOUT = 408,
FTL_INGEST_RESP_GAME_BLOCKED = 409,
FTL_INGEST_RESP_SERVER_TERMINATE = 410,
FTL_INGEST_RESP_INTERNAL_SERVER_ERROR = 500,
FTL_INGEST_RESP_INTERNAL_MEMORY_ERROR = 900,
FTL_INGEST_RESP_INTERNAL_COMMAND_ERROR = 901,
FTL_INGEST_RESP_INTERNAL_SOCKET_CLOSED = 902,
FTL_INGEST_RESP_INTERNAL_SOCKET_TIMEOUT = 903,
};

/* Constructor/Destructor */
FtlControlConnection(
Expand All @@ -45,7 +68,7 @@ class FtlControlConnection

/* Public functions */
Result<void> StartAsync();
void Stop();
void Stop(FtlResponseCode responseCode = FtlResponseCode::FTL_INGEST_RESP_SERVER_TERMINATE);

private:
/* Constants */
Expand Down
34 changes: 32 additions & 2 deletions src/JanusFtl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ int JanusFtl::Init(janus_callbacks* callback, const char* config_path)

configuration = std::make_unique<Configuration>();
configuration->Load();
maxAllowedBitsPerSecond = configuration->GetMaxAllowedBitsPerSecond();
metadataReportIntervalMs = configuration->GetServiceConnectionMetadataReportIntervalMs();

initPreviewGenerators();
Expand Down Expand Up @@ -353,7 +354,15 @@ Result<ftl_stream_id_t> JanusFtl::ftlServerStreamStarted(ftl_channel_id_t channe
if (streams.count(channelId) > 0)
{
const ActiveStream& activeStream = streams[channelId];
ftlServer->StopStream(activeStream.ChannelId, activeStream.StreamId);
spdlog::info("Existing Stream {} exists for Channel {} - stopping...",
activeStream.StreamId, channelId);
Result<void> stopResult = ftlServer->StopStream(activeStream.ChannelId,
activeStream.StreamId);
if (stopResult.IsError)
{
spdlog::error("Received error attempting to stop Channel {} / Stream {}: {}",
activeStream.ChannelId, activeStream.StreamId, stopResult.ErrorMessage);
}
endStream(activeStream.ChannelId, activeStream.StreamId, lock);
}

Expand Down Expand Up @@ -544,7 +553,7 @@ void JanusFtl::serviceReportThreadBody(std::promise<void>&& threadEndedPromise)

// Quickly gather data from active streams while under lock (defer reporting to avoid
// holding up other threads)
std::shared_lock lock(streamDataMutex);
std::unique_lock lock(streamDataMutex);
std::list<std::pair<std::pair<ftl_channel_id_t, ftl_stream_id_t>,
std::pair<FtlStream::FtlStreamStats, FtlStream::FtlKeyframe>>> statsAndKeyframes =
ftlServer->GetAllStatsAndKeyframes();
Expand All @@ -553,15 +562,36 @@ void JanusFtl::serviceReportThreadBody(std::promise<void>&& threadEndedPromise)
for (const auto& streamInfo : statsAndKeyframes)
{
const ftl_channel_id_t& channelId = streamInfo.first.first;
const ftl_stream_id_t& streamId = streamInfo.first.second;
const FtlStream::FtlStreamStats& stats = streamInfo.second.first;
if (streams.count(channelId) <= 0)
{
continue;
}

// Has this stream exceeded the maximum allowed bandwidth?
if ((maxAllowedBitsPerSecond > 0) &&
(stats.RollingAverageBitrateBps > maxAllowedBitsPerSecond))
{
spdlog::info("Channel {} / Stream {} is averaging {}bps, exceeding the limit of "
"{}bps. Stopping the stream...", channelId, streamId,
stats.RollingAverageBitrateBps, maxAllowedBitsPerSecond);
Result<void> stopResult = ftlServer->StopStream(channelId, streamId);
if (stopResult.IsError)
{
spdlog::error("Received error attempting to stop Channel {} / Stream {}: {}",
channelId, streamId, stopResult.ErrorMessage);
}
endStream(channelId, streamId, lock);
continue;
}

metadataByChannel.try_emplace(channelId, streams.at(channelId).Metadata);
viewersByChannel.try_emplace(channelId, streams.at(channelId).ViewerSessions.size());
}
lock.unlock();

// Now coalesce all of the stream data and report it to the ServiceConnection
for (const auto& streamInfo : statsAndKeyframes)
{
const ftl_channel_id_t& channelId = streamInfo.first.first;
Expand Down
1 change: 1 addition & 0 deletions src/JanusFtl.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class JanusFtl
std::shared_ptr<FtlConnection> orchestrationClient;
std::shared_ptr<ServiceConnection> serviceConnection;
std::unordered_map<VideoCodecKind, std::unique_ptr<PreviewGenerator>> previewGenerators;
uint32_t maxAllowedBitsPerSecond = 0;
uint32_t metadataReportIntervalMs = 0;
uint16_t minMediaPort = 9000; // TODO: Migrate to Configuration
uint16_t maxMediaPort = 10000; // TODO: Migrate to Configuration
Expand Down

0 comments on commit 526aca4

Please sign in to comment.