Skip to content

Commit c131816

Browse files
committed
Address Feedback
Revert moving IpcPollEvents to ep-ipc-pal-types header Use IpcStream base type directly in sessions Omit websocket ipc_stream_poll_func implementation Change continuation stream poll timeout to infinite Change returned PollEvent to none upon timeout
1 parent c7fc5e6 commit c131816

File tree

13 files changed

+68
-105
lines changed

13 files changed

+68
-105
lines changed

src/native/eventpipe/ds-ipc-pal-namedpipe.c

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ ds_ipc_poll (
213213
handles [i] = poll_handles_data [i].ipc->overlap.hEvent;
214214
if (handles [i] == INVALID_HANDLE_VALUE) {
215215
// Invalid handle, wait will fail. Signal error
216-
poll_handles_data [i].events = EP_IPC_POLL_EVENTS_ERR;
216+
poll_handles_data [i].events = DS_IPC_POLL_EVENTS_ERR;
217217
}
218218
} else {
219219
// CLIENT
@@ -238,7 +238,7 @@ ds_ipc_poll (
238238
handles [i] = poll_handles_data [i].stream->overlap.hEvent;
239239
break;
240240
case ERROR_PIPE_NOT_CONNECTED:
241-
poll_handles_data [i].events = (uint8_t)EP_IPC_POLL_EVENTS_HANGUP;
241+
poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_HANGUP;
242242
result = -1;
243243
ep_raise_error ();
244244
default:
@@ -288,7 +288,7 @@ ds_ipc_poll (
288288
// check if we abandoned something
289289
DWORD abandonedIndex = wait - WAIT_ABANDONED_0;
290290
if (abandonedIndex > 0 || abandonedIndex < (poll_handles_data_len - 1)) {
291-
poll_handles_data [abandonedIndex].events = (uint8_t)EP_IPC_POLL_EVENTS_HANGUP;
291+
poll_handles_data [abandonedIndex].events = (uint8_t)DS_IPC_POLL_EVENTS_HANGUP;
292292
result = -1;
293293
ep_raise_error ();
294294
} else {
@@ -325,20 +325,20 @@ ds_ipc_poll (
325325
if (!success) {
326326
DWORD error = GetLastError();
327327
if (error == ERROR_PIPE_NOT_CONNECTED || error == ERROR_BROKEN_PIPE) {
328-
poll_handles_data [index].events = (uint8_t)EP_IPC_POLL_EVENTS_HANGUP;
328+
poll_handles_data [index].events = (uint8_t)DS_IPC_POLL_EVENTS_HANGUP;
329329
} else {
330330
if (callback)
331331
callback ("Client connection error", error);
332-
poll_handles_data [index].events = (uint8_t)EP_IPC_POLL_EVENTS_ERR;
332+
poll_handles_data [index].events = (uint8_t)DS_IPC_POLL_EVENTS_ERR;
333333
result = -1;
334334
ep_raise_error ();
335335
}
336336
} else {
337-
poll_handles_data [index].events = (uint8_t)EP_IPC_POLL_EVENTS_SIGNALED;
337+
poll_handles_data [index].events = (uint8_t)DS_IPC_POLL_EVENTS_SIGNALED;
338338
}
339339
} else {
340340
// SERVER
341-
poll_handles_data [index].events = (uint8_t)EP_IPC_POLL_EVENTS_SIGNALED;
341+
poll_handles_data [index].events = (uint8_t)DS_IPC_POLL_EVENTS_SIGNALED;
342342
}
343343

344344
result = 1;
@@ -835,13 +835,14 @@ ipc_stream_close_func (void *object)
835835
}
836836

837837
static
838-
EventPipeIpcPollEvents
838+
DiagnosticsIpcPollEvents
839839
ipc_stream_poll_func (
840840
void *object,
841841
uint32_t timeout_ms)
842842
{
843-
// Needs to be implemented.
844-
return EP_IPC_POLL_EVENTS_UNKNOWN;
843+
EP_UNREACHABLE ("ipc_stream_poll_func needs to be implemented for NamedPipes");
844+
// TODO: Implement ipc_stream_poll_func for NamedPipes
845+
return DS_IPC_POLL_EVENTS_UNKNOWN;
845846
}
846847

847848
static IpcStreamVtable ipc_stream_vtable = {

src/native/eventpipe/ds-ipc-pal-socket.c

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1146,15 +1146,15 @@ ds_ipc_poll (
11461146
// check for hangup first because a closed socket
11471147
// will technically meet the requirements for POLLIN
11481148
// i.e., a call to recv/read won't block
1149-
poll_handles_data [i].events = (uint8_t)EP_IPC_POLL_EVENTS_HANGUP;
1149+
poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_HANGUP;
11501150
} else if ((poll_fds [i].revents & (POLLERR|POLLNVAL))) {
11511151
if (callback)
11521152
callback ("Poll error", (uint32_t)poll_fds [i].revents);
1153-
poll_handles_data [i].events = (uint8_t)EP_IPC_POLL_EVENTS_ERR;
1153+
poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_ERR;
11541154
} else if (poll_fds [i].revents & (POLLIN|POLLPRI)) {
1155-
poll_handles_data [i].events = (uint8_t)EP_IPC_POLL_EVENTS_SIGNALED;
1155+
poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_SIGNALED;
11561156
} else {
1157-
poll_handles_data [i].events = (uint8_t)EP_IPC_POLL_EVENTS_UNKNOWN;
1157+
poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_UNKNOWN;
11581158
if (callback)
11591159
callback ("unknown poll response", (uint32_t)poll_fds [i].revents);
11601160
}
@@ -1490,7 +1490,7 @@ ipc_stream_close_func (void *object)
14901490
}
14911491

14921492
static
1493-
EventPipeIpcPollEvents
1493+
DiagnosticsIpcPollEvents
14941494
ipc_stream_poll_func (
14951495
void *object,
14961496
uint32_t timeout_ms)
@@ -1680,15 +1680,15 @@ ds_ipc_stream_to_string (
16801680
return (result > 0 && result < (int32_t)buffer_len) ? result : 0;
16811681
}
16821682

1683-
EventPipeIpcPollEvents
1683+
DiagnosticsIpcPollEvents
16841684
ds_ipc_stream_poll (
16851685
DiagnosticsIpcStream *ipc_stream,
16861686
uint32_t timeout_ms)
16871687
{
16881688
EP_ASSERT (ipc_stream != NULL);
16891689

16901690
if (ipc_stream->client_socket == DS_IPC_INVALID_SOCKET)
1691-
return EP_IPC_POLL_EVENTS_HANGUP;
1691+
return DS_IPC_POLL_EVENTS_HANGUP;
16921692

16931693
ds_ipc_pollfd_t pfd;
16941694
pfd.fd = ipc_stream->client_socket;
@@ -1698,24 +1698,24 @@ ds_ipc_stream_poll (
16981698
result_poll = ipc_poll_fds (&pfd, 1, timeout_ms);
16991699

17001700
if (result_poll < 0)
1701-
return EP_IPC_POLL_EVENTS_ERR;
1701+
return DS_IPC_POLL_EVENTS_ERR;
17021702

17031703
if (result_poll == 0)
1704-
return EP_IPC_POLL_EVENTS_HANGUP;
1704+
return DS_IPC_POLL_EVENTS_NONE;
17051705

17061706
if (pfd.revents == 0)
1707-
return EP_IPC_POLL_EVENTS_NONE;
1707+
return DS_IPC_POLL_EVENTS_NONE;
17081708

17091709
if (pfd.revents & POLLHUP)
1710-
return EP_IPC_POLL_EVENTS_HANGUP;
1710+
return DS_IPC_POLL_EVENTS_HANGUP;
17111711

17121712
if (pfd.revents & (POLLERR | POLLNVAL))
1713-
return EP_IPC_POLL_EVENTS_ERR;
1713+
return DS_IPC_POLL_EVENTS_ERR;
17141714

17151715
if (pfd.revents & (POLLIN | POLLPRI | POLLOUT))
1716-
return EP_IPC_POLL_EVENTS_SIGNALED;
1716+
return DS_IPC_POLL_EVENTS_SIGNALED;
17171717

1718-
return EP_IPC_POLL_EVENTS_UNKNOWN;
1718+
return DS_IPC_POLL_EVENTS_UNKNOWN;
17191719
}
17201720

17211721
#endif /* ENABLE_PERFTRACING */

src/native/eventpipe/ds-ipc-pal-types.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ typedef struct _DiagnosticsIpcStream DiagnosticsIpcStream;
2222
* Diagnostics IPC PAL Enums.
2323
*/
2424

25+
typedef enum {
26+
DS_IPC_POLL_EVENTS_NONE = 0x00, // no events
27+
DS_IPC_POLL_EVENTS_SIGNALED = 0x01, // ready for use
28+
DS_IPC_POLL_EVENTS_HANGUP = 0x02, // connection remotely closed
29+
DS_IPC_POLL_EVENTS_ERR = 0x04, // error
30+
DS_IPC_POLL_EVENTS_UNKNOWN = 0x80 // unknown state
31+
} DiagnosticsIpcPollEvents;
32+
2533
typedef enum {
2634
DS_IPC_CONNECTION_MODE_CONNECT,
2735
DS_IPC_CONNECTION_MODE_LISTEN

src/native/eventpipe/ds-ipc-pal-websocket.c

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -248,11 +248,11 @@ ds_ipc_poll (
248248
int client_socket = poll_handles_data [i].stream->client_socket;
249249
int pending = ds_rt_websocket_poll (client_socket);
250250
if (pending < 0){
251-
poll_handles_data [i].events = (uint8_t)EP_IPC_POLL_EVENTS_ERR;
251+
poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_ERR;
252252
return 1;
253253
}
254254
if (pending > 0){
255-
poll_handles_data [i].events = (uint8_t)EP_IPC_POLL_EVENTS_SIGNALED;
255+
poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_SIGNALED;
256256
return 1;
257257
}
258258
}
@@ -426,23 +426,14 @@ ipc_stream_close_func (void *object)
426426
}
427427

428428
static
429-
EventPipeIpcPollEvents
429+
DiagnosticsIpcPollEvents
430430
ipc_stream_poll_func (
431431
void *object,
432432
uint32_t timeout_ms)
433433
{
434-
EP_ASSERT (object != NULL);
435-
DiagnosticsIpcStream *ipc_stream = (DiagnosticsIpcStream *)object;
436-
437-
// Check if the socket is still open
438-
int pending = ds_rt_websocket_poll (ipc_stream->client_socket);
439-
if (pending < 0)
440-
return EP_IPC_POLL_EVENTS_ERR;
441-
442-
if (pending > 0)
443-
return EP_IPC_POLL_EVENTS_SIGNALED;
444-
445-
return EP_IPC_POLL_EVENTS_NONE;
434+
EP_UNREACHABLE ("ipc_stream_poll_func needs to be implemented for WebSockets");
435+
// TODO: Implement ipc_stream_poll_func for WebSockets
436+
return DS_IPC_POLL_EVENTS_UNKNOWN;
446437
}
447438

448439
static IpcStreamVtable ipc_stream_vtable = {

src/native/eventpipe/ds-ipc-pal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ ds_ipc_stream_to_string (
140140
ep_char8_t *buffer,
141141
uint32_t buffer_len);
142142

143-
EventPipeIpcPollEvents
143+
DiagnosticsIpcPollEvents
144144
ds_ipc_stream_poll (
145145
DiagnosticsIpcStream *ipc_stream,
146146
uint32_t timeout_ms);

src/native/eventpipe/ds-ipc.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -406,13 +406,13 @@ ds_ipc_stream_factory_get_next_available_stream (ds_ipc_error_callback_func call
406406
DN_VECTOR_FOREACH_BEGIN (DiagnosticsIpcPollHandle, ipc_poll_handle, &ipc_poll_handles) {
407407
DiagnosticsPort *port = (DiagnosticsPort *)ipc_poll_handle.user_data;
408408
switch (ipc_poll_handle.events) {
409-
case EP_IPC_POLL_EVENTS_HANGUP:
409+
case DS_IPC_POLL_EVENTS_HANGUP:
410410
EP_ASSERT (port != NULL);
411411
ds_port_reset_vcall (port, callback);
412412
DS_LOG_INFO_2 ("ds_ipc_stream_factory_get_next_available_stream - HUP :: Poll attempt: %d, connection %d hung up. Connect is reset.", poll_attempts, connection_id);
413413
poll_timeout_ms = DS_IPC_POLL_TIMEOUT_MIN_MS;
414414
break;
415-
case EP_IPC_POLL_EVENTS_SIGNALED:
415+
case DS_IPC_POLL_EVENTS_SIGNALED:
416416
EP_ASSERT (port != NULL);
417417
if (!stream) { // only use first signaled stream; will get others on subsequent calls
418418
stream = ds_port_get_connected_stream_vcall (port, callback);
@@ -422,12 +422,12 @@ ds_ipc_stream_factory_get_next_available_stream (ds_ipc_error_callback_func call
422422
}
423423
DS_LOG_DEBUG_2 ("ds_ipc_stream_factory_get_next_available_stream - SIG :: Poll attempt: %d, connection %d signalled.", poll_attempts, connection_id);
424424
break;
425-
case EP_IPC_POLL_EVENTS_ERR:
425+
case DS_IPC_POLL_EVENTS_ERR:
426426
ds_port_reset_vcall ((DiagnosticsPort *)ipc_poll_handle.user_data, callback);
427427
DS_LOG_INFO_2 ("ds_ipc_stream_factory_get_next_available_stream - ERR :: Poll attempt: %d, connection %d errored. Connection is reset.", poll_attempts, connection_id);
428428
saw_error = true;
429429
break;
430-
case EP_IPC_POLL_EVENTS_NONE:
430+
case DS_IPC_POLL_EVENTS_NONE:
431431
DS_LOG_INFO_2 ("ds_ipc_stream_factory_get_next_available_stream - NON :: Poll attempt: %d, connection %d had no events.", poll_attempts, connection_id);
432432
break;
433433
default:

src/native/eventpipe/ep-ipc-pal-types.h

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,5 @@
1111

1212
#include "ep-ipc-pal-types-forward.h"
1313

14-
/*
15-
* EventPipe IPC PAL
16-
*/
17-
18-
typedef enum {
19-
EP_IPC_POLL_EVENTS_NONE = 0x00, // no events
20-
EP_IPC_POLL_EVENTS_SIGNALED = 0x01, // ready for use
21-
EP_IPC_POLL_EVENTS_HANGUP = 0x02, // connection remotely closed
22-
EP_IPC_POLL_EVENTS_ERR = 0x04, // error
23-
EP_IPC_POLL_EVENTS_UNKNOWN = 0x80 // unknown state
24-
} EventPipeIpcPollEvents;
25-
26-
#define EP_IPC_POLL_TIMEOUT_MIN_MS (uint32_t)10
27-
2814
#endif /* ENABLE_PERFTRACING */
2915
#endif /* __EVENTPIPE_IPC_PAL_TYPES_H__ */

src/native/eventpipe/ep-ipc-stream.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ typedef bool (*IpcStreamReadFunc)(void *object, uint8_t *buffer, uint32_t bytes_
2121
typedef bool (*IpcStreamWriteFunc)(void *object, const uint8_t *buffer, uint32_t bytes_to_write, uint32_t *bytes_written, uint32_t timeout_ms);
2222
typedef bool (*IpcStreamFlushFunc)(void *object);
2323
typedef bool (*IpcStreamCloseFunc)(void *object);
24-
typedef EventPipeIpcPollEvents (*IpcStreamPollFunc)(void *object, uint32_t timeout_ms);
24+
typedef DiagnosticsIpcPollEvents (*IpcStreamPollFunc)(void *object, uint32_t timeout_ms);
2525

2626
struct _IpcStreamVtable {
2727
IpcStreamFreeFunc free_func;
@@ -79,7 +79,7 @@ ep_ipc_stream_flush_vcall (IpcStream *ipc_stream);
7979
bool
8080
ep_ipc_stream_close_vcall (IpcStream *ipc_stream);
8181

82-
EventPipeIpcPollEvents
82+
DiagnosticsIpcPollEvents
8383
ep_ipc_stream_poll_vcall (IpcStream *ipc_stream, uint32_t timeout_ms);
8484

8585
#endif /* ENABLE_PERFTRACING */

src/native/eventpipe/ep-session.c

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ session_tracepoint_write_event (
7676
ep_rt_thread_handle_t event_thread,
7777
EventPipeStackContents *stack);
7878

79+
static
80+
bool
81+
session_is_stream_connection_closed (IpcStream *stream);
82+
7983
/*
8084
* EventPipeSession.
8185
*/
@@ -123,15 +127,15 @@ EP_RT_DEFINE_THREAD_FUNC (streaming_thread)
123127
ep_rt_thread_sleep (timeout_ns);
124128
}
125129
} else if (session->session_type == EP_SESSION_TYPE_USEREVENTS) {
126-
// User events session, write all user events tracepoints to the file.
130+
// In a user events session we only monitor the stream to stop the session if it closes.
127131
while (ep_session_get_streaming_enabled (session)) {
128-
EP_ASSERT (session->continuation_stream != NULL);
129-
if (ep_ipc_continuation_stream_connection_closed (session->continuation_stream)) {
132+
EP_ASSERT (session->stream != NULL);
133+
if (session_is_stream_connection_closed (session->stream)) {
130134
success = false;
131135
break;
132136
}
133137

134-
// Wait until it's time to sample again.
138+
// Wait until it's time to poll again.
135139
const uint32_t timeout_ns = 100000000; // 100 msec.
136140
ep_rt_thread_sleep (timeout_ns);
137141
}
@@ -219,6 +223,15 @@ session_disable_streaming_thread (EventPipeSession *session)
219223
ep_rt_wait_event_free (rt_thread_shutdown_event);
220224
}
221225

226+
static
227+
bool
228+
session_is_stream_connection_closed (IpcStream *stream)
229+
{
230+
EP_ASSERT (stream != NULL);
231+
DiagnosticsIpcPollEvents poll_event = ep_ipc_stream_poll_vcall (stream, DS_IPC_TIMEOUT_INFINITE);
232+
return poll_event == DS_IPC_POLL_EVENTS_HANGUP;
233+
}
234+
222235
/*
223236
* session_user_events_tracepoints_init
224237
*
@@ -300,7 +313,7 @@ ep_session_alloc (
300313
instance->synchronous_callback = sync_callback;
301314
instance->callback_additional_data = callback_additional_data;
302315
instance->user_events_data_fd = -1;
303-
instance->continuation_stream = NULL;
316+
instance->stream = NULL;
304317

305318
// Hard coded 10MB for now, we'll probably want to make
306319
// this configurable later.
@@ -339,7 +352,7 @@ ep_session_alloc (
339352
case EP_SESSION_TYPE_USEREVENTS:
340353
// With the user_events_data file, register tracepoints for each provider's tracepoint configurations
341354
ep_raise_error_if_nok (session_user_events_tracepoints_init (instance, user_events_data_fd));
342-
instance->continuation_stream = (IpcContinuationStream *)stream;
355+
instance->stream = stream;
343356
break;
344357

345358
default:

src/native/eventpipe/ep-session.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ struct _EventPipeSession_Internal {
7272
// The user_events_data file descriptor to register Tracepoints and write user_events to.
7373
int user_events_data_fd;
7474
// The IPC continuation stream from initializing the session through the diagnostic server
75-
IpcContinuationStream *continuation_stream;
75+
// Currently only initialized for user_events sessions.
76+
IpcStream *stream;
7677
};
7778

7879
#if !defined(EP_INLINE_GETTER_SETTER) && !defined(EP_IMPL_SESSION_GETTER_SETTER)

0 commit comments

Comments
 (0)