@@ -64,23 +64,10 @@ pub(crate) struct Spawner {
64
64
shared : Arc < Shared > ,
65
65
}
66
66
67
- /// A remote scheduler entry.
68
- ///
69
- /// These are filled in by remote threads sending instructions to the scheduler.
70
- enum RemoteMsg {
71
- /// A remote thread wants to spawn a task.
72
- Schedule ( task:: Notified < Arc < Shared > > ) ,
73
- }
74
-
75
- // Safety: Used correctly, the task header is "thread safe". Ultimately the task
76
- // is owned by the current thread executor, for which this instruction is being
77
- // sent.
78
- unsafe impl Send for RemoteMsg { }
79
-
80
67
/// Scheduler state shared between threads.
81
68
struct Shared {
82
69
/// Remote run queue. None if the `Runtime` has been dropped.
83
- queue : Mutex < Option < VecDeque < RemoteMsg > > > ,
70
+ queue : Mutex < Option < VecDeque < task :: Notified < Arc < Shared > > > > > ,
84
71
85
72
/// Collection of all active tasks spawned onto this executor.
86
73
owned : OwnedTasks < Arc < Shared > > ,
@@ -251,12 +238,8 @@ impl Drop for BasicScheduler {
251
238
// Using `Option::take` to replace the shared queue with `None`.
252
239
// We already shut down every task, so we just need to drop the task.
253
240
if let Some ( remote_queue) = remote_queue {
254
- for entry in remote_queue {
255
- match entry {
256
- RemoteMsg :: Schedule ( task) => {
257
- drop ( task) ;
258
- }
259
- }
241
+ for task in remote_queue {
242
+ drop ( task) ;
260
243
}
261
244
}
262
245
@@ -396,7 +379,7 @@ impl Spawner {
396
379
handle
397
380
}
398
381
399
- fn pop ( & self ) -> Option < RemoteMsg > {
382
+ fn pop ( & self ) -> Option < task :: Notified < Arc < Shared > > > {
400
383
match self . shared . queue . lock ( ) . as_mut ( ) {
401
384
Some ( queue) => queue. pop_front ( ) ,
402
385
None => None ,
@@ -470,7 +453,7 @@ impl Schedule for Arc<Shared> {
470
453
// don't need to do anything with the notification in that case.
471
454
let mut guard = self . queue . lock ( ) ;
472
455
if let Some ( queue) = guard. as_mut ( ) {
473
- queue. push_back ( RemoteMsg :: Schedule ( task) ) ;
456
+ queue. push_back ( task) ;
474
457
drop ( guard) ;
475
458
self . unpark . unpark ( ) ;
476
459
}
@@ -528,17 +511,12 @@ impl CoreGuard<'_> {
528
511
core. tick = core. tick . wrapping_add ( 1 ) ;
529
512
530
513
let entry = if tick % REMOTE_FIRST_INTERVAL == 0 {
531
- core. spawner
532
- . pop ( )
533
- . or_else ( || core. tasks . pop_front ( ) . map ( RemoteMsg :: Schedule ) )
514
+ core. spawner . pop ( ) . or_else ( || core. tasks . pop_front ( ) )
534
515
} else {
535
- core. tasks
536
- . pop_front ( )
537
- . map ( RemoteMsg :: Schedule )
538
- . or_else ( || core. spawner . pop ( ) )
516
+ core. tasks . pop_front ( ) . or_else ( || core. spawner . pop ( ) )
539
517
} ;
540
518
541
- let entry = match entry {
519
+ let task = match entry {
542
520
Some ( entry) => entry,
543
521
None => {
544
522
core = context. park ( core) ;
@@ -548,17 +526,13 @@ impl CoreGuard<'_> {
548
526
}
549
527
} ;
550
528
551
- match entry {
552
- RemoteMsg :: Schedule ( task) => {
553
- let task = context. spawner . shared . owned . assert_owner ( task) ;
529
+ let task = context. spawner . shared . owned . assert_owner ( task) ;
554
530
555
- let ( c, _) = context. run_task ( core, || {
556
- task. run ( ) ;
557
- } ) ;
531
+ let ( c, _) = context. run_task ( core, || {
532
+ task. run ( ) ;
533
+ } ) ;
558
534
559
- core = c;
560
- }
561
- }
535
+ core = c;
562
536
}
563
537
564
538
// Yield to the driver, this drives the timer and pulls any
0 commit comments