Skip to content

Commit

Permalink
Fix after feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
stefan-mysten committed Nov 20, 2024
1 parent 11fd0e8 commit cf3d557
Showing 3 changed files with 73 additions and 157 deletions.
2 changes: 1 addition & 1 deletion crates/sui-graphql-rpc/schema.graphql
Original file line number Diff line number Diff line change
@@ -451,7 +451,7 @@ type Checkpoint {
"""
The Base64 serialized BCS bytes of CheckpointSummary for this checkpoint.
"""
checkpointSummaryBcs: Base64
bcs: Base64
}

type CheckpointConnection {
226 changes: 71 additions & 155 deletions crates/sui-graphql-rpc/src/types/checkpoint.rs
Original file line number Diff line number Diff line change
@@ -41,6 +41,12 @@ pub(crate) struct CheckpointId {
pub sequence_number: Option<UInt53>,
}

/// `DataLoader` key for fetching `StoredRawCheckpoint` by its sequence number.
#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
struct SeqNum {
pub sequence_number: i64,
}

/// `DataLoader` key for fetching a `Checkpoint` by its sequence number, constrained by a consistency
/// cursor.
#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
@@ -64,15 +70,12 @@ pub(crate) struct Checkpoint {
/// Representation of transaction data in the Indexer's Store. The indexer stores the
/// transaction data and its effects together, in one table.
pub stored: StoredCheckpoint,
/// Representation of the raw checkpoint data, including the summary and contents.
pub raw_checkpoint: Option<StoredRawCheckpoint>,
/// The checkpoint_sequence_number at which this was viewed at.
pub checkpoint_viewed_at: u64,
}

pub(crate) type Cursor = cursor::JsonCursor<CheckpointCursor>;
type Query<ST, GB> = data::Query<ST, checkpoints::table, GB>;
type RawQuery<ST, GB> = data::Query<ST, raw_checkpoints::table, GB>;

/// The cursor returned for each `Checkpoint` in a connection's page of results. The
/// `checkpoint_viewed_at` will set the consistent upper bound for subsequent queries made on this
@@ -197,23 +200,24 @@ impl Checkpoint {
}

/// The Base64 serialized BCS bytes of CheckpointSummary for this checkpoint.
async fn checkpoint_summary_bcs(&self) -> Result<Option<Base64>> {
let checkpoint_summary = self
.raw_checkpoint
.as_ref()
.map(|raw_checkpoint| {
bcs::from_bytes::<CertifiedCheckpointSummary>(&raw_checkpoint.certified_checkpoint)
async fn bcs(&self, ctx: &Context<'_>) -> Result<Option<Base64>> {
let DataLoader(dl) = ctx.data_unchecked();
let raw_checkpoint = dl
.load_one(SeqNum {
sequence_number: self.stored.sequence_number,
})
.transpose()
.map_err(|e| {
Error::Internal(format!("Failed to deserialize checkpoint summary: {e}"))
})?;
.await?;

let summary = raw_checkpoint.map(|raw_checkpoint| {
bcs::from_bytes::<CertifiedCheckpointSummary>(&raw_checkpoint.certified_checkpoint)
.unwrap()
});

let checkpoint_summary = checkpoint_summary
.map(|summary| summary.into_summary_and_sequence())
.map(|s| s.1);
let checkpoint_bcs = summary
.map(|c| c.into_summary_and_sequence().1)
.map(|c| bcs::to_bytes(&c).unwrap());

Ok(checkpoint_summary.map(|s| Base64::from(&bcs::to_bytes(&s).unwrap())))
Ok(checkpoint_bcs.map(Base64::from))
}
}

@@ -286,7 +290,6 @@ impl Checkpoint {
/// that cursor).
async fn query_latest_at(db: &Db, checkpoint_viewed_at: u64) -> Result<Option<Self>, Error> {
use checkpoints::dsl;
use raw_checkpoints::dsl as raw_dsl;

let stored: Option<StoredCheckpoint> = db
.execute(move |conn| {
@@ -304,25 +307,8 @@ impl Checkpoint {
.await
.map_err(|e| Error::Internal(format!("Failed to fetch checkpoint: {e}")))?;

let raw_checkpoint: Option<StoredRawCheckpoint> = db
.execute(move |conn| {
async move {
conn.first(move || {
raw_dsl::raw_checkpoints
.filter(raw_dsl::sequence_number.le(checkpoint_viewed_at as i64))
.order_by(raw_dsl::sequence_number.desc())
})
.await
.optional()
}
.scope_boxed()
})
.await
.map_err(|e| Error::Internal(format!("Failed to fetch raw checkpoint: {e}")))?;

Ok(stored.map(|stored| Checkpoint {
stored,
raw_checkpoint,
checkpoint_viewed_at,
}))
}
@@ -365,49 +351,23 @@ impl Checkpoint {
checkpoint_viewed_at: u64,
) -> Result<Connection<String, Checkpoint>, Error> {
use checkpoints::dsl;
use raw_checkpoints::dsl as raw_dsl;

let cursor_viewed_at = page.validate_cursor_consistency()?;
let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
let page_stored = page.clone();

let (prev, next, results) = db
.execute(move |conn| {
async move {
page_stored
.paginate_query::<StoredCheckpoint, _, _, _>(
conn,
checkpoint_viewed_at,
move || {
let mut query = dsl::checkpoints.into_boxed();
query = query
.filter(dsl::sequence_number.le(checkpoint_viewed_at as i64));
if let Some(epoch) = filter {
query = query.filter(dsl::epoch.eq(epoch as i64));
}
query
},
)
.await
}
.scope_boxed()
})
.await?;
let results_clone = results.collect::<Vec<_>>();
let results = results_clone.clone();

let (_, _, results_raw) = db
.execute(move |conn| {
async move {
page.paginate_query::<StoredRawCheckpoint, _, _, _>(
page.paginate_query::<StoredCheckpoint, _, _, _>(
conn,
checkpoint_viewed_at,
move || {
let mut query = raw_dsl::raw_checkpoints.into_boxed();
query = query.filter(
raw_dsl::sequence_number
.eq_any(results_clone.iter().map(|r| r.sequence_number)),
);
let mut query = dsl::checkpoints.into_boxed();
query =
query.filter(dsl::sequence_number.le(checkpoint_viewed_at as i64));
if let Some(epoch) = filter {
query = query.filter(dsl::epoch.eq(epoch as i64));
}
query
},
)
@@ -419,13 +379,12 @@ impl Checkpoint {

// The "checkpoint viewed at" sets a consistent upper bound for the nested queries.
let mut conn = Connection::new(prev, next);
for (stored, raw) in results.into_iter().zip(results_raw) {
for stored in results {
let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
conn.edges.push(Edge::new(
cursor,
Checkpoint {
stored,
raw_checkpoint: Some(raw),
checkpoint_viewed_at,
},
));
@@ -465,36 +424,6 @@ impl Target<Cursor> for StoredCheckpoint {
}
}

impl Paginated<Cursor> for StoredRawCheckpoint {
type Source = raw_checkpoints::table;

fn filter_ge<ST, GB>(cursor: &Cursor, query: RawQuery<ST, GB>) -> RawQuery<ST, GB> {
query.filter(raw_checkpoints::dsl::sequence_number.ge(cursor.sequence_number as i64))
}

fn filter_le<ST, GB>(cursor: &Cursor, query: RawQuery<ST, GB>) -> RawQuery<ST, GB> {
query.filter(raw_checkpoints::dsl::sequence_number.le(cursor.sequence_number as i64))
}

fn order<ST, GB>(asc: bool, query: RawQuery<ST, GB>) -> RawQuery<ST, GB> {
use raw_checkpoints::dsl;
if asc {
query.order(dsl::sequence_number)
} else {
query.order(dsl::sequence_number.desc())
}
}
}

impl Target<Cursor> for StoredRawCheckpoint {
fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
Cursor::new(CheckpointCursor {
checkpoint_viewed_at,
sequence_number: self.sequence_number as u64,
})
}
}

impl Checkpointed for Cursor {
fn checkpoint_viewed_at(&self) -> u64 {
self.checkpoint_viewed_at
@@ -510,7 +439,6 @@ impl Loader<SeqNumKey> for Db {

async fn load(&self, keys: &[SeqNumKey]) -> Result<HashMap<SeqNumKey, Checkpoint>, Error> {
use checkpoints::dsl;
use raw_checkpoints::dsl as raw_dsl;

let checkpoint_ids: BTreeSet<_> = keys
.iter()
@@ -520,7 +448,6 @@ impl Loader<SeqNumKey> for Db {
.then_some(key.sequence_number as i64)
})
.collect();
let raw_checkpoint_ids: Vec<i64> = checkpoint_ids.iter().cloned().collect();

let checkpoints: Vec<StoredCheckpoint> = self
.execute(move |conn| {
@@ -536,39 +463,17 @@ impl Loader<SeqNumKey> for Db {
.await
.map_err(|e| Error::Internal(format!("Failed to fetch checkpoints: {e}")))?;

let raw_checkpoints: Vec<StoredRawCheckpoint> = self
.execute(move |conn| {
async move {
conn.results(move || {
raw_dsl::raw_checkpoints.filter(
raw_dsl::sequence_number.eq_any(raw_checkpoint_ids.iter().cloned()),
)
})
.await
}
.scope_boxed()
})
.await
.map_err(|e| Error::Internal(format!("Failed to fetch raw checkpoints: {e}")))?;

let checkpoint_id_to_stored: BTreeMap<_, _> = checkpoints
.into_iter()
.map(|stored| (stored.sequence_number as u64, stored))
.collect();

let raw_checkpoints: BTreeMap<_, _> = raw_checkpoints
.into_iter()
.map(|raw_checkpoint| (raw_checkpoint.sequence_number as u64, raw_checkpoint))
.collect();

Ok(keys
.iter()
.filter_map(|key| {
let stored = checkpoint_id_to_stored.get(&key.sequence_number).cloned()?;
let raw_checkpoint = raw_checkpoints.get(&key.sequence_number).cloned();
let checkpoint = Checkpoint {
stored,
raw_checkpoint,
checkpoint_viewed_at: key.checkpoint_viewed_at,
};

@@ -590,7 +495,6 @@ impl Loader<DigestKey> for Db {

async fn load(&self, keys: &[DigestKey]) -> Result<HashMap<DigestKey, Checkpoint>, Error> {
use checkpoints::dsl;
use raw_checkpoints::dsl as raw_dsl;

let digests: BTreeSet<_> = keys.iter().map(|key| key.digest.to_vec()).collect();

@@ -607,24 +511,55 @@ impl Loader<DigestKey> for Db {
})
.await
.map_err(|e| Error::Internal(format!("Failed to fetch checkpoints: {e}")))?;
let chckp = checkpoints.clone();

let checkpoint_id_to_stored: BTreeMap<_, _> = checkpoints
let checkpoint_ids: BTreeMap<_, _> = checkpoints
.into_iter()
.map(|stored| (stored.checkpoint_digest.clone(), stored))
.collect();

let checkpoint_ids: Vec<i64> = chckp
.into_iter()
.map(|stored| stored.sequence_number)
.collect();
Ok(keys
.iter()
.filter_map(|key| {
let DigestKey {
digest,
checkpoint_viewed_at,
} = *key;

let stored = checkpoint_ids.get(digest.as_slice()).cloned()?;
let checkpoint = Checkpoint {
stored: stored.clone(),
checkpoint_viewed_at,
};

// Filter by key's checkpoint viewed at here. Doing this in memory because it should
// be quite rare that this query actually filters something, but encoding it in SQL
// is complicated.
let seq_num = checkpoint.stored.sequence_number as u64;
(checkpoint_viewed_at >= seq_num).then_some((*key, checkpoint))
})
.collect())
}
}

#[async_trait::async_trait]
impl Loader<SeqNum> for Db {
type Value = StoredRawCheckpoint;
type Error = Error;

async fn load(&self, keys: &[SeqNum]) -> Result<HashMap<SeqNum, StoredRawCheckpoint>, Error> {
use raw_checkpoints::dsl;

let checkpoint_ids = keys
.iter()
.map(|key| key.sequence_number)
.collect::<Vec<_>>();

let raw_checkpoints: Vec<StoredRawCheckpoint> = self
.execute(move |conn| {
async move {
conn.results(move || {
raw_dsl::raw_checkpoints
.filter(raw_dsl::sequence_number.eq_any(checkpoint_ids.iter().cloned()))
dsl::raw_checkpoints
.filter(dsl::sequence_number.eq_any(checkpoint_ids.iter().cloned()))
})
.await
}
@@ -635,33 +570,14 @@ impl Loader<DigestKey> for Db {

let raw_checkpoints: BTreeMap<_, _> = raw_checkpoints
.into_iter()
.map(|raw_checkpoint| (raw_checkpoint.sequence_number as u64, raw_checkpoint))
.map(|raw| (raw.sequence_number, raw))
.collect();

Ok(keys
.iter()
.filter_map(|key| {
let DigestKey {
digest,
checkpoint_viewed_at,
} = *key;

let stored = checkpoint_id_to_stored.get(digest.as_slice()).cloned()?;
let raw_checkpoint = raw_checkpoints
.get(&(stored.sequence_number as u64))
.cloned();

let checkpoint = Checkpoint {
stored: stored.clone(),
raw_checkpoint,
checkpoint_viewed_at,
};

// Filter by key's checkpoint viewed at here. Doing this in memory because it should
// be quite rare that this query actually filters something, but encoding it in SQL
// is complicated.
let seq_num = checkpoint.stored.sequence_number as u64;
(checkpoint_viewed_at >= seq_num).then_some((*key, checkpoint))
let raw = raw_checkpoints.get(&key.sequence_number).cloned()?;
Some((*key, raw))
})
.collect())
}
2 changes: 1 addition & 1 deletion crates/sui-graphql-rpc/staging.graphql
Original file line number Diff line number Diff line change
@@ -451,7 +451,7 @@ type Checkpoint {
"""
The Base64 serialized BCS bytes of CheckpointSummary for this checkpoint.
"""
checkpointSummaryBcs: Base64
bcs: Base64
}

type CheckpointConnection {

0 comments on commit cf3d557

Please sign in to comment.