Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/native/eventpipe/ds-eventpipe-protocol.c
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ eventpipe_protocol_helper_collect_tracing (
payload->serialization_format,
payload->rundown_keyword,
payload->stackwalk_requested,
payload->session_type == EP_SESSION_TYPE_IPCSTREAM ? ds_ipc_stream_get_stream_ref (stream) : NULL,
ds_ipc_stream_get_stream_ref (stream),
NULL,
NULL,
user_events_data_fd);
Expand Down
32 changes: 22 additions & 10 deletions src/native/eventpipe/ds-ipc-pal-namedpipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ ds_ipc_poll (
handles [i] = poll_handles_data [i].ipc->overlap.hEvent;
if (handles [i] == INVALID_HANDLE_VALUE) {
// Invalid handle, wait will fail. Signal error
poll_handles_data [i].events = DS_IPC_POLL_EVENTS_ERR;
poll_handles_data [i].events = IPC_POLL_EVENTS_ERR;
}
} else {
// CLIENT
Expand All @@ -238,7 +238,7 @@ ds_ipc_poll (
handles [i] = poll_handles_data [i].stream->overlap.hEvent;
break;
case ERROR_PIPE_NOT_CONNECTED:
poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_HANGUP;
poll_handles_data [i].events = (uint8_t)IPC_POLL_EVENTS_HANGUP;
result = -1;
ep_raise_error ();
default:
Expand Down Expand Up @@ -288,7 +288,7 @@ ds_ipc_poll (
// check if we abandoned something
DWORD abandonedIndex = wait - WAIT_ABANDONED_0;
if (abandonedIndex > 0 || abandonedIndex < (poll_handles_data_len - 1)) {
poll_handles_data [abandonedIndex].events = (uint8_t)DS_IPC_POLL_EVENTS_HANGUP;
poll_handles_data [abandonedIndex].events = (uint8_t)IPC_POLL_EVENTS_HANGUP;
result = -1;
ep_raise_error ();
} else {
Expand Down Expand Up @@ -325,20 +325,20 @@ ds_ipc_poll (
if (!success) {
DWORD error = GetLastError();
if (error == ERROR_PIPE_NOT_CONNECTED || error == ERROR_BROKEN_PIPE) {
poll_handles_data [index].events = (uint8_t)DS_IPC_POLL_EVENTS_HANGUP;
poll_handles_data [index].events = (uint8_t)IPC_POLL_EVENTS_HANGUP;
} else {
if (callback)
callback ("Client connection error", error);
poll_handles_data [index].events = (uint8_t)DS_IPC_POLL_EVENTS_ERR;
poll_handles_data [index].events = (uint8_t)IPC_POLL_EVENTS_ERR;
result = -1;
ep_raise_error ();
}
} else {
poll_handles_data [index].events = (uint8_t)DS_IPC_POLL_EVENTS_SIGNALED;
poll_handles_data [index].events = (uint8_t)IPC_POLL_EVENTS_SIGNALED;
}
} else {
// SERVER
poll_handles_data [index].events = (uint8_t)DS_IPC_POLL_EVENTS_SIGNALED;
poll_handles_data [index].events = (uint8_t)IPC_POLL_EVENTS_SIGNALED;
}

result = 1;
Expand Down Expand Up @@ -694,7 +694,7 @@ ipc_stream_read_func (
DWORD error = GetLastError ();
if (error == ERROR_IO_PENDING) {
// if we're waiting infinitely, only make one syscall
if (timeout_ms == DS_IPC_TIMEOUT_INFINITE) {
if (timeout_ms == IPC_TIMEOUT_INFINITE) {
DS_ENTER_BLOCKING_PAL_SECTION;
success = GetOverlappedResult (
ipc_stream->pipe, // pipe
Expand Down Expand Up @@ -765,7 +765,7 @@ ipc_stream_write_func (
DWORD error = GetLastError ();
if (error == ERROR_IO_PENDING) {
// if we're waiting infinitely, only make one syscall
if (timeout_ms == DS_IPC_TIMEOUT_INFINITE) {
if (timeout_ms == IPC_TIMEOUT_INFINITE) {
DS_ENTER_BLOCKING_PAL_SECTION;
success = GetOverlappedResult (
ipc_stream->pipe, // pipe
Expand Down Expand Up @@ -834,12 +834,24 @@ ipc_stream_close_func (void *object)
return ds_ipc_stream_close (ipc_stream, NULL);
}

static
IpcPollEvents
ipc_stream_poll_func (
void *object,
uint32_t timeout_ms)
{
EP_ASSERT (!"ipc_stream_poll_func needs to be implemented for NamedPipes");
// TODO: Implement ipc_stream_poll_func for NamedPipes
return IPC_POLL_EVENTS_UNKNOWN;
}

static IpcStreamVtable ipc_stream_vtable = {
ipc_stream_free_func,
ipc_stream_read_func,
ipc_stream_write_func,
ipc_stream_flush_func,
ipc_stream_close_func };
ipc_stream_close_func,
ipc_stream_poll_func };

static
DiagnosticsIpcStream *
Expand Down
70 changes: 60 additions & 10 deletions src/native/eventpipe/ds-ipc-pal-socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ ipc_socket_connect (
// the server hasn't called `accept`, so no need to check for timeout or connect error.

#if defined(DS_IPC_PAL_AF_INET) || defined(DS_IPC_PAL_AF_INET6)
if (timeout_ms != DS_IPC_TIMEOUT_INFINITE) {
if (timeout_ms != IPC_TIMEOUT_INFINITE) {
// Set socket to none blocking.
ipc_socket_set_blocking (s, false);
}
Expand All @@ -601,7 +601,7 @@ ipc_socket_connect (
DS_EXIT_BLOCKING_PAL_SECTION;

#if defined(DS_IPC_PAL_AF_INET) || defined(DS_IPC_PAL_AF_INET6)
if (timeout_ms != DS_IPC_TIMEOUT_INFINITE) {
if (timeout_ms != IPC_TIMEOUT_INFINITE) {
if (result_connect == DS_IPC_SOCKET_ERROR) {
if (ipc_get_last_error () == DS_IPC_SOCKET_ERROR_WOULDBLOCK) {
ds_ipc_pollfd_t pfd;
Expand All @@ -627,7 +627,7 @@ ipc_socket_connect (
}
}

if (timeout_ms != DS_IPC_TIMEOUT_INFINITE) {
if (timeout_ms != IPC_TIMEOUT_INFINITE) {
// Reset socket to blocking.
int last_error = ipc_get_last_error ();
ipc_socket_set_blocking (s, true);
Expand Down Expand Up @@ -1146,15 +1146,15 @@ ds_ipc_poll (
// check for hangup first because a closed socket
// will technically meet the requirements for POLLIN
// i.e., a call to recv/read won't block
poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_HANGUP;
poll_handles_data [i].events = (uint8_t)IPC_POLL_EVENTS_HANGUP;
} else if ((poll_fds [i].revents & (POLLERR|POLLNVAL))) {
if (callback)
callback ("Poll error", (uint32_t)poll_fds [i].revents);
poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_ERR;
poll_handles_data [i].events = (uint8_t)IPC_POLL_EVENTS_ERR;
} else if (poll_fds [i].revents & (POLLIN|POLLPRI)) {
poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_SIGNALED;
poll_handles_data [i].events = (uint8_t)IPC_POLL_EVENTS_SIGNALED;
} else {
poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_UNKNOWN;
poll_handles_data [i].events = (uint8_t)IPC_POLL_EVENTS_UNKNOWN;
if (callback)
callback ("unknown poll response", (uint32_t)poll_fds [i].revents);
}
Expand Down Expand Up @@ -1401,7 +1401,7 @@ ipc_stream_read_func (
DiagnosticsIpcStream *ipc_stream = (DiagnosticsIpcStream *)object;
ssize_t total_bytes_read = 0;

if (timeout_ms != DS_IPC_TIMEOUT_INFINITE) {
if (timeout_ms != IPC_TIMEOUT_INFINITE) {
ds_ipc_pollfd_t pfd;
pfd.fd = ipc_stream->client_socket;
pfd.events = POLLIN;
Expand Down Expand Up @@ -1445,7 +1445,7 @@ ipc_stream_write_func (
DiagnosticsIpcStream *ipc_stream = (DiagnosticsIpcStream *)object;
ssize_t total_bytes_written = 0;

if (timeout_ms != DS_IPC_TIMEOUT_INFINITE) {
if (timeout_ms != IPC_TIMEOUT_INFINITE) {
ds_ipc_pollfd_t pfd;
pfd.fd = ipc_stream->client_socket;
pfd.events = POLLOUT;
Expand Down Expand Up @@ -1489,12 +1489,24 @@ ipc_stream_close_func (void *object)
return ds_ipc_stream_close (ipc_stream, NULL);
}

static
IpcPollEvents
ipc_stream_poll_func (
void *object,
uint32_t timeout_ms)
{
EP_ASSERT (object != NULL);
DiagnosticsIpcStream *ipc_stream = (DiagnosticsIpcStream *)object;
return ds_ipc_stream_poll (ipc_stream, timeout_ms);
}

static IpcStreamVtable ipc_stream_vtable = {
ipc_stream_free_func,
ipc_stream_read_func,
ipc_stream_write_func,
ipc_stream_flush_func,
ipc_stream_close_func };
ipc_stream_close_func,
ipc_stream_poll_func };

static
DiagnosticsIpcStream *
Expand Down Expand Up @@ -1668,6 +1680,44 @@ ds_ipc_stream_to_string (
return (result > 0 && result < (int32_t)buffer_len) ? result : 0;
}

IpcPollEvents
ds_ipc_stream_poll (
DiagnosticsIpcStream *ipc_stream,
uint32_t timeout_ms)
{
EP_ASSERT (ipc_stream != NULL);

if (ipc_stream->client_socket == DS_IPC_INVALID_SOCKET)
return IPC_POLL_EVENTS_HANGUP;

ds_ipc_pollfd_t pfd;
pfd.fd = ipc_stream->client_socket;
pfd.events = POLLIN | POLLPRI | POLLOUT;

int result_poll;
result_poll = ipc_poll_fds (&pfd, 1, timeout_ms);

if (result_poll < 0)
return IPC_POLL_EVENTS_ERR;

if (result_poll == 0)
return IPC_POLL_EVENTS_NONE;

if (pfd.revents == 0)
return IPC_POLL_EVENTS_NONE;

if (pfd.revents & POLLHUP)
return IPC_POLL_EVENTS_HANGUP;

if (pfd.revents & (POLLERR | POLLNVAL))
return IPC_POLL_EVENTS_ERR;

if (pfd.revents & (POLLIN | POLLPRI | POLLOUT))
return IPC_POLL_EVENTS_SIGNALED;

return IPC_POLL_EVENTS_UNKNOWN;
}

#endif /* ENABLE_PERFTRACING */

#ifndef DS_INCLUDE_SOURCE_FILES
Expand Down
9 changes: 0 additions & 9 deletions src/native/eventpipe/ds-ipc-pal-types.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,12 @@ typedef struct _DiagnosticsIpcStream DiagnosticsIpcStream;
* Diagnostics IPC PAL Enums.
*/

typedef enum {
DS_IPC_POLL_EVENTS_NONE = 0x00, // no events
DS_IPC_POLL_EVENTS_SIGNALED = 0x01, // ready for use
DS_IPC_POLL_EVENTS_HANGUP = 0x02, // connection remotely closed
DS_IPC_POLL_EVENTS_ERR = 0x04, // error
DS_IPC_POLL_EVENTS_UNKNOWN = 0x80 // unknown state
} DiagnosticsIpcPollEvents;

typedef enum {
DS_IPC_CONNECTION_MODE_CONNECT,
DS_IPC_CONNECTION_MODE_LISTEN
} DiagnosticsIpcConnectionMode;

#define DS_IPC_MAX_TO_STRING_LEN 128
#define DS_IPC_TIMEOUT_INFINITE (uint32_t)-1

#define DS_IPC_POLL_TIMEOUT_FALLOFF_FACTOR (float)1.25
#define DS_IPC_POLL_TIMEOUT_MIN_MS (uint32_t)10
Expand Down
18 changes: 15 additions & 3 deletions src/native/eventpipe/ds-ipc-pal-websocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,11 @@ ds_ipc_poll (
int client_socket = poll_handles_data [i].stream->client_socket;
int pending = ds_rt_websocket_poll (client_socket);
if (pending < 0){
poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_ERR;
poll_handles_data [i].events = (uint8_t)IPC_POLL_EVENTS_ERR;
return 1;
}
if (pending > 0){
poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_SIGNALED;
poll_handles_data [i].events = (uint8_t)IPC_POLL_EVENTS_SIGNALED;
return 1;
}
}
Expand Down Expand Up @@ -425,12 +425,24 @@ ipc_stream_close_func (void *object)
return ds_ipc_stream_close (ipc_stream, NULL);
}

static
IpcPollEvents
ipc_stream_poll_func (
void *object,
uint32_t timeout_ms)
{
EP_ASSERT (!"ipc_stream_poll_func needs to be implemented for WebSockets");
// TODO: Implement ipc_stream_poll_func for WebSockets
return IPC_POLL_EVENTS_UNKNOWN;
}

static IpcStreamVtable ipc_stream_vtable = {
ipc_stream_free_func,
ipc_stream_read_func,
ipc_stream_write_func,
ipc_stream_flush_func,
ipc_stream_close_func };
ipc_stream_close_func,
ipc_stream_poll_func };

static
DiagnosticsIpcStream *
Expand Down
5 changes: 5 additions & 0 deletions src/native/eventpipe/ds-ipc-pal.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,10 @@ ds_ipc_stream_to_string (
ep_char8_t *buffer,
uint32_t buffer_len);

IpcPollEvents
ds_ipc_stream_poll (
DiagnosticsIpcStream *ipc_stream,
uint32_t timeout_ms);

#endif /* ENABLE_PERFTRACING */
#endif /* __DIAGNOSTICS_IPC_PAL_H__ */
18 changes: 9 additions & 9 deletions src/native/eventpipe/ds-ipc.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ inline
uint32_t
ipc_stream_factory_get_next_timeout (uint32_t current_timeout_ms)
{
if (current_timeout_ms == DS_IPC_TIMEOUT_INFINITE)
if (current_timeout_ms == IPC_TIMEOUT_INFINITE)
return DS_IPC_POLL_TIMEOUT_MIN_MS;
else
return (current_timeout_ms >= DS_IPC_POLL_TIMEOUT_MAX_MS) ?
Expand Down Expand Up @@ -361,7 +361,7 @@ ds_ipc_stream_factory_get_next_available_stream (ds_ipc_error_callback_func call

DiagnosticsIpcStream *stream = NULL;

uint32_t poll_timeout_ms = DS_IPC_TIMEOUT_INFINITE;
uint32_t poll_timeout_ms = IPC_TIMEOUT_INFINITE;
bool connect_success = true;
uint32_t poll_attempts = 0;

Expand All @@ -382,7 +382,7 @@ ds_ipc_stream_factory_get_next_available_stream (ds_ipc_error_callback_func call
} DN_VECTOR_PTR_FOREACH_END;

poll_timeout_ms = connect_success ?
DS_IPC_TIMEOUT_INFINITE :
IPC_TIMEOUT_INFINITE :
ipc_stream_factory_get_next_timeout (poll_timeout_ms);

int32_t ret_val;
Expand All @@ -392,7 +392,7 @@ ds_ipc_stream_factory_get_next_available_stream (ds_ipc_error_callback_func call
ipc_log_poll_handles (&ipc_poll_handles);
ret_val = ds_ipc_poll (dn_vector_data_t (&ipc_poll_handles, DiagnosticsIpcPollHandle), dn_vector_size (&ipc_poll_handles), poll_timeout_ms, callback);
} else {
if (poll_timeout_ms == DS_IPC_TIMEOUT_INFINITE)
if (poll_timeout_ms == IPC_TIMEOUT_INFINITE)
poll_timeout_ms = DS_IPC_POLL_TIMEOUT_MAX_MS;
DS_LOG_DEBUG_1 ("ds_ipc_stream_factory_get_next_available_stream - Nothing to poll, sleeping using timeout: %dms.", poll_timeout_ms);
ep_rt_thread_sleep ((uint64_t)poll_timeout_ms * NUM_NANOSECONDS_IN_1_MS);
Expand All @@ -406,13 +406,13 @@ ds_ipc_stream_factory_get_next_available_stream (ds_ipc_error_callback_func call
DN_VECTOR_FOREACH_BEGIN (DiagnosticsIpcPollHandle, ipc_poll_handle, &ipc_poll_handles) {
DiagnosticsPort *port = (DiagnosticsPort *)ipc_poll_handle.user_data;
switch (ipc_poll_handle.events) {
case DS_IPC_POLL_EVENTS_HANGUP:
case IPC_POLL_EVENTS_HANGUP:
EP_ASSERT (port != NULL);
ds_port_reset_vcall (port, callback);
DS_LOG_INFO_2 ("ds_ipc_stream_factory_get_next_available_stream - HUP :: Poll attempt: %d, connection %d hung up. Connect is reset.", poll_attempts, connection_id);
poll_timeout_ms = DS_IPC_POLL_TIMEOUT_MIN_MS;
break;
case DS_IPC_POLL_EVENTS_SIGNALED:
case IPC_POLL_EVENTS_SIGNALED:
EP_ASSERT (port != NULL);
if (!stream) { // only use first signaled stream; will get others on subsequent calls
stream = ds_port_get_connected_stream_vcall (port, callback);
Expand All @@ -422,12 +422,12 @@ ds_ipc_stream_factory_get_next_available_stream (ds_ipc_error_callback_func call
}
DS_LOG_DEBUG_2 ("ds_ipc_stream_factory_get_next_available_stream - SIG :: Poll attempt: %d, connection %d signalled.", poll_attempts, connection_id);
break;
case DS_IPC_POLL_EVENTS_ERR:
case IPC_POLL_EVENTS_ERR:
ds_port_reset_vcall ((DiagnosticsPort *)ipc_poll_handle.user_data, callback);
DS_LOG_INFO_2 ("ds_ipc_stream_factory_get_next_available_stream - ERR :: Poll attempt: %d, connection %d errored. Connection is reset.", poll_attempts, connection_id);
saw_error = true;
break;
case DS_IPC_POLL_EVENTS_NONE:
case IPC_POLL_EVENTS_NONE:
DS_LOG_INFO_2 ("ds_ipc_stream_factory_get_next_available_stream - NON :: Poll attempt: %d, connection %d had no events.", poll_attempts, connection_id);
break;
default:
Expand All @@ -444,7 +444,7 @@ ds_ipc_stream_factory_get_next_available_stream (ds_ipc_error_callback_func call
if (!stream && saw_error) {
// Some errors can cause the poll to return instantly, we want to delay if we see an error to avoid
// runaway CPU usage.
if (poll_timeout_ms == DS_IPC_TIMEOUT_INFINITE)
if (poll_timeout_ms == IPC_TIMEOUT_INFINITE)
poll_timeout_ms = DS_IPC_POLL_TIMEOUT_MAX_MS;
DS_LOG_DEBUG_1 ("ds_ipc_stream_factory_get_next_available_stream - Saw error, sleeping using timeout: %dms.", poll_timeout_ms);
ep_rt_thread_sleep ((uint64_t)poll_timeout_ms * NUM_NANOSECONDS_IN_1_MS);
Expand Down
Loading
Loading