Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/platform/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ struct AVHWFramesContext;
// Forward declarations of boost classes to avoid having to include boost headers
// here, which results in issues with Windows.h and WinSock2.h include order.
namespace boost {
namespace asio {
namespace ip {
class address;
} // namespace ip
} // namespace asio
namespace filesystem {
class path;
}
Expand Down Expand Up @@ -335,6 +340,23 @@ void streaming_will_stop();
bool restart_supported();
bool restart();

struct batched_send_info_t {
const char *buffer;
size_t block_size;
size_t block_count;

std::uintptr_t native_socket;
boost::asio::ip::address &target_address;
uint16_t target_port;
};
bool send_batch(batched_send_info_t &send_info);

enum class qos_data_type_e : int {
audio,
video
};
std::unique_ptr<deinit_t> enable_socket_qos(uintptr_t native_socket, boost::asio::ip::address &address, uint16_t port, qos_data_type_e data_type);

input_t input();
void move_mouse(input_t &input, int deltaX, int deltaY);
void abs_mouse(input_t &input, const touch_port_t &touch_port, float x, float y);
Expand Down
211 changes: 211 additions & 0 deletions src/platform/linux/misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <dlfcn.h>
#include <fcntl.h>
#include <ifaddrs.h>
#include <netinet/udp.h>
#include <pwd.h>
#include <unistd.h>

Expand All @@ -14,6 +15,7 @@
#include "src/main.h"
#include "src/platform/common.h"

#include <boost/asio/ip/address.hpp>
#include <boost/process.hpp>

#ifdef __GNUC__
Expand Down Expand Up @@ -175,6 +177,215 @@ bool restart() {
return false;
}

bool send_batch(batched_send_info_t &send_info) {
auto sockfd = (int)send_info.native_socket;

// Convert the target address into a sockaddr
struct sockaddr_in saddr_v4 = {};
struct sockaddr_in6 saddr_v6 = {};
struct sockaddr *addr;
socklen_t addr_len;
if(send_info.target_address.is_v6()) {
auto address_v6 = send_info.target_address.to_v6();

saddr_v6.sin6_family = AF_INET6;
saddr_v6.sin6_port = htons(send_info.target_port);
saddr_v6.sin6_scope_id = address_v6.scope_id();

auto addr_bytes = address_v6.to_bytes();
memcpy(&saddr_v6.sin6_addr, addr_bytes.data(), sizeof(saddr_v6.sin6_addr));

addr = (struct sockaddr *)&saddr_v6;
addr_len = sizeof(saddr_v6);
}
else {
auto address_v4 = send_info.target_address.to_v4();

saddr_v4.sin_family = AF_INET;
saddr_v4.sin_port = htons(send_info.target_port);

auto addr_bytes = address_v4.to_bytes();
memcpy(&saddr_v4.sin_addr, addr_bytes.data(), sizeof(saddr_v4.sin_addr));

addr = (struct sockaddr *)&saddr_v4;
addr_len = sizeof(saddr_v4);
}

#ifdef UDP_SEGMENT
{
struct msghdr msg = {};
struct iovec iov = {};
union {
char buf[CMSG_SPACE(sizeof(uint16_t))];
struct cmsghdr alignment;
} cmbuf;

// UDP GSO on Linux currently only supports sending 64K or 64 segments at a time
size_t seg_index = 0;
const size_t seg_max = 65536 / 1500;
while(seg_index < send_info.block_count) {
iov.iov_base = (void *)&send_info.buffer[seg_index * send_info.block_size];
iov.iov_len = send_info.block_size * std::min(send_info.block_count - seg_index, seg_max);

msg.msg_name = addr;
msg.msg_namelen = addr_len;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;

// We should not use GSO if the data is <= one full block size
if(iov.iov_len > send_info.block_size) {
msg.msg_control = cmbuf.buf;
msg.msg_controllen = CMSG_SPACE(sizeof(uint16_t));

// Enable GSO to perform segmentation of our buffer for us
auto cm = CMSG_FIRSTHDR(&msg);
cm->cmsg_level = SOL_UDP;
cm->cmsg_type = UDP_SEGMENT;
cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
*((uint16_t *)CMSG_DATA(cm)) = send_info.block_size;
}
else {
msg.msg_control = nullptr;
msg.msg_controllen = 0;
}

// This will fail if GSO is not available, so we will fall back to non-GSO if
// it's the first sendmsg() call. On subsequent calls, we will treat errors as
// actual failures and return to the caller.
auto bytes_sent = sendmsg(sockfd, &msg, 0);
if(bytes_sent < 0) {
// If there's no send buffer space, wait for some to be available
if(errno == EAGAIN) {
struct pollfd pfd;

pfd.fd = sockfd;
pfd.events = POLLOUT;

if(poll(&pfd, 1, -1) != 1) {
BOOST_LOG(warning) << "poll() failed: "sv << errno;
break;
}

// Try to send again
continue;
}

break;
}

seg_index += bytes_sent / send_info.block_size;
}

// If we sent something, return the status and don't fall back to the non-GSO path.
if(seg_index != 0) {
return seg_index >= send_info.block_count;
}
}
#endif

{
// If GSO is not supported, use sendmmsg() instead.
struct mmsghdr msgs[send_info.block_count];
struct iovec iovs[send_info.block_count];
for(size_t i = 0; i < send_info.block_count; i++) {
iovs[i] = {};
iovs[i].iov_base = (void *)&send_info.buffer[i * send_info.block_size];
iovs[i].iov_len = send_info.block_size;

msgs[i] = {};
msgs[i].msg_hdr.msg_name = addr;
msgs[i].msg_hdr.msg_namelen = addr_len;
msgs[i].msg_hdr.msg_iov = &iovs[i];
msgs[i].msg_hdr.msg_iovlen = 1;
}

// Call sendmmsg() until all messages are sent
size_t blocks_sent = 0;
while(blocks_sent < send_info.block_count) {
int msgs_sent = sendmmsg(sockfd, &msgs[blocks_sent], send_info.block_count - blocks_sent, 0);
if(msgs_sent < 0) {
// If there's no send buffer space, wait for some to be available
if(errno == EAGAIN) {
struct pollfd pfd;

pfd.fd = sockfd;
pfd.events = POLLOUT;

if(poll(&pfd, 1, -1) != 1) {
BOOST_LOG(warning) << "poll() failed: "sv << errno;
break;
}

// Try to send again
continue;
}

BOOST_LOG(warning) << "sendmmsg() failed: "sv << errno;
return false;
}

blocks_sent += msgs_sent;
}

return true;
}
}

class qos_t : public deinit_t {
public:
qos_t(int sockfd, int level, int option) : sockfd(sockfd), level(level), option(option) {}

virtual ~qos_t() {
int reset_val = -1;
if(setsockopt(sockfd, level, option, &reset_val, sizeof(reset_val)) < 0) {
BOOST_LOG(warning) << "Failed to reset IP TOS: "sv << errno;
}
}

private:
int sockfd;
int level;
int option;
};

std::unique_ptr<deinit_t> enable_socket_qos(uintptr_t native_socket, boost::asio::ip::address &address, uint16_t port, qos_data_type_e data_type) {
int sockfd = (int)native_socket;

int level;
int option;
if(address.is_v6()) {
level = SOL_IPV6;
option = IPV6_TCLASS;
}
else {
level = SOL_IP;
option = IP_TOS;
}

// The specific DSCP values here are chosen to be consistent with Windows
int dscp;
switch(data_type) {
case qos_data_type_e::video:
dscp = 40;
break;
case qos_data_type_e::audio:
dscp = 56;
break;
default:
BOOST_LOG(error) << "Unknown traffic type: "sv << (int)data_type;
return nullptr;
}

// Shift to put the DSCP value in the correct position in the TOS field
dscp <<= 2;

if(setsockopt(sockfd, level, option, &dscp, sizeof(dscp)) < 0) {
return nullptr;
}

return std::make_unique<qos_t>(sockfd, level, option);
}

namespace source {
enum source_e : std::size_t {
#ifdef SUNSHINE_BUILD_CUDA
Expand Down
12 changes: 12 additions & 0 deletions src/platform/macos/misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,18 @@ bool restart() {
return false;
}

bool send_batch(batched_send_info_t &send_info) {
// Fall back to unbatched send calls
return false;
}

std::unique_ptr<deinit_t> enable_socket_qos(uintptr_t native_socket, boost::asio::ip::address &address, uint16_t port, qos_data_type_e data_type) {
// Unimplemented
//
// NB: When implementing, remember to consider that some routes can drop DSCP-tagged packets completely!
return nullptr;
}

} // namespace platf

namespace dyn {
Expand Down
Loading