@@ -938,6 +938,227 @@ JL_DLLEXPORT int jl_alignment(size_t sz)
938938 return jl_gc_alignment (sz );
939939}
940940
941+ // Heartbeat mechanism for Julia's task scheduler
942+ // ---
943+ // Start a thread that does not participate in running Julia's tasks. This
944+ // thread simply sleeps until the heartbeat mechanism is enabled. When
945+ // enabled, the heartbeat thread enters a loop in which it blocks waiting
946+ // for the specified heartbeat interval. If, within that interval,
947+ // `jl_heartbeat()` is *not* called at least once, then the thread calls
948+ // `jl_print_task_backtraces(0)`.
949+
950+ #ifdef JL_HEARTBEAT_THREAD
951+
952+ #include <time.h>
953+
954+ volatile int heartbeat_enabled ;
955+ uv_sem_t heartbeat_on_sem , // jl_heartbeat_enable -> thread
956+ heartbeat_off_sem ; // thread -> jl_heartbeat_enable
957+ int heartbeat_interval_s ,
958+ n_loss_reports ,
959+ reset_reporting_s ;
960+ int last_report_s , report_interval_s , n_reported ;
961+ _Atomic(int ) heartbeats ;
962+
963+ JL_DLLEXPORT void jl_print_task_backtraces (int show_done ) JL_NOTSAFEPOINT ;
964+ void jl_heartbeat_threadfun (void * arg );
965+
966+ // start the heartbeat thread with heartbeats disabled
967+ void jl_init_heartbeat (void )
968+ {
969+ uv_thread_t uvtid ;
970+ heartbeat_enabled = 0 ;
971+ uv_sem_init (& heartbeat_on_sem , 0 );
972+ uv_sem_init (& heartbeat_off_sem , 0 );
973+ uv_thread_create (& uvtid , jl_heartbeat_threadfun , NULL );
974+ uv_thread_detach (& uvtid );
975+ }
976+
977+ // enable/disable heartbeats
978+ // heartbeat_s: interval within which jl_heartbeat() must be called
979+ // n_reports: for one heartbeat loss interval, how many times to report
980+ // reset_reporting_after_s: how long to wait after a heartbeat loss
981+ // interval and a return to steady heartbeats, before resetting
982+ // reporting behavior
983+ //
984+ // When disabling heartbeats, the heartbeat thread must wake up,
985+ // find out that heartbeats are now diabled, and reset. For now, we
986+ // handle this by preventing re-enabling of heartbeats until this
987+ // completes.
988+ JL_DLLEXPORT int jl_heartbeat_enable (int heartbeat_s , int n_reports ,
989+ int reset_reporting_after_s )
990+ {
991+ if (heartbeat_s <= 0 ) {
992+ heartbeat_enabled = 0 ;
993+ heartbeat_interval_s = n_loss_reports = reset_reporting_s = 0 ;
994+ }
995+ else {
996+ // must disable before enabling
997+ if (heartbeat_enabled ) {
998+ return -1 ;
999+ }
1000+ // heartbeat thread must be ready
1001+ if (uv_sem_trywait (& heartbeat_off_sem ) != 0 ) {
1002+ return -1 ;
1003+ }
1004+
1005+ jl_atomic_store_relaxed (& heartbeats , 0 );
1006+ heartbeat_interval_s = heartbeat_s ;
1007+ n_loss_reports = n_reports ;
1008+ reset_reporting_s = reset_reporting_after_s ;
1009+ last_report_s = 0 ;
1010+ report_interval_s = heartbeat_interval_s ;
1011+ heartbeat_enabled = 1 ;
1012+ uv_sem_post (& heartbeat_on_sem ); // wake the heartbeat thread
1013+ }
1014+ return 0 ;
1015+ }
1016+
1017+ // heartbeat
1018+ JL_DLLEXPORT void jl_heartbeat (void )
1019+ {
1020+ jl_atomic_fetch_add (& heartbeats , 1 );
1021+ }
1022+
1023+ // sleep the thread for the specified interval
1024+ void sleep_for (int secs , int nsecs )
1025+ {
1026+ struct timespec rqtp , rmtp ;
1027+ rqtp .tv_sec = secs ;
1028+ rqtp .tv_nsec = nsecs ;
1029+ rmtp .tv_sec = 0 ;
1030+ rmtp .tv_nsec = 0 ;
1031+ for (; ;) {
1032+ // this suspends the thread so we aren't using CPU
1033+ if (nanosleep (& rqtp , & rmtp ) == 0 ) {
1034+ return ;
1035+ }
1036+ // TODO: else if (errno == EINTR)
1037+ // this could be SIGTERM and we should shutdown but how to find out?
1038+ rqtp = rmtp ;
1039+ }
1040+ }
1041+
1042+ // check for heartbeats and maybe report loss
1043+ uint8_t check_heartbeats (uint8_t gc_state )
1044+ {
1045+ int hb = jl_atomic_exchange (& heartbeats , 0 );
1046+ uint64_t curr_s = jl_hrtime () / 1e9 ;
1047+
1048+ if (hb <= 0 ) {
1049+ // we didn't get a heartbeat in the last interval; should we report?
1050+ if (n_reported < n_loss_reports &&
1051+ curr_s - last_report_s >= report_interval_s ) {
1052+ jl_task_t * ct = jl_current_task ;
1053+ jl_ptls_t ptls = ct -> ptls ;
1054+
1055+ // exit GC-safe region to report then re-enter
1056+ jl_gc_safe_leave (ptls , gc_state );
1057+ jl_safe_printf ("==== heartbeat loss ====\n" );
1058+ jl_print_task_backtraces (0 );
1059+ gc_state = jl_gc_safe_enter (ptls );
1060+
1061+ // we've reported
1062+ n_reported ++ ;
1063+
1064+ // record the reporting time _after_ the report
1065+ last_report_s = jl_hrtime () / 1e9 ;
1066+
1067+ // double the reporting interval up to a maximum
1068+ if (report_interval_s < 60 * heartbeat_interval_s ) {
1069+ report_interval_s *= 2 ;
1070+ }
1071+ }
1072+ // no heartbeats, don't change reporting state
1073+ return gc_state ;
1074+ }
1075+ else {
1076+ // we got a heartbeat; reset the report count
1077+ n_reported = 0 ;
1078+ }
1079+
1080+ // reset the reporting interval only once we're steadily getting
1081+ // heartbeats for the requested reset interval
1082+ if (curr_s - reset_reporting_s > last_report_s ) {
1083+ report_interval_s = heartbeat_interval_s ;
1084+ }
1085+
1086+ return gc_state ;
1087+ }
1088+
1089+ // heartbeat thread function
1090+ void jl_heartbeat_threadfun (void * arg )
1091+ {
1092+ int s , ns = 1e9 - 1 , rs ;
1093+ uint64_t t0 , tchb ;
1094+
1095+ // We need a TLS because backtraces are accumulated into ptls->bt_size
1096+ // and ptls->bt_data, so we need to call jl_adopt_thread().
1097+ jl_adopt_thread ();
1098+ jl_task_t * ct = jl_current_task ;
1099+ jl_ptls_t ptls = ct -> ptls ;
1100+
1101+ // Don't hold up GC, this thread doesn't participate.
1102+ uint8_t gc_state = jl_gc_safe_enter (ptls );
1103+
1104+ for (;;) {
1105+ if (!heartbeat_enabled ) {
1106+ // post the off semaphore to indicate we're ready to enable
1107+ uv_sem_post (& heartbeat_off_sem );
1108+
1109+ // sleep the thread here; this semaphore is posted in
1110+ // jl_heartbeat_enable()
1111+ uv_sem_wait (& heartbeat_on_sem );
1112+
1113+ // Set the sleep duration.
1114+ s = heartbeat_interval_s - 1 ;
1115+ ns = 1e9 - 1 ;
1116+ continue ;
1117+ }
1118+
1119+ // heartbeat is enabled; sleep, waiting for the desired interval
1120+ sleep_for (s , ns );
1121+
1122+ // if heartbeats were turned off while we were sleeping, reset
1123+ if (!heartbeat_enabled ) {
1124+ continue ;
1125+ }
1126+
1127+ // check if any heartbeats have happened, report as appropriate
1128+ t0 = jl_hrtime ();
1129+ gc_state = check_heartbeats (gc_state );
1130+ tchb = jl_hrtime () - t0 ;
1131+
1132+ // adjust the next sleep duration based on how long the heartbeat
1133+ // check took
1134+ rs = 1 ;
1135+ while (tchb > 1e9 ) {
1136+ rs ++ ;
1137+ tchb -= 1e9 ;
1138+ }
1139+ s = heartbeat_interval_s - rs ;
1140+ ns = 1e9 - tchb ;
1141+ }
1142+ }
1143+
1144+ #else // !JL_HEARTBEAT_THREAD
1145+
1146+ void jl_init_heartbeat (void )
1147+ {
1148+ }
1149+
1150+ JL_DLLEXPORT int jl_heartbeat_enable (int heartbeat_s , int n_reports ,
1151+ int reset_reporting_after_s )
1152+ {
1153+ return -1 ;
1154+ }
1155+
1156+ JL_DLLEXPORT void jl_heartbeat (void )
1157+ {
1158+ }
1159+
1160+ #endif // JL_HEARTBEAT_THREAD
1161+
9411162#ifdef __cplusplus
9421163}
9431164#endif
0 commit comments