Skip to content

Commit

Permalink
Add feature to run command when closing or access-timedout
Browse files Browse the repository at this point in the history
  • Loading branch information
xtne6f committed Aug 20, 2021
1 parent dab988f commit 9ecee3d
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 17 deletions.
5 changes: 4 additions & 1 deletion Readme.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ tsmemseg - In-memory transport stream segmenter mainly for HLS

Usage:

tsmemseg [-i inittime][-t time][-a acc_timeout][-r readrate][-f fill_readrate][-s seg_num][-m max_kbytes] seg_name
tsmemseg [-i inittime][-t time][-a acc_timeout][-c cmd][-r readrate][-f fill_readrate][-s seg_num][-m max_kbytes] seg_name

-i inittime (seconds), 0<=range<=60, default=0
Initial segment duration. Segment is cut on a key (NAL-IDR) packet.
Expand All @@ -13,6 +13,9 @@ tsmemseg [-i inittime][-t time][-a acc_timeout][-r readrate][-f fill_readrate][-
-a acc_timeout (seconds), 0<=range<=600, default=10
Quit when the named-pipes of this tool have not been accessed for more than acc_timeout. 0 means no quit.

-c cmd
Run command once when this tool is closing or access-timedout. "cmd" string is passed to system() C function.

-r readrate (percent), 0 or 20<=range<=500, default=0
Read speed from standard input compared to media timestamp (PTS). 0 means unlimited.

Expand Down
44 changes: 28 additions & 16 deletions tsmemseg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <memory>
#include <mutex>
#include <thread>
#include <unordered_map>
Expand Down Expand Up @@ -56,6 +57,16 @@ uint32_t GetCurrentUnixTime()
return static_cast<uint32_t>((ll - 116444736000000000) / 10000000);
}

void ClosingRunner(const char *closingCmd, HANDLE stopEvent, std::atomic_uint32_t &lastAccessTick, uint32_t accessTimeoutMsec)
{
while (accessTimeoutMsec == 0 || static_cast<uint32_t>(GetMsecTick()) - lastAccessTick < accessTimeoutMsec) {
if (WaitForSingleObject(stopEvent, 1000) != WAIT_TIMEOUT) {
break;
}
}
system(closingCmd);
}

void Worker(SEGMENT_CONTEXT *segments, std::vector<HANDLE> events, std::recursive_mutex &bufLock, std::atomic_uint32_t &lastAccessTick)
{
for (;;) {
Expand Down Expand Up @@ -154,16 +165,6 @@ void ClearSegmentsAndEvents(std::vector<SEGMENT_CONTEXT> &segments, std::vector<
}
}

bool CheckAccessTimeout(int64_t tick, const std::atomic_uint32_t *lastAccessTicks, size_t n, uint32_t accessTimeoutMsec)
{
for (size_t i = 0; i < n; ++i) {
if (static_cast<uint32_t>(tick) - lastAccessTicks[i] < accessTimeoutMsec) {
return false;
}
}
return true;
}

void WriteUint32(uint8_t *buf, uint32_t n)
{
buf[0] = static_cast<uint8_t>(n);
Expand Down Expand Up @@ -203,6 +204,7 @@ int main(int argc, char **argv)
uint32_t targetDurationMsec = 0;
uint32_t nextTargetDurationMsec = 2000;
uint32_t accessTimeoutMsec = 10000;
const char *closingCmd = "";
int readRatePerMille = -1;
int nextReadRatePerMille = 0;
size_t segNum = 8;
Expand All @@ -215,7 +217,7 @@ int main(int argc, char **argv)
c = argv[i][1];
}
if (c == 'h') {
fprintf(stderr, "Usage: tsmemseg [-i inittime][-t time][-a acc_timeout][-r readrate][-f fill_readrate][-s seg_num][-m max_kbytes] seg_name\n");
fprintf(stderr, "Usage: tsmemseg [-i inittime][-t time][-a acc_timeout][-c cmd][-r readrate][-f fill_readrate][-s seg_num][-m max_kbytes] seg_name\n");
return 2;
}
bool invalid = false;
Expand All @@ -235,6 +237,9 @@ int main(int argc, char **argv)
accessTimeoutMsec = static_cast<uint32_t>(sec * 1000);
}
}
else if (c == 'c') {
closingCmd = argv[++i];
}
else if (c == 'r' || c == 'f') {
double percent = strtod(argv[++i], nullptr);
invalid = !(0 <= percent && percent <= 1000);
Expand Down Expand Up @@ -333,16 +338,20 @@ int main(int argc, char **argv)

int64_t baseTick = GetMsecTick();
std::recursive_mutex bufLock;
std::unique_ptr<std::thread> closingRunnerThread;
std::vector<std::thread> threads;
std::atomic_uint32_t lastAccessTicks[SEGMENTS_MAX / 20];
std::atomic_uint32_t lastAccessTick(static_cast<uint32_t>(baseTick));

if (closingCmd[0]) {
closingRunnerThread.reset(new std::thread(ClosingRunner, closingCmd, events.front(), std::ref(lastAccessTick), accessTimeoutMsec));
}

// Create a thread for every 20 segments
for (size_t i = 0; i < segments.size(); i += 20) {
std::vector<HANDLE> eventsForThread;
eventsForThread.push_back(events.front());
eventsForThread.insert(eventsForThread.end(), events.begin() + 1 + i * 2, events.begin() + std::min(1 + (i + 20) * 2, events.size()));
lastAccessTicks[i / 20] = static_cast<uint32_t>(baseTick);
threads.emplace_back(Worker, segments.data() + i, std::move(eventsForThread), std::ref(bufLock), std::ref(lastAccessTicks[i / 20]));
threads.emplace_back(Worker, segments.data() + i, std::move(eventsForThread), std::ref(bufLock), std::ref(lastAccessTick));
}

// Index of the next segment to be overwritten (between 1 and "segNum")
Expand Down Expand Up @@ -377,7 +386,7 @@ int main(int argc, char **argv)
bool accessTimedout = false;
for (;;) {
int64_t nowTick = GetMsecTick();
if (accessTimeoutMsec != 0 && CheckAccessTimeout(nowTick, lastAccessTicks, threads.size(), accessTimeoutMsec)) {
if (accessTimeoutMsec != 0 && static_cast<uint32_t>(nowTick) - lastAccessTick >= accessTimeoutMsec) {
accessTimedout = true;
break;
}
Expand Down Expand Up @@ -566,14 +575,17 @@ int main(int argc, char **argv)
if (forcedSegmentationError) {
fprintf(stderr, "Warning: %u forced segmentation happened.\n", forcedSegmentationError);
}
while (accessTimeoutMsec != 0 && !CheckAccessTimeout(GetMsecTick(), lastAccessTicks, threads.size(), accessTimeoutMsec)) {
while (accessTimeoutMsec != 0 && static_cast<uint32_t>(GetMsecTick()) - lastAccessTick < accessTimeoutMsec) {
Sleep(100);
}
SetEvent(events.front());
while (!threads.empty()) {
threads.back().join();
threads.pop_back();
}
if (closingRunnerThread) {
closingRunnerThread->join();
}
ClearSegmentsAndEvents(segments, events);
return 0;
}

0 comments on commit 9ecee3d

Please sign in to comment.