Skip to content

Commit

Permalink
Optimize pipe writes
Browse files Browse the repository at this point in the history
  • Loading branch information
xtne6f committed Oct 15, 2021
1 parent 0b7f080 commit b20413a
Showing 1 changed file with 36 additions and 6 deletions.
42 changes: 36 additions & 6 deletions tsmemseg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#else
#include <errno.h>
#include <signal.h>
#include <sys/select.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
Expand Down Expand Up @@ -232,9 +233,17 @@ void Worker(std::vector<SEGMENT_CONTEXT> &segments, CManualResetEvent &stopEvent
if (pipe.fd >= 0) {
lastAccessTick = static_cast<uint32_t>(tick);
pipe.written = 0;

lock_recursive_mutex lock(bufLock);
pipe.connected = true;
{
lock_recursive_mutex lock(bufLock);
pipe.connected = true;
}
#if defined(F_GETPIPE_SZ) && defined(F_SETPIPE_SZ)
int pipeBufSize = fcntl(pipe.fd, F_GETPIPE_SZ);
if (pipeBufSize > 0 && pipeBufSize < static_cast<int>(it->buf.size() / 2)) {
// Buffer is too small, expand up to 5 times.
fcntl(pipe.fd, F_SETPIPE_SZ, std::min(static_cast<int>(it->buf.size()), pipeBufSize * 5));
}
#endif
}
}
connected = connected || pipe.connected;
Expand All @@ -245,6 +254,9 @@ void Worker(std::vector<SEGMENT_CONTEXT> &segments, CManualResetEvent &stopEvent

while (connected) {
connected = false;
fd_set wfd;
FD_ZERO(&wfd);
int maxfd = -1;
for (auto it = segments.begin(); it != segments.end(); ++it) {
SEGMENT_PIPE_CONTEXT &pipe = it->pipes[0];
if (pipe.connected) {
Expand All @@ -255,6 +267,10 @@ void Worker(std::vector<SEGMENT_CONTEXT> &segments, CManualResetEvent &stopEvent
}
if (pipe.written < it->buf.size() && n < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
connected = true;
maxfd = std::max(maxfd, pipe.fd);
if (maxfd < FD_SETSIZE) {
FD_SET(pipe.fd, &wfd);
}
}
else {
close(pipe.fd);
Expand All @@ -264,9 +280,23 @@ void Worker(std::vector<SEGMENT_CONTEXT> &segments, CManualResetEvent &stopEvent
}
}
}
// Sleep a little
if (GetMsecTick() >= tick || stopEvent.WaitOne(std::chrono::milliseconds(1))) {
break;
if (connected) {
if (maxfd < FD_SETSIZE) {
// Wait for writable
timeval tv = {};
tv.tv_usec = static_cast<long>(std::max<int64_t>(tick - GetMsecTick(), 0) * 1000);
if (tv.tv_usec <= 0 || tv.tv_usec >= 1000000 ||
select(maxfd + 1, &wfd, nullptr, nullptr, &tv) < 0 ||
stopEvent.WaitOne(std::chrono::milliseconds(0))) {
break;
}
}
else {
// Sleep a little
if (GetMsecTick() >= tick || stopEvent.WaitOne(std::chrono::milliseconds(1))) {
break;
}
}
}
}
if (stopEvent.WaitOne(std::chrono::milliseconds(std::max<int64_t>(tick - GetMsecTick(), 1)))) {
Expand Down

0 comments on commit b20413a

Please sign in to comment.