Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add bcs field to Checkpoint #20340

Merged
merged 7 commits into from
Nov 22, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix after feedback
stefan-mysten committed Nov 20, 2024
commit 54caca3cc5d2f0bbbc1cde83c543c05db18a6cc3
2 changes: 1 addition & 1 deletion crates/sui-graphql-rpc/schema.graphql
Original file line number Diff line number Diff line change
@@ -451,7 +451,7 @@ type Checkpoint {
"""
The Base64 serialized BCS bytes of CheckpointSummary for this checkpoint.
"""
checkpointSummaryBcs: Base64
bcs: Base64
}

type CheckpointConnection {
226 changes: 71 additions & 155 deletions crates/sui-graphql-rpc/src/types/checkpoint.rs
Original file line number Diff line number Diff line change
@@ -41,6 +41,12 @@ pub(crate) struct CheckpointId {
pub sequence_number: Option<UInt53>,
}

/// `DataLoader` key for fetching `StoredRawCheckpoint` by its sequence number.
#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
struct SeqNum {
pub sequence_number: i64,
}

/// `DataLoader` key for fetching a `Checkpoint` by its sequence number, constrained by a consistency
/// cursor.
#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
@@ -64,15 +70,12 @@ pub(crate) struct Checkpoint {
/// Representation of transaction data in the Indexer's Store. The indexer stores the
/// transaction data and its effects together, in one table.
pub stored: StoredCheckpoint,
/// Representation of the raw checkpoint data, including the summary and contents.
pub raw_checkpoint: Option<StoredRawCheckpoint>,
/// The checkpoint_sequence_number at which this was viewed at.
pub checkpoint_viewed_at: u64,
}

pub(crate) type Cursor = cursor::JsonCursor<CheckpointCursor>;
type Query<ST, GB> = data::Query<ST, checkpoints::table, GB>;
type RawQuery<ST, GB> = data::Query<ST, raw_checkpoints::table, GB>;

/// The cursor returned for each `Checkpoint` in a connection's page of results. The
/// `checkpoint_viewed_at` will set the consistent upper bound for subsequent queries made on this
@@ -197,23 +200,24 @@ impl Checkpoint {
}

/// The Base64 serialized BCS bytes of CheckpointSummary for this checkpoint.
async fn checkpoint_summary_bcs(&self) -> Result<Option<Base64>> {
let checkpoint_summary = self
.raw_checkpoint
.as_ref()
.map(|raw_checkpoint| {
bcs::from_bytes::<CertifiedCheckpointSummary>(&raw_checkpoint.certified_checkpoint)
async fn bcs(&self, ctx: &Context<'_>) -> Result<Option<Base64>> {
let DataLoader(dl) = ctx.data_unchecked();
let raw_checkpoint = dl
.load_one(SeqNum {
sequence_number: self.stored.sequence_number,
})
.transpose()
.map_err(|e| {
Error::Internal(format!("Failed to deserialize checkpoint summary: {e}"))
})?;
.await?;

let summary = raw_checkpoint.map(|raw_checkpoint| {
bcs::from_bytes::<CertifiedCheckpointSummary>(&raw_checkpoint.certified_checkpoint)
.unwrap()
});

let checkpoint_summary = checkpoint_summary
.map(|summary| summary.into_summary_and_sequence())
.map(|s| s.1);
let checkpoint_bcs = summary
.map(|c| c.into_summary_and_sequence().1)
.map(|c| bcs::to_bytes(&c).unwrap());

Ok(checkpoint_summary.map(|s| Base64::from(&bcs::to_bytes(&s).unwrap())))
Ok(checkpoint_bcs.map(Base64::from))
}
}

@@ -286,7 +290,6 @@ impl Checkpoint {
/// that cursor).
async fn query_latest_at(db: &Db, checkpoint_viewed_at: u64) -> Result<Option<Self>, Error> {
use checkpoints::dsl;
use raw_checkpoints::dsl as raw_dsl;

let stored: Option<StoredCheckpoint> = db
.execute(move |conn| {
@@ -304,25 +307,8 @@ impl Checkpoint {
.await
.map_err(|e| Error::Internal(format!("Failed to fetch checkpoint: {e}")))?;

let raw_checkpoint: Option<StoredRawCheckpoint> = db
.execute(move |conn| {
async move {
conn.first(move || {
raw_dsl::raw_checkpoints
.filter(raw_dsl::sequence_number.le(checkpoint_viewed_at as i64))
.order_by(raw_dsl::sequence_number.desc())
})
.await
.optional()
}
.scope_boxed()
})
.await
.map_err(|e| Error::Internal(format!("Failed to fetch raw checkpoint: {e}")))?;

Ok(stored.map(|stored| Checkpoint {
stored,
raw_checkpoint,
checkpoint_viewed_at,
}))
}
@@ -365,49 +351,23 @@ impl Checkpoint {
checkpoint_viewed_at: u64,
) -> Result<Connection<String, Checkpoint>, Error> {
use checkpoints::dsl;
use raw_checkpoints::dsl as raw_dsl;

let cursor_viewed_at = page.validate_cursor_consistency()?;
let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
let page_stored = page.clone();

let (prev, next, results) = db
.execute(move |conn| {
async move {
page_stored
.paginate_query::<StoredCheckpoint, _, _, _>(
conn,
checkpoint_viewed_at,
move || {
let mut query = dsl::checkpoints.into_boxed();
query = query
.filter(dsl::sequence_number.le(checkpoint_viewed_at as i64));
if let Some(epoch) = filter {
query = query.filter(dsl::epoch.eq(epoch as i64));
}
query
},
)
.await
}
.scope_boxed()
})
.await?;
let results_clone = results.collect::<Vec<_>>();
let results = results_clone.clone();

let (_, _, results_raw) = db
.execute(move |conn| {
async move {
page.paginate_query::<StoredRawCheckpoint, _, _, _>(
page.paginate_query::<StoredCheckpoint, _, _, _>(
conn,
checkpoint_viewed_at,
move || {
let mut query = raw_dsl::raw_checkpoints.into_boxed();
query = query.filter(
raw_dsl::sequence_number
.eq_any(results_clone.iter().map(|r| r.sequence_number)),
);
let mut query = dsl::checkpoints.into_boxed();
query =
query.filter(dsl::sequence_number.le(checkpoint_viewed_at as i64));
if let Some(epoch) = filter {
query = query.filter(dsl::epoch.eq(epoch as i64));
}
query
},
)
@@ -419,13 +379,12 @@ impl Checkpoint {

// The "checkpoint viewed at" sets a consistent upper bound for the nested queries.
let mut conn = Connection::new(prev, next);
for (stored, raw) in results.into_iter().zip(results_raw) {
for stored in results {
let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
conn.edges.push(Edge::new(
cursor,
Checkpoint {
stored,
raw_checkpoint: Some(raw),
checkpoint_viewed_at,
},
));
@@ -465,36 +424,6 @@ impl Target<Cursor> for StoredCheckpoint {
}
}

impl Paginated<Cursor> for StoredRawCheckpoint {
type Source = raw_checkpoints::table;

fn filter_ge<ST, GB>(cursor: &Cursor, query: RawQuery<ST, GB>) -> RawQuery<ST, GB> {
query.filter(raw_checkpoints::dsl::sequence_number.ge(cursor.sequence_number as i64))
}

fn filter_le<ST, GB>(cursor: &Cursor, query: RawQuery<ST, GB>) -> RawQuery<ST, GB> {
query.filter(raw_checkpoints::dsl::sequence_number.le(cursor.sequence_number as i64))
}

fn order<ST, GB>(asc: bool, query: RawQuery<ST, GB>) -> RawQuery<ST, GB> {
use raw_checkpoints::dsl;
if asc {
query.order(dsl::sequence_number)
} else {
query.order(dsl::sequence_number.desc())
}
}
}

impl Target<Cursor> for StoredRawCheckpoint {
fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
Cursor::new(CheckpointCursor {
checkpoint_viewed_at,
sequence_number: self.sequence_number as u64,
})
}
}

impl Checkpointed for Cursor {
fn checkpoint_viewed_at(&self) -> u64 {
self.checkpoint_viewed_at
@@ -510,7 +439,6 @@ impl Loader<SeqNumKey> for Db {

async fn load(&self, keys: &[SeqNumKey]) -> Result<HashMap<SeqNumKey, Checkpoint>, Error> {
use checkpoints::dsl;
use raw_checkpoints::dsl as raw_dsl;

let checkpoint_ids: BTreeSet<_> = keys
.iter()
@@ -520,7 +448,6 @@ impl Loader<SeqNumKey> for Db {
.then_some(key.sequence_number as i64)
})
.collect();
let raw_checkpoint_ids: Vec<i64> = checkpoint_ids.iter().cloned().collect();

let checkpoints: Vec<StoredCheckpoint> = self
.execute(move |conn| {
@@ -536,39 +463,17 @@ impl Loader<SeqNumKey> for Db {
.await
.map_err(|e| Error::Internal(format!("Failed to fetch checkpoints: {e}")))?;

let raw_checkpoints: Vec<StoredRawCheckpoint> = self
.execute(move |conn| {
async move {
conn.results(move || {
raw_dsl::raw_checkpoints.filter(
raw_dsl::sequence_number.eq_any(raw_checkpoint_ids.iter().cloned()),
)
})
.await
}
.scope_boxed()
})
.await
.map_err(|e| Error::Internal(format!("Failed to fetch raw checkpoints: {e}")))?;

let checkpoint_id_to_stored: BTreeMap<_, _> = checkpoints
.into_iter()
.map(|stored| (stored.sequence_number as u64, stored))
.collect();

let raw_checkpoints: BTreeMap<_, _> = raw_checkpoints
.into_iter()
.map(|raw_checkpoint| (raw_checkpoint.sequence_number as u64, raw_checkpoint))
.collect();

Ok(keys
.iter()
.filter_map(|key| {
let stored = checkpoint_id_to_stored.get(&key.sequence_number).cloned()?;
let raw_checkpoint = raw_checkpoints.get(&key.sequence_number).cloned();
let checkpoint = Checkpoint {
stored,
raw_checkpoint,
checkpoint_viewed_at: key.checkpoint_viewed_at,
};

@@ -590,7 +495,6 @@ impl Loader<DigestKey> for Db {

async fn load(&self, keys: &[DigestKey]) -> Result<HashMap<DigestKey, Checkpoint>, Error> {
use checkpoints::dsl;
use raw_checkpoints::dsl as raw_dsl;

let digests: BTreeSet<_> = keys.iter().map(|key| key.digest.to_vec()).collect();

@@ -607,24 +511,55 @@ impl Loader<DigestKey> for Db {
})
.await
.map_err(|e| Error::Internal(format!("Failed to fetch checkpoints: {e}")))?;
let chckp = checkpoints.clone();

let checkpoint_id_to_stored: BTreeMap<_, _> = checkpoints
let checkpoint_ids: BTreeMap<_, _> = checkpoints
.into_iter()
.map(|stored| (stored.checkpoint_digest.clone(), stored))
.collect();

let checkpoint_ids: Vec<i64> = chckp
.into_iter()
.map(|stored| stored.sequence_number)
.collect();
Ok(keys
.iter()
.filter_map(|key| {
let DigestKey {
digest,
checkpoint_viewed_at,
} = *key;

let stored = checkpoint_ids.get(digest.as_slice()).cloned()?;
let checkpoint = Checkpoint {
stored: stored.clone(),
checkpoint_viewed_at,
};

// Filter by key's checkpoint viewed at here. Doing this in memory because it should
// be quite rare that this query actually filters something, but encoding it in SQL
// is complicated.
let seq_num = checkpoint.stored.sequence_number as u64;
(checkpoint_viewed_at >= seq_num).then_some((*key, checkpoint))
})
.collect())
}
}

#[async_trait::async_trait]
impl Loader<SeqNum> for Db {
type Value = StoredRawCheckpoint;
type Error = Error;

async fn load(&self, keys: &[SeqNum]) -> Result<HashMap<SeqNum, StoredRawCheckpoint>, Error> {
use raw_checkpoints::dsl;

let checkpoint_ids = keys
.iter()
.map(|key| key.sequence_number)
.collect::<Vec<_>>();

let raw_checkpoints: Vec<StoredRawCheckpoint> = self
.execute(move |conn| {
async move {
conn.results(move || {
raw_dsl::raw_checkpoints
.filter(raw_dsl::sequence_number.eq_any(checkpoint_ids.iter().cloned()))
dsl::raw_checkpoints
.filter(dsl::sequence_number.eq_any(checkpoint_ids.iter().cloned()))
})
.await
}
@@ -635,33 +570,14 @@ impl Loader<DigestKey> for Db {

let raw_checkpoints: BTreeMap<_, _> = raw_checkpoints
.into_iter()
.map(|raw_checkpoint| (raw_checkpoint.sequence_number as u64, raw_checkpoint))
.map(|raw| (raw.sequence_number, raw))
.collect();

Ok(keys
.iter()
.filter_map(|key| {
let DigestKey {
digest,
checkpoint_viewed_at,
} = *key;

let stored = checkpoint_id_to_stored.get(digest.as_slice()).cloned()?;
let raw_checkpoint = raw_checkpoints
.get(&(stored.sequence_number as u64))
.cloned();

let checkpoint = Checkpoint {
stored: stored.clone(),
raw_checkpoint,
checkpoint_viewed_at,
};

// Filter by key's checkpoint viewed at here. Doing this in memory because it should
// be quite rare that this query actually filters something, but encoding it in SQL
// is complicated.
let seq_num = checkpoint.stored.sequence_number as u64;
(checkpoint_viewed_at >= seq_num).then_some((*key, checkpoint))
let raw = raw_checkpoints.get(&key.sequence_number).cloned()?;
Some((*key, raw))
})
.collect())
}
2 changes: 1 addition & 1 deletion crates/sui-graphql-rpc/staging.graphql
Original file line number Diff line number Diff line change
@@ -451,7 +451,7 @@ type Checkpoint {
"""
The Base64 serialized BCS bytes of CheckpointSummary for this checkpoint.
"""
checkpointSummaryBcs: Base64
bcs: Base64
}

type CheckpointConnection {