From fb42aafcd5ae40393b0eca2a454157314f4d30c0 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E8=8B=8F=E5=90=91=E5=A4=9C?=
 <46275354+fu050409@users.noreply.github.com>
Date: Sun, 29 Sep 2024 10:45:14 +0800
Subject: [PATCH] feat(client): support listen and subscribe (#77)

---
 .changes/client-subscribe.md         |  5 +++++
 Cargo.lock                           |  2 +-
 crates/oblivion/Cargo.toml           |  6 +++---
 crates/oblivion/src/models/client.rs | 24 +++++++++++++++++++++++-
 4 files changed, 32 insertions(+), 5 deletions(-)
 create mode 100644 .changes/client-subscribe.md

diff --git a/.changes/client-subscribe.md b/.changes/client-subscribe.md
new file mode 100644
index 0000000..b673088
--- /dev/null
+++ b/.changes/client-subscribe.md
@@ -0,0 +1,5 @@
+---
+"oblivion": patch:feat
+---
+
+Support listen and subscribe in Oblivion client.
diff --git a/Cargo.lock b/Cargo.lock
index a1ed303..41c309b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -688,7 +688,7 @@ dependencies = [
 
 [[package]]
 name = "oblivion-codegen"
-version = "0.3.0"
+version = "0.3.2"
 dependencies = [
  "futures",
  "proc-macro2",
diff --git a/crates/oblivion/Cargo.toml b/crates/oblivion/Cargo.toml
index 50d4172..fcb8231 100644
--- a/crates/oblivion/Cargo.toml
+++ b/crates/oblivion/Cargo.toml
@@ -2,10 +2,10 @@
 name = "oblivion"
 version = "2.2.0"
 authors = ["苏向夜 <fu050409@163.com>"]
-description = "Rust High Concurrency Implementation of Oblivion, an End-to-End Encryption Protocol Based on ECDHE Encryption Algorithm"
+description = "A fast, lightweight, and secure end-to-end encryption protocol based on ECDHE"
 license = "AGPL-3.0"
 repository = "https://github.com/noctisynth/oblivion-rust"
-
+readme = "../../README.md"
 edition = "2021"
 
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -23,7 +23,7 @@ hkdf = "0.12"
 
 # Utils
 arc-swap = "1.7.1"
-oblivion-codegen = { version = "^0.3.0", path = "../oblivion-codegen" }
+oblivion-codegen = { version = "0.3.2", path = "../oblivion-codegen" }
 proc-macro2 = { workspace = true }
 futures = { workspace = true }
 regex = "1"
diff --git a/crates/oblivion/src/models/client.rs b/crates/oblivion/src/models/client.rs
index ced7bc4..fc2023a 100644
--- a/crates/oblivion/src/models/client.rs
+++ b/crates/oblivion/src/models/client.rs
@@ -4,7 +4,11 @@ use std::sync::Arc;
 use anyhow::{Error, Result};
 #[cfg(feature = "serde")]
 use serde::{Deserialize, Serialize};
-use tokio::net::TcpStream;
+use tokio::{
+    net::TcpStream,
+    sync::mpsc::{Receiver, Sender},
+    task::JoinHandle,
+};
 
 use crate::exceptions::Exception;
 #[cfg(feature = "pyo3")]
@@ -112,6 +116,8 @@ pub struct Client {
     pub entrance: String,
     pub path: OblivionPath,
     pub session: Arc<Session>,
+    sender: Arc<Sender<Response>>,
+    receiver: Receiver<Response>,
 }
 
 impl Client {
@@ -134,13 +140,29 @@ impl Client {
 
         session.handshake(0).await?;
 
+        let (sender, receiver) = tokio::sync::mpsc::channel(1024);
         Ok(Self {
             entrance: entrance.to_string(),
             path,
             session: Arc::new(session),
+            sender: Arc::new(sender),
+            receiver,
         })
     }
 
+    pub async fn listen(&self) -> JoinHandle<()> {
+        let session = self.session.clone();
+        let sender = self.sender.clone();
+        tokio::spawn(async move {
+            let response = session.recv().await.unwrap();
+            sender.send(response).await.unwrap();
+        })
+    }
+
+    pub async fn next(&mut self) -> Option<Response> {
+        self.receiver.recv().await
+    }
+
     pub async fn send(&self, data: Vec<u8>) -> Result<()> {
         self.session.send(data).await
     }