209209 no_crate_inject,
210210 attr( deny( warnings, rust_2018_idioms, single_use_lifetimes) , allow( dead_code) )
211211) ) ]
212- #![ warn( unsafe_code) ]
213212#![ warn( missing_docs) ]
214213#![ warn( rust_2018_idioms, single_use_lifetimes, unreachable_pub) ]
215214#![ warn( clippy:: all) ]
216- #![ feature( gen_future , generator_trait) ]
215+ #![ feature( generator_trait) ]
217216
218217#[ doc( inline) ]
219218pub use futures_async_stream_macro:: for_await;
@@ -230,13 +229,289 @@ pub use futures_async_stream_macro::async_try_stream;
230229#[ doc( inline) ]
231230pub use futures_async_stream_macro:: async_try_stream_block;
232231
232+ use core:: { cell:: Cell , ptr:: NonNull , task:: Context } ;
233+
234+ thread_local ! {
235+ static TLS_CX : Cell <Option <NonNull <Context <' static >>>> = Cell :: new( None ) ;
236+ }
237+
238+ struct SetOnDrop ( Option < NonNull < Context < ' static > > > ) ;
239+
240+ impl Drop for SetOnDrop {
241+ fn drop ( & mut self ) {
242+ TLS_CX . with ( |tls_cx| {
243+ tls_cx. set ( self . 0 . take ( ) ) ;
244+ } ) ;
245+ }
246+ }
247+
248+ // Safety: the returned guard must drop before `cx` is dropped and before
249+ // any previous guard is dropped.
250+ unsafe fn set_task_context ( cx : & mut Context < ' _ > ) -> SetOnDrop {
251+ // transmute the context's lifetime to 'static so we can store it.
252+ let cx = core:: mem:: transmute :: < & mut Context < ' _ > , & mut Context < ' static > > ( cx) ;
253+ let old_cx = TLS_CX . with ( |tls_cx| tls_cx. replace ( Some ( NonNull :: from ( cx) ) ) ) ;
254+ SetOnDrop ( old_cx)
255+ }
256+
233257// Not public API.
234258#[ doc( hidden) ]
235- pub mod stream;
259+ pub mod future {
260+ use core:: {
261+ future:: Future ,
262+ ops:: { Generator , GeneratorState } ,
263+ pin:: Pin ,
264+ task:: { Context , Poll } ,
265+ } ;
266+ use pin_project:: pin_project;
267+
268+ use super :: { set_task_context, SetOnDrop , TLS_CX } ;
269+
270+ // =================================================================================================
271+ // GenFuture
272+
273+ /// Wrap a generator in a future.
274+ ///
275+ /// This function returns a `GenFuture` underneath, but hides it in `impl Trait` to give
276+ /// better error messages (`impl Future` rather than `GenFuture<[closure.....]>`).
277+ #[ doc( hidden) ]
278+ pub fn from_generator < T : Generator < Yield = ( ) > > ( x : T ) -> impl Future < Output = T :: Return > {
279+ GenFuture ( x)
280+ }
281+
282+ /// A wrapper around generators used to implement `Future` for `async`/`await` code.
283+ #[ pin_project]
284+ struct GenFuture < T > ( #[ pin] T ) ;
285+
286+ #[ doc( hidden) ]
287+ impl < T > Future for GenFuture < T >
288+ where
289+ T : Generator < Yield = ( ) > ,
290+ {
291+ type Output = T :: Return ;
292+
293+ fn poll ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
294+ let this = self . project ( ) ;
295+ let _guard = unsafe { set_task_context ( cx) } ;
296+ match this. 0 . resume ( ) {
297+ GeneratorState :: Yielded ( ( ) ) => Poll :: Pending ,
298+ GeneratorState :: Complete ( x) => Poll :: Ready ( x) ,
299+ }
300+ }
301+ }
302+
303+ // =================================================================================================
304+ // Poll
305+
306+ /// Polls a future in the current thread-local task waker.
307+ #[ doc( hidden) ]
308+ pub fn poll_with_tls_context < F > ( f : Pin < & mut F > ) -> Poll < F :: Output >
309+ where
310+ F : Future ,
311+ {
312+ let cx_ptr = TLS_CX . with ( |tls_cx| {
313+ // Clear the entry so that nested `get_task_waker` calls
314+ // will fail or set their own value.
315+ tls_cx. replace ( None )
316+ } ) ;
317+ let _reset = SetOnDrop ( cx_ptr) ;
318+
319+ let mut cx_ptr = cx_ptr. expect ( "TLS Context not set." ) ;
320+
321+ // Safety: we've ensured exclusive access to the context by
322+ // removing the pointer from TLS, only to be replaced once
323+ // we're done with it.
324+ //
325+ // The pointer that was inserted came from an `&mut Context<'_>`,
326+ // so it is safe to treat as mutable.
327+ unsafe { F :: poll ( f, cx_ptr. as_mut ( ) ) }
328+ }
329+ }
330+
331+ // Not public API.
332+ #[ doc( hidden) ]
333+ pub mod stream {
334+ use core:: {
335+ future:: Future ,
336+ marker:: PhantomData ,
337+ ops:: { Generator , GeneratorState } ,
338+ pin:: Pin ,
339+ task:: { Context , Poll } ,
340+ } ;
341+ use futures_core:: stream:: Stream ;
342+ use pin_project:: pin_project;
343+
344+ use super :: { set_task_context, SetOnDrop , TLS_CX } ;
345+
346+ // =================================================================================================
347+ // GenStream
348+
349+ /// Wrap a generator in a stream.
350+ ///
351+ /// This function returns a `GenStream` underneath, but hides it in `impl Trait` to give
352+ /// better error messages (`impl Stream` rather than `GenStream<[closure.....]>`).
353+ #[ doc( hidden) ]
354+ pub fn from_generator < G , T > ( gen : G ) -> impl Stream < Item = T >
355+ where
356+ G : Generator < Yield = Poll < T > , Return = ( ) > ,
357+ {
358+ GenStream { gen, _phantom : PhantomData }
359+ }
360+
361+ /// A wrapper around generators used to implement `Stream` for `async`/`await` code.
362+ #[ pin_project]
363+ struct GenStream < G , T > {
364+ #[ pin]
365+ gen : G ,
366+ _phantom : PhantomData < T > ,
367+ }
368+
369+ impl < G , T > Stream for GenStream < G , T >
370+ where
371+ G : Generator < Yield = Poll < T > , Return = ( ) > ,
372+ {
373+ type Item = T ;
374+
375+ fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
376+ let this = self . project ( ) ;
377+ let _guard = unsafe { set_task_context ( cx) } ;
378+ match this. gen . resume ( ) {
379+ GeneratorState :: Yielded ( x) => x. map ( Some ) ,
380+ GeneratorState :: Complete ( ( ) ) => Poll :: Ready ( None ) ,
381+ }
382+ }
383+ }
384+
385+ // =================================================================================================
386+ // Poll
387+
388+ /// Polls a future in the current thread-local task waker.
389+ #[ doc( hidden) ]
390+ pub fn poll_next_with_tls_context < S > ( s : Pin < & mut S > ) -> Poll < Option < S :: Item > >
391+ where
392+ S : Stream ,
393+ {
394+ let cx_ptr = TLS_CX . with ( |tls_cx| {
395+ // Clear the entry so that nested `get_task_waker` calls
396+ // will fail or set their own value.
397+ tls_cx. replace ( None )
398+ } ) ;
399+ let _reset = SetOnDrop ( cx_ptr) ;
400+
401+ let mut cx_ptr = cx_ptr. expect ( "TLS Context not set." ) ;
402+
403+ // Safety: we've ensured exclusive access to the context by
404+ // removing the pointer from TLS, only to be replaced once
405+ // we're done with it.
406+ //
407+ // The pointer that was inserted came from an `&mut Context<'_>`,
408+ // so it is safe to treat as mutable.
409+ unsafe { S :: poll_next ( s, cx_ptr. as_mut ( ) ) }
410+ }
411+
412+ // =================================================================================================
413+ // Next
414+
415+ // This is equivalent to the `futures::stream::StreamExt::next` method.
416+ // But we want to make this crate dependency as small as possible, so we define our `next` function.
417+ #[ doc( hidden) ]
418+ pub fn next < S > ( stream : & mut S ) -> impl Future < Output = Option < S :: Item > > + ' _
419+ where
420+ S : Stream + Unpin ,
421+ {
422+ Next { stream }
423+ }
424+
425+ struct Next < ' a , S > {
426+ stream : & ' a mut S ,
427+ }
428+
429+ impl < S > Future for Next < ' _ , S >
430+ where
431+ S : Stream + Unpin ,
432+ {
433+ type Output = Option < S :: Item > ;
434+
435+ fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
436+ Pin :: new ( & mut self . stream ) . poll_next ( cx)
437+ }
438+ }
439+ }
236440
237441// Not public API.
238442#[ doc( hidden) ]
239- pub mod try_stream;
443+ pub mod try_stream {
444+ use core:: {
445+ marker:: PhantomData ,
446+ ops:: { Generator , GeneratorState } ,
447+ pin:: Pin ,
448+ task:: { Context , Poll } ,
449+ } ;
450+ use futures_core:: stream:: { FusedStream , Stream } ;
451+ use pin_project:: pin_project;
452+
453+ use super :: set_task_context;
454+
455+ // =================================================================================================
456+ // GenStream
457+
458+ /// Wrap a generator in a stream.
459+ ///
460+ /// This function returns a `GenStream` underneath, but hides it in `impl Trait` to give
461+ /// better error messages (`impl Stream` rather than `GenStream<[closure.....]>`).
462+ #[ doc( hidden) ]
463+ pub fn from_generator < G , T , E > (
464+ gen : G ,
465+ ) -> impl Stream < Item = Result < T , E > > + FusedStream < Item = Result < T , E > >
466+ where
467+ G : Generator < Yield = Poll < T > , Return = Result < ( ) , E > > ,
468+ {
469+ GenTryStream { gen, done : false , _phantom : PhantomData }
470+ }
471+
472+ /// A wrapper around generators used to implement `Stream` for `async`/`await` code.
473+ #[ pin_project]
474+ struct GenTryStream < G , T , E > {
475+ #[ pin]
476+ gen : G ,
477+ done : bool ,
478+ _phantom : PhantomData < ( T , E ) > ,
479+ }
480+
481+ impl < G , T , E > Stream for GenTryStream < G , T , E >
482+ where
483+ G : Generator < Yield = Poll < T > , Return = Result < ( ) , E > > ,
484+ {
485+ type Item = Result < T , E > ;
486+
487+ fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
488+ if self . done {
489+ return Poll :: Ready ( None ) ;
490+ }
491+
492+ let this = self . project ( ) ;
493+ let _guard = unsafe { set_task_context ( cx) } ;
494+ let res = match this. gen . resume ( ) {
495+ GeneratorState :: Yielded ( x) => x. map ( |x| Some ( Ok ( x) ) ) ,
496+ GeneratorState :: Complete ( Err ( e) ) => Poll :: Ready ( Some ( Err ( e) ) ) ,
497+ GeneratorState :: Complete ( Ok ( ( ) ) ) => Poll :: Ready ( None ) ,
498+ } ;
499+ if let Poll :: Ready ( Some ( Err ( _) ) ) | Poll :: Ready ( None ) = & res {
500+ * this. done = true ;
501+ }
502+ res
503+ }
504+ }
505+
506+ impl < G , T , E > FusedStream for GenTryStream < G , T , E >
507+ where
508+ G : Generator < Yield = Poll < T > , Return = Result < ( ) , E > > ,
509+ {
510+ fn is_terminated ( & self ) -> bool {
511+ self . done
512+ }
513+ }
514+ }
240515
241516// Not public API.
242517#[ doc( hidden) ]
0 commit comments