Skip to content

Commit

Permalink
loadshed broadcast
Browse files Browse the repository at this point in the history
  • Loading branch information
somtochiama committed Nov 4, 2024
1 parent 8f2ddcf commit 3f95444
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 58 deletions.
110 changes: 97 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ strum = { version = "0.24.1", features = ["derive"] }
tempfile = "3.5.0"
thiserror = "1.0.40"
time = { version = "0.3.15", features = ["macros", "serde-well-known"] }
tokio = { version = "1.34", features = ["full"] }
tokio = { version = "1.41", features = ["full"] }
tokio-metrics = "0.3.0"
tokio-serde = { version = "0.8", features = ["json"] }
tokio-stream = { version = "0.1.12", features = ["sync"] }
Expand Down
38 changes: 9 additions & 29 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::{
time::{Duration, Instant},
};

use crate::agent::util::log_at_pow_10;
use crate::{
agent::{bi, bootstrap, uni, util, SyncClientError, ANNOUNCE_INTERVAL},
api::peer::parallel_sync,
Expand Down Expand Up @@ -527,11 +528,12 @@ pub async fn handle_emptyset(
mut rx_emptysets: CorroReceiver<ChangeV1>,
mut tripwire: Tripwire,
) {
let mut buf: HashMap<ActorId, VecDeque<(Vec<RangeInclusive<Version>>, Timestamp)>> =
type EmptyQueue = VecDeque<(Vec<RangeInclusive<Version>>, Timestamp)>;
let mut buf: HashMap<ActorId, EmptyQueue> =
HashMap::new();

let mut join_set: JoinSet<
HashMap<ActorId, VecDeque<(Vec<RangeInclusive<Version>>, Timestamp)>>,
HashMap<ActorId, EmptyQueue>,
> = JoinSet::new();

loop {
Expand All @@ -551,7 +553,7 @@ pub async fn handle_emptyset(
maybe_change_src = rx_emptysets.recv() => match maybe_change_src {
Some(change) => {
if let Changeset::EmptySet { versions, ts } = change.changeset {
buf.entry(change.actor_id).or_insert(VecDeque::new()).push_back((versions.clone(), ts));
buf.entry(change.actor_id).or_default().push_back((versions.clone(), ts));
} else {
warn!("received non-emptyset changes in emptyset channel from {}", change.actor_id);
}
Expand Down Expand Up @@ -618,9 +620,9 @@ pub async fn process_emptyset(
) -> Result<(), ChangeError> {
let (versions, ts) = changes;

let mut version_iter = versions.chunks(100);
let version_iter = versions.chunks(100);

while let Some(chunk) = version_iter.next() {
for chunk in version_iter {
let mut conn = agent.pool().write_low().await?;
debug!("processing emptyset from {:?}", actor_id);
let booked = {
Expand Down Expand Up @@ -743,7 +745,7 @@ pub async fn handle_changes(
agent.config().perf.apply_queue_timeout as u64,
));

const MAX_SEEN_CACHE_LEN: usize = 10000;
const MAX_SEEN_CACHE_LEN: usize = 1000;
const KEEP_SEEN_CACHE_SIZE: usize = 1000;
let mut seen: IndexMap<_, RangeInclusiveSet<CrsqlSeq>> = IndexMap::new();

Expand Down Expand Up @@ -872,21 +874,7 @@ pub async fn handle_changes(
}
}

drop_log_count += 1;
if is_pow_10(drop_log_count) {
if drop_log_count == 1 {
warn!("dropped an old change because changes queue is full");
} else {
warn!(
"droppped {} old changes because changes queue is full",
drop_log_count
);
}
}
// reset count
if drop_log_count == 100000000 {
drop_log_count = 0;
}
log_at_pow_10("dropped old change from queue", &mut drop_log_count);
}

if let Some(mut seqs) = change.seqs().cloned() {
Expand Down Expand Up @@ -1170,11 +1158,3 @@ mod tests {
Ok(())
}
}

#[inline]
fn is_pow_10(i: u64) -> bool {
matches!(
i,
1 | 10 | 100 | 1000 | 10000 | 1000000 | 10000000 | 100000000
)
}
24 changes: 20 additions & 4 deletions crates/corro-agent/src/agent/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,6 @@ pub async fn process_multiple_changes(
.snapshot()
};


snap.update_cleared_ts(&tx, ts)
.map_err(|source| ChangeError::Rusqlite {
source,
Expand All @@ -988,12 +987,11 @@ pub async fn process_multiple_changes(

if let Some(ts) = last_cleared {
let mut booked_writer = agent
.booked()
.blocking_write("process_multiple_changes(update_cleared_ts)");
.booked()
.blocking_write("process_multiple_changes(update_cleared_ts)");
booked_writer.update_cleared_ts(ts);
}


for (_, changeset, _, _) in changesets.iter() {
if let Some(ts) = changeset.ts() {
let dur = (agent.clock().new_timestamp().get_time() - ts.0).to_duration();
Expand Down Expand Up @@ -1318,3 +1316,21 @@ pub fn check_buffered_meta_to_clear(

conn.prepare_cached("SELECT EXISTS(SELECT 1 FROM __corro_seq_bookkeeping WHERE site_id = ? AND version >= ? AND version <= ?)")?.query_row(params![actor_id, versions.start(), versions.end()], |row| row.get(0))
}

pub fn log_at_pow_10(msg: &str, count: &mut u64) {
if is_pow_10(*count + 1) {
warn!("{} (log count: {})", msg, count)
}
// reset count
if *count == 100000000 {
*count = 0;
}
}

#[inline]
fn is_pow_10(i: u64) -> bool {
matches!(
i,
1 | 10 | 100 | 1000 | 10000 | 1000000 | 10000000 | 100000000
)
}
Loading

0 comments on commit 3f95444

Please sign in to comment.