Skip to content
This repository has been archived by the owner on Nov 9, 2022. It is now read-only.

Commit

Permalink
moved to a more generic flow control approach to model network buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
sslivins committed Sep 28, 2016
1 parent 0610564 commit 1a9e529
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 9 deletions.
3 changes: 3 additions & 0 deletions libftl/ftl_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
#define NACK_RTT_AVG_SECONDS 5
#define MAX_STATUS_MESSAGE_QUEUED 10
#define MAX_FRAME_SIZE_ELEMENTS 64 //must be a minimum of 3
#define MAX_XMIT_LEVEL_IN_MS 50 //allows a maximum burst size of 100ms at the target bitrate

#ifndef _WIN32
typdef SOCKET int
Expand Down Expand Up @@ -122,6 +123,7 @@ typedef struct {
BOOL nack_slots_initalized;
int producer;
int consumer;
uint16_t xmit_seq_num;
nack_slot_t *nack_slots[NACK_RB_SIZE];
struct timeval stats;
frame_size_t frames[MAX_FRAME_SIZE_ELEMENTS];
Expand Down Expand Up @@ -176,6 +178,7 @@ typedef struct {
uint32_t channel_id;
char *key;
char hmacBuffer[512];
int target_bitrate_kbps;
#ifdef _WIN32
HANDLE connection_thread_handle;
DWORD connection_thread_id;
Expand Down
143 changes: 134 additions & 9 deletions libftl/media.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ ftl_status_t media_init(ftl_stream_configuration_private_t *ftl) {
ftl_status_t status = FTL_SUCCESS;
int i, idx;

//TODO: this must be passed in, here for testing atm
ftl->target_bitrate_kbps = 8000;

//Create a socket
if ((media->media_socket = socket(AF_INET, SOCK_DGRAM, 0)) == INVALID_SOCKET)
{
Expand Down Expand Up @@ -74,7 +77,7 @@ ftl_status_t media_init(ftl_stream_configuration_private_t *ftl) {
}

#ifdef _WIN32
if ((comp->send_frame_sem = CreateSemaphore(NULL, 0, MAX_FRAME_SIZE_ELEMENTS - 1, NULL)) == NULL) {
if ((comp->send_frame_sem = CreateSemaphore(NULL, 0, 1000, NULL)) == NULL) {
#else
comp->send_frame_sem
#endif
Expand Down Expand Up @@ -219,12 +222,14 @@ ftl_status_t media_send_video(ftl_stream_configuration_private_t *ftl, uint8_t *

nalu_type = data[0] & 0x1F;

/*
frame_size_t *frame = &mc->frames[mc->frame_write_idx];
if (frame->first_sn < 0) {
frame->first_sn = mc->seq_num;
gettimeofday(&frame->tv, NULL);
}
*/

while (remaining > 0) {
uint16_t sn = mc->seq_num;
Expand All @@ -239,8 +244,8 @@ ftl_status_t media_send_video(ftl_stream_configuration_private_t *ftl, uint8_t *
consumed += payload_size;
data += payload_size;

frame->total_bytes += pkt_len;
frame->total_packets++;
// frame->total_bytes += pkt_len;
// frame->total_packets++;

/*if all data has been consumed set marker bit*/
if (remaining <= 0 && end_of_frame) {
Expand Down Expand Up @@ -269,6 +274,7 @@ ftl_status_t media_send_video(ftl_stream_configuration_private_t *ftl, uint8_t *
enqueue_status_msg(ftl, &status);
}

/*
if (end_of_frame) {
mc->frame_write_idx = (mc->frame_write_idx + 1) % MAX_FRAME_SIZE_ELEMENTS;
if (mc->frame_write_idx == mc->frame_read_idx) {
Expand Down Expand Up @@ -298,7 +304,7 @@ ftl_status_t media_send_video(ftl_stream_configuration_private_t *ftl, uint8_t *
//TODO: do non-windows
#endif
}

*/
return FTL_SUCCESS;
}

Expand Down Expand Up @@ -327,7 +333,7 @@ static int _nack_init(ftl_media_component_common_t *media) {
}

media->nack_slots_initalized = TRUE;
media->seq_num = 0; //TODO: should start at a random value
media->seq_num = media->xmit_seq_num = 0; //TODO: should start at a random value

return FTL_SUCCESS;
}
Expand Down Expand Up @@ -375,10 +381,12 @@ static uint8_t* _media_get_empty_packet(ftl_stream_configuration_private_t *ftl,
return NULL;
}

/*
while (((mc->producer + 1) % NACK_RB_SIZE) == mc->consumer) {
FTL_LOG(FTL_LOG_ERROR, "[%d] ring buffer is full...trying again\n", ssrc);
Sleep(5);
}
*/

/*map sequence number to slot*/

Expand Down Expand Up @@ -458,6 +466,12 @@ static int _media_queue_packet(ftl_stream_configuration_private_t *ftl, uint32_t

_unlock_slot(slot);

#ifdef _WIN32
ReleaseSemaphore(mc->send_frame_sem, 1, NULL);
#else
//TODO: do non-windows
#endif

return 0;
}

Expand All @@ -484,7 +498,7 @@ static int _nack_resend_packet(ftl_stream_configuration_private_t *ftl, uint32_t
int req_delay = 0;
struct timeval delta, now;
gettimeofday(&now, NULL);
timeval_subtract(&delta, &now, &slot->insert_time);
timeval_subtract(&delta, &now, &slot->xmit_time);
req_delay = (int)timeval_to_ms(&delta);

tx_len = _media_send_slot(ftl, slot, TRUE);
Expand Down Expand Up @@ -615,7 +629,6 @@ static void *recv_thread(void *data)
return -1;
}
#endif
//os_set_thread_name("ftl-stream: recv_thread");

while (media->recv_thread_running) {

Expand Down Expand Up @@ -681,7 +694,7 @@ static void *recv_thread(void *data)
return 0;
}

/*handles rtcp packets from ingest including lost packet retransmission requests (nack)*/
#if 0
#ifdef _WIN32
static DWORD WINAPI send_thread(LPVOID data)
#else
Expand Down Expand Up @@ -793,4 +806,116 @@ static void *send_thread(void *data)
FTL_LOG(FTL_LOG_INFO, "Exited Send Thread\n");

return 0;
}
}
#endif

#ifdef _WIN32
static DWORD WINAPI send_thread(LPVOID data)
#else
static void *send_thread(void *data)
#endif
{
ftl_stream_configuration_private_t *ftl = (ftl_stream_configuration_private_t *)data;
ftl_media_config_t *media = &ftl->media;
ftl_media_component_common_t *video = &ftl->video.media_component;
int ret;
nack_slot_t *slot;
int sn;

int first_packet = 1;
int bytes_per_ms = (((float)(ftl->target_bitrate_kbps * 1000 / 8)) * 1.1) / 1000;

int transmit_level;
struct timeval start_tv, stop_tv, delta_tv;
struct timeval profile_start, profile_stop, profile_delta;
int pkt_xmit_delay_min = 1000, pkt_xmit_delay_max = 0, xmit_delay_delta;
int xmit_delay_avg, xmit_delay_total = 0, xmit_delay_samples = 0;

#ifdef _WIN32
if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL)) {
FTL_LOG(FTL_LOG_WARN, "Failed to set recv_thread priority to THREAD_PRIORITY_TIME_CRITICAL\n");
}
#endif

transmit_level = 5 * bytes_per_ms;

while (1) {

#ifdef _WIN32
//WaitForSingleObject(video->send_frame_sem, INFINITE);
#else
sem_pend()
#endif
if (!media->send_thread_running) {
break;
}

#if 0
int pkts_queued;
if (video->seq_num >= video->xmit_seq_num) {
pkts_queued = video->seq_num - video->xmit_seq_num;
}
else {
pkts_queued = 65535 - video->xmit_seq_num + video->seq_num + 1;
}

if (pkts_queued > 20) {
FTL_LOG(FTL_LOG_INFO, "There are %d packets queued\n", pkts_queued);
}
#endif

while (transmit_level > 0 && video->xmit_seq_num != video->seq_num) {
sn = video->xmit_seq_num;

slot = video->nack_slots[video->xmit_seq_num % NACK_RB_SIZE];
_lock_slot(slot);
transmit_level -= _media_send_slot(ftl, slot, FALSE);
_unlock_slot(slot);

timeval_subtract(&profile_delta, &slot->xmit_time, &slot->insert_time);
xmit_delay_delta = timeval_to_ms(&delta_tv);

if (xmit_delay_delta > pkt_xmit_delay_max) {
pkt_xmit_delay_max = xmit_delay_delta;
}
else if (xmit_delay_delta < pkt_xmit_delay_min) {
pkt_xmit_delay_min = xmit_delay_delta;
}

xmit_delay_total += xmit_delay_delta;
xmit_delay_samples++;

video->xmit_seq_num++;
}

if (xmit_delay_samples > 15000) {
FTL_LOG(FTL_LOG_INFO, "Average transmit delay was %d ms (max: %d, min: %d)\n", xmit_delay_total/xmit_delay_samples, pkt_xmit_delay_max, pkt_xmit_delay_min);
pkt_xmit_delay_min = 1000;
pkt_xmit_delay_max = 0;
xmit_delay_total = 0;
xmit_delay_samples = 0;
}

if (transmit_level <= 0) {
Sleep(1500/bytes_per_ms + 1);
}

gettimeofday(&stop_tv, NULL);
if (!first_packet) {
timeval_subtract(&delta_tv, &stop_tv, &start_tv);
transmit_level += timeval_to_ms(&delta_tv) * bytes_per_ms;

if (transmit_level > (MAX_XMIT_LEVEL_IN_MS * bytes_per_ms)) {
transmit_level = MAX_XMIT_LEVEL_IN_MS * bytes_per_ms;
}
}
else {
first_packet = 0;
}

start_tv = stop_tv;
}

FTL_LOG(FTL_LOG_INFO, "Exited Send Thread\n");
return 0;
}

0 comments on commit 1a9e529

Please sign in to comment.