Skip to content

Commit

Permalink
Add corpus --dry-run and --task-sql-suffix and --repo-sql-suffix
Browse files Browse the repository at this point in the history
  • Loading branch information
Byron committed Jun 17, 2023
1 parent 0f973ac commit 4cef57d
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 56 deletions.
19 changes: 16 additions & 3 deletions gitoxide-core/src/corpus/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,25 @@ impl Engine {
.con
.query_row(
"INSERT INTO gitoxide_version (version) VALUES (?1) ON CONFLICT DO UPDATE SET version = version RETURNING id",
[&self.gitoxide_version],
[&self.state.gitoxide_version],
|r| r.get(0),
)?)
}
pub(crate) fn tasks_or_insert(&self) -> anyhow::Result<Vec<(Id, &'static super::Task)>> {
let mut out: Vec<_> = super::run::ALL.iter().map(|task| (0, task)).collect();
pub(crate) fn tasks_or_insert(
&self,
allowed_short_names: &[String],
) -> anyhow::Result<Vec<(Id, &'static super::Task)>> {
let mut out: Vec<_> = super::run::ALL
.iter()
.filter(|task| {
if allowed_short_names.is_empty() {
true
} else {
allowed_short_names.iter().any(|allowed| task.short_name == allowed)
}
})
.map(|task| (0, task))
.collect();
for (id, task) in &mut out {
*id = self.con.query_row(
"INSERT INTO task (short_name, description) VALUES (?1, ?2) ON CONFLICT DO UPDATE SET short_name = short_name, description = ?2 RETURNING id",
Expand Down
120 changes: 82 additions & 38 deletions gitoxide-core/src/corpus/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,47 @@ use std::path::{Path, PathBuf};
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};

pub type ProgressItem = gix::progress::DoOrDiscard<gix::progress::prodash::tree::Item>;

pub struct State {
pub progress: ProgressItem,
pub gitoxide_version: String,
pub trace_to_progress: bool,
pub reverse_trace_lines: bool,
}

impl Engine {
/// Open the corpus DB or create it.
pub fn open_or_create(
db: PathBuf,
gitoxide_version: String,
progress: corpus::Progress,
trace_to_progress: bool,
reverse_trace_lines: bool,
) -> anyhow::Result<Engine> {
pub fn open_or_create(db: PathBuf, state: State) -> anyhow::Result<Engine> {
let con = crate::corpus::db::create(db).context("Could not open or create database")?;
Ok(Engine {
progress,
con,
gitoxide_version,
trace_to_progress,
reverse_trace_lines,
})
Ok(Engine { con, state })
}

/// Run on the existing set of repositories we have already seen or obtain them from `path` if there is none yet.
pub fn run(&mut self, corpus_path: PathBuf, threads: Option<usize>) -> anyhow::Result<()> {
pub fn run(
&mut self,
corpus_path: PathBuf,
threads: Option<usize>,
dry_run: bool,
repo_sql_suffix: Option<String>,
allowed_task_names: Vec<String>,
) -> anyhow::Result<()> {
let tasks = self.tasks_or_insert(&allowed_task_names)?;
if tasks.is_empty() {
bail!("Cannot run without any task to perform on the repositories");
}
let (corpus_path, corpus_id) = self.prepare_corpus_path(corpus_path)?;
let gitoxide_id = self.gitoxide_version_id_or_insert()?;
let runner_id = self.runner_id_or_insert()?;
let repos = self.find_repos_or_insert(&corpus_path, corpus_id)?;
let tasks = self.tasks_or_insert()?;
self.perform_run(&corpus_path, gitoxide_id, runner_id, &tasks, repos, threads)
let repos = self.find_repos_or_insert(&corpus_path, corpus_id, repo_sql_suffix)?;
self.perform_run(&corpus_path, gitoxide_id, runner_id, &tasks, repos, threads, dry_run)
}

pub fn refresh(&mut self, corpus_path: PathBuf) -> anyhow::Result<()> {
let (corpus_path, corpus_id) = self.prepare_corpus_path(corpus_path)?;
let repos = self.refresh_repos(&corpus_path, corpus_id)?;
self.progress.set_name("refresh repos");
self.progress.info(format!(
self.state.progress.set_name("refresh repos");
self.state.progress.info(format!(
"Added or updated {} repositories under {corpus_path:?}",
repos.len()
));
Expand All @@ -52,6 +59,7 @@ impl Engine {
}

impl Engine {
#[allow(clippy::too_many_arguments)]
fn perform_run(
&mut self,
corpus_path: &Path,
Expand All @@ -60,24 +68,42 @@ impl Engine {
tasks: &[(db::Id, &'static Task)],
mut repos: Vec<db::Repo>,
threads: Option<usize>,
dry_run: bool,
) -> anyhow::Result<()> {
let start = Instant::now();
let task_progress = &mut self.progress;
let task_progress = &mut self.state.progress;
task_progress.set_name("run");
task_progress.init(Some(tasks.len()), gix::progress::count("tasks"));
let threads = gix::parallel::num_threads(threads);
let db_path = self.con.path().expect("opened from path on disk").to_owned();
for (task_id, task) in tasks {
let task_start = Instant::now();
let mut repo_progress = task_progress.add_child(format!("run '{}'", task.short_name));
repo_progress.init(Some(repos.len()), gix::progress::count("repos"));

if task.execute_exclusive || threads == 1 {
if task.execute_exclusive || threads == 1 || dry_run {
if dry_run {
task_progress.set_name("WOULD run");
for repo in &repos {
task_progress.info(format!(
"{}",
repo.path
.strip_prefix(corpus_path)
.expect("corpus contains repo")
.display()
));
task_progress.inc();
}
task_progress.info(format!("with {} tasks", tasks.len()));
for (_, task) in tasks {
task_progress.info(format!("task '{}' ({})", task.description, task.short_name))
}
continue;
}
repo_progress.init(Some(repos.len()), gix::progress::count("repos"));
let mut run_progress = repo_progress.add_child("set later");
let (_guard, current_id) = corpus::trace::override_thread_subscriber(
db_path.as_str(),
self.trace_to_progress.then(|| task_progress.add_child("trace")),
self.reverse_trace_lines,
self.state.trace_to_progress.then(|| task_progress.add_child("trace")),
self.state.reverse_trace_lines,
)?;

for repo in &repos {
Expand Down Expand Up @@ -180,13 +206,21 @@ impl Engine {
Ok((corpus_path, corpus_id))
}

fn find_repos(&mut self, corpus_path: &Path, corpus_id: db::Id) -> anyhow::Result<Vec<db::Repo>> {
self.progress.set_name("query db-repos");
self.progress.init(None, gix::progress::count("repos"));
fn find_repos(
&mut self,
corpus_path: &Path,
corpus_id: db::Id,
sql_suffix: Option<&str>,
) -> anyhow::Result<Vec<db::Repo>> {
self.state.progress.set_name("query db-repos");
self.state.progress.init(None, gix::progress::count("repos"));

Ok(self
.con
.prepare("SELECT id, rela_path, odb_size, num_objects, num_references FROM repository WHERE corpus = ?1")?
.prepare(&format!(
"SELECT id, rela_path, odb_size, num_objects, num_references FROM repository WHERE corpus = ?1 {}",
sql_suffix.unwrap_or_default()
))?
.query_map([corpus_id], |r| {
Ok(db::Repo {
id: r.get(0)?,
Expand All @@ -196,17 +230,17 @@ impl Engine {
num_references: r.get(4)?,
})
})?
.inspect(|_| self.progress.inc())
.inspect(|_| self.state.progress.inc())
.collect::<Result<_, _>>()?)
}

fn refresh_repos(&mut self, corpus_path: &Path, corpus_id: db::Id) -> anyhow::Result<Vec<db::Repo>> {
let start = Instant::now();
self.progress.set_name("refresh");
self.progress.init(None, gix::progress::count("repos"));
self.state.progress.set_name("refresh");
self.state.progress.init(None, gix::progress::count("repos"));

let repos = std::thread::scope({
let progress = &mut self.progress;
let progress = &mut self.state.progress;
let con = &mut self.con;
|scope| -> anyhow::Result<_> {
let threads = std::thread::available_parallelism()
Expand Down Expand Up @@ -278,13 +312,23 @@ impl Engine {
Ok(repos)
}

fn find_repos_or_insert(&mut self, corpus_path: &Path, corpus_id: db::Id) -> anyhow::Result<Vec<db::Repo>> {
fn find_repos_or_insert(
&mut self,
corpus_path: &Path,
corpus_id: db::Id,
sql_suffix: Option<String>,
) -> anyhow::Result<Vec<db::Repo>> {
let start = Instant::now();
let repos = self.find_repos(corpus_path, corpus_id)?;
let repos = self.find_repos(corpus_path, corpus_id, sql_suffix.as_deref())?;
if repos.is_empty() {
self.refresh_repos(corpus_path, corpus_id)
let res = self.refresh_repos(corpus_path, corpus_id);
if sql_suffix.is_some() {
self.find_repos(corpus_path, corpus_id, sql_suffix.as_deref())
} else {
res
}
} else {
self.progress.show_throughput(start);
self.state.progress.show_throughput(start);
Ok(repos)
}
}
Expand Down
8 changes: 2 additions & 6 deletions gitoxide-core/src/corpus/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
pub const PROGRESS_RANGE: std::ops::RangeInclusive<u8> = 0..=3;
pub(crate) type Progress = gix::progress::DoOrDiscard<gix::progress::prodash::tree::Item>;

pub struct Engine {
progress: Progress,
con: rusqlite::Connection,
gitoxide_version: String,
trace_to_progress: bool,
reverse_trace_lines: bool,
state: engine::State,
}

pub struct RunOutcome {
Expand All @@ -15,7 +11,7 @@ pub struct RunOutcome {
}

pub(crate) mod db;
pub(crate) mod engine;
pub mod engine;

/// Contains all information necessary to run a task.
pub(crate) struct Task {
Expand Down
6 changes: 3 additions & 3 deletions gitoxide-core/src/corpus/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ impl Task {
&self,
run: &mut Run,
repo: &Path,
progress: &mut corpus::Progress,
progress: &mut corpus::engine::ProgressItem,
threads: Option<usize>,
should_interrupt: &AtomicBool,
) {
Expand All @@ -26,7 +26,7 @@ pub(crate) trait Execute {
fn execute(
&self,
repo: &Path,
progress: &mut corpus::Progress,
progress: &mut corpus::engine::ProgressItem,
threads: Option<usize>,
should_interrupt: &AtomicBool,
) -> anyhow::Result<()>;
Expand All @@ -45,7 +45,7 @@ impl Execute for OpenRepo {
fn execute(
&self,
repo: &Path,
_progress: &mut corpus::Progress,
_progress: &mut corpus::engine::ProgressItem,
_threads: Option<usize>,
_should_interrupt: &AtomicBool,
) -> anyhow::Result<()> {
Expand Down
16 changes: 11 additions & 5 deletions src/plumbing/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,19 @@ pub fn main() -> Result<()> {
move |progress, _out, _err| {
let mut engine = core::corpus::Engine::open_or_create(
db,
env!("GITOXIDE_VERSION").into(),
progress,
trace,
reverse_trace_lines,
core::corpus::engine::State {
gitoxide_version: env!("GITOXIDE_VERSION").into(),
progress,
trace_to_progress: trace,
reverse_trace_lines,
},
)?;
match cmd {
crate::plumbing::options::corpus::SubCommands::Run => engine.run(path, thread_limit),
crate::plumbing::options::corpus::SubCommands::Run {
dry_run,
repo_sql_suffix,
include_task,
} => engine.run(path, thread_limit, dry_run, repo_sql_suffix, include_task),
crate::plumbing::options::corpus::SubCommands::Refresh => engine.refresh(path),
}
},
Expand Down
18 changes: 17 additions & 1 deletion src/plumbing/options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,23 @@ pub mod corpus {
#[derive(Debug, clap::Subcommand)]
pub enum SubCommands {
/// Perform a corpus run on all registered repositories.
Run,
Run {
/// Don't run any task, but print all repos that would be traversed once.
///
/// Note that this will refresh repositories if necessary and store them in the database, it just won't run tasks.
#[clap(long, short = 'n')]
dry_run: bool,

/// The SQL that will be appended to the actual select statement for repositories to apply additional filtering, like `LIMIT 10`.
///
/// The string must be trusted even though the engine will only execute a single statement.
#[clap(long, short = 'r')]
repo_sql_suffix: Option<String>,

/// The short_names of the tasks to include when running.
#[clap(long, short = 't')]
include_task: Vec<String>,
},
/// Re-read all repositories under the corpus directory, and add or update them.
Refresh,
}
Expand Down

0 comments on commit 4cef57d

Please sign in to comment.