|
1 | 1 | pub use ::futures_core::{Stream, ready};
|
2 | 2 |
|
3 | 3 |
|
| 4 | +/// # Stream of an async process with a queue |
| 5 | +/// |
| 6 | +/// `queue(|mut q| async move { 〜 })` makes an queue for `T` values |
| 7 | +/// and an async process that pushes items to the queue, they work as |
| 8 | +/// a stream yeilding all the items asynchronously. |
| 9 | +/// |
| 10 | +/// <br> |
| 11 | +/// |
| 12 | +/// _**note**_ : It's recommended to just `use ohkami::utils::stream` and |
| 13 | +/// call as **`stream::queue()`**, not direct `queue()`. |
| 14 | +/// |
| 15 | +/// <br> |
| 16 | +/// |
| 17 | +/// --- |
| 18 | +/// *example.rs* |
| 19 | +/// ```no_run |
| 20 | +/// use ohkami::prelude::*; |
| 21 | +/// use ohkami::typed::DataStream; |
| 22 | +/// use ohkami::utils::{StreamExt, stream}; |
| 23 | +/// use tokio::time::sleep; |
| 24 | +/// |
| 25 | +/// #[tokio::main] |
| 26 | +/// async fn main() { |
| 27 | +/// let qs = stream::queue(|mut q| async move { |
| 28 | +/// for i in 1..=5 { |
| 29 | +/// sleep(std::time::Duration::from_secs(1)).await; |
| 30 | +/// q.push(format!("Hello, I'm message#{i}!")) |
| 31 | +/// } |
| 32 | +/// |
| 33 | +/// sleep(std::time::Duration::from_secs(1)).await; |
| 34 | +/// |
| 35 | +/// q.push("done".to_string()) |
| 36 | +/// }); |
| 37 | +/// } |
| 38 | +/// ``` |
| 39 | +/// |
| 40 | +/// <br> |
| 41 | +/// |
| 42 | +/// --- |
| 43 | +/// *openai.rs* |
| 44 | +/// ```ignore |
| 45 | +/// use ohkami::prelude::*; |
| 46 | +/// use ohkami::Memory; |
| 47 | +/// use ohkami::typed::DataStream; |
| 48 | +/// use ohkami::utils::{StreamExt, stream}; |
| 49 | +/// |
| 50 | +/// pub async fn relay_chat_completion( |
| 51 | +/// api_key: Memory<'_, &'static str>, |
| 52 | +/// UserMessage(message): UserMessage, |
| 53 | +/// ) -> Result<DataStream<String, Error>, Error> { |
| 54 | +/// let mut gpt_response = reqwest::Client::new() |
| 55 | +/// .post("https://api.openai.com/v1/chat/completions") |
| 56 | +/// .bearer_auth(*api_key) |
| 57 | +/// .json(&ChatCompletions { |
| 58 | +/// model: "gpt-4o", |
| 59 | +/// stream: true, |
| 60 | +/// messages: vec![ |
| 61 | +/// ChatMessage { |
| 62 | +/// role: Role::user, |
| 63 | +/// content: message, |
| 64 | +/// } |
| 65 | +/// ], |
| 66 | +/// }) |
| 67 | +/// .send().await? |
| 68 | +/// .bytes_stream(); |
| 69 | +/// |
| 70 | +/// Ok(DataStream::from_stream(stream::queue(|mut q| async move { |
| 71 | +/// let mut push_line = |mut line: String| { |
| 72 | +/// line.strip_suffix("\n\n").ok(); |
| 73 | +/// |
| 74 | +/// #[cfg(debug_assertions)] { |
| 75 | +/// if line != "[DONE]" { |
| 76 | +/// let chunk: models::ChatCompletionChunk |
| 77 | +/// = serde_json::from_str(&line).unwrap(); |
| 78 | +/// let content = chunk |
| 79 | +/// .choices[0] |
| 80 | +/// .delta |
| 81 | +/// .content.as_deref().unwrap_or("")); |
| 82 | +/// print!("{content}"); |
| 83 | +/// std::io::Write::flush(&mut std::io::stdout()).ok(); |
| 84 | +/// } else { |
| 85 | +/// println!() |
| 86 | +/// } |
| 87 | +/// } |
| 88 | +/// |
| 89 | +/// q.push(Ok(line)); |
| 90 | +/// }; |
| 91 | +/// |
| 92 | +/// let mut remaining = String::new(); |
| 93 | +/// while let Some(Ok(raw_chunk)) = gpt_response.next().await { |
| 94 | +/// for line in std::str::from_utf8(&raw_chunk).unwrap() |
| 95 | +/// .split_inclusive("\n\n") |
| 96 | +/// { |
| 97 | +/// if let Some(data) = line.strip_prefix("data: ") { |
| 98 | +/// if data.ends_with("\n\n") { |
| 99 | +/// push_line(data.to_string()) |
| 100 | +/// } else { |
| 101 | +/// remaining = data.into() |
| 102 | +/// } |
| 103 | +/// } else { |
| 104 | +/// #[cfg(debug_assertions)] { |
| 105 | +/// assert!(line.ends_with("\n\n")) |
| 106 | +/// } |
| 107 | +/// push_line(std::mem::take(&mut remaining) + line) |
| 108 | +/// } |
| 109 | +/// } |
| 110 | +/// } |
| 111 | +/// }))) |
| 112 | +/// } |
| 113 | +/// ``` |
4 | 114 | pub fn queue<T, F, Fut>(f: F) -> stream::QueueStream<F, T, Fut>
|
5 | 115 | where
|
6 | 116 | F: FnOnce(stream::Queue<T>) -> Fut,
|
|
0 commit comments