Skip to content

Commit

Permalink
run tasks in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
Byron committed Jun 15, 2023
1 parent 36a3229 commit cfd8e88
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 40 deletions.
13 changes: 7 additions & 6 deletions gitoxide-core/src/corpus/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub fn create(path: impl AsRef<std::path::Path>) -> anyhow::Result<rusqlite::Con
},
_ => {}
}
con.execute_batch("PRAGMA synchronous = OFF;")?;
con.execute_batch("PRAGMA synchronous = OFF; PRAGMA journal_mode = WAL; PRAGMA wal_checkpoint(FULL); ")?;
con.execute_batch(
r#"
CREATE TABLE if not exists runner(
Expand Down Expand Up @@ -115,7 +115,8 @@ pub fn create(path: impl AsRef<std::path::Path>) -> anyhow::Result<rusqlite::Con
r#"
CREATE TABLE if not exists task(
id integer PRIMARY KEY,
name text UNIQUE -- the unique name of the task, which is considered its id and which is immutable once run once
short_name UNIQUE, -- the unique and permanent identifier for the task
description text UNIQUE -- the descriptive name of the task, it can be changed at will
)
"#,
)?;
Expand All @@ -142,7 +143,7 @@ pub fn create(path: impl AsRef<std::path::Path>) -> anyhow::Result<rusqlite::Con
}

/// Utilities
impl<P> Engine<P> {
impl Engine {
pub(crate) fn runner_id_or_insert(&self) -> anyhow::Result<Id> {
let sys =
sysinfo::System::new_with_specifics(RefreshKind::new().with_cpu(CpuRefreshKind::new().with_frequency()));
Expand Down Expand Up @@ -177,10 +178,10 @@ impl<P> Engine<P> {
}
pub(crate) fn tasks_or_insert(&self) -> anyhow::Result<Vec<(Id, &'static super::Task)>> {
let mut out: Vec<_> = super::run::ALL.iter().map(|task| (0, task)).collect();
for (id, task) in out.iter_mut() {
for (id, task) in &mut out {
*id = self.con.query_row(
"INSERT INTO task (name) VALUES (?1) ON CONFLICT DO UPDATE SET name = name RETURNING id",
[task.name],
"INSERT INTO task (short_name, description) VALUES (?1, ?2) ON CONFLICT DO UPDATE SET short_name = short_name, description = ?2 RETURNING id",
[task.short_name, task.description],
|r| r.get(0),
)?;
}
Expand Down
98 changes: 76 additions & 22 deletions gitoxide-core/src/corpus/engine.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
use super::db;
use crate::corpus;
use crate::corpus::{Engine, Task};
use crate::organize::find_git_repository_workdirs;
use anyhow::{bail, Context};
use bytesize::ByteSize;
use gix::Progress;
use rusqlite::params;
use std::path::{Path, PathBuf};
use std::time::Instant;
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};

impl<P> Engine<P>
where
P: gix::Progress,
{
impl Engine {
/// Open the corpus DB or create it.
pub fn open_or_create(db: PathBuf, gitoxide_version: String, progress: P) -> anyhow::Result<Engine<P>> {
pub fn open_or_create(db: PathBuf, gitoxide_version: String, progress: corpus::Progress) -> anyhow::Result<Engine> {
let con = crate::corpus::db::create(db).context("Could not open or create database")?;
Ok(Engine {
progress,
Expand All @@ -23,13 +22,13 @@ where
}

/// Run on the existing set of repositories we have already seen or obtain them from `path` if there is none yet.
pub fn run(&mut self, corpus_path: PathBuf) -> anyhow::Result<()> {
pub fn run(&mut self, corpus_path: PathBuf, threads: Option<usize>) -> anyhow::Result<()> {
let (corpus_path, corpus_id) = self.prepare_corpus_path(corpus_path)?;
let gitoxide_id = self.gitoxide_version_id_or_insert()?;
let runner_id = self.runner_id_or_insert()?;
let repos = self.find_repos_or_insert(&corpus_path, corpus_id)?;
let tasks = self.tasks_or_insert()?;
self.perform_run(gitoxide_id, runner_id, &tasks, &repos)
self.perform_run(&corpus_path, gitoxide_id, runner_id, &tasks, repos, threads)
}

pub fn refresh(&mut self, corpus_path: PathBuf) -> anyhow::Result<()> {
Expand All @@ -44,42 +43,97 @@ where
}
}

impl<P> Engine<P>
where
P: gix::Progress,
{
impl Engine {
fn perform_run(
&mut self,
corpus_path: &Path,
gitoxide_id: db::Id,
runner_id: db::Id,
tasks: &[(db::Id, &'static Task)],
repos: &[db::Repo],
mut repos: Vec<db::Repo>,
threads: Option<usize>,
) -> anyhow::Result<()> {
let start = Instant::now();
let task_progress = &mut self.progress;
task_progress.set_name("run");
task_progress.init(Some(tasks.len()), gix::progress::count("tasks"));
let threads = gix::parallel::num_threads(threads);
for (task_id, task) in tasks {
let task_start = Instant::now();
let mut run_progress = task_progress.add_child(format!("run '{}'", task.name));
run_progress.init(Some(repos.len()), gix::progress::count("repos"));
let mut repo_progress = task_progress.add_child(format!("run '{}'", task.short_name));
repo_progress.init(Some(repos.len()), gix::progress::count("repos"));

if task.execute_exclusive {
for repo in repos {
if task.execute_exclusive || threads == 1 {
let mut run_progress = repo_progress.add_child("set later");
for repo in &repos {
if gix::interrupt::is_triggered() {
bail!("interrupted by user");
}
run_progress.set_name(format!(
"{}",
repo.path
.strip_prefix(corpus_path)
.expect("corpus contains repo")
.display()
));
let mut run = Self::insert_run(&self.con, gitoxide_id, runner_id, *task_id, repo.id)?;
task.perform(&mut run, &repo.path);
task.perform(
&mut run,
&repo.path,
&mut run_progress,
Some(threads),
&gix::interrupt::IS_INTERRUPTED,
);
Self::update_run(&self.con, run)?;
run_progress.inc();
repo_progress.inc();
}
repo_progress.show_throughput(task_start);
} else {
// gix::parallel::in_parallel_with_slice()
todo!("shared")
let counter = repo_progress.counter();
let repo_progress = gix::threading::OwnShared::new(gix::threading::Mutable::new(repo_progress));
gix::parallel::in_parallel_with_slice(
&mut repos,
Some(threads),
{
let shared_repo_progress = repo_progress.clone();
let path = self.con.path().expect("opened from path on disk").to_owned();
move |tid| {
(
gix::threading::lock(&shared_repo_progress).add_child(format!("{tid}")),
rusqlite::Connection::open(&path),
)
}
},
|repo, (progress, con), _threads_left, should_interrupt| -> anyhow::Result<()> {
progress.set_name(format!(
"{}",
repo.path
.strip_prefix(corpus_path)
.expect("corpus contains repo")
.display()
));
let con = match con {
Ok(con) => con,
Err(err) => {
progress.fail(format!("{err:#?}"));
should_interrupt.store(true, Ordering::SeqCst);
return Ok(());
}
};
let mut run = Self::insert_run(con, gitoxide_id, runner_id, *task_id, repo.id)?;
task.perform(&mut run, &repo.path, progress, Some(1), should_interrupt);
Self::update_run(con, run)?;
if let Some(counter) = counter.as_ref() {
counter.fetch_add(1, Ordering::SeqCst);
}
Ok(())
},
|| (!gix::interrupt::is_triggered()).then(|| Duration::from_millis(100)),
std::convert::identity,
)?;
gix::threading::lock(&repo_progress).show_throughput(task_start);
}

run_progress.show_throughput(task_start);
task_progress.inc();
}
task_progress.show_throughput(start);
Expand Down
47 changes: 36 additions & 11 deletions gitoxide-core/src/corpus/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
pub const PROGRESS_RANGE: std::ops::RangeInclusive<u8> = 0..=3;
pub(crate) type Progress = gix::progress::DoOrDiscard<gix::progress::prodash::tree::Item>;

pub struct Engine<P> {
progress: P,
pub struct Engine {
progress: Progress,
con: rusqlite::Connection,
gitoxide_version: String,
}
Expand All @@ -20,7 +21,9 @@ pub(crate) struct Task {
///
/// However, if it is changed it will be treated as new kind of task entirely and won't compare
/// to previous runs of the task.
name: &'static str,
short_name: &'static str,
/// Explain in greater detail what the task is doing.
description: &'static str,
/// `true` if the task cannot be run in parallel as it needs all resources by itself.
execute_exclusive: bool,
/// The actual implementation
Expand All @@ -36,13 +39,22 @@ pub(crate) struct Run {
}

pub(crate) mod run {
use crate::corpus;
use crate::corpus::{Run, Task};
use std::path::Path;
use std::sync::atomic::AtomicBool;

impl Task {
pub fn perform(&self, run: &mut Run, repo: &Path) {
pub fn perform(
&self,
run: &mut Run,
repo: &Path,
progress: &mut corpus::Progress,
threads: Option<usize>,
should_interrupt: &AtomicBool,
) {
let start = std::time::Instant::now();
if let Err(err) = self.execute.execute(repo) {
if let Err(err) = self.execute.execute(repo, progress, threads, should_interrupt) {
run.error = Some(format!("{err:#?}"))
}
run.duration = start.elapsed();
Expand All @@ -52,20 +64,33 @@ pub(crate) mod run {
/// Note that once runs have been recorded, the implementation must not change anymore to keep it comparable.
/// If changes have be done, rather change the name of the owning task to start a new kind of task.
pub(crate) trait Execute {
fn execute(&self, repo: &Path) -> anyhow::Result<()>;
fn execute(
&self,
repo: &Path,
progress: &mut corpus::Progress,
threads: Option<usize>,
should_interrupt: &AtomicBool,
) -> anyhow::Result<()>;
}

pub(crate) static ALL: &'static [Task] = &[Task {
name: "open repository (isolated)",
execute_exclusive: true, // TODO: false
pub(crate) static ALL: &[Task] = &[Task {
short_name: "OPNR",
description: "open repository (isolated)",
execute_exclusive: false,
execute: &OpenRepo,
}];

struct OpenRepo;

impl Execute for OpenRepo {
fn execute(&self, repo: &Path) -> anyhow::Result<()> {
gix::open_opts(&repo, gix::open::Options::isolated())?;
fn execute(
&self,
repo: &Path,
_progress: &mut corpus::Progress,
_threads: Option<usize>,
_should_interrupt: &AtomicBool,
) -> anyhow::Result<()> {
gix::open_opts(repo, gix::open::Options::isolated())?;
Ok(())
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/plumbing/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ pub fn main() -> Result<()> {
move |progress, _out, _err| {
let mut engine = core::corpus::Engine::open_or_create(db, env!("GITOXIDE_VERSION").into(), progress)?;
match cmd {
crate::plumbing::options::corpus::SubCommands::Run => engine.run(path),
crate::plumbing::options::corpus::SubCommands::Run => engine.run(path, thread_limit),
crate::plumbing::options::corpus::SubCommands::Refresh => engine.refresh(path),
}
},
Expand Down

0 comments on commit cfd8e88

Please sign in to comment.