Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions tooling/nargo/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,24 @@ pub fn parse_all(file_manager: &FileManager) -> ParsedFiles {

#[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))]
pub fn parse_all(file_manager: &FileManager) -> ParsedFiles {
let num_threads = rayon::current_num_threads();
// Collect only .nr files to process
let nr_files: Vec<_> = file_manager
.as_file_map()
.all_file_ids()
.filter(|&&file_id| {
let file_path = file_manager.path(file_id).expect("expected file to exist");
let file_extension =
file_path.extension().expect("expected all file paths to have an extension");
file_extension == "nr"
})
.copied()
.collect();

// Limit threads to the actual number of files we need to process.
let num_threads = rayon::current_num_threads().min(nr_files.len()).max(1);

let (sender, receiver) = mpsc::channel();
let iter = &Mutex::new(file_manager.as_file_map().all_file_ids());
let iter = &Mutex::new(nr_files.into_iter());

thread::scope(|scope| {
// Start worker threads
Expand All @@ -247,18 +262,10 @@ pub fn parse_all(file_manager: &FileManager) -> ParsedFiles {
.spawn_scoped(scope, move || {
loop {
// Get next file to process from the iterator.
let Some(&file_id) = iter.lock().unwrap().next() else {
let Some(file_id) = iter.lock().unwrap().next() else {
break;
};

let file_path = file_manager.path(file_id).expect("expected file to exist");
let file_extension = file_path
.extension()
.expect("expected all file paths to have an extension");
if file_extension != "nr" {
continue;
}

let parsed_file = parse_file(file_manager, file_id);

if thread_sender.send((file_id, parsed_file)).is_err() {
Expand Down
9 changes: 7 additions & 2 deletions tooling/nargo_cli/src/cli/compile_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,13 @@ fn compile_programs(
};

// Configure a thread pool with a larger stack size to prevent overflowing stack in large programs.
// Default is 2MB.
let pool = rayon::ThreadPoolBuilder::new().stack_size(4 * 1024 * 1024).build().unwrap();
// Default is 2MB. Limit threads to the number of packages we actually need to compile.
let num_threads = rayon::current_num_threads().min(binary_packages.len()).max(1);
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.stack_size(4 * 1024 * 1024)
.build()
.unwrap();
let program_results: Vec<CompilationResult<()>> =
pool.install(|| binary_packages.par_iter().map(compile_package).collect());

Expand Down
9 changes: 8 additions & 1 deletion tooling/nargo_cli/src/cli/fuzz_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,14 @@ fn list_harnesses_command(
parsed_files: &ParsedFiles,
pattern: &FunctionNameMatch,
) -> Result<(), CliError> {
let pool = rayon::ThreadPoolBuilder::new().stack_size(4 * 1024 * 1024).build().unwrap();
// Configure a thread pool with a larger stack size to prevent overflowing stack in large programs.
// Default is 2MB. Limit threads to the number of packages we actually need to compile.
let num_threads = rayon::current_num_threads().min(workspace.members.len()).max(1);
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.stack_size(4 * 1024 * 1024)
.build()
.unwrap();
let all_harnesses_by_package: Vec<(CrateName, Vec<String>)> = pool
.install(|| {
workspace.into_iter().par_bridge().map(|package| {
Expand Down
11 changes: 8 additions & 3 deletions tooling/nargo_cli/src/cli/test_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,12 +366,15 @@ impl<'a> TestRunner<'a> {
Vec<Test<'a>>,
) = tests.into_iter().partition(|test| !test.has_arguments);

// Calculate the actual number of threads needed based on test count.
let num_threads = self.num_threads.min(iter_tests_without_arguments.len()).max(1);

let iter_tests_without_arguments = &Mutex::new(iter_tests_without_arguments.into_iter());
let iter_tests_with_arguments = &Mutex::new(iter_tests_with_arguments.into_iter());

thread::scope(|scope| {
// Start worker threads
for _ in 0..self.num_threads {
for _ in 0..num_threads {
// Clone sender so it's dropped once the thread finishes
let test_result_thread_sender = sender.clone();
let standard_tests_finished_thread_sender = standard_tests_finished_sender.clone();
Expand All @@ -398,7 +401,7 @@ impl<'a> TestRunner<'a> {
// Wait for at least half of the threads to finish processing the standard tests
while standard_tests_finished_receiver.recv().is_ok() {
standard_tests_threads_finished += 1;
if standard_tests_threads_finished >= max(1, self.num_threads / 2) {
if standard_tests_threads_finished >= max(1, num_threads / 2) {
break;
}
}
Expand Down Expand Up @@ -493,11 +496,13 @@ impl<'a> TestRunner<'a> {
let mut error = None;

let (sender, receiver) = mpsc::channel();
// Calculate the actual number of threads needed based on package count.
let num_threads = self.num_threads.min(self.workspace.members.len()).max(1);
let iter = &Mutex::new(self.workspace.into_iter());

thread::scope(|scope| {
// Start worker threads
for _ in 0..self.num_threads {
for _ in 0..num_threads {
// Clone sender so it's dropped once the thread finishes
let thread_sender = sender.clone();
thread::Builder::new()
Expand Down
Loading