Skip to content

Commit

Permalink
fix clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
somtochiama committed Oct 25, 2024
1 parent 108477e commit bdeee40
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 29 deletions.
6 changes: 3 additions & 3 deletions crates/corro-types/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub struct MatcherCreated {

const SUB_EVENT_CHANNEL_CAP: usize = 512;

impl Manager<MatcherHandle> for SubsManager {
impl Manager<MatcherHandle, IndexSet<Vec<u8>>> for SubsManager {
fn trait_type(&self) -> String {
"subs".to_string()
}
Expand Down Expand Up @@ -270,7 +270,7 @@ struct InnerMatcherHandle {
type MatchCandidates = IndexMap<TableName, IndexSet<Vec<u8>>>;

#[async_trait]
impl Handle for MatcherHandle {
impl Handle<IndexSet<Vec<u8>>> for MatcherHandle {
type CandidateMatcher = MatchCandidates;

fn id(&self) -> Uuid {
Expand Down Expand Up @@ -1433,7 +1433,7 @@ impl Matcher {
for (table, pks) in candidates {
let pks = pks
.iter()
.map(|pk,| unpack_columns(pk))
.map(|pk| unpack_columns(pk))
.collect::<Result<Vec<Vec<SqliteValueRef>>, _>>()?;

let tmp_table_name = format!("temp_{table}");
Expand Down
89 changes: 63 additions & 26 deletions crates/corro-types/src/updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use compact_str::CompactString;
use corro_api_types::sqlite::ChangeType;
use corro_api_types::{Change, ColumnName, SqliteValue, SqliteValueRef, TableName};
use corro_base_types::CrsqlDbVersion;
use indexmap::IndexMap;
use indexmap::{IndexMap, IndexSet};
use metrics::{counter, histogram};
use parking_lot::RwLock;
use rusqlite::Connection;
Expand All @@ -26,16 +26,39 @@ use uuid::Uuid;
#[derive(Debug, Default, Clone)]
pub struct UpdatesManager(Arc<RwLock<InnerUpdatesManager>>);

pub trait Manager<H: Handle> {
#[allow(clippy::len_without_is_empty)]
pub trait HasLen {
fn len(&self) -> usize;
}

impl<K, V> HasLen for IndexMap<K, V> {
fn len(&self) -> usize {
self.len()
}
}

impl<T> HasLen for IndexSet<T> {
fn len(&self) -> usize {
self.len()
}
}

pub trait Manager<H: Handle<T>, T: HasLen> {
fn trait_type(&self) -> String;
fn get(&self, id: &Uuid) -> Option<H>;
fn remove(&self, id: &Uuid) -> Option<H>;
fn get_handles(&self) -> BTreeMap<Uuid, H>;
}

impl<T> HasLen for Vec<T> {
fn len(&self) -> usize {
self.len()
}
}

#[async_trait]
pub trait Handle {
type CandidateMatcher: Send;
pub trait Handle<T: HasLen> {
type CandidateMatcher: IntoIterator<Item = (TableName, T)> + Clone + Send;

fn id(&self) -> Uuid;
fn hash(&self) -> &str;
Expand All @@ -50,7 +73,7 @@ pub trait Handle {
async fn cleanup(&self);
}

impl Manager<UpdateHandle> for UpdatesManager {
impl Manager<UpdateHandle, IndexMap<Vec<u8>, i64>> for UpdatesManager {
fn trait_type(&self) -> String {
"updates".to_string()
}
Expand Down Expand Up @@ -86,7 +109,7 @@ pub enum TypedNotifyEvent<T> {
}

#[async_trait]
impl Handle for UpdateHandle {
impl Handle<IndexMap<Vec<u8>, i64>> for UpdateHandle {
type CandidateMatcher = MatchClCandidates;

fn id(&self) -> Uuid {
Expand Down Expand Up @@ -231,8 +254,9 @@ pub struct InnerUpdateHandle {

impl UpdateHandle {
pub fn id(&self) -> Uuid {
return self.inner.id;
self.inner.id
}

pub fn create(
id: Uuid,
tbl_name: &str,
Expand Down Expand Up @@ -286,7 +310,7 @@ fn handle_candidates(
for (_, pks) in candidates {
let pks = pks
.iter()
.map(|(pk, cl)| unpack_columns(pk).and_then(|x| Ok((x, cl.clone()))))
.map(|(pk, cl)| unpack_columns(pk).map(|x| (x, *cl)))
.collect::<Result<Vec<(Vec<SqliteValueRef>, i64)>, _>>()?;

for (pk, cl) in pks {
Expand Down Expand Up @@ -391,11 +415,14 @@ async fn cmd_loop(
debug!(id = %id, "update loop is done");
}

pub fn match_changes<H: Handle + Send + 'static>(
manager: &impl Manager<H>,
pub fn match_changes<H, T>(
manager: &impl Manager<H, T>,
changes: &[Change],
db_version: CrsqlDbVersion,
) {
) where
H: Handle<T> + Send + 'static,
T: HasLen + Clone + 'static,
{
let trait_type = manager.trait_type();
trace!(
%db_version,
Expand All @@ -422,14 +449,17 @@ pub fn match_changes<H: Handle + Send + 'static>(
}

// metrics...
// for (table, pks) in candidates.iter() {
// counter!(format!("corro.{trait_type}.changes.matched.count"), "table" => table.to_string())
// .increment(pks.len() as u64);
// }
for (table, pks) in candidates.clone() {
counter!(format!("corro.{trait_type}.changes.matched.count"), "table" => table.to_string())
.increment(pks.len() as u64);
}

trace!(sub_id = %id, %db_version, "found {match_count} candidates");

if let Err(e) = handle.changes_tx().try_send((candidates, db_version)) {
if let Err(e) = handle
.changes_tx()
.try_send((candidates.clone(), db_version))
{
error!(sub_id = %id, "could not send change candidates to {trait_type} handler: {e}");
match e {
mpsc::error::TrySendError::Full(item) => {
Expand All @@ -452,11 +482,15 @@ pub fn match_changes<H: Handle + Send + 'static>(
}
}

pub fn match_changes_from_db_version<H: Handle + Send + 'static>(
manager: &impl Manager<H>,
pub fn match_changes_from_db_version<H, T>(
manager: &impl Manager<H, T>,
conn: &Connection,
db_version: CrsqlDbVersion,
) -> rusqlite::Result<()> {
) -> rusqlite::Result<()>
where
H: Handle<T> + Send + 'static,
T: HasLen + 'static,
{
let handles = manager.get_handles();
if handles.is_empty() {
return Ok(());
Expand Down Expand Up @@ -506,15 +540,18 @@ pub fn match_changes_from_db_version<H: Handle + Send + 'static>(
for (id, (candidates, handle)) in candidates {
let mut match_count = 0;

// for (table, pks) in candidates.iter() {
// let count = pks.len();
// match_count += count;
// counter!(format!("corro.{trait_type}.changes.matched.count"), "sql_hash" => handle.hash().to_string(), "table" => table.to_string()).increment(count as u64);
// }
for (table, pks) in candidates.clone() {
let count = pks.len();
match_count += count;
counter!(format!("corro.{trait_type}.changes.matched.count"), "sql_hash" => handle.hash().to_string(), "table" => table.to_string()).increment(count as u64);
}

trace!(sub_id = %id, %db_version, "found {match_count} candidates");

if let Err(e) = handle.changes_tx().try_send((candidates, db_version)) {
if let Err(e) = handle
.changes_tx()
.try_send((candidates.clone(), db_version))
{
error!(sub_id = %id, "could not send change candidates to {trait_type} handler: {e}");
match e {
mpsc::error::TrySendError::Full(item) => {
Expand All @@ -525,7 +562,7 @@ pub fn match_changes_from_db_version<H: Handle + Send + 'static>(
});
}
mpsc::error::TrySendError::Closed(_) => {
if let Some(handle) = manager.remove(&id) {
if let Some(handle) = manager.remove(id) {
tokio::spawn(async move {
handle.cleanup().await;
});
Expand Down

0 comments on commit bdeee40

Please sign in to comment.