Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize ztm tunnel code #541

Merged
merged 4 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aries/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async fn main() {
ztm_agent.clone().start_ztm_agent();
thread::sleep(time::Duration::from_secs(3));
run_ztm_client(
"http://34.84.172.121/relay".to_string(),
"http://gitmono.org/relay".to_string(),
config.clone(),
peer_id,
ztm_agent,
Expand Down
98 changes: 11 additions & 87 deletions gemini/src/http/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use common::model::CommonResult;
use jupiter::context::Context;

use crate::{
util::{get_available_port, get_short_peer_id, repo_alias_to_identifier},
ztm::{agent::share_repo, create_tunnel, send_get_request_to_peer_by_tunnel},
util::repo_alias_to_identifier,
ztm::{
agent::share_repo, get_or_create_remote_mega_tunnel, send_get_request_to_peer_by_tunnel,
},
RepoInfo,
};

Expand Down Expand Up @@ -54,53 +56,11 @@ pub async fn repo_provide(
Ok("success".to_string())
}

pub async fn repo_folk(
ztm_agent_port: u16,
identifier: String,
local_port: u16,
) -> Result<String, String> {
let remote_peer_id = match get_peer_id_from_identifier(identifier.clone()) {
Ok(p) => p,
Err(e) => return Err(e),
};
let remote_port = match get_remote_port_from_identifier(identifier.clone()) {
Ok(p) => p,
Err(e) => return Err(e),
};
let git_path = match get_git_path_from_identifier(identifier) {
Ok(p) => p,
Err(e) => return Err(e),
};

let (peer_id, _) = vault::init();
let bound_name = format!(
"{}_{}",
get_short_peer_id(peer_id),
get_short_peer_id(remote_peer_id.clone())
);
match create_tunnel(
ztm_agent_port,
remote_peer_id,
local_port,
remote_port,
bound_name,
)
.await
{
Ok(_) => (),
Err(e) => return Err(e),
}

let msg = format!("git clone http://localhost:{local_port}/{git_path}");
Ok(msg)
}

pub async fn repo_folk_alias(ztm_agent_port: u16, identifier: String) -> Result<String, String> {
let remote_peer_id = match get_peer_id_from_identifier(identifier.clone()) {
Ok(p) => p,
Err(e) => return Err(e),
};
let remote_port = 8000;
let alias = match get_alias_from_identifier(identifier) {
Ok(p) => p,
Err(e) => return Err(e),
Expand All @@ -111,28 +71,14 @@ pub async fn repo_folk_alias(ztm_agent_port: u16, identifier: String) -> Result<
Err(e) => return Err(e),
};

let peer_id = vault::get_peerid();
let bound_name = format!(
"{}_{}",
get_short_peer_id(peer_id),
get_short_peer_id(remote_peer_id.clone())
);
let local_port = match get_available_port() {
Ok(p) => p,
Err(e) => return Err(e),
let local_port = get_or_create_remote_mega_tunnel(ztm_agent_port, remote_peer_id).await;

let local_port = match local_port {
Ok(local_port) => local_port,
Err(e) => {
return Err(e);
}
};
match create_tunnel(
ztm_agent_port,
remote_peer_id,
local_port,
remote_port,
bound_name,
)
.await
{
Ok(_) => (),
Err(e) => return Err(e),
}

let msg = format!("http://localhost:{local_port}{path}.git");
Ok(msg)
Expand All @@ -147,28 +93,6 @@ pub fn get_peer_id_from_identifier(identifier: String) -> Result<String, String>
return Ok(words.get(2).unwrap().to_string());
}

pub fn get_remote_port_from_identifier(identifier: String) -> Result<u16, String> {
// p2p://mrJ46F8gd2sa2Dx3iCYf6DauJ2WpAaepus7PwyZVebgD/8000/third-part/mega_143.git
let words: Vec<&str> = identifier.split('/').collect();
if words.len() <= 3 {
return Err("invalid identifier".to_string());
}
match words.get(3).unwrap().parse::<u16>() {
Ok(number) => Ok(number),
Err(e) => Err(e.to_string()),
}
}

pub fn get_git_path_from_identifier(identifier: String) -> Result<String, String> {
// p2p://mrJ46F8gd2sa2Dx3iCYf6DauJ2WpAaepus7PwyZVebgD/8000/third-part/mega_143.git
let words: Vec<&str> = identifier.split('/').collect();
if words.len() <= 4 {
return Err("invalid identifier".to_string());
}
let path = words[4..].join("/");
Ok(path)
}

pub fn get_alias_from_identifier(identifier: String) -> Result<String, String> {
// p2p://wGg2inNE22LY1eHttDB63znw2MnsK8CPXeG2nfhpXs5a/serde_python
let words: Vec<&str> = identifier.split('/').collect();
Expand Down
2 changes: 1 addition & 1 deletion gemini/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub mod nostr;
pub mod util;
pub mod ztm;

const ZTM_APP_PROVIDER: &str = "mega";


#[derive(Deserialize, Debug)]
pub struct RelayGetParams {
Expand Down
8 changes: 8 additions & 0 deletions gemini/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,11 @@ pub fn repo_path_to_identifier(http_port: u16, repo_path: String) -> String {
let (peer_id, _) = vault::init();
format!("p2p://{}/{http_port}{repo_path}.git", peer_id.clone())
}

pub fn get_ztm_app_tunnel_bound_name(remote_peer_id: String) -> String {
format!(
"{}_{}",
get_short_peer_id(vault::get_peerid()),
get_short_peer_id(remote_peer_id)
)
}
86 changes: 64 additions & 22 deletions gemini/src/ztm/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ use common::config::Config;
use reqwest::{header::CONTENT_TYPE, Client};
use serde::{Deserialize, Serialize};

use crate::{RepoInfo, ZTM_APP_PROVIDER};
use crate::{
ztm::{ZTM_APP_NAME, ZTM_APP_PROVIDER},
RepoInfo,
};

use super::{handle_response, hub::ZTMUserPermit};
use super::{handle_response, hub::ZTMUserPermit, MESH_NAME};

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct ZTMMesh {
Expand Down Expand Up @@ -55,6 +58,20 @@ pub struct ZTMPortReq {
pub struct ZTMPortService {
pub service: String,
}

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct ZTMAppInbound {
pub protocol: String,
pub name: String,
pub listens: Vec<ZTMAppInboundListen>,
}

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct ZTMAppInboundListen {
pub ip: String,
pub port: u16,
}

#[async_trait]
pub trait ZTMAgent {
async fn connect_ztm_hub(&self, permit: ZTMUserPermit) -> Result<ZTMMesh, String>;
Expand Down Expand Up @@ -90,6 +107,14 @@ pub trait ZTMAgent {
port: u16,
) -> Result<String, String>;

async fn get_ztm_app_tunnel_inbound_port(
&self,
ep_id: String,
provider: String,
app_name: String,
bound_name: String,
) -> Option<u16>;

async fn create_ztm_app_tunnel_outbound(
&self,
ep_id: String,
Expand Down Expand Up @@ -120,8 +145,6 @@ impl LocalZTMAgent {
}
}

const MESH_NAME: &str = "relay_mesh";

#[async_trait]
impl ZTMAgent for LocalZTMAgent {
async fn connect_ztm_hub(&self, permit: ZTMUserPermit) -> Result<ZTMMesh, String> {
Expand Down Expand Up @@ -335,6 +358,42 @@ impl ZTMAgent for LocalZTMAgent {
Ok(response_text)
}

async fn get_ztm_app_tunnel_inbound_port(
&self,
ep_id: String,
provider: String,
app_name: String,
bound_name: String,
) -> Option<u16> {
//GET /api/meshes/{mesh.name}/apps/${provider}/${name}/api/endpoints/{ep}/inbound/{proto}/{name}
let agent_port = self.agent_port;
let agent_address = format!("http://127.0.0.1:{agent_port}");
let url = format!(
"{agent_address}/api/meshes/{MESH_NAME}/apps/{provider}/{app_name}/api/endpoints/{ep_id}/inbound"
);
tracing::debug!("get_ztm_app_tunnel_inbound url: {}", url);
let client = Client::new();
let request_result = client.get(url).send().await;
let response_text = match handle_response(request_result).await {
Ok(s) => s,
Err(e) => {
tracing::error!("get_ztm_app_tunnel_inbound error: {}", e);
return None;
}
};
let ztm_app_inbounds: Vec<ZTMAppInbound> =
match serde_json::from_str(response_text.as_str()) {
Ok(inbounds) => inbounds,
Err(e) => {
tracing::error!("get_ztm_app_tunnel_inbound error: {}", e);
return None;
}
};
let ztm_app_inbound = ztm_app_inbounds.iter().find(|x| x.name == bound_name)?;
let listen = ztm_app_inbound.listens.first()?;
Some(listen.port)
}

async fn create_ztm_app_tunnel_outbound(
&self,
ep_id: String,
Expand Down Expand Up @@ -410,7 +469,7 @@ pub async fn run_ztm_client(
.start_ztm_app(
mesh.clone().agent.id,
ZTM_APP_PROVIDER.to_string(),
"tunnel".to_string(),
ZTM_APP_NAME.to_string(),
)
.await
{
Expand All @@ -422,23 +481,6 @@ pub async fn run_ztm_client(
}
tracing::info!("start tunnel app successfully");

//test send msg
// sleep(Duration::from_secs(5));
// match send_get_request_to_peer_by_tunnel(
// agent.agent_port,
// "nX7NRgitx7wUwAiJXxAVcec4iAoV8YwbzUn8FqwfFR4J".to_string(),
// "api/v1/ztm/repo_provide".to_string(),
// )
// .await
// {
// Ok(s) => {
// tracing::info!("send_get_request_to_peer_by_tunnel successfully:{}", s);
// }
// Err(e) => {
// tracing::error!(e);
// }
// };

// ping relay
let peer_id_clone = peer_id.clone();
let bootstrap_node_clone = bootstrap_node.clone();
Expand Down
Loading
Loading