Skip to content

Commit

Permalink
Merge pull request #479 from Ivanbeethoven/main
Browse files Browse the repository at this point in the history
[scorpio]: Add Readonly dictionary FUSE with server test. unstable.v1
  • Loading branch information
genedna authored Jul 24, 2024
2 parents d3e9493 + 37d62d5 commit 6a14e28
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 30 deletions.
18 changes: 16 additions & 2 deletions scorpio/src/dicfuse/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ fn default_stat64(inode:u64) -> stat64 {
mtimensec: 0, // Default nanoseconds of last modification time
ctimensec: 0, // Default nanoseconds of last status change time
mode: 0o444, // Default file mode (r--r--r--)
//mode: 0o555, // Default file mode (r-xr-xr-x)
nlink: 1, // Default number of hard links
uid: 1000, // Default user ID
gid: 1000, // Default group ID
Expand All @@ -24,8 +25,8 @@ fn default_stat64(inode:u64) -> stat64 {
};
t.into()
}
#[allow(unused)]
pub fn default_entry(inode:u64) -> Entry {

pub fn default_file_entry(inode:u64) -> Entry {
Entry{
inode,
generation: 0,
Expand All @@ -35,6 +36,19 @@ pub fn default_entry(inode:u64) -> Entry {
entry_timeout: Duration::from_secs(u64::MAX),
} // Return a default Entry instance
}

pub fn default_dic_entry(inode:u64) -> Entry {
let mut d = default_stat64(inode);
d.st_mode = 0o555;
Entry{
inode,
generation: 0,
attr: d,
attr_flags: 0,
attr_timeout: Duration::from_secs(u64::MAX),
entry_timeout: Duration::from_secs(u64::MAX),
} // Return a default Dictionary Entry instance
}
// pub struct stat64 {
// pub st_dev: ::dev_t, // Device ID of the device containing the file
// pub st_ino: ::ino64_t, // Inode number of the file
Expand Down
98 changes: 91 additions & 7 deletions scorpio/src/dicfuse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ mod fuse;

use std::{sync::Arc, time::Duration};
use std::io::Result;
use fuse::default_entry;
use fuse_backend_rs::{abi::fuse_abi::FsOptions, api::filesystem::{Context, Entry, FileSystem}};
use tokio::task::JoinHandle;

use store::DictionaryStore;
use store::{DictionaryStore, IntoEntry};

struct Dicfuse{
store: Arc<DictionaryStore>,
Expand Down Expand Up @@ -50,8 +49,11 @@ impl FileSystem for Dicfuse{

fn lookup(&self, ctx: &Context, parent: Self::Inode, name: &std::ffi::CStr) -> Result<Entry> {
let store = self.store.clone();
let pitem = store.find_path(parent).ok_or_else(|| std::io::Error::from_raw_os_error(libc::ENODATA))?;
Ok(Entry::default())
let mut ppath = store.find_path(parent).ok_or_else(|| std::io::Error::from_raw_os_error(libc::ENODATA))?;
let pitem = store.get_inode(parent)?;
ppath.push(name.to_string_lossy().into_owned());
let chil = store.get_by_path(&ppath.to_string())?;
Ok(chil.into_entry())
}


Expand All @@ -71,9 +73,8 @@ impl FileSystem for Dicfuse{
) -> std::io::Result<(libc::stat64, std::time::Duration)> {
let store = self.store.clone();
let i = store.find_path(inode).ok_or_else(|| std::io::Error::from_raw_os_error(libc::ENODATA))?;
let entry = default_entry(inode);
let entry = fuse::default_file_entry(inode);
Ok((entry.attr,Duration::from_secs(u64::MAX)))

}

fn setattr(
Expand Down Expand Up @@ -286,4 +287,87 @@ impl FileSystem for Dicfuse{
}


}
}

#[cfg(test)]
mod tests {
use std::{io, path::Path, sync::Arc,thread};

use fuse_backend_rs::{api::server::Server, transport::{FuseChannel, FuseSession}};
use signal_hook::{consts::TERM_SIGNALS, iterator::Signals};

use super::Dicfuse;


pub struct DicFuseServer {
server: Arc<Server<Arc<Dicfuse>>>,
ch: FuseChannel,
}
impl DicFuseServer {
pub fn svc_loop(&mut self) -> Result<(),io::Error> {
let _ebadf = std::io::Error::from_raw_os_error(libc::EBADF);
println!("entering server loop");
loop {
if let Some((reader, writer)) = self
.ch
.get_request()
.map_err(|_| std::io::Error::from_raw_os_error(libc::EINVAL))?
{
if let Err(e) = self
.server
.handle_message(reader, writer.into(), None, None)
{
match e {
fuse_backend_rs::Error::EncodeMessage(_ebadf) => {
break;
}
_ => {
print!("Handling fuse message failed");
continue;
}
}
}
} else {
print!("fuse server exits");
break;
}
}
Ok(())
}


}

#[test]
fn test_svc_loop_success() {
let dicfuse = Arc::new(Dicfuse::new());
// Create fuse session
let mut se = FuseSession::new(Path::new(&"/home/luxian/ccode/mega/dictest"), "dic", "", true).unwrap();
se.mount().unwrap();
let ch: FuseChannel = se.new_channel().unwrap();
let server = Arc::new(Server::new(dicfuse.clone()));
let mut dicfuse_server = DicFuseServer { server, ch };

// Mock the behavior of get_request to simulate a successful request
// This would require implementing a mock or a stub for FuseChannel
// For the sake of this example, we will assume it is done correctly


// Spawn server thread
let handle = thread::spawn(move || {
let _ = dicfuse_server.svc_loop();
});
// Wait for termination signal
let mut signals = Signals::new(TERM_SIGNALS).unwrap();
if let Some(_sig) = signals.forever().next() {
//pass
}
// Unmount and wake up
se.umount().unwrap();
se.wake().unwrap();
// Join server thread
let _ = handle.join();
}


}
1 change: 1 addition & 0 deletions scorpio/src/dicfuse/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ impl Display for GPath{
write!(f, "{}", self.path.join("/"))
}
}

108 changes: 87 additions & 21 deletions scorpio/src/dicfuse/store.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

use fuse_backend_rs::api::filesystem::Entry;
/// Read only file system for obtaining and displaying monorepo directory information
use reqwest::Client;
// Import Response explicitly
Expand All @@ -12,9 +13,10 @@ use radix_trie::{self, TrieCommon};
use std::sync::{Arc,Mutex};


use super::fuse;
use super::model::GPath;
const MEGA_TREE_URL: &str = "localhost:8000";//TODO: make it configable

const UNKNOW_INODE: u64 = 0; // illegal inode number;

#[derive(Serialize, Deserialize, Debug)]
pub struct Item {
Expand All @@ -23,35 +25,66 @@ pub struct Item {
content_type: String,
}
#[allow(unused)]
struct DicItem{
pub struct DicItem{
inode:u64,
name:GPath,
content_type: ContentType,
content_type: Mutex<ContentType>,
children:Mutex<HashMap<String, Arc<DicItem>>>,
parent:u64,
}

#[allow(unused)]
#[derive(PartialEq)]
enum ContentType {
File,
Dictionary,
Dictionary(bool),// if this dictionary is loaded.
}
#[allow(unused)]
impl DicItem {
pub fn new(inode:u64, item:Item) -> Self {
pub fn new(inode:u64,parent:u64, item:Item) -> Self {
DicItem {
inode,
name: item.name.into(), // Assuming GPath can be created from String
content_type: match item.content_type.as_str() {
"file" => ContentType::File,
"directory" => ContentType::Dictionary,
"file" => ContentType::File.into(),
"directory" => ContentType::Dictionary(false).into(),
_ => panic!("Unknown content type"),
},
children: Mutex::new(HashMap::new()),
parent,
}
}
//get the total path
pub fn get_path(&self) -> String {
self.name.to_string()
}
//get the file or dic name . aka tail name.
pub fn get_name(&self) -> String {
self.name.name()
}
// add a children item
pub fn push_children(&self,children:Arc<DicItem>){
self.children.lock().unwrap().insert(children.get_path(), children);
}
// get the inode
pub fn get_inode(&self)-> u64{
self.inode
}
}

pub trait IntoEntry {
fn into_entry(self) -> Entry;
}

impl IntoEntry for Arc<DicItem> {
fn into_entry(self) -> Entry {
match *self.content_type.lock().unwrap() {
ContentType::File => fuse::default_file_entry(self.inode),
ContentType::Dictionary(_) => fuse::default_dic_entry(self.inode),
}
}
}

#[derive(Serialize, Deserialize, Debug,Default)]
struct ApiResponse {
req_result: bool,
Expand Down Expand Up @@ -79,7 +112,7 @@ async fn fetch_tree(path: &str) -> Result<ApiResponse, Box<dyn Error>> {

#[allow(unused)]
pub struct DictionaryStore {
inodes: Arc<Mutex<HashMap<u64, DicItem>>>,
inodes: Arc<Mutex<HashMap<u64, Arc<DicItem>>>>,
next_inode: AtomicU64,
queue: Arc<Mutex<VecDeque<u64>>>,
radix_trie: Arc<Mutex<radix_trie::Trie<String, u64>>>,
Expand All @@ -89,40 +122,62 @@ pub struct DictionaryStore {
#[allow(unused)]
impl DictionaryStore {
pub fn new() -> Self {
DictionaryStore {
next_inode: AtomicU64::new(1),
let mut init = DictionaryStore {
next_inode: AtomicU64::new(2),
inodes: Arc::new(Mutex::new(HashMap::new())),
radix_trie: Arc::new(Mutex::new(radix_trie::Trie::new())),
queue: Arc::new(Mutex::new(VecDeque::new())),
}
};
let root_item = DicItem{
inode: 1,
name: GPath::new(),
content_type: ContentType::Dictionary(false).into(),
children: Mutex::new(HashMap::new()),
parent: UNKNOW_INODE, // root dictory has no parent
};
init
}
fn update_inode(&self,item:Item){
fn update_inode(&self,pitem:Option<Arc<DicItem>>,item:Item){
self.next_inode.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let alloc_inode = self.next_inode.load(std::sync::atomic::Ordering::Relaxed);
self.radix_trie.lock().unwrap().insert(item.path.clone(), alloc_inode);
self.inodes.lock().unwrap().insert(alloc_inode, DicItem::new(alloc_inode, item));

if let Some(parent) = pitem{
let newitem = Arc::new(DicItem::new(alloc_inode, parent.get_inode(),item));
parent.push_children(newitem.clone());
self.inodes.lock().unwrap().insert(alloc_inode, newitem);
}else{
self.inodes.lock().unwrap().insert(alloc_inode, Arc::new(DicItem::new(alloc_inode, UNKNOW_INODE,item)));
}

self.queue.lock().unwrap().push_back(alloc_inode);
}
pub fn import(&self){
const ROOT_DIR: &str ="/";
let mut queue = VecDeque::new();
let items: Vec<Item> = tokio::runtime::Runtime::new().unwrap().block_on(fetch_tree(ROOT_DIR)).unwrap().collect();//todo: can't tokio
for it in items{
self.update_inode(it);
self.update_inode(None,it);
}
while !queue.is_empty() {//BFS to look up all dictionary
let one_inode = queue.pop_back().unwrap();
let mut new_items = Vec::new();
{
let inodes_lock = self.inodes.lock().unwrap();
let it = inodes_lock.get(&one_inode).unwrap();
if it.content_type == ContentType::Dictionary{
let path = it.get_path();
new_items = tokio::runtime::Runtime::new().unwrap().block_on(fetch_tree(&path)).unwrap().collect();
if let ContentType::Dictionary(load) = *it.content_type.lock().unwrap(){
if !load{
let path = it.get_path();
new_items = tokio::runtime::Runtime::new().unwrap().block_on(fetch_tree(&path)).unwrap().collect();
}

}
}
for newit in new_items {
self.update_inode(newit); // Await the update_inode call
let mut pc = it.clone();
for newit in new_items {
self.update_inode(Some(pc.clone()),newit); // Await the update_inode call
}
let mut content_type = pc.content_type.lock().unwrap();
*content_type = ContentType::Dictionary(true);
}
new_items = Vec::new();
}
Expand All @@ -133,7 +188,18 @@ impl DictionaryStore {
pub fn find_path(&self,inode :u64)-> Option<GPath>{
self.inodes.lock().unwrap().get(&inode).map(|item| item.name.clone())
}

pub fn get_inode(&self,inode: u64) -> Result<Arc<DicItem>, io::Error> {
match self.inodes.lock().unwrap().get(&inode) {
Some(item) => Ok(item.clone()),
None=>Err(io::Error::new(io::ErrorKind::NotFound, "inode not found"))
}

}
pub fn get_by_path(&self, path: &str) -> Result<Arc<DicItem>, io::Error> {
let binding = self.radix_trie.lock().unwrap();
let inode = binding.get(path).ok_or(io::Error::new(io::ErrorKind::NotFound, "path not found"))?;
self.get_inode(*inode)
}
fn find_children(&self,parent: u64) -> Result<DicItem,io::Error>{
let path = self.inodes.lock().unwrap().get(&parent).map(|item| item.name.clone());
if let Some(parent_path) = path{
Expand Down

1 comment on commit 6a14e28

@vercel
Copy link

@vercel vercel bot commented on 6a14e28 Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

mega – ./

mega-gitmono.vercel.app
mega-git-main-gitmono.vercel.app
www.gitmega.dev
gitmega.dev

Please sign in to comment.