Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

relay add search lfs_chunk function #666

Merged
merged 2 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 52 additions & 3 deletions aries/src/service/relay_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use clap::Parser;
use common::config::Config;
use gemini::ztm::hub::{LocalHub, ZTMUserPermit, ZTMCA};
use gemini::ztm::send_get_request_to_peer_by_tunnel;
use gemini::{LFSInfo, LFSInfoPostBody, Node, RelayGetParams, RelayResultRes, RepoInfo};
use gemini::{
LFSChunk, LFSInfo, LFSInfoPostBody, LFSInfoRes, Node, RelayGetParams, RelayResultRes, RepoInfo,
};
use jupiter::context::Context;
use tower::ServiceBuilder;
use tower_http::cors::{Any, CorsLayer};
Expand Down Expand Up @@ -92,7 +94,8 @@ pub fn routers() -> Router<AppState> {
.route("/repo_list", get(repo_list))
.route("/test/send", get(send_message))
.route("/lfs_share", post(lfs_share))
.route("/lfs_list", get(lfs_list));
.route("/lfs_list", get(lfs_list))
.route("/lfs_chunk", get(lfs_chunk));

Router::new()
.merge(router)
Expand Down Expand Up @@ -230,6 +233,11 @@ pub async fn lfs_list(
Query(_query): Query<RelayGetParams>,
state: State<AppState>,
) -> Result<Json<Vec<LFSInfo>>, (StatusCode, String)> {
let lfs_info_list_result = lfs_list_handler(state).await;
Ok(Json(lfs_info_list_result))
}

async fn lfs_list_handler(state: State<AppState>) -> Vec<LFSInfo> {
let storage = state.context.services.ztm_storage.clone();
let lfs_info_list: Vec<LFSInfo> = storage
.get_all_lfs_info()
Expand All @@ -254,7 +262,7 @@ pub async fn lfs_list(
}
lfs_info_list_result.push(lfs.clone());
}
Ok(Json(lfs_info_list_result))
lfs_info_list_result
}

async fn send_message(
Expand Down Expand Up @@ -288,6 +296,47 @@ async fn send_message(
Ok(Json(result))
}

pub async fn lfs_chunk(
Query(query): Query<RelayGetParams>,
state: State<AppState>,
) -> Result<Json<LFSInfoRes>, (StatusCode, String)> {
if query.file_hash.is_none() {
return Err((StatusCode::BAD_REQUEST, "not enough paras".to_string()));
}
let file_hash = query.file_hash.unwrap().clone();

let lfs_object = state
.context
.services
.lfs_db_storage
.get_lfs_object(file_hash.clone())
.await
.unwrap();

if lfs_object.is_none() {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
"lfs chunk info not found".to_string(),
));
}

let mut lfs_object_chunks: Vec<LFSChunk> = state
.context
.services
.lfs_db_storage
.get_lfs_relations(file_hash)
.await
.unwrap()
.iter()
.map(|x| x.clone().into())
.collect();

let mut lfs_info_res: LFSInfoRes = lfs_object.unwrap().into();
lfs_info_res.chunks.append(&mut lfs_object_chunks);

Ok(Json(lfs_info_res))
}

async fn loop_running(context: Context) {
let mut interval = tokio::time::interval(Duration::from_secs(60));

Expand Down
51 changes: 50 additions & 1 deletion gemini/src/lfs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::collections::HashSet;

use reqwest::Client;
use reqwest::{get, Client};

use crate::{
util::handle_response, ztm::get_or_create_remote_mega_tunnel, LFSInfo, LFSInfoPostBody,
LFSInfoRes,
};

/// share lfs
Expand Down Expand Up @@ -62,6 +63,44 @@ pub async fn share_lfs(
}
}

/// get lfs chunks info
///
/// ## paras
/// - `bootstrap_node`: bootstrap_node
/// - `file_hash`: file_hash
///
/// for example
/// ```
/// {
/// "bootstrap_node":"https://gitmono.org/relay",
/// "file_hash":"52c90a86cb034b7a1c4beb79304fa76bd0a6cbb7b168c3a935076c714bd1c6b6",
///}
/// ```
/// This method will send a GET request to the relay to get lfs chunks info
///
pub async fn get_lfs_chunks_info(bootstrap_node: String, file_hash: String) -> Option<LFSInfoRes> {
let url = format!(
"{}/api/v1/lfs_chunk?file_hash={}",
bootstrap_node, file_hash
);
let lfs_info: LFSInfoRes = match get(url.clone()).await {
Ok(response) => {
if !response.status().is_success() {
println!("Get lfs chuncks info failed {}", url);
return None;
}
let body = response.text().await.unwrap();
let lfs_info: LFSInfoRes = serde_json::from_str(&body).unwrap();
lfs_info
}
Err(_) => {
println!("Get lfs chuncks info failed {}", url);
return None;
}
};
Some(lfs_info)
}

/// create lfs download local ports
///
/// ## Paras
Expand Down Expand Up @@ -254,4 +293,14 @@ mod tests {
// println!("Check LFS SHA256 failed");
// }
// }

// #[tokio::test]
// async fn test_get_chunk_info() {
// let res = super::get_lfs_chunks_info(
// "http://localhost:8001".to_string(),
// "52c90a86cb034b7a1c4beb79304fa76bd0a6cbb7b168c3a935076c714bd1c6b6".to_string(),
// )
// .await;
// println!("{:?}", res);
// }
}
36 changes: 35 additions & 1 deletion gemini/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt;

use callisto::{ztm_lfs_info, ztm_node, ztm_repo_info};
use callisto::{lfs_objects, lfs_split_relations, ztm_lfs_info, ztm_node, ztm_repo_info};
use chrono::Utc;
use common::utils::generate_id;
use serde::{Deserialize, Serialize};
Expand All @@ -22,6 +22,7 @@ pub struct RelayGetParams {
pub agent_name: Option<String>,
pub service_name: Option<String>,
pub service_port: Option<i32>,
pub file_hash: Option<String>,
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -267,3 +268,36 @@ impl From<LFSInfoPostBody> for LFSInfo {
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct LFSInfoRes {
pub oid: String,
pub size: i64,
pub chunks: Vec<LFSChunk>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct LFSChunk {
pub sub_oid: String,
pub offset: i64,
pub size: i64,
}

impl From<lfs_objects::Model> for LFSInfoRes {
fn from(lfs: lfs_objects::Model) -> Self {
LFSInfoRes {
oid: lfs.oid,
size: lfs.size,
chunks: vec![],
}
}
}

impl From<lfs_split_relations::Model> for LFSChunk {
fn from(chunk: lfs_split_relations::Model) -> Self {
LFSChunk {
sub_oid: chunk.sub_oid,
offset: chunk.offset,
size: chunk.size,
}
}
}
Loading