Skip to content

[scorpio]: Add Readonly dictionary FUSE with server test. unstable.v1 #479

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions scorpio/src/dicfuse/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ fn default_stat64(inode:u64) -> stat64 {
mtimensec: 0, // Default nanoseconds of last modification time
ctimensec: 0, // Default nanoseconds of last status change time
mode: 0o444, // Default file mode (r--r--r--)
//mode: 0o555, // Default file mode (r-xr-xr-x)
nlink: 1, // Default number of hard links
uid: 1000, // Default user ID
gid: 1000, // Default group ID
Expand All @@ -24,8 +25,8 @@ fn default_stat64(inode:u64) -> stat64 {
};
t.into()
}
#[allow(unused)]
pub fn default_entry(inode:u64) -> Entry {

pub fn default_file_entry(inode:u64) -> Entry {
Entry{
inode,
generation: 0,
Expand All @@ -35,6 +36,19 @@ pub fn default_entry(inode:u64) -> Entry {
entry_timeout: Duration::from_secs(u64::MAX),
} // Return a default Entry instance
}

pub fn default_dic_entry(inode:u64) -> Entry {
let mut d = default_stat64(inode);
d.st_mode = 0o555;
Entry{
inode,
generation: 0,
attr: d,
attr_flags: 0,
attr_timeout: Duration::from_secs(u64::MAX),
entry_timeout: Duration::from_secs(u64::MAX),
} // Return a default Dictionary Entry instance
}
// pub struct stat64 {
// pub st_dev: ::dev_t, // Device ID of the device containing the file
// pub st_ino: ::ino64_t, // Inode number of the file
Expand Down
98 changes: 91 additions & 7 deletions scorpio/src/dicfuse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ mod fuse;

use std::{sync::Arc, time::Duration};
use std::io::Result;
use fuse::default_entry;
use fuse_backend_rs::{abi::fuse_abi::FsOptions, api::filesystem::{Context, Entry, FileSystem}};
use tokio::task::JoinHandle;

use store::DictionaryStore;
use store::{DictionaryStore, IntoEntry};

struct Dicfuse{
store: Arc<DictionaryStore>,
Expand Down Expand Up @@ -50,8 +49,11 @@ impl FileSystem for Dicfuse{

fn lookup(&self, ctx: &Context, parent: Self::Inode, name: &std::ffi::CStr) -> Result<Entry> {
let store = self.store.clone();
let pitem = store.find_path(parent).ok_or_else(|| std::io::Error::from_raw_os_error(libc::ENODATA))?;
Ok(Entry::default())
let mut ppath = store.find_path(parent).ok_or_else(|| std::io::Error::from_raw_os_error(libc::ENODATA))?;
let pitem = store.get_inode(parent)?;
ppath.push(name.to_string_lossy().into_owned());
let chil = store.get_by_path(&ppath.to_string())?;
Ok(chil.into_entry())
}


Expand All @@ -71,9 +73,8 @@ impl FileSystem for Dicfuse{
) -> std::io::Result<(libc::stat64, std::time::Duration)> {
let store = self.store.clone();
let i = store.find_path(inode).ok_or_else(|| std::io::Error::from_raw_os_error(libc::ENODATA))?;
let entry = default_entry(inode);
let entry = fuse::default_file_entry(inode);
Ok((entry.attr,Duration::from_secs(u64::MAX)))

}

fn setattr(
Expand Down Expand Up @@ -286,4 +287,87 @@ impl FileSystem for Dicfuse{
}


}
}

#[cfg(test)]
mod tests {
use std::{io, path::Path, sync::Arc,thread};

use fuse_backend_rs::{api::server::Server, transport::{FuseChannel, FuseSession}};
use signal_hook::{consts::TERM_SIGNALS, iterator::Signals};

use super::Dicfuse;


pub struct DicFuseServer {
server: Arc<Server<Arc<Dicfuse>>>,
ch: FuseChannel,
}
impl DicFuseServer {
pub fn svc_loop(&mut self) -> Result<(),io::Error> {
let _ebadf = std::io::Error::from_raw_os_error(libc::EBADF);
println!("entering server loop");
loop {
if let Some((reader, writer)) = self
.ch
.get_request()
.map_err(|_| std::io::Error::from_raw_os_error(libc::EINVAL))?
{
if let Err(e) = self
.server
.handle_message(reader, writer.into(), None, None)
{
match e {
fuse_backend_rs::Error::EncodeMessage(_ebadf) => {
break;
}
_ => {
print!("Handling fuse message failed");
continue;
}
}
}
} else {
print!("fuse server exits");
break;
}
}
Ok(())
}


}

#[test]
fn test_svc_loop_success() {
let dicfuse = Arc::new(Dicfuse::new());
// Create fuse session
let mut se = FuseSession::new(Path::new(&"/home/luxian/ccode/mega/dictest"), "dic", "", true).unwrap();
se.mount().unwrap();
let ch: FuseChannel = se.new_channel().unwrap();
let server = Arc::new(Server::new(dicfuse.clone()));
let mut dicfuse_server = DicFuseServer { server, ch };

// Mock the behavior of get_request to simulate a successful request
// This would require implementing a mock or a stub for FuseChannel
// For the sake of this example, we will assume it is done correctly


// Spawn server thread
let handle = thread::spawn(move || {
let _ = dicfuse_server.svc_loop();
});
// Wait for termination signal
let mut signals = Signals::new(TERM_SIGNALS).unwrap();
if let Some(_sig) = signals.forever().next() {
//pass
}
// Unmount and wake up
se.umount().unwrap();
se.wake().unwrap();
// Join server thread
let _ = handle.join();
}


}
1 change: 1 addition & 0 deletions scorpio/src/dicfuse/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ impl Display for GPath{
write!(f, "{}", self.path.join("/"))
}
}

108 changes: 87 additions & 21 deletions scorpio/src/dicfuse/store.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

use fuse_backend_rs::api::filesystem::Entry;
/// Read only file system for obtaining and displaying monorepo directory information
use reqwest::Client;
// Import Response explicitly
Expand All @@ -12,9 +13,10 @@ use radix_trie::{self, TrieCommon};
use std::sync::{Arc,Mutex};


use super::fuse;
use super::model::GPath;
const MEGA_TREE_URL: &str = "localhost:8000";//TODO: make it configable

const UNKNOW_INODE: u64 = 0; // illegal inode number;

#[derive(Serialize, Deserialize, Debug)]
pub struct Item {
Expand All @@ -23,35 +25,66 @@ pub struct Item {
content_type: String,
}
#[allow(unused)]
struct DicItem{
pub struct DicItem{
inode:u64,
name:GPath,
content_type: ContentType,
content_type: Mutex<ContentType>,
children:Mutex<HashMap<String, Arc<DicItem>>>,
parent:u64,
}

#[allow(unused)]
#[derive(PartialEq)]
enum ContentType {
File,
Dictionary,
Dictionary(bool),// if this dictionary is loaded.
}
#[allow(unused)]
impl DicItem {
pub fn new(inode:u64, item:Item) -> Self {
pub fn new(inode:u64,parent:u64, item:Item) -> Self {
DicItem {
inode,
name: item.name.into(), // Assuming GPath can be created from String
content_type: match item.content_type.as_str() {
"file" => ContentType::File,
"directory" => ContentType::Dictionary,
"file" => ContentType::File.into(),
"directory" => ContentType::Dictionary(false).into(),
_ => panic!("Unknown content type"),
},
children: Mutex::new(HashMap::new()),
parent,
}
}
//get the total path
pub fn get_path(&self) -> String {
self.name.to_string()
}
//get the file or dic name . aka tail name.
pub fn get_name(&self) -> String {
self.name.name()
}
// add a children item
pub fn push_children(&self,children:Arc<DicItem>){
self.children.lock().unwrap().insert(children.get_path(), children);
}
// get the inode
pub fn get_inode(&self)-> u64{
self.inode
}
}

pub trait IntoEntry {
fn into_entry(self) -> Entry;
}

impl IntoEntry for Arc<DicItem> {
fn into_entry(self) -> Entry {
match *self.content_type.lock().unwrap() {
ContentType::File => fuse::default_file_entry(self.inode),
ContentType::Dictionary(_) => fuse::default_dic_entry(self.inode),
}
}
}

#[derive(Serialize, Deserialize, Debug,Default)]
struct ApiResponse {
req_result: bool,
Expand Down Expand Up @@ -79,7 +112,7 @@ async fn fetch_tree(path: &str) -> Result<ApiResponse, Box<dyn Error>> {

#[allow(unused)]
pub struct DictionaryStore {
inodes: Arc<Mutex<HashMap<u64, DicItem>>>,
inodes: Arc<Mutex<HashMap<u64, Arc<DicItem>>>>,
next_inode: AtomicU64,
queue: Arc<Mutex<VecDeque<u64>>>,
radix_trie: Arc<Mutex<radix_trie::Trie<String, u64>>>,
Expand All @@ -89,40 +122,62 @@ pub struct DictionaryStore {
#[allow(unused)]
impl DictionaryStore {
pub fn new() -> Self {
DictionaryStore {
next_inode: AtomicU64::new(1),
let mut init = DictionaryStore {
next_inode: AtomicU64::new(2),
inodes: Arc::new(Mutex::new(HashMap::new())),
radix_trie: Arc::new(Mutex::new(radix_trie::Trie::new())),
queue: Arc::new(Mutex::new(VecDeque::new())),
}
};
let root_item = DicItem{
inode: 1,
name: GPath::new(),
content_type: ContentType::Dictionary(false).into(),
children: Mutex::new(HashMap::new()),
parent: UNKNOW_INODE, // root dictory has no parent
};
init
}
fn update_inode(&self,item:Item){
fn update_inode(&self,pitem:Option<Arc<DicItem>>,item:Item){
self.next_inode.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let alloc_inode = self.next_inode.load(std::sync::atomic::Ordering::Relaxed);
self.radix_trie.lock().unwrap().insert(item.path.clone(), alloc_inode);
self.inodes.lock().unwrap().insert(alloc_inode, DicItem::new(alloc_inode, item));

if let Some(parent) = pitem{
let newitem = Arc::new(DicItem::new(alloc_inode, parent.get_inode(),item));
parent.push_children(newitem.clone());
self.inodes.lock().unwrap().insert(alloc_inode, newitem);
}else{
self.inodes.lock().unwrap().insert(alloc_inode, Arc::new(DicItem::new(alloc_inode, UNKNOW_INODE,item)));
}

self.queue.lock().unwrap().push_back(alloc_inode);
}
pub fn import(&self){
const ROOT_DIR: &str ="/";
let mut queue = VecDeque::new();
let items: Vec<Item> = tokio::runtime::Runtime::new().unwrap().block_on(fetch_tree(ROOT_DIR)).unwrap().collect();//todo: can't tokio
for it in items{
self.update_inode(it);
self.update_inode(None,it);
}
while !queue.is_empty() {//BFS to look up all dictionary
let one_inode = queue.pop_back().unwrap();
let mut new_items = Vec::new();
{
let inodes_lock = self.inodes.lock().unwrap();
let it = inodes_lock.get(&one_inode).unwrap();
if it.content_type == ContentType::Dictionary{
let path = it.get_path();
new_items = tokio::runtime::Runtime::new().unwrap().block_on(fetch_tree(&path)).unwrap().collect();
if let ContentType::Dictionary(load) = *it.content_type.lock().unwrap(){
if !load{
let path = it.get_path();
new_items = tokio::runtime::Runtime::new().unwrap().block_on(fetch_tree(&path)).unwrap().collect();
}

}
}
for newit in new_items {
self.update_inode(newit); // Await the update_inode call
let mut pc = it.clone();
for newit in new_items {
self.update_inode(Some(pc.clone()),newit); // Await the update_inode call
}
let mut content_type = pc.content_type.lock().unwrap();
*content_type = ContentType::Dictionary(true);
}
new_items = Vec::new();
}
Expand All @@ -133,7 +188,18 @@ impl DictionaryStore {
pub fn find_path(&self,inode :u64)-> Option<GPath>{
self.inodes.lock().unwrap().get(&inode).map(|item| item.name.clone())
}

pub fn get_inode(&self,inode: u64) -> Result<Arc<DicItem>, io::Error> {
match self.inodes.lock().unwrap().get(&inode) {
Some(item) => Ok(item.clone()),
None=>Err(io::Error::new(io::ErrorKind::NotFound, "inode not found"))
}

}
pub fn get_by_path(&self, path: &str) -> Result<Arc<DicItem>, io::Error> {
let binding = self.radix_trie.lock().unwrap();
let inode = binding.get(path).ok_or(io::Error::new(io::ErrorKind::NotFound, "path not found"))?;
self.get_inode(*inode)
}
fn find_children(&self,parent: u64) -> Result<DicItem,io::Error>{
let path = self.inodes.lock().unwrap().get(&parent).map(|item| item.name.clone());
if let Some(parent_path) = path{
Expand Down
Loading