Skip to content

Commit 6a19342

Browse files
authored
[EventPipe] End user_events session upon continuation_stream closure (#117435)
* [EventPipe][DiagnosticServer] Add poll interface to IpcStreams * [EventPipe] Close user_events session upon continuation_stream closure * Address Feedback Revert moving IpcPollEvents to ep-ipc-pal-types header Use IpcStream base type directly in sessions Omit websocket ipc_stream_poll_func implementation Change continuation stream poll timeout to infinite Change returned PollEvent to none upon timeout * Move IpcPollEvents back to ep-ipc-pal-types header * Add relevant headers ep-stream.h for ep_ipc_stream_writer_alloc that was transitively included by ep-file.h ep-ipc-stream.h for ep_ipc_stream_poll_vcall * Address feedback Remove prefix from IpcPollEvents enums and infinite timeout define Expand connection closed detection to error case
1 parent ea4a781 commit 6a19342

File tree

12 files changed

+218
-70
lines changed

12 files changed

+218
-70
lines changed

src/native/eventpipe/ds-eventpipe-protocol.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -920,7 +920,7 @@ eventpipe_protocol_helper_collect_tracing (
920920
payload->serialization_format,
921921
payload->rundown_keyword,
922922
payload->stackwalk_requested,
923-
payload->session_type == EP_SESSION_TYPE_IPCSTREAM ? ds_ipc_stream_get_stream_ref (stream) : NULL,
923+
ds_ipc_stream_get_stream_ref (stream),
924924
NULL,
925925
NULL,
926926
user_events_data_fd);

src/native/eventpipe/ds-ipc-pal-namedpipe.c

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ ds_ipc_poll (
213213
handles [i] = poll_handles_data [i].ipc->overlap.hEvent;
214214
if (handles [i] == INVALID_HANDLE_VALUE) {
215215
// Invalid handle, wait will fail. Signal error
216-
poll_handles_data [i].events = DS_IPC_POLL_EVENTS_ERR;
216+
poll_handles_data [i].events = IPC_POLL_EVENTS_ERR;
217217
}
218218
} else {
219219
// CLIENT
@@ -238,7 +238,7 @@ ds_ipc_poll (
238238
handles [i] = poll_handles_data [i].stream->overlap.hEvent;
239239
break;
240240
case ERROR_PIPE_NOT_CONNECTED:
241-
poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_HANGUP;
241+
poll_handles_data [i].events = (uint8_t)IPC_POLL_EVENTS_HANGUP;
242242
result = -1;
243243
ep_raise_error ();
244244
default:
@@ -288,7 +288,7 @@ ds_ipc_poll (
288288
// check if we abandoned something
289289
DWORD abandonedIndex = wait - WAIT_ABANDONED_0;
290290
if (abandonedIndex > 0 || abandonedIndex < (poll_handles_data_len - 1)) {
291-
poll_handles_data [abandonedIndex].events = (uint8_t)DS_IPC_POLL_EVENTS_HANGUP;
291+
poll_handles_data [abandonedIndex].events = (uint8_t)IPC_POLL_EVENTS_HANGUP;
292292
result = -1;
293293
ep_raise_error ();
294294
} else {
@@ -325,20 +325,20 @@ ds_ipc_poll (
325325
if (!success) {
326326
DWORD error = GetLastError();
327327
if (error == ERROR_PIPE_NOT_CONNECTED || error == ERROR_BROKEN_PIPE) {
328-
poll_handles_data [index].events = (uint8_t)DS_IPC_POLL_EVENTS_HANGUP;
328+
poll_handles_data [index].events = (uint8_t)IPC_POLL_EVENTS_HANGUP;
329329
} else {
330330
if (callback)
331331
callback ("Client connection error", error);
332-
poll_handles_data [index].events = (uint8_t)DS_IPC_POLL_EVENTS_ERR;
332+
poll_handles_data [index].events = (uint8_t)IPC_POLL_EVENTS_ERR;
333333
result = -1;
334334
ep_raise_error ();
335335
}
336336
} else {
337-
poll_handles_data [index].events = (uint8_t)DS_IPC_POLL_EVENTS_SIGNALED;
337+
poll_handles_data [index].events = (uint8_t)IPC_POLL_EVENTS_SIGNALED;
338338
}
339339
} else {
340340
// SERVER
341-
poll_handles_data [index].events = (uint8_t)DS_IPC_POLL_EVENTS_SIGNALED;
341+
poll_handles_data [index].events = (uint8_t)IPC_POLL_EVENTS_SIGNALED;
342342
}
343343

344344
result = 1;
@@ -694,7 +694,7 @@ ipc_stream_read_func (
694694
DWORD error = GetLastError ();
695695
if (error == ERROR_IO_PENDING) {
696696
// if we're waiting infinitely, only make one syscall
697-
if (timeout_ms == DS_IPC_TIMEOUT_INFINITE) {
697+
if (timeout_ms == IPC_TIMEOUT_INFINITE) {
698698
DS_ENTER_BLOCKING_PAL_SECTION;
699699
success = GetOverlappedResult (
700700
ipc_stream->pipe, // pipe
@@ -765,7 +765,7 @@ ipc_stream_write_func (
765765
DWORD error = GetLastError ();
766766
if (error == ERROR_IO_PENDING) {
767767
// if we're waiting infinitely, only make one syscall
768-
if (timeout_ms == DS_IPC_TIMEOUT_INFINITE) {
768+
if (timeout_ms == IPC_TIMEOUT_INFINITE) {
769769
DS_ENTER_BLOCKING_PAL_SECTION;
770770
success = GetOverlappedResult (
771771
ipc_stream->pipe, // pipe
@@ -834,12 +834,24 @@ ipc_stream_close_func (void *object)
834834
return ds_ipc_stream_close (ipc_stream, NULL);
835835
}
836836

837+
static
838+
IpcPollEvents
839+
ipc_stream_poll_func (
840+
void *object,
841+
uint32_t timeout_ms)
842+
{
843+
EP_ASSERT (!"ipc_stream_poll_func needs to be implemented for NamedPipes");
844+
// TODO: Implement ipc_stream_poll_func for NamedPipes
845+
return IPC_POLL_EVENTS_UNKNOWN;
846+
}
847+
837848
static IpcStreamVtable ipc_stream_vtable = {
838849
ipc_stream_free_func,
839850
ipc_stream_read_func,
840851
ipc_stream_write_func,
841852
ipc_stream_flush_func,
842-
ipc_stream_close_func };
853+
ipc_stream_close_func,
854+
ipc_stream_poll_func };
843855

844856
static
845857
DiagnosticsIpcStream *

src/native/eventpipe/ds-ipc-pal-socket.c

Lines changed: 60 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -586,7 +586,7 @@ ipc_socket_connect (
586586
// the server hasn't called `accept`, so no need to check for timeout or connect error.
587587

588588
#if defined(DS_IPC_PAL_AF_INET) || defined(DS_IPC_PAL_AF_INET6)
589-
if (timeout_ms != DS_IPC_TIMEOUT_INFINITE) {
589+
if (timeout_ms != IPC_TIMEOUT_INFINITE) {
590590
// Set socket to none blocking.
591591
ipc_socket_set_blocking (s, false);
592592
}
@@ -599,7 +599,7 @@ ipc_socket_connect (
599599
DS_EXIT_BLOCKING_PAL_SECTION;
600600

601601
#if defined(DS_IPC_PAL_AF_INET) || defined(DS_IPC_PAL_AF_INET6)
602-
if (timeout_ms != DS_IPC_TIMEOUT_INFINITE) {
602+
if (timeout_ms != IPC_TIMEOUT_INFINITE) {
603603
if (result_connect == DS_IPC_SOCKET_ERROR) {
604604
if (ipc_get_last_error () == DS_IPC_SOCKET_ERROR_WOULDBLOCK) {
605605
ds_ipc_pollfd_t pfd;
@@ -625,7 +625,7 @@ ipc_socket_connect (
625625
}
626626
}
627627

628-
if (timeout_ms != DS_IPC_TIMEOUT_INFINITE) {
628+
if (timeout_ms != IPC_TIMEOUT_INFINITE) {
629629
// Reset socket to blocking.
630630
int last_error = ipc_get_last_error ();
631631
ipc_socket_set_blocking (s, true);
@@ -1144,15 +1144,15 @@ ds_ipc_poll (
11441144
// check for hangup first because a closed socket
11451145
// will technically meet the requirements for POLLIN
11461146
// i.e., a call to recv/read won't block
1147-
poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_HANGUP;
1147+
poll_handles_data [i].events = (uint8_t)IPC_POLL_EVENTS_HANGUP;
11481148
} else if ((poll_fds [i].revents & (POLLERR|POLLNVAL))) {
11491149
if (callback)
11501150
callback ("Poll error", (uint32_t)poll_fds [i].revents);
1151-
poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_ERR;
1151+
poll_handles_data [i].events = (uint8_t)IPC_POLL_EVENTS_ERR;
11521152
} else if (poll_fds [i].revents & (POLLIN|POLLPRI)) {
1153-
poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_SIGNALED;
1153+
poll_handles_data [i].events = (uint8_t)IPC_POLL_EVENTS_SIGNALED;
11541154
} else {
1155-
poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_UNKNOWN;
1155+
poll_handles_data [i].events = (uint8_t)IPC_POLL_EVENTS_UNKNOWN;
11561156
if (callback)
11571157
callback ("unknown poll response", (uint32_t)poll_fds [i].revents);
11581158
}
@@ -1399,7 +1399,7 @@ ipc_stream_read_func (
13991399
DiagnosticsIpcStream *ipc_stream = (DiagnosticsIpcStream *)object;
14001400
ssize_t total_bytes_read = 0;
14011401

1402-
if (timeout_ms != DS_IPC_TIMEOUT_INFINITE) {
1402+
if (timeout_ms != IPC_TIMEOUT_INFINITE) {
14031403
ds_ipc_pollfd_t pfd;
14041404
pfd.fd = ipc_stream->client_socket;
14051405
pfd.events = POLLIN;
@@ -1443,7 +1443,7 @@ ipc_stream_write_func (
14431443
DiagnosticsIpcStream *ipc_stream = (DiagnosticsIpcStream *)object;
14441444
ssize_t total_bytes_written = 0;
14451445

1446-
if (timeout_ms != DS_IPC_TIMEOUT_INFINITE) {
1446+
if (timeout_ms != IPC_TIMEOUT_INFINITE) {
14471447
ds_ipc_pollfd_t pfd;
14481448
pfd.fd = ipc_stream->client_socket;
14491449
pfd.events = POLLOUT;
@@ -1487,12 +1487,24 @@ ipc_stream_close_func (void *object)
14871487
return ds_ipc_stream_close (ipc_stream, NULL);
14881488
}
14891489

1490+
static
1491+
IpcPollEvents
1492+
ipc_stream_poll_func (
1493+
void *object,
1494+
uint32_t timeout_ms)
1495+
{
1496+
EP_ASSERT (object != NULL);
1497+
DiagnosticsIpcStream *ipc_stream = (DiagnosticsIpcStream *)object;
1498+
return ds_ipc_stream_poll (ipc_stream, timeout_ms);
1499+
}
1500+
14901501
static IpcStreamVtable ipc_stream_vtable = {
14911502
ipc_stream_free_func,
14921503
ipc_stream_read_func,
14931504
ipc_stream_write_func,
14941505
ipc_stream_flush_func,
1495-
ipc_stream_close_func };
1506+
ipc_stream_close_func,
1507+
ipc_stream_poll_func };
14961508

14971509
static
14981510
DiagnosticsIpcStream *
@@ -1666,6 +1678,44 @@ ds_ipc_stream_to_string (
16661678
return (result > 0 && result < (int32_t)buffer_len) ? result : 0;
16671679
}
16681680

1681+
IpcPollEvents
1682+
ds_ipc_stream_poll (
1683+
DiagnosticsIpcStream *ipc_stream,
1684+
uint32_t timeout_ms)
1685+
{
1686+
EP_ASSERT (ipc_stream != NULL);
1687+
1688+
if (ipc_stream->client_socket == DS_IPC_INVALID_SOCKET)
1689+
return IPC_POLL_EVENTS_HANGUP;
1690+
1691+
ds_ipc_pollfd_t pfd;
1692+
pfd.fd = ipc_stream->client_socket;
1693+
pfd.events = POLLIN | POLLPRI | POLLOUT;
1694+
1695+
int result_poll;
1696+
result_poll = ipc_poll_fds (&pfd, 1, timeout_ms);
1697+
1698+
if (result_poll < 0)
1699+
return IPC_POLL_EVENTS_ERR;
1700+
1701+
if (result_poll == 0)
1702+
return IPC_POLL_EVENTS_NONE;
1703+
1704+
if (pfd.revents == 0)
1705+
return IPC_POLL_EVENTS_NONE;
1706+
1707+
if (pfd.revents & POLLHUP)
1708+
return IPC_POLL_EVENTS_HANGUP;
1709+
1710+
if (pfd.revents & (POLLERR | POLLNVAL))
1711+
return IPC_POLL_EVENTS_ERR;
1712+
1713+
if (pfd.revents & (POLLIN | POLLPRI | POLLOUT))
1714+
return IPC_POLL_EVENTS_SIGNALED;
1715+
1716+
return IPC_POLL_EVENTS_UNKNOWN;
1717+
}
1718+
16691719
#endif /* ENABLE_PERFTRACING */
16701720

16711721
#ifndef DS_INCLUDE_SOURCE_FILES

src/native/eventpipe/ds-ipc-pal-types.h

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,12 @@ typedef struct _DiagnosticsIpcStream DiagnosticsIpcStream;
2222
* Diagnostics IPC PAL Enums.
2323
*/
2424

25-
typedef enum {
26-
DS_IPC_POLL_EVENTS_NONE = 0x00, // no events
27-
DS_IPC_POLL_EVENTS_SIGNALED = 0x01, // ready for use
28-
DS_IPC_POLL_EVENTS_HANGUP = 0x02, // connection remotely closed
29-
DS_IPC_POLL_EVENTS_ERR = 0x04, // error
30-
DS_IPC_POLL_EVENTS_UNKNOWN = 0x80 // unknown state
31-
} DiagnosticsIpcPollEvents;
32-
3325
typedef enum {
3426
DS_IPC_CONNECTION_MODE_CONNECT,
3527
DS_IPC_CONNECTION_MODE_LISTEN
3628
} DiagnosticsIpcConnectionMode;
3729

3830
#define DS_IPC_MAX_TO_STRING_LEN 128
39-
#define DS_IPC_TIMEOUT_INFINITE (uint32_t)-1
4031

4132
#define DS_IPC_POLL_TIMEOUT_FALLOFF_FACTOR (float)1.25
4233
#define DS_IPC_POLL_TIMEOUT_MIN_MS (uint32_t)10

src/native/eventpipe/ds-ipc-pal-websocket.c

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,11 +248,11 @@ ds_ipc_poll (
248248
int client_socket = poll_handles_data [i].stream->client_socket;
249249
int pending = ds_rt_websocket_poll (client_socket);
250250
if (pending < 0){
251-
poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_ERR;
251+
poll_handles_data [i].events = (uint8_t)IPC_POLL_EVENTS_ERR;
252252
return 1;
253253
}
254254
if (pending > 0){
255-
poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_SIGNALED;
255+
poll_handles_data [i].events = (uint8_t)IPC_POLL_EVENTS_SIGNALED;
256256
return 1;
257257
}
258258
}
@@ -425,12 +425,24 @@ ipc_stream_close_func (void *object)
425425
return ds_ipc_stream_close (ipc_stream, NULL);
426426
}
427427

428+
static
429+
IpcPollEvents
430+
ipc_stream_poll_func (
431+
void *object,
432+
uint32_t timeout_ms)
433+
{
434+
EP_ASSERT (!"ipc_stream_poll_func needs to be implemented for WebSockets");
435+
// TODO: Implement ipc_stream_poll_func for WebSockets
436+
return IPC_POLL_EVENTS_UNKNOWN;
437+
}
438+
428439
static IpcStreamVtable ipc_stream_vtable = {
429440
ipc_stream_free_func,
430441
ipc_stream_read_func,
431442
ipc_stream_write_func,
432443
ipc_stream_flush_func,
433-
ipc_stream_close_func };
444+
ipc_stream_close_func,
445+
ipc_stream_poll_func };
434446

435447
static
436448
DiagnosticsIpcStream *

src/native/eventpipe/ds-ipc-pal.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,5 +140,10 @@ ds_ipc_stream_to_string (
140140
ep_char8_t *buffer,
141141
uint32_t buffer_len);
142142

143+
IpcPollEvents
144+
ds_ipc_stream_poll (
145+
DiagnosticsIpcStream *ipc_stream,
146+
uint32_t timeout_ms);
147+
143148
#endif /* ENABLE_PERFTRACING */
144149
#endif /* __DIAGNOSTICS_IPC_PAL_H__ */

src/native/eventpipe/ds-ipc.c

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ inline
122122
uint32_t
123123
ipc_stream_factory_get_next_timeout (uint32_t current_timeout_ms)
124124
{
125-
if (current_timeout_ms == DS_IPC_TIMEOUT_INFINITE)
125+
if (current_timeout_ms == IPC_TIMEOUT_INFINITE)
126126
return DS_IPC_POLL_TIMEOUT_MIN_MS;
127127
else
128128
return (current_timeout_ms >= DS_IPC_POLL_TIMEOUT_MAX_MS) ?
@@ -361,7 +361,7 @@ ds_ipc_stream_factory_get_next_available_stream (ds_ipc_error_callback_func call
361361

362362
DiagnosticsIpcStream *stream = NULL;
363363

364-
uint32_t poll_timeout_ms = DS_IPC_TIMEOUT_INFINITE;
364+
uint32_t poll_timeout_ms = IPC_TIMEOUT_INFINITE;
365365
bool connect_success = true;
366366
uint32_t poll_attempts = 0;
367367

@@ -382,7 +382,7 @@ ds_ipc_stream_factory_get_next_available_stream (ds_ipc_error_callback_func call
382382
} DN_VECTOR_PTR_FOREACH_END;
383383

384384
poll_timeout_ms = connect_success ?
385-
DS_IPC_TIMEOUT_INFINITE :
385+
IPC_TIMEOUT_INFINITE :
386386
ipc_stream_factory_get_next_timeout (poll_timeout_ms);
387387

388388
int32_t ret_val;
@@ -392,7 +392,7 @@ ds_ipc_stream_factory_get_next_available_stream (ds_ipc_error_callback_func call
392392
ipc_log_poll_handles (&ipc_poll_handles);
393393
ret_val = ds_ipc_poll (dn_vector_data_t (&ipc_poll_handles, DiagnosticsIpcPollHandle), dn_vector_size (&ipc_poll_handles), poll_timeout_ms, callback);
394394
} else {
395-
if (poll_timeout_ms == DS_IPC_TIMEOUT_INFINITE)
395+
if (poll_timeout_ms == IPC_TIMEOUT_INFINITE)
396396
poll_timeout_ms = DS_IPC_POLL_TIMEOUT_MAX_MS;
397397
DS_LOG_DEBUG_1 ("ds_ipc_stream_factory_get_next_available_stream - Nothing to poll, sleeping using timeout: %dms.", poll_timeout_ms);
398398
ep_rt_thread_sleep ((uint64_t)poll_timeout_ms * NUM_NANOSECONDS_IN_1_MS);
@@ -406,13 +406,13 @@ ds_ipc_stream_factory_get_next_available_stream (ds_ipc_error_callback_func call
406406
DN_VECTOR_FOREACH_BEGIN (DiagnosticsIpcPollHandle, ipc_poll_handle, &ipc_poll_handles) {
407407
DiagnosticsPort *port = (DiagnosticsPort *)ipc_poll_handle.user_data;
408408
switch (ipc_poll_handle.events) {
409-
case DS_IPC_POLL_EVENTS_HANGUP:
409+
case IPC_POLL_EVENTS_HANGUP:
410410
EP_ASSERT (port != NULL);
411411
ds_port_reset_vcall (port, callback);
412412
DS_LOG_INFO_2 ("ds_ipc_stream_factory_get_next_available_stream - HUP :: Poll attempt: %d, connection %d hung up. Connect is reset.", poll_attempts, connection_id);
413413
poll_timeout_ms = DS_IPC_POLL_TIMEOUT_MIN_MS;
414414
break;
415-
case DS_IPC_POLL_EVENTS_SIGNALED:
415+
case IPC_POLL_EVENTS_SIGNALED:
416416
EP_ASSERT (port != NULL);
417417
if (!stream) { // only use first signaled stream; will get others on subsequent calls
418418
stream = ds_port_get_connected_stream_vcall (port, callback);
@@ -422,12 +422,12 @@ ds_ipc_stream_factory_get_next_available_stream (ds_ipc_error_callback_func call
422422
}
423423
DS_LOG_DEBUG_2 ("ds_ipc_stream_factory_get_next_available_stream - SIG :: Poll attempt: %d, connection %d signalled.", poll_attempts, connection_id);
424424
break;
425-
case DS_IPC_POLL_EVENTS_ERR:
425+
case IPC_POLL_EVENTS_ERR:
426426
ds_port_reset_vcall ((DiagnosticsPort *)ipc_poll_handle.user_data, callback);
427427
DS_LOG_INFO_2 ("ds_ipc_stream_factory_get_next_available_stream - ERR :: Poll attempt: %d, connection %d errored. Connection is reset.", poll_attempts, connection_id);
428428
saw_error = true;
429429
break;
430-
case DS_IPC_POLL_EVENTS_NONE:
430+
case IPC_POLL_EVENTS_NONE:
431431
DS_LOG_INFO_2 ("ds_ipc_stream_factory_get_next_available_stream - NON :: Poll attempt: %d, connection %d had no events.", poll_attempts, connection_id);
432432
break;
433433
default:
@@ -444,7 +444,7 @@ ds_ipc_stream_factory_get_next_available_stream (ds_ipc_error_callback_func call
444444
if (!stream && saw_error) {
445445
// Some errors can cause the poll to return instantly, we want to delay if we see an error to avoid
446446
// runaway CPU usage.
447-
if (poll_timeout_ms == DS_IPC_TIMEOUT_INFINITE)
447+
if (poll_timeout_ms == IPC_TIMEOUT_INFINITE)
448448
poll_timeout_ms = DS_IPC_POLL_TIMEOUT_MAX_MS;
449449
DS_LOG_DEBUG_1 ("ds_ipc_stream_factory_get_next_available_stream - Saw error, sleeping using timeout: %dms.", poll_timeout_ms);
450450
ep_rt_thread_sleep ((uint64_t)poll_timeout_ms * NUM_NANOSECONDS_IN_1_MS);

0 commit comments

Comments
 (0)