Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 36 additions & 29 deletions crates/rspack_fs/src/watcher/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use tokio::sync::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
};

use super::{EventAggregateHandler, EventHandler, FsEvent, FsEventKind};
use super::{EventAggregateHandler, EventHandler, FsEventKind};
use crate::watcher::EventBatch;

type ThreadSafetyReceiver<T> = ThreadSafety<UnboundedReceiver<T>>;
type ThreadSafety<T> = Arc<Mutex<T>>;
Expand All @@ -32,7 +33,7 @@ impl FilesData {
/// deleted files, and coordinates the event handling logic.
pub struct Executor {
aggregate_timeout: u32,
rx: ThreadSafety<UnboundedReceiver<FsEvent>>,
rx: ThreadSafetyReceiver<EventBatch>,
files_data: ThreadSafety<FilesData>,
exec_aggregate_tx: UnboundedSender<ExecAggregateEvent>,
exec_aggregate_rx: ThreadSafetyReceiver<ExecAggregateEvent>,
Expand All @@ -59,13 +60,13 @@ enum ExecAggregateEvent {
}

enum ExecEvent {
Execute(FsEvent),
Execute(EventBatch),
Close,
}

impl Executor {
/// Create a new `WatcherExecutor` with the given receiver and optional aggregate timeout.
pub fn new(rx: UnboundedReceiver<FsEvent>, aggregate_timeout: Option<u32>) -> Self {
pub fn new(rx: UnboundedReceiver<EventBatch>, aggregate_timeout: Option<u32>) -> Self {
let (exec_aggregate_tx, exec_aggregate_rx) = mpsc::unbounded_channel::<ExecAggregateEvent>();
let (exec_tx, exec_rx) = mpsc::unbounded_channel::<ExecEvent>();

Expand Down Expand Up @@ -134,24 +135,27 @@ impl Executor {
let aggregate_running = Arc::clone(&self.aggregate_running);

let future = async move {
while let Some(event) = rx.lock().await.recv().await {
let path = event.path.to_string_lossy().to_string();
match event.kind {
FsEventKind::Change => {
files_data.lock().await.changed.insert(path);
}
FsEventKind::Remove => {
files_data.lock().await.deleted.insert(path);
}
FsEventKind::Create => {
files_data.lock().await.changed.insert(path);
while let Some(events) = rx.lock().await.recv().await {
for event in &events {
let path = event.path.to_string_lossy().to_string();
match event.kind {
FsEventKind::Change => {
files_data.lock().await.changed.insert(path);
}
FsEventKind::Remove => {
files_data.lock().await.deleted.insert(path);
}
FsEventKind::Create => {
files_data.lock().await.changed.insert(path);
}
}
};
}

if !paused.load(Ordering::Relaxed) && !aggregate_running.load(Ordering::Relaxed) {
let _ = exec_aggregate_tx.send(ExecAggregateEvent::Execute);
}
let _ = exec_tx.send(ExecEvent::Execute(event));

let _ = exec_tx.send(ExecEvent::Execute(events));
}

let _ = exec_aggregate_tx.send(ExecAggregateEvent::Close);
Expand Down Expand Up @@ -194,19 +198,22 @@ fn create_execute_task(
exec_rx: ThreadSafetyReceiver<ExecEvent>,
) -> tokio::task::JoinHandle<()> {
let future = async move {
while let Some(event) = exec_rx.lock().await.recv().await {
match event {
ExecEvent::Execute(event) => {
let path = event.path.to_string_lossy().to_string();
match event.kind {
super::FsEventKind::Change | super::FsEventKind::Create => {
if event_handler.on_change(path).is_err() {
break;
while let Some(exec_event) = exec_rx.lock().await.recv().await {
match exec_event {
ExecEvent::Execute(batch_events) => {
for event in batch_events {
// Handle each event based on its kind
let path = event.path.to_string_lossy().to_string();
match event.kind {
super::FsEventKind::Change | super::FsEventKind::Create => {
if event_handler.on_change(path).is_err() {
break;
}
}
}
super::FsEventKind::Remove => {
if event_handler.on_delete(path).is_err() {
break;
super::FsEventKind::Remove => {
if event_handler.on_delete(path).is_err() {
break;
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions crates/rspack_fs/src/watcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ pub(crate) struct FsEvent {
pub kind: FsEventKind,
}

pub(crate) type EventBatch = Vec<FsEvent>;

/// `EventAggregateHandler` is a trait for handling aggregated file system events.
/// It provides methods to handle changes and deletions of files, as well as errors.
/// Implementors of this trait can define custom behavior for these events.
Expand Down
21 changes: 11 additions & 10 deletions crates/rspack_fs/src/watcher/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ use std::{ops::Deref, sync::Arc};
use tokio::sync::mpsc::UnboundedSender;

use super::{FsEvent, FsEventKind, PathManager};
use crate::watcher::EventBatch;

// Scanner will scann the path whether it is exist or not in disk on initialization
pub struct Scanner {
path_manager: Arc<PathManager>,
tx: Option<UnboundedSender<FsEvent>>,
tx: Option<UnboundedSender<EventBatch>>,
}

impl Scanner {
/// Creates a new `Scanner` that will send events to the provided sender when paths are scanned.
pub fn new(tx: UnboundedSender<FsEvent>, path_manager: Arc<PathManager>) -> Self {
pub fn new(tx: UnboundedSender<EventBatch>, path_manager: Arc<PathManager>) -> Self {
Self {
path_manager,
tx: Some(tx),
Expand All @@ -33,10 +34,10 @@ impl Scanner {
&& let Some(tx) = &_tx
{
// If the file does not exist, send a delete event
let _ = tx.send(FsEvent {
let _ = tx.send(vec![FsEvent {
path: filepath.clone(),
kind: FsEventKind::Remove,
});
}]);
}
}
});
Expand All @@ -50,10 +51,10 @@ impl Scanner {
&& let Some(tx) = &_tx
{
// If the directory does not exist, send a delete event
let _ = tx.send(FsEvent {
let _ = tx.send(vec![FsEvent {
path: dirpath.clone(),
kind: FsEventKind::Remove,
});
}]);
}
}
});
Expand Down Expand Up @@ -110,13 +111,13 @@ mod tests {
let collected_events = collector.await.unwrap();
assert_eq!(collected_events.len(), 2);

assert!(collected_events.contains(&FsEvent {
assert!(collected_events.contains(&vec![FsEvent {
path: ArcPath::from(current_dir.join("___test_file.txt")),
kind: FsEventKind::Remove
}));
assert!(collected_events.contains(&FsEvent {
}]));
assert!(collected_events.contains(&vec![FsEvent {
path: ArcPath::from(current_dir.join("___test_dir/a/b/c")),
kind: FsEventKind::Remove,
}));
}]));
}
}
27 changes: 16 additions & 11 deletions crates/rspack_fs/src/watcher/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use rspack_paths::ArcPath;
use tokio::sync::mpsc::UnboundedSender;

use super::{FsEvent, FsEventKind};
use crate::watcher::paths::PathManager;
use crate::watcher::{EventBatch, paths::PathManager};
/// `DependencyFinder` provides references to sets of files, directories, and missing paths,
/// allowing efficient lookup and dependency resolution for a given path.
///
Expand Down Expand Up @@ -91,12 +91,12 @@ pub struct Trigger {
/// Shared reference to the path register, which tracks watched files/directories/missing.
path_manager: Arc<PathManager>,
/// Sender for communicating file system events to the watcher executor.
tx: UnboundedSender<FsEvent>,
tx: UnboundedSender<EventBatch>,
}

impl Trigger {
/// Create a new `Trigger` with the given path register and event sender.
pub fn new(path_manager: Arc<PathManager>, tx: UnboundedSender<FsEvent>) -> Self {
pub fn new(path_manager: Arc<PathManager>, tx: UnboundedSender<EventBatch>) -> Self {
Self { path_manager, tx }
}

Expand All @@ -116,9 +116,7 @@ impl Trigger {
pub fn on_event(&self, path: &ArcPath, kind: FsEventKind) {
let finder = self.finder();
let associated_event = finder.find_associated_event(path, kind);
for (path, kind) in associated_event {
self.trigger_event(path, kind);
}
self.trigger_events(associated_event);
}

/// Helper to construct a `DependencyFinder` for the current path register state.
Expand All @@ -136,11 +134,18 @@ impl Trigger {
}
}

/// Sends a file system event for the given path and event kind.
/// Ignores any error if the receiver has been dropped.
fn trigger_event(&self, path: ArcPath, kind: FsEventKind) {
let event = FsEvent { path, kind };
_ = self.tx.send(event);
/// Sends a group of file system events for the given path and event kind.
/// If the event is successfully sent, it returns true; otherwise, it returns false.
fn trigger_events(&self, events: Vec<(ArcPath, FsEventKind)>) -> bool {
self
.tx
.send(
events
.into_iter()
.map(|(path, kind)| FsEvent { path, kind })
.collect(),
)
.is_ok()
}
}
#[cfg(test)]
Expand Down
Loading