Skip to content

Commit

Permalink
Merge branch 'gix-corpus'
Browse files Browse the repository at this point in the history
  • Loading branch information
Byron committed Jun 17, 2023
2 parents aa16c8c + 8817c24 commit 5861afb
Show file tree
Hide file tree
Showing 20 changed files with 872 additions and 629 deletions.
1 change: 0 additions & 1 deletion .cargo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
rustflags = [
# Rustc lints
# "-W", "warning_name"
"-A", "clippy::let_unit_value", # in 'small' builds this triggers as the `span!` macro yields `let x = ()`. No way to prevent it in macro apparently.

# Clippy lints
"-W", "clippy::cloned_instead_of_copied",
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion gitoxide-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ estimate-hours = ["dep:itertools", "dep:fs-err", "dep:crossbeam-channel", "dep:s
query = ["dep:rusqlite"]
## Run algorithms on a corpus of repositories and store their results for later comparison and intelligence gathering.
## *Note that* `organize` we need for finding git repositories fast.
corpus = [ "dep:rusqlite", "dep:sysinfo", "organize", "dep:crossbeam-channel", "dep:serde_json", "dep:tracing-forest", "dep:tracing-subscriber", "dep:tracing" ]
corpus = [ "dep:rusqlite", "dep:sysinfo", "organize", "dep:crossbeam-channel", "dep:serde_json", "dep:tracing-forest", "dep:tracing-subscriber", "dep:tracing", "dep:parking_lot" ]

#! ### Mutually Exclusive Networking
#! If both are set, _blocking-client_ will take precedence, allowing `--all-features` to be used.
Expand Down Expand Up @@ -72,6 +72,7 @@ smallvec = { version = "1.10.0", optional = true }
rusqlite = { version = "0.29.0", optional = true, features = ["bundled"] }

# for 'corpus'
parking_lot = { version = "0.12.1", optional = true }
sysinfo = { version = "0.29.2", optional = true, default-features = false }
serde_json = { version = "1.0.65", optional = true }
tracing-forest = { version = "0.1.5", features = ["serde"], optional = true }
Expand Down
19 changes: 16 additions & 3 deletions gitoxide-core/src/corpus/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,25 @@ impl Engine {
.con
.query_row(
"INSERT INTO gitoxide_version (version) VALUES (?1) ON CONFLICT DO UPDATE SET version = version RETURNING id",
[&self.gitoxide_version],
[&self.state.gitoxide_version],
|r| r.get(0),
)?)
}
pub(crate) fn tasks_or_insert(&self) -> anyhow::Result<Vec<(Id, &'static super::Task)>> {
let mut out: Vec<_> = super::run::ALL.iter().map(|task| (0, task)).collect();
pub(crate) fn tasks_or_insert(
&self,
allowed_short_names: &[String],
) -> anyhow::Result<Vec<(Id, &'static super::Task)>> {
let mut out: Vec<_> = super::run::ALL
.iter()
.filter(|task| {
if allowed_short_names.is_empty() {
true
} else {
allowed_short_names.iter().any(|allowed| task.short_name == allowed)
}
})
.map(|task| (0, task))
.collect();
for (id, task) in &mut out {
*id = self.con.query_row(
"INSERT INTO task (short_name, description) VALUES (?1, ?2) ON CONFLICT DO UPDATE SET short_name = short_name, description = ?2 RETURNING id",
Expand Down
135 changes: 97 additions & 38 deletions gitoxide-core/src/corpus/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,47 @@ use std::path::{Path, PathBuf};
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};

pub type ProgressItem = gix::progress::DoOrDiscard<gix::progress::prodash::tree::Item>;

pub struct State {
pub progress: ProgressItem,
pub gitoxide_version: String,
pub trace_to_progress: bool,
pub reverse_trace_lines: bool,
}

impl Engine {
/// Open the corpus DB or create it.
pub fn open_or_create(db: PathBuf, gitoxide_version: String, progress: corpus::Progress) -> anyhow::Result<Engine> {
pub fn open_or_create(db: PathBuf, state: State) -> anyhow::Result<Engine> {
let con = crate::corpus::db::create(db).context("Could not open or create database")?;
Ok(Engine {
progress,
con,
gitoxide_version,
})
Ok(Engine { con, state })
}

/// Run on the existing set of repositories we have already seen or obtain them from `path` if there is none yet.
pub fn run(&mut self, corpus_path: PathBuf, threads: Option<usize>) -> anyhow::Result<()> {
pub fn run(
&mut self,
corpus_path: PathBuf,
threads: Option<usize>,
dry_run: bool,
repo_sql_suffix: Option<String>,
allowed_task_names: Vec<String>,
) -> anyhow::Result<()> {
let tasks = self.tasks_or_insert(&allowed_task_names)?;
if tasks.is_empty() {
bail!("Cannot run without any task to perform on the repositories");
}
let (corpus_path, corpus_id) = self.prepare_corpus_path(corpus_path)?;
let gitoxide_id = self.gitoxide_version_id_or_insert()?;
let runner_id = self.runner_id_or_insert()?;
let repos = self.find_repos_or_insert(&corpus_path, corpus_id)?;
let tasks = self.tasks_or_insert()?;
self.perform_run(&corpus_path, gitoxide_id, runner_id, &tasks, repos, threads)
let repos = self.find_repos_or_insert(&corpus_path, corpus_id, repo_sql_suffix)?;
self.perform_run(&corpus_path, gitoxide_id, runner_id, &tasks, repos, threads, dry_run)
}

pub fn refresh(&mut self, corpus_path: PathBuf) -> anyhow::Result<()> {
let (corpus_path, corpus_id) = self.prepare_corpus_path(corpus_path)?;
let repos = self.refresh_repos(&corpus_path, corpus_id)?;
self.progress.set_name("refresh repos");
self.progress.info(format!(
self.state.progress.set_name("refresh repos");
self.state.progress.info(format!(
"Added or updated {} repositories under {corpus_path:?}",
repos.len()
));
Expand All @@ -44,6 +59,7 @@ impl Engine {
}

impl Engine {
#[allow(clippy::too_many_arguments)]
fn perform_run(
&mut self,
corpus_path: &Path,
Expand All @@ -52,21 +68,42 @@ impl Engine {
tasks: &[(db::Id, &'static Task)],
mut repos: Vec<db::Repo>,
threads: Option<usize>,
dry_run: bool,
) -> anyhow::Result<()> {
let start = Instant::now();
let task_progress = &mut self.progress;
task_progress.set_name("run");
task_progress.init(Some(tasks.len()), gix::progress::count("tasks"));
let repo_progress = &mut self.state.progress;
let threads = gix::parallel::num_threads(threads);
let db_path = self.con.path().expect("opened from path on disk").to_owned();
for (task_id, task) in tasks {
'tasks_loop: for (task_id, task) in tasks {
let task_start = Instant::now();
let mut repo_progress = task_progress.add_child(format!("run '{}'", task.short_name));
let task_info = format!("run '{}'", task.short_name);
repo_progress.set_name(task_info.clone());
repo_progress.init(Some(repos.len()), gix::progress::count("repos"));

if task.execute_exclusive || threads == 1 {
if task.execute_exclusive || threads == 1 || dry_run {
if dry_run {
repo_progress.set_name("WOULD run");
for repo in &repos {
repo_progress.info(format!(
"{}",
repo.path
.strip_prefix(corpus_path)
.expect("corpus contains repo")
.display()
));
repo_progress.inc();
}
repo_progress.info(format!("with {} tasks", tasks.len()));
for (_, task) in tasks {
repo_progress.info(format!("task '{}' ({})", task.description, task.short_name))
}
break 'tasks_loop;
}
let mut run_progress = repo_progress.add_child("set later");
let (_guard, current_id) = corpus::trace::override_thread_subscriber(db_path.as_str())?;
let (_guard, current_id) = corpus::trace::override_thread_subscriber(
db_path.as_str(),
self.state.trace_to_progress.then(|| repo_progress.add_child("trace")),
self.state.reverse_trace_lines,
)?;

for repo in &repos {
if gix::interrupt::is_triggered() {
Expand All @@ -80,7 +117,7 @@ impl Engine {
.display()
));

// TODO: wait for new release to be able to provide run_id via span attributes
// TODO: wait for new release of `tracing-forest` to be able to provide run_id via span attributes
let mut run = Self::insert_run(&self.con, gitoxide_id, runner_id, *task_id, repo.id)?;
current_id.store(run.id, Ordering::SeqCst);
tracing::info_span!("run", run_id = run.id).in_scope(|| {
Expand All @@ -98,17 +135,21 @@ impl Engine {
repo_progress.show_throughput(task_start);
} else {
let counter = repo_progress.counter();
let repo_progress = gix::threading::OwnShared::new(gix::threading::Mutable::new(repo_progress));
let repo_progress = gix::threading::OwnShared::new(gix::threading::Mutable::new(
repo_progress.add_child("will be changed"),
));
gix::parallel::in_parallel_with_slice(
&mut repos,
Some(threads),
{
let shared_repo_progress = repo_progress.clone();
let db_path = db_path.clone();
move |tid| {
let mut progress = gix::threading::lock(&shared_repo_progress);
(
corpus::trace::override_thread_subscriber(db_path.as_str()),
gix::threading::lock(&shared_repo_progress).add_child(format!("{tid}")),
// threaded printing is usually spammy, and lines interleave so it's useless.
corpus::trace::override_thread_subscriber(db_path.as_str(), None, false),
progress.add_child(format!("{tid}")),
rusqlite::Connection::open(&db_path),
)
}
Expand Down Expand Up @@ -154,9 +195,9 @@ impl Engine {
gix::threading::lock(&repo_progress).show_throughput(task_start);
}

task_progress.inc();
repo_progress.inc();
}
task_progress.show_throughput(start);
repo_progress.show_throughput(start);
Ok(())
}

Expand All @@ -166,13 +207,21 @@ impl Engine {
Ok((corpus_path, corpus_id))
}

fn find_repos(&mut self, corpus_path: &Path, corpus_id: db::Id) -> anyhow::Result<Vec<db::Repo>> {
self.progress.set_name("query db-repos");
self.progress.init(None, gix::progress::count("repos"));
fn find_repos(
&mut self,
corpus_path: &Path,
corpus_id: db::Id,
sql_suffix: Option<&str>,
) -> anyhow::Result<Vec<db::Repo>> {
self.state.progress.set_name("query db-repos");
self.state.progress.init(None, gix::progress::count("repos"));

Ok(self
.con
.prepare("SELECT id, rela_path, odb_size, num_objects, num_references FROM repository WHERE corpus = ?1")?
.prepare(&format!(
"SELECT id, rela_path, odb_size, num_objects, num_references FROM repository WHERE corpus = ?1 {}",
sql_suffix.unwrap_or_default()
))?
.query_map([corpus_id], |r| {
Ok(db::Repo {
id: r.get(0)?,
Expand All @@ -182,17 +231,17 @@ impl Engine {
num_references: r.get(4)?,
})
})?
.inspect(|_| self.progress.inc())
.inspect(|_| self.state.progress.inc())
.collect::<Result<_, _>>()?)
}

fn refresh_repos(&mut self, corpus_path: &Path, corpus_id: db::Id) -> anyhow::Result<Vec<db::Repo>> {
let start = Instant::now();
self.progress.set_name("refresh");
self.progress.init(None, gix::progress::count("repos"));
self.state.progress.set_name("refresh");
self.state.progress.init(None, gix::progress::count("repos"));

let repos = std::thread::scope({
let progress = &mut self.progress;
let progress = &mut self.state.progress;
let con = &mut self.con;
|scope| -> anyhow::Result<_> {
let threads = std::thread::available_parallelism()
Expand Down Expand Up @@ -264,13 +313,23 @@ impl Engine {
Ok(repos)
}

fn find_repos_or_insert(&mut self, corpus_path: &Path, corpus_id: db::Id) -> anyhow::Result<Vec<db::Repo>> {
fn find_repos_or_insert(
&mut self,
corpus_path: &Path,
corpus_id: db::Id,
sql_suffix: Option<String>,
) -> anyhow::Result<Vec<db::Repo>> {
let start = Instant::now();
let repos = self.find_repos(corpus_path, corpus_id)?;
let repos = self.find_repos(corpus_path, corpus_id, sql_suffix.as_deref())?;
if repos.is_empty() {
self.refresh_repos(corpus_path, corpus_id)
let res = self.refresh_repos(corpus_path, corpus_id);
if sql_suffix.is_some() {
self.find_repos(corpus_path, corpus_id, sql_suffix.as_deref())
} else {
res
}
} else {
self.progress.show_throughput(start);
self.state.progress.show_throughput(start);
Ok(repos)
}
}
Expand Down
Loading

0 comments on commit 5861afb

Please sign in to comment.