Skip to content

improve performance: using stream db query in encode and tokio spawn in query #408

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 71 additions & 40 deletions ceres/src/pack/import_repo.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
use std::{
collections::{HashMap, HashSet},
str::FromStr,
sync::{atomic::{AtomicUsize, Ordering}, mpsc::Receiver},
sync::{
atomic::{AtomicUsize, Ordering},
mpsc::Receiver,
},
};

use async_trait::async_trait;
use futures::{future::join_all, StreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;

use callisto::raw_blob;
use common::errors::MegaError;
use jupiter::{
context::Context,
storage::{batch_query_by_columns, GitStorageProvider},
};
use jupiter::{context::Context, storage::GitStorageProvider};
use mercury::{
errors::GitError,
internal::{
Expand Down Expand Up @@ -50,14 +51,21 @@ impl PackHandler for ImportRepo {

async fn save_entry(&self, receiver: Receiver<Entry>) -> Result<(), GitError> {
let storage = self.context.services.git_db_storage.clone();
let mut entry_list = Vec::new();
let mut entry_list = vec![];
let mut join_tasks = vec![];
for entry in receiver {
entry_list.push(entry);
if entry_list.len() >= 1000 {
storage.save_entry(&self.repo, entry_list).await.unwrap();
entry_list = Vec::new();
let stg_clone = storage.clone();
let repo_clone = self.repo.clone();
let handle = tokio::spawn(async move {
stg_clone.save_entry(&repo_clone, entry_list).await.unwrap();
});
join_tasks.push(handle);
entry_list = vec![];
}
}
join_all(join_tasks).await;
storage.save_entry(&self.repo, entry_list).await.unwrap();
Ok(())
}
Expand All @@ -74,43 +82,65 @@ impl PackHandler for ImportRepo {

let repo = self.repo.clone();
tokio::spawn(async move {
let commits = storage.get_commits_by_repo_id(&repo).await.unwrap();
for m in commits.into_iter() {
let c: Commit = m.into();
let entry: Entry = c.into();
entry_tx.send(entry).await.unwrap();
let mut commit_stream = storage.get_commits_by_repo_id(&repo).await.unwrap();

while let Some(model) = commit_stream.next().await {
match model {
Ok(m) => {
let c: Commit = m.into();
let entry = c.into();
entry_tx.send(entry).await.unwrap();
}
Err(err) => eprintln!("Error: {:?}", err),
}
}
tracing::info!("send commits end");

let trees: Vec<callisto::git_tree::Model> =
storage.get_trees_by_repo_id(&repo).await.unwrap();
for m in trees.into_iter() {
let c: Tree = m.into();
let entry: Entry = c.into();
entry_tx.send(entry).await.unwrap();
let mut tree_stream = storage.get_trees_by_repo_id(&repo).await.unwrap();
while let Some(model) = tree_stream.next().await {
match model {
Ok(m) => {
let t: Tree = m.into();
let entry = t.into();
entry_tx.send(entry).await.unwrap();
}
Err(err) => eprintln!("Error: {:?}", err),
}
}
tracing::info!("send trees end");

let bids: Vec<String> = storage
.get_blobs_by_repo_id(&repo)
.await
.unwrap()
.into_iter()
.map(|b| b.blob_id)
.collect();
let raw_blobs = batch_query_by_columns::<raw_blob::Entity, raw_blob::Column>(
storage.get_connection(),
raw_blob::Column::Sha1,
bids,
None,
None,
)
.await
.unwrap();
for m in raw_blobs {
// todo handle storage type
let c: Blob = m.into();
let entry: Entry = c.into();
entry_tx.send(entry).await.unwrap();
let mut bid_stream = storage.get_blobs_by_repo_id(&repo).await.unwrap();
let mut bids = vec![];
while let Some(model) = bid_stream.next().await {
match model {
Ok(m) => bids.push(m.blob_id),
Err(err) => eprintln!("Error: {:?}", err),
}
}

let mut blob_handler = vec![];
for chunk in bids.chunks(10000) {
let stg_clone = storage.clone();
let sender_clone = entry_tx.clone();
let chunk_clone = chunk.to_vec();
let handler = tokio::spawn(async move {
let mut blob_stream = stg_clone.get_raw_blobs(chunk_clone).await.unwrap();
while let Some(model) = blob_stream.next().await {
match model {
Ok(m) => {
// todo handle storage type
let b: Blob = m.into();
let entry: Entry = b.into();
sender_clone.send(entry).await.unwrap();
}
Err(err) => eprintln!("Error: {:?}", err),
}
}
});
blob_handler.push(handler);
}
join_all(blob_handler).await;
tracing::info!("send blobs end");

let tags = storage.get_tags_by_repo_id(&repo).await.unwrap();
for m in tags.into_iter() {
Expand All @@ -119,6 +149,7 @@ impl PackHandler for ImportRepo {
entry_tx.send(entry).await.unwrap();
}
drop(entry_tx);
tracing::info!("sending all object end...");
});

Ok(ReceiverStream::new(stream_rx))
Expand Down
67 changes: 39 additions & 28 deletions jupiter/src/storage/git_db_storage.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::sync::{Arc, Mutex};

use async_trait::async_trait;
use futures::Stream;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use sea_orm::{
ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, QueryFilter,
Set,
ActiveModelTrait, ColumnTrait, DatabaseConnection, DbErr, EntityTrait, IntoActiveModel,
QueryFilter, Set,
};
use sea_orm::{PaginatorTrait, QueryOrder};

use callisto::{git_blob, git_commit, git_repo, git_tag, git_tree, import_refs};
use callisto::{git_blob, git_commit, git_repo, git_tag, git_tree, import_refs, raw_blob};
use common::config::StorageConfig;
use common::errors::MegaError;
use mercury::internal::object::GitObjectModel;
Expand All @@ -22,7 +23,7 @@ use crate::{
storage::GitStorageProvider,
};

use super::batch_save_model;
use crate::storage::batch_save_model;

#[derive(Clone)]
pub struct GitDbStorage {
Expand Down Expand Up @@ -232,24 +233,25 @@ impl GitDbStorage {
.unwrap())
}

pub async fn get_commits_by_repo_id(
&self,
repo: &Repo,
) -> Result<Vec<git_commit::Model>, MegaError> {
Ok(git_commit::Entity::find()
pub async fn get_commits_by_repo_id<'a>(
&'a self,
repo: &'a Repo,
) -> Result<impl Stream<Item = Result<git_commit::Model, DbErr>> + 'a + Send, MegaError> {
let stream = git_commit::Entity::find()
.filter(git_commit::Column::RepoId.eq(repo.repo_id))
.all(self.get_connection())
.stream(self.get_connection())
.await
.unwrap())
.unwrap();
Ok(stream)
}

pub async fn get_trees_by_repo_id(
&self,
repo: &Repo,
) -> Result<Vec<git_tree::Model>, MegaError> {
pub async fn get_trees_by_repo_id<'a>(
&'a self,
repo: &'a Repo,
) -> Result<impl Stream<Item = Result<git_tree::Model, DbErr>> + 'a + Send, MegaError> {
Ok(git_tree::Entity::find()
.filter(git_tree::Column::RepoId.eq(repo.repo_id))
.all(self.get_connection())
.stream(self.get_connection())
.await
.unwrap())
}
Expand Down Expand Up @@ -280,13 +282,24 @@ impl GitDbStorage {
.unwrap())
}

pub async fn get_blobs_by_repo_id(
&self,
repo: &Repo,
) -> Result<Vec<git_blob::Model>, MegaError> {
pub async fn get_blobs_by_repo_id<'a>(
&'a self,
repo: &'a Repo,
) -> Result<impl Stream<Item = Result<git_blob::Model, DbErr>> + 'a + Send, MegaError> {
Ok(git_blob::Entity::find()
.filter(git_blob::Column::RepoId.eq(repo.repo_id))
.all(self.get_connection())
.stream(self.get_connection())
.await
.unwrap())
}

pub async fn get_raw_blobs(
&self,
hashes: Vec<String>,
) -> Result<impl Stream<Item = Result<raw_blob::Model, DbErr>> + '_ + Send, MegaError> {
Ok(raw_blob::Entity::find()
.filter(raw_blob::Column::Sha1.is_in(hashes))
.stream(self.get_connection())
.await
.unwrap())
}
Expand All @@ -312,21 +325,19 @@ impl GitDbStorage {
.await
.unwrap();

let bids: Vec<String> = self
.get_blobs_by_repo_id(repo)
let b_count = git_blob::Entity::find()
.filter(git_blob::Column::RepoId.eq(repo.repo_id))
.count(self.get_connection())
.await
.unwrap()
.into_iter()
.map(|b| b.blob_id)
.collect();
.unwrap();

let tag_count = git_tag::Entity::find()
.filter(git_tag::Column::RepoId.eq(repo.repo_id))
.count(self.get_connection())
.await
.unwrap();

(c_count + t_count + bids.len() as u64 + tag_count)
(c_count + t_count + b_count + tag_count)
.try_into()
.unwrap()
}
Expand Down
7 changes: 0 additions & 7 deletions jupiter/src/storage/mega_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,6 @@ impl MegaStorage {
.unwrap())
}

pub async fn get_commits(&self) -> Result<Vec<mega_commit::Model>, MegaError> {
Ok(mega_commit::Entity::find()
.all(self.get_connection())
.await
.unwrap())
}

pub async fn get_tree_by_hash(
&self,
hash: &str,
Expand Down
32 changes: 1 addition & 31 deletions jupiter/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ pub mod mega_storage;
use async_trait::async_trait;

use common::errors::MegaError;
use sea_orm::{
sea_query::OnConflict, ActiveModelTrait, ColumnTrait, ConnectionTrait, DatabaseConnection,
EntityTrait, QueryFilter,
};
use sea_orm::{sea_query::OnConflict, ActiveModelTrait, ConnectionTrait, EntityTrait};
use venus::{
import_repo::import_refs::{RefCommand, Refs},
import_repo::repo::Repo,
Expand Down Expand Up @@ -96,30 +93,3 @@ where
futures::future::join_all(results).await;
Ok(())
}

#[allow(unused)]
pub async fn batch_query_by_columns<T, C>(
connection: &DatabaseConnection,
column: C,
ids: Vec<String>,
filter_column: Option<C>,
value: Option<String>,
) -> Result<Vec<T::Model>, MegaError>
where
T: EntityTrait,
C: ColumnTrait,
{
let mut result = Vec::<T::Model>::new();
for chunk in ids.chunks(1000) {
let query_builder = T::find().filter(column.is_in(chunk));

// Conditionally add the filter based on the value parameter
let query_builder = match value {
Some(ref v) => query_builder.filter(filter_column.unwrap().eq(v)),
None => query_builder,
};

result.extend(query_builder.all(connection).await?);
}
Ok(result)
}
Loading
Loading