Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework generation of the VID with the Entity wrapper #5691

Draft
wants to merge 22 commits into
base: master
Choose a base branch
from
Draft
4 changes: 3 additions & 1 deletion chain/substreams/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use graph::{
subgraph::{MappingError, ProofOfIndexingEvent, SharedProofOfIndexing},
trigger_processor::HostedTrigger,
},
data::store::EntityV,
prelude::{
anyhow, async_trait, BlockHash, BlockNumber, BlockState, CheapClone, RuntimeHostBuilder,
},
Expand Down Expand Up @@ -225,7 +226,8 @@ where
logger,
);

state.entity_cache.set(key, entity)?;
let vid = state.next_vid(block.number);
state.entity_cache.set(key, EntityV::new(entity, vid))?;
}
ParsedChanges::Delete(entity_key) => {
let entity_type = entity_key.entity_type.cheap_clone();
Expand Down
2 changes: 1 addition & 1 deletion core/src/subgraph/context/instance/hosts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> OffchainHosts<C, T> {
pub fn matches_by_address<'a>(
&'a self,
address: Option<&[u8]>,
) -> Box<dyn Iterator<Item = &T::Host> + Send + 'a> {
) -> Box<dyn Iterator<Item = &'a T::Host> + Send + 'a> {
let Some(address) = address else {
return Box::new(self.by_block.values().flatten().map(|host| host.as_ref()));
};
Expand Down
4 changes: 3 additions & 1 deletion core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use graph::components::{
subgraph::{MappingError, PoICausalityRegion, ProofOfIndexing, SharedProofOfIndexing},
};
use graph::data::store::scalar::Bytes;
use graph::data::store::EntityV;
use graph::data::subgraph::{
schema::{SubgraphError, SubgraphHealth},
SubgraphFeature,
Expand Down Expand Up @@ -1607,7 +1608,8 @@ async fn update_proof_of_indexing(
data.push((entity_cache.schema.poi_block_time(), block_time));
}
let poi = entity_cache.make_entity(data)?;
entity_cache.set(key, poi)
// VID is autogenerated for POI table and our input is ignored
entity_cache.set(key, EntityV::new(poi, 0))
}

let _section_guard = stopwatch.start_section("update_proof_of_indexing");
Expand Down
32 changes: 19 additions & 13 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;
use crate::cheap_clone::CheapClone;
use crate::components::store::write::EntityModification;
use crate::components::store::{self as s, Entity, EntityOperation};
use crate::data::store::{EntityValidationError, Id, IdType, IntoEntityIterator};
use crate::data::store::{EntityV, EntityValidationError, Id, IdType, IntoEntityIterator};
use crate::prelude::ENV_VARS;
use crate::schema::{EntityKey, InputSchema};
use crate::util::intern::Error as InternError;
Expand All @@ -29,8 +29,8 @@ pub enum GetScope {
#[derive(Debug, Clone)]
enum EntityOp {
Remove,
Update(Entity),
Overwrite(Entity),
Update(EntityV),
Overwrite(EntityV),
}

impl EntityOp {
Expand All @@ -41,7 +41,7 @@ impl EntityOp {
use EntityOp::*;
match (self, entity) {
(Remove, _) => Ok(None),
(Overwrite(new), _) | (Update(new), None) => Ok(Some(new)),
(Overwrite(new), _) | (Update(new), None) => Ok(Some(new.e)),
(Update(updates), Some(entity)) => {
let mut e = entity.borrow().clone();
e.merge_remove_null_fields(updates)?;
Expand All @@ -65,7 +65,7 @@ impl EntityOp {
match self {
// This is how `Overwrite` is constructed, by accumulating `Update` onto `Remove`.
Remove => *self = Overwrite(update),
Update(current) | Overwrite(current) => current.merge(update),
Update(current) | Overwrite(current) => current.e.merge(update.e),
}
}
}
Expand Down Expand Up @@ -278,9 +278,9 @@ impl EntityCache {
) -> Result<Option<Entity>, anyhow::Error> {
match op {
EntityOp::Update(entity) | EntityOp::Overwrite(entity)
if query.matches(key, entity) =>
if query.matches(key, &entity.e) =>
{
Ok(Some(entity.clone()))
Ok(Some(entity.e.clone()))
}
EntityOp::Remove => Ok(None),
_ => Ok(None),
Expand Down Expand Up @@ -349,9 +349,9 @@ impl EntityCache {
/// with existing data. The entity will be validated against the
/// subgraph schema, and any errors will result in an `Err` being
/// returned.
pub fn set(&mut self, key: EntityKey, entity: Entity) -> Result<(), anyhow::Error> {
pub fn set(&mut self, key: EntityKey, entity: EntityV) -> Result<(), anyhow::Error> {
// check the validate for derived fields
let is_valid = entity.validate(&key).is_ok();
let is_valid = entity.e.validate(&key).is_ok();

self.entity_op(key.clone(), EntityOp::Update(entity));

Expand Down Expand Up @@ -458,19 +458,22 @@ impl EntityCache {
// Entity was created
(None, EntityOp::Update(mut updates))
| (None, EntityOp::Overwrite(mut updates)) => {
updates.remove_null_fields();
let data = Arc::new(updates);
let vid = updates.vid;
updates.e.remove_null_fields();
let data = Arc::new(updates.e.clone());
self.current.insert(key.clone(), Some(data.cheap_clone()));
Some(Insert {
key,
data,
block,
end: None,
vid,
})
}
// Entity may have been changed
(Some(current), EntityOp::Update(updates)) => {
let mut data = current.as_ref().clone();
let vid = updates.vid;
data.merge_remove_null_fields(updates)
.map_err(|e| key.unknown_attribute(e))?;
let data = Arc::new(data);
Expand All @@ -481,21 +484,24 @@ impl EntityCache {
data,
block,
end: None,
vid,
})
} else {
None
}
}
// Entity was removed and then updated, so it will be overwritten
(Some(current), EntityOp::Overwrite(data)) => {
let data = Arc::new(data);
self.current.insert(key.clone(), Some(data.clone()));
let vid = data.vid;
let data = Arc::new(data.e.clone());
self.current.insert(key.clone(), Some(data.cheap_clone()));
if current != data {
Some(Overwrite {
key,
data,
block,
end: None,
vid,
})
} else {
None
Expand Down
4 changes: 2 additions & 2 deletions graph/src/components/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::cheap_clone::CheapClone;
use crate::components::store::write::EntityModification;
use crate::constraint_violation;
use crate::data::store::scalar::Bytes;
use crate::data::store::{Id, IdList, Value};
use crate::data::store::{EntityV, Id, IdList, Value};
use crate::data::value::Word;
use crate::data_source::CausalityRegion;
use crate::derive::CheapClone;
Expand Down Expand Up @@ -829,7 +829,7 @@ where
pub enum EntityOperation {
/// Locates the entity specified by `key` and sets its attributes according to the contents of
/// `data`. If no entity exists with this key, creates a new entity.
Set { key: EntityKey, data: Entity },
Set { key: EntityKey, data: EntityV },

/// Removes an entity with the specified key, if one exists.
Remove { key: EntityKey },
Expand Down
20 changes: 18 additions & 2 deletions graph/src/components/store/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ pub enum EntityModification {
data: Arc<Entity>,
block: BlockNumber,
end: Option<BlockNumber>,
vid: i64,
},
/// Update the entity by overwriting it
Overwrite {
key: EntityKey,
data: Arc<Entity>,
block: BlockNumber,
end: Option<BlockNumber>,
vid: i64,
},
/// Remove the entity
Remove { key: EntityKey, block: BlockNumber },
Expand All @@ -67,6 +69,7 @@ pub struct EntityWrite<'a> {
// The end of the block range for which this write is valid. The value
// of `end` itself is not included in the range
pub end: Option<BlockNumber>,
pub vid: i64,
}

impl std::fmt::Display for EntityWrite<'_> {
Expand All @@ -89,24 +92,28 @@ impl<'a> TryFrom<&'a EntityModification> for EntityWrite<'a> {
data,
block,
end,
vid,
} => Ok(EntityWrite {
id: &key.entity_id,
entity: data,
causality_region: key.causality_region,
block: *block,
end: *end,
vid: *vid,
}),
EntityModification::Overwrite {
key,
data,
block,
end,
vid,
} => Ok(EntityWrite {
id: &key.entity_id,
entity: &data,
causality_region: key.causality_region,
block: *block,
end: *end,
vid: *vid,
}),

EntityModification::Remove { .. } => Err(()),
Expand Down Expand Up @@ -213,11 +220,13 @@ impl EntityModification {
data,
block,
end,
vid,
} => Ok(Insert {
key,
data,
block,
end,
vid,
}),
Remove { key, .. } => {
return Err(constraint_violation!(
Expand Down Expand Up @@ -271,21 +280,23 @@ impl EntityModification {
}

impl EntityModification {
pub fn insert(key: EntityKey, data: Entity, block: BlockNumber) -> Self {
pub fn insert(key: EntityKey, data: Entity, block: BlockNumber, vid: i64) -> Self {
EntityModification::Insert {
key,
data: Arc::new(data),
block,
end: None,
vid,
}
}

pub fn overwrite(key: EntityKey, data: Entity, block: BlockNumber) -> Self {
pub fn overwrite(key: EntityKey, data: Entity, block: BlockNumber, vid: i64) -> Self {
EntityModification::Overwrite {
key,
data: Arc::new(data),
block,
end: None,
vid,
}
}

Expand Down Expand Up @@ -1017,31 +1028,36 @@ mod test {

let value = value.clone();
let key = THING_TYPE.parse_key("one").unwrap();
let vid = 0;
match value {
Ins(block) => EntityModification::Insert {
key,
data: Arc::new(entity! { SCHEMA => id: "one", count: block }),
block,
end: None,
vid,
},
Ovw(block) => EntityModification::Overwrite {
key,
data: Arc::new(entity! { SCHEMA => id: "one", count: block }),
block,
end: None,
vid,
},
Rem(block) => EntityModification::Remove { key, block },
InsC(block, end) => EntityModification::Insert {
key,
data: Arc::new(entity! { SCHEMA => id: "one", count: block }),
block,
end: Some(end),
vid,
},
OvwC(block, end) => EntityModification::Overwrite {
key,
data: Arc::new(entity! { SCHEMA => id: "one", count: block }),
block,
end: Some(end),
vid,
},
}
}
Expand Down
10 changes: 10 additions & 0 deletions graph/src/components/subgraph/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ pub struct BlockState {
// data source that have been processed.
pub processed_data_sources: Vec<StoredDynamicDataSource>,

pub vid_seq: i32,

// Marks whether a handler is currently executing.
in_handler: bool,

Expand All @@ -92,6 +94,7 @@ impl BlockState {
persisted_data_sources: Vec::new(),
handler_created_data_sources: Vec::new(),
processed_data_sources: Vec::new(),
vid_seq: 0,
in_handler: false,
metrics: BlockStateMetrics::new(),
}
Expand All @@ -109,6 +112,7 @@ impl BlockState {
persisted_data_sources,
handler_created_data_sources,
processed_data_sources,
vid_seq: _,
in_handler,
metrics,
} = self;
Expand Down Expand Up @@ -178,4 +182,10 @@ impl BlockState {
pub fn persist_data_source(&mut self, ds: StoredDynamicDataSource) {
self.persisted_data_sources.push(ds)
}

pub fn next_vid(&mut self, block_number: BlockNumber) -> i64 {
let vid = ((block_number as i64) << 32) + self.vid_seq as i64;
self.vid_seq += 1;
vid
}
}
Loading