Skip to content

Conversation

@mdrokz
Copy link
Contributor

@mdrokz mdrokz commented Oct 30, 2025

Changes

  • implement parallel execution using tokio::spawn
  • stream logs and maintain their execution order
  • handle docker api rate limiting through semaphore lock
  • add subdirectories to prevent race conditions for resource access
  • refactor and standardise functionality of the docker command execution by moving common methods into container.rs
  • add integration tests & performance benchmarks

Review

  • made sure all acceptance criterias are met
  • all tests are passing and performance benchmarks show improvements for parallel execution

Issue

@gemini-code-assist
Copy link

Summary of Changes

Hello @mdrokz, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a major performance enhancement by enabling parallel execution for stages in the CI/CD pipeline. It allows independent steps within a stage to run simultaneously, drastically cutting down execution times. The implementation includes careful handling of log output to maintain readability, mechanisms to prevent resource conflicts between parallel tasks, and strategies to manage external API interactions like Docker image pulls. Comprehensive testing and benchmarking confirm the functionality and performance benefits of this new feature.

Highlights

  • Parallel Execution for Stages: Implemented the ability to run multiple steps within a stage concurrently using tokio::spawn, significantly reducing overall pipeline execution time for independent tasks.
  • Log Streaming and Ordering: Developed a mechanism to stream logs from parallel tasks and buffer them, ensuring that the final output maintains the original definition order of the steps, rather than the order of completion.
  • Docker API Rate Limiting Handling: Introduced a semaphore lock to manage Docker image pulls, preventing concurrent pulls of the same image and mitigating potential Docker Hub API rate limiting issues.
  • Resource Access and Race Condition Prevention: Each parallel step now operates within its own isolated temporary subdirectory, mounted as /forge-shared, to prevent race conditions when multiple steps attempt to access or modify shared filesystem resources.
  • Code Refactoring and Modularity: Extracted common Docker command execution and container management functionalities into a new src/container.rs module, improving code organization and maintainability.
  • Integration Tests and Performance Benchmarks: Added new integration tests to validate parallel execution behavior, including fail-fast scenarios, log ordering, and validation rules. Performance benchmarks were also introduced to quantify the speed improvements of parallel execution over sequential.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Improved (1 files improve in Code Health)

Gates Failed
New code is healthy (2 new files with code health below 10.00)
Enforce critical code health rules (1 file with Bumpy Road Ahead)
Enforce advisory code health rules (3 files with Overall Code Complexity, Excess Number of Function Arguments, Large Method, Code Duplication)

Gates Passed
3 Quality Gates Passed

See analysis details in CodeScene

Reason for failure
New code is healthy Violations Code Health Impact
container.rs 3 rules 8.82 Suppress
parallel_benchmark.rs 1 rule 9.16 Suppress
Enforce critical code health rules Violations Code Health Impact
container.rs 1 critical rule 8.82 Suppress
Enforce advisory code health rules Violations Code Health Impact
container.rs 2 advisory rules 8.82 Suppress
parallel_benchmark.rs 1 advisory rule 9.16 Suppress
integration_tests.rs 1 advisory rule 10.00 → 9.39 Suppress
View Improvements
File Code Health Impact Categories Improved
main.rs 6.79 → 7.59 Complex Method, Bumpy Road Ahead, Overall Code Complexity

Quality Gate Profile: Pay Down Tech Debt
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.

Comment on lines +171 to +252
pub async fn prepare_container(
docker: &Docker,
step: &Step,
cache_config: &CacheConfig,
temp_dir: &Path,
verbose: bool,
) -> Result<ContainerSetup, Box<dyn std::error::Error + Send + Sync>> {
let image = if step.image.is_empty() {
"alpine:latest"
} else {
&step.image
};

// Pull the image if needed
pull_image(docker, image).await?;

// Create a unique container name
let container_name = format!("forge-{}", uuid::Uuid::new_v4());

// Prepare environment variables
let env: Vec<String> = step.env.iter().map(|(k, v)| format!("{k}={v}")).collect();

let step_name = if step.name.is_empty() {
"unnamed step"
} else {
&step.name
};

if verbose {
println!("{}", format!("Running step: {step_name}").yellow().bold());
println!(" Command: {command}", command = step.command);
println!(" Image: {image}");
if !step.working_dir.is_empty() {
println!(" Working directory: {dir}", dir = step.working_dir);
}
if !step.env.is_empty() {
println!(" Environment variables:");
for (k, v) in &step.env {
println!(" {k}={v}");
}
}
}

// Setup volume mounts
let mut mounts = vec![];
let shared_mount = Mount {
target: Some("/forge-shared".to_string()),
source: Some(temp_dir.to_string_lossy().to_string()),
typ: Some(MountTypeEnum::BIND),
..Default::default()
};
mounts.push(shared_mount);

let host_config = HostConfig {
auto_remove: Some(false),
mounts: Some(mounts),
..Default::default()
};

// Build command with cache support
let command = build_command_with_cache(&step.command, cache_config, verbose);

let config = Config {
image: Some(image.to_string()),
cmd: Some(vec!["/bin/sh".to_string(), "-c".to_string(), command]),
env: Some(env),
working_dir: if step.working_dir.is_empty() {
None
} else {
Some(step.working_dir.clone())
},
host_config: Some(host_config),
..Default::default()
};

Ok(ContainerSetup {
name: container_name,
config,
image: image.to_string(),
step_name: step_name.to_string(),
})
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Bumpy Road Ahead
prepare_container has 2 blocks with nested conditional logic. Any nesting of 2 or deeper is considered. Threshold is 2 blocks per function

Suppress

Comment on lines +467 to +488
pub async fn cleanup_containers(docker: &Docker, container_ids: &Arc<Mutex<Vec<String>>>) {
let ids = container_ids.lock().await;

// First, stop all containers (may still be running in fail-fast scenarios)
for id in ids.iter() {
// Stop with a 2-second timeout
if let Err(e) = docker.stop_container(id, None).await {
// Container might already be stopped, that's okay
if !e.to_string().contains("304") {
// 304 = Not Modified (already stopped)
eprintln!("Warning: Failed to stop container {}: {}", id, e);
}
}
}

// Then remove all containers
for id in ids.iter() {
if let Err(e) = docker.remove_container(id, None).await {
eprintln!("Warning: Failed to remove container {}: {}", id, e);
}
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Bumpy Road Ahead
cleanup_containers has 2 blocks with nested conditional logic. Any nesting of 2 or deeper is considered. Threshold is 2 blocks per function

Suppress

@@ -0,0 +1,488 @@
//! Container management utilities for Forge

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Overall Code Complexity
This module has a mean cyclomatic complexity of 4.25 across 8 functions. The mean complexity threshold is 4

Suppress

Comment on lines +171 to +252
pub async fn prepare_container(
docker: &Docker,
step: &Step,
cache_config: &CacheConfig,
temp_dir: &Path,
verbose: bool,
) -> Result<ContainerSetup, Box<dyn std::error::Error + Send + Sync>> {
let image = if step.image.is_empty() {
"alpine:latest"
} else {
&step.image
};

// Pull the image if needed
pull_image(docker, image).await?;

// Create a unique container name
let container_name = format!("forge-{}", uuid::Uuid::new_v4());

// Prepare environment variables
let env: Vec<String> = step.env.iter().map(|(k, v)| format!("{k}={v}")).collect();

let step_name = if step.name.is_empty() {
"unnamed step"
} else {
&step.name
};

if verbose {
println!("{}", format!("Running step: {step_name}").yellow().bold());
println!(" Command: {command}", command = step.command);
println!(" Image: {image}");
if !step.working_dir.is_empty() {
println!(" Working directory: {dir}", dir = step.working_dir);
}
if !step.env.is_empty() {
println!(" Environment variables:");
for (k, v) in &step.env {
println!(" {k}={v}");
}
}
}

// Setup volume mounts
let mut mounts = vec![];
let shared_mount = Mount {
target: Some("/forge-shared".to_string()),
source: Some(temp_dir.to_string_lossy().to_string()),
typ: Some(MountTypeEnum::BIND),
..Default::default()
};
mounts.push(shared_mount);

let host_config = HostConfig {
auto_remove: Some(false),
mounts: Some(mounts),
..Default::default()
};

// Build command with cache support
let command = build_command_with_cache(&step.command, cache_config, verbose);

let config = Config {
image: Some(image.to_string()),
cmd: Some(vec!["/bin/sh".to_string(), "-c".to_string(), command]),
env: Some(env),
working_dir: if step.working_dir.is_empty() {
None
} else {
Some(step.working_dir.clone())
},
host_config: Some(host_config),
..Default::default()
};

Ok(ContainerSetup {
name: container_name,
config,
image: image.to_string(),
step_name: step_name.to_string(),
})
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Excess Number of Function Arguments
prepare_container has 5 arguments, max arguments = 4

Suppress

Comment on lines +130 to +221
fn benchmark_5_tasks_1s(c: &mut Criterion) {
let binary_path = get_forge_binary();
if !binary_path.exists() {
panic!("Binary not found. Please run 'cargo build --release' first");
}

let dir = tempdir().unwrap();

// Sequential version
let seq_config = r#"
version: "1.0"
stages:
- name: sequential
parallel: false
steps:
- name: Task 1
image: alpine:latest
command: sleep 1
- name: Task 2
image: alpine:latest
command: sleep 1
- name: Task 3
image: alpine:latest
command: sleep 1
- name: Task 4
image: alpine:latest
command: sleep 1
- name: Task 5
image: alpine:latest
command: sleep 1
"#;

let seq_path = dir.path().join("sequential_5.yaml");
File::create(&seq_path)
.unwrap()
.write_all(seq_config.as_bytes())
.unwrap();

// Parallel version
let par_config = r#"
version: "1.0"
stages:
- name: parallel
parallel: true
steps:
- name: Task 1
image: alpine:latest
command: sleep 1
- name: Task 2
image: alpine:latest
command: sleep 1
- name: Task 3
image: alpine:latest
command: sleep 1
- name: Task 4
image: alpine:latest
command: sleep 1
- name: Task 5
image: alpine:latest
command: sleep 1
"#;

let par_path = dir.path().join("parallel_5.yaml");
File::create(&par_path)
.unwrap()
.write_all(par_config.as_bytes())
.unwrap();

let mut group = c.benchmark_group("5_tasks_1s_each");
// Reduce sample size for faster benchmarks
group.sample_size(10);
// Increase measurement time to account for Docker overhead
group.measurement_time(std::time::Duration::from_secs(120));

group.bench_with_input(
BenchmarkId::new("sequential", "5x1s"),
&seq_path,
|b, path| {
b.iter(|| run_forge(black_box(&binary_path), black_box(path.to_str().unwrap())));
},
);

group.bench_with_input(
BenchmarkId::new("parallel", "5x1s"),
&par_path,
|b, path| {
b.iter(|| run_forge(black_box(&binary_path), black_box(path.to_str().unwrap())));
},
);

group.finish();
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Large Method
benchmark_5_tasks_1s has 79 lines, threshold = 70

Suppress

Comment on lines +225 to +314
fn test_parallel_is_faster_than_sequential() {
let binary_path = get_forge_binary();
if !binary_path.exists() {
println!(
"Binary not found at {:?}. Building in release mode...",
binary_path
);
let status = Command::new("cargo")
.args(&["build", "--release"])
.status()
.expect("Failed to build");
assert!(status.success(), "Build failed");
}

let dir = tempdir().unwrap();

// Sequential: 3 tasks × 2 seconds = 6 seconds
let seq_config = r#"
version: "1.0"
stages:
- name: sequential
parallel: false
steps:
- name: Task 1
image: alpine:latest
command: sleep 2
- name: Task 2
image: alpine:latest
command: sleep 2
- name: Task 3
image: alpine:latest
command: sleep 2
"#;

let seq_path = dir.path().join("seq_perf_test.yaml");
File::create(&seq_path)
.unwrap()
.write_all(seq_config.as_bytes())
.unwrap();

// Parallel: 3 tasks running simultaneously = ~2 seconds
let par_config = r#"
version: "1.0"
stages:
- name: parallel
parallel: true
steps:
- name: Task 1
image: alpine:latest
command: sleep 2
- name: Task 2
image: alpine:latest
command: sleep 2
- name: Task 3
image: alpine:latest
command: sleep 2
"#;

let par_path = dir.path().join("par_perf_test.yaml");
File::create(&par_path)
.unwrap()
.write_all(par_config.as_bytes())
.unwrap();

println!("Running sequential execution...");
let seq_time = run_forge(&binary_path, seq_path.to_str().unwrap());
println!("Sequential took: {:?}", seq_time);

println!("Running parallel execution...");
let par_time = run_forge(&binary_path, par_path.to_str().unwrap());
println!("Parallel took: {:?}", par_time);

let speedup = seq_time.as_secs_f64() / par_time.as_secs_f64();
println!("Speedup: {:.2}x", speedup);

// Parallel should be faster (allow 1.5x minimum due to Docker overhead)
// Ideally it should be ~3x for 3 tasks, but Docker operations add overhead
assert!(
speedup >= 1.5,
"Parallel execution should be at least 1.5x faster. Got: {:.2}x",
speedup
);

// Generate report
println!("\n=== Performance Report ===");
println!("Sequential: {:?}", seq_time);
println!("Parallel: {:?}", par_time);
println!("Speedup: {:.2}x", speedup);
println!("Improvement: {:.1}%", (speedup - 1.0) * 100.0);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Large Method
test_parallel_is_faster_than_sequential has 75 lines, threshold = 70

Suppress

Comment on lines +301 to +330
fn test_validation_rejects_single_step_parallel() {
let dir = tempdir().unwrap();

let config = r#"
version: "1.0"
stages:
- name: invalid-parallel
parallel: true
steps:
- name: Only One
image: alpine:latest
command: echo "test"
"#;

let config_path = create_test_config(dir.path(), "invalid-single.yaml", config);

let result = run_forge_cli(&["validate", "--file", config_path.to_str().unwrap()]);

// Should fail validation
assert!(
result.is_err(),
"Expected validation to fail for single step parallel"
);

let error = result.err().unwrap();
assert!(
error.contains("at least 2 steps") || error.contains("only 1 step"),
"Should mention step count requirement"
);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Code Duplication
The module contains 4 functions with similar structure: test_validation_rejects_duplicate_step_names,test_validation_rejects_parallel_with_dependencies,test_validation_rejects_single_step_parallel,test_validation_rejects_unnamed_steps

Suppress

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces parallel execution for pipeline stages using tokio, handles Docker API rate limiting, and refactors Docker command execution. It includes new integration tests and performance benchmarks demonstrating improvements with parallel execution. The code changes involve modifications to Cargo.lock, Cargo.toml, README.md, docs/configuration.md, and the addition of new files: benches/parallel_benchmark.rs, examples/parallel-execution.yaml, and src/container.rs. I have provided comments to address potential issues related to validation and error handling in parallel stages.

}

// Validate parallel stages before running
validate_parallel_stages(&config)?;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

It's important to validate parallel stages before connecting to Docker to prevent resource allocation if the configuration is invalid. This fail-fast approach can save time and resources.

Comment on lines +957 to +958
run_stage_parallel(&docker, &stage.steps, verbose, &config.cache, &temp_dir)
.await?;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider adding a more robust error handling mechanism within run_stage_parallel to catch and propagate errors from individual tasks, ensuring that the pipeline fails gracefully and provides informative error messages.

}

// Validate parallel stages
validate_parallel_stages(&config)?;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Validating parallel stages configuration is done twice. Consider refactoring to avoid code duplication.

@mdrokz
Copy link
Contributor Author

mdrokz commented Oct 30, 2025

Hey! @0xReLogic can you review the PR ?, will resolve the merge conflicts soon

Copy link
Owner

@0xReLogic 0xReLogic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mdrokz
I've tested your PR thoroughly. The parallel execution implementation works correctly and all tests pass.

Test Results:

  • All 11 integration tests passed
  • Parallel execution verified with 3 concurrent tasks
  • Log ordering maintains definition order
  • Isolated temp directories prevent race conditions
  • Docker rate limiting works via Semaphore
  • Fail-fast behavior stops on first error
  • Sequential execution still works

Implementation Quality:

  • Clean async implementation with tokio::spawn and FuturesUnordered
  • Smart log buffering maintains order during parallel execution
  • Good resource isolation with per-step temp directories
  • Comprehensive validation catches invalid configurations
  • Extensive test coverage with 9 new integration tests
  • Well-organized refactoring into container.rs

Required Fixes Before Merge

1. Clippy Errors (Blocker)

Six clippy errors must be fixed:

cargo clippy -- -D warnings

Issues:

  • Dead code: image field in ContainerSetup is never read
  • Type complexity (3x): Arc<Mutex<Vec<Option<(String, Vec<LogEntry>)>>>> is too complex
  • Manual flatten: Loop can be simplified with .flatten()
  • Too many arguments: run_step_parallel has 9 parameters (max 7)

fixes:

  • Remove unused image field or add #[allow(dead_code)]
  • Create type alias: type LogBuffer = Arc<Mutex<Vec<Option<(String, Vec<LogEntry>)>>>>;
  • Use .flatten() in log printing loop
  • Group parameters into a struct like ParallelContext

2. Merge Conflicts

Confirmed conflict in src/main.rs with main branch. Recent main changes:

  • Performance monitoring (Timer struct and timing code)
  • Documentation updates

To resolve:

git checkout feat/implement_parallel_execution
git pull origin main
# Resolve conflicts in src/main.rs
# Keep both: your parallel execution + main's performance monitoring

3. Documentation

Add comments explaining:

  • Why FuturesUnordered instead of join_all
  • How isolated temp directories work with cache system
  • Semaphore usage for Docker rate limiting

Next Steps

  1. Fix clippy errors
  2. Resolve merge conflicts
  3. Verify tests still pass
  4. Reply when ready for re-review

The implementation is solid. Once these fixes are done, this is ready to merge.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement parallel execution for stages

2 participants