Skip to content

Commit

Permalink
Some cometindex robustness improvements (#5095)
Browse files Browse the repository at this point in the history
## Describe your changes

This tweaks the indexing logic in cometindex to add a bit of extra
robustness against indexing the same block twice. The current logic is
to generate batches of events, which are then sent over to logical
threads for each app view, the idea being to only read each batch once,
avoiding duplicate database fetching work from each thread, while
allowing parallel processing. One potential race condition which might
have been the cause of some oddities we've seen around duplicate
processing is that each thread would process the entire batch as long as
the next height it needed to index was somewhere inside of that batch.
In practice, if the threads were in sync, this would be fine, but if for
whatever reason they got desynced (e.g. one app view crashes because
there's a bug or some weird data thing, then we patch the bug and
restart pindexer, or cometindex dies when some indices have committed a
batch, but not others, which can totally happen because some indices are
faster than other), it's possible that this might lead to some threads
processing some events twice. This adds some logic to truncate the
events in each thread so that only the blocks that particular thread
needs are indexed.

I also did a pass over on the rest of the indexing logic, and added a
comment on a particularly tricky section, justifying its correctness.

CI should be sufficient to test this. We shouldn't observe any
difference in behavior.


## Checklist before requesting a review

- [x] I have added guiding text to explain how a reviewer should test
these changes.

- [x] If this code contains consensus-breaking changes, I have added the
"consensus-breaking" label. Otherwise, I declare my belief that there
are not consensus-breaking changes, for the following reason:

  > indexing only
  • Loading branch information
cronokirby authored Feb 17, 2025
1 parent 259b498 commit eb79827
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 20 deletions.
2 changes: 1 addition & 1 deletion crates/bin/pindexer/src/dex_ex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1425,7 +1425,7 @@ impl AppView for Component {
let mut charts = HashMap::new();
let mut snapshots = HashMap::new();
let mut last_time = None;
for block in batch.by_height.iter() {
for block in batch.events_by_block() {
let mut events = Events::extract(&block)?;
let time = events
.time
Expand Down
51 changes: 47 additions & 4 deletions crates/util/cometindex/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,60 @@ pub struct BlockEvents {

#[derive(Clone, Debug)]
pub struct EventBatch {
pub first_height: u64,
pub last_height: u64,
first_height: u64,
last_height: u64,
/// The batch of events, ordered by increasing height.
///
/// The heights are guaranteed to be increasing, and to be contiguous.
pub by_height: Arc<Vec<BlockEvents>>,
by_height: Arc<Vec<BlockEvents>>,
}

impl EventBatch {
/// Create a new [`EventBatch`].
pub fn new(block_events: Vec<BlockEvents>) -> Self {
Self {
first_height: block_events.first().map(|x| x.height).unwrap_or_default(),
last_height: block_events.last().map(|x| x.height).unwrap_or_default(),
by_height: Arc::new(block_events),
}
}

pub(crate) fn first_height(&self) -> u64 {
self.first_height
}

pub(crate) fn last_height(&self) -> u64 {
self.last_height
}

/// Check if this batch has no blocks in it.
///
/// Most commonly, this is the result when [`start_later`] is called with a height
/// past that inside the batch.
pub fn empty(&self) -> bool {
self.first_height > self.last_height
}

/// Modify this batch to start at a greater height.
///
/// This will have no effect if the new start height is *before* the current start height.
pub fn start_later(&mut self, new_start: u64) {
self.first_height = new_start.max(self.first_height);
}

pub fn events_by_block(&self) -> impl Iterator<Item = &'_ BlockEvents> {
// Assuming the first height is past the first height in our vec,
// we need to skip the difference.
let skip = self
.by_height
.first()
.map(|x| self.first_height.saturating_sub(x.height) as usize)
.unwrap_or_default();
self.by_height.iter().skip(skip)
}

pub fn events(&self) -> impl Iterator<Item = &'_ ContextualizedEvent> {
self.by_height.iter().flat_map(|x| x.events.iter())
self.events_by_block().flat_map(|x| x.events.iter())
}
}

Expand Down
17 changes: 10 additions & 7 deletions crates/util/cometindex/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,27 @@ async fn catchup(
} else {
tracing::info!(?name, "already initialized");
}
while let Some(events) = rx.recv().await {
while let Some(mut events) = rx.recv().await {
let mut dbtx = state_cp.begin_transaction().await?;
let last_height = events.last_height;
if index_height >= Height::from(last_height) {
// We only ever want to index events past our current height.
// We might receive a batch with more events because other indices are behind us.
events.start_later(index_height.next().into());
if events.empty() {
tracing::info!(
first = events.first_height,
last = events.last_height,
first = events.first_height(),
last = events.last_height(),
index_name = &name,
"skipping batch"
);
continue;
}
tracing::info!(
first = events.first_height,
last = events.last_height,
first = events.first_height(),
last = events.last_height(),
index_name = &name,
"indexing batch"
);
let last_height = events.last_height();
index.index_batch(&mut dbtx, events).await?;
tracing::debug!(index_name = &name, "committing batch");
IndexingState::update_index_height(&mut dbtx, &name, Height::from(last_height))
Expand Down
27 changes: 19 additions & 8 deletions crates/util/cometindex/src/indexer/indexing_state.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, sync::Arc};
use std::collections::HashMap;

use futures::TryStreamExt;
use sqlx::{postgres::PgPoolOptions, PgPool, Postgres, Transaction};
Expand Down Expand Up @@ -58,6 +58,12 @@ impl From<u64> for Height {
}
}

impl From<Height> for u64 {
fn from(value: Height) -> Self {
value.0
}
}

impl TryFrom<i64> for Height {
type Error = anyhow::Error;

Expand Down Expand Up @@ -259,28 +265,33 @@ ORDER BY
assert!(e.block_height >= height);
if e.block_height > height {
by_height.push(current_batch);
height = e.block_height;
current_batch = BlockEvents {
height,
events: Vec::with_capacity(WORKING_CAPACITY),
};
height = e.block_height;
}
current_batch.events.push(e);
}
// Flush the current block, and create empty ones for the remaining heights.
//
// This is the correct behavior *assuming* that the caller has already checked
// that the raw events database has indexed all the blocks up to and including
// the provided last height. In that case, imagine if there were never any events
// at all. In that case, what we would need to do is to push empty blocks
// starting from `first` and up to and including `last`.
//
// Usually, there are events every block, so this code just serves to push
// the final block.
while height <= last.0 {
by_height.push(current_batch);
height += 1;
current_batch = BlockEvents {
height,
events: Vec::new(),
};
height += 1;
}
Ok(EventBatch {
first_height: first.0,
last_height: last.0,
by_height: Arc::new(by_height),
})
Ok(EventBatch::new(by_height))
}

pub async fn init(src_url: &str, dst_url: &str) -> anyhow::Result<Self> {
Expand Down

0 comments on commit eb79827

Please sign in to comment.