Skip to content

Commit

Permalink
Feat/recover id event jobs (#549)
Browse files Browse the repository at this point in the history
* feat: add doc store

* feat: add doc order and event recover

* feat: add recover

* feat: add start block
  • Loading branch information
imotai authored Jul 4, 2023
1 parent 8d9e2e9 commit f1ac0b0
Show file tree
Hide file tree
Showing 12 changed files with 359 additions and 32 deletions.
5 changes: 1 addition & 4 deletions java/src/test/java/network/db3/provider/ClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,7 @@ public void testAddCollection() {
Optional<Db3DatabaseV2.Collection> collection = client.getCollection(result.getDb(), "col1");
Assert.assertEquals(collection.isPresent(), true);
Assert.assertEquals(collection.get().getName(), "col1");
String doc = """{
"name":"a"}
""";
String doc = "{\"name\":1}";
AddDocResult addDocResult = client.addDoc(result.getDb(), "col1", doc);
Assert.assertNotNull(addDocResult.getMutationId());
Thread.sleep(1000 * 3);
Expand Down
2 changes: 1 addition & 1 deletion sdk/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "db3.js",
"version": "0.3.16",
"version": "0.3.17",
"description": "DB3 Network Javascript API",
"author": "dbpunk labs",
"keywords": [
Expand Down
4 changes: 3 additions & 1 deletion sdk/src/store/database_v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ export async function createEventDatabase(
contractAddress: string,
tables: string[],
abi: string,
evmNodeUrl: string
evmNodeUrl: string,
startBlock: string
) {
const collections = tables.map((name) => {
const collection: CollectionMutation = {
Expand All @@ -73,6 +74,7 @@ export async function createEventDatabase(
tables: collections,
eventsJsonAbi: abi,
evmNodeUrl,
startBlock,
}
const body: Mutation_BodyWrapper = {
body: {
Expand Down
3 changes: 2 additions & 1 deletion sdk/tests/client_v2.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ describe('test db3.js client module', () => {
'0x0d500B1d8E8eF31E21C99d1Db9A6444d3ADf1270',
['Transfer', 'Deposit', 'Approval', 'Withdrawal'],
abi,
evmNodeUrl
evmNodeUrl,
0
)
console.log(response)
await new Promise((r) => setTimeout(r, 10000))
Expand Down
24 changes: 22 additions & 2 deletions src/event/src/event_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub struct EventProcessorConfig {
pub abi: String,
pub target_events: HashSet<String>,
pub contract_addr: String,
pub start_block: u64,
}

pub struct EventProcessor {
Expand Down Expand Up @@ -100,7 +101,18 @@ impl EventProcessor {
.map_err(|e| DB3Error::StoreEventError(format!("{e}")))?;
let db_addr = DB3Address::from_hex(self.config.db_addr.as_str())
.map_err(|e| DB3Error::StoreEventError(format!("{e}")))?;
let filter = Filter::new().address(address);
let filter = match self.config.start_block == 0 {
true => Filter::new().address(address),
false => {
info!(
"start process contract from block {} with address {}",
self.config.start_block, self.config.contract_addr
);
Filter::new()
.from_block(self.config.start_block)
.address(address)
}
};
let mut stream = self
.provider
.subscribe_logs(&filter)
Expand Down Expand Up @@ -193,7 +205,15 @@ impl EventProcessor {
Token::Uint(value) | Token::Int(value) => {
serde_json::value::Value::String(value.to_string())
}
_ => todo!(),
Token::FixedBytes(bytes) | Token::Bytes(bytes) => {
serde_json::value::Value::String(hex::encode(bytes))
}
Token::Bool(value) => serde_json::value::Value::Bool(*value),
Token::Array(tokens) | Token::FixedArray(tokens) | Token::Tuple(tokens) => {
serde_json::value::Value::Array(
tokens.iter().map(|t| Self::param_to_value(t)).collect(),
)
}
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions src/node/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ pub enum DB3Command {
/// the admin address which can change the configuration this node
#[clap(long, default_value = "0x0000000000000000000000000000000000000000")]
admin_addr: String,
/// this is just for upgrade the node
#[clap(long, default_value = "10000")]
doc_id_start: i64,
},

/// Start db3 interactive console
Expand Down Expand Up @@ -258,6 +261,7 @@ impl DB3Command {
contract_addr,
evm_node_url,
admin_addr,
doc_id_start,
} => {
let log_level = if verbose {
LevelFilter::DEBUG
Expand All @@ -283,6 +287,7 @@ impl DB3Command {
contract_addr.as_str(),
evm_node_url.as_str(),
admin_addr.as_str(),
doc_id_start,
)
.await;
let running = Arc::new(AtomicBool::new(true));
Expand Down Expand Up @@ -348,6 +353,7 @@ impl DB3Command {
scan_max_limit: 1000,
enable_doc_store: true,
doc_store_conf,
doc_start_id: 0,
};
let addr = format!("{public_host}:{public_grpc_port}");
let indexer = IndexerNodeImpl::new(
Expand All @@ -361,6 +367,7 @@ impl DB3Command {
)
.unwrap();
let indexer_for_syncing = indexer.clone();
indexer.recover().await.unwrap();
let listen = tokio::spawn(async move {
info!("start syncing data from storage node");
indexer_for_syncing
Expand Down Expand Up @@ -418,6 +425,7 @@ impl DB3Command {
contract_addr: &str,
evm_node_url: &str,
admin_addr: &str,
doc_start_id: i64,
) {
let addr = format!("{public_host}:{public_grpc_port}");

Expand Down Expand Up @@ -458,6 +466,7 @@ impl DB3Command {
scan_max_limit: 1000,
enable_doc_store: false,
doc_store_conf: DocStoreConfig::default(),
doc_start_id,
};

let (sender, receiver) = tokio::sync::mpsc::channel::<(
Expand All @@ -483,6 +492,7 @@ impl DB3Command {
addr, network_id
);
std::fs::create_dir_all(rollup_data_path).unwrap();
storage_node.recover().unwrap();
storage_node.keep_subscription(receiver).await.unwrap();
storage_node.start_to_produce_block().await;
storage_node.start_to_rollup().await;
Expand Down
68 changes: 51 additions & 17 deletions src/node/src/indexer_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use db3_proto::db3_indexer_proto::{
GetSystemStatusRequest, RunQueryRequest, RunQueryResponse, SetupRequest, SetupResponse,
};
use db3_proto::db3_mutation_v2_proto::mutation::body_wrapper::Body;
use db3_proto::db3_mutation_v2_proto::EventDatabaseMutation;
use db3_proto::db3_mutation_v2_proto::MutationAction;
use db3_proto::db3_storage_proto::block_response::MutationWrapper;
use db3_proto::db3_storage_proto::event_message;
Expand Down Expand Up @@ -85,6 +84,28 @@ impl IndexerNodeImpl {
})
}

pub async fn recover(&self) -> Result<()> {
self.db_store.recover_db_state()?;
let databases = self.db_store.get_all_event_db()?;
for database in databases {
let address_ref: &[u8] = database.address.as_ref();
let db_address = DB3Address::try_from(address_ref)?;
let (collections, _) = self.db_store.get_collection_of_database(&db_address)?;
let tables = collections.iter().map(|c| c.name.to_string()).collect();
self.start_an_event_task(
&db_address,
database.evm_node_url.as_str(),
database.events_json_abi.as_str(),
&tables,
database.contract_address.as_str(),
0,
)
.await?;
info!("recover the event db {} done", db_address.to_hex());
}
Ok(())
}

/// start standalone indexer block syncer
/// 1. subscribe db3 event
/// 2. handle event to sync db3 node block
Expand Down Expand Up @@ -160,18 +181,19 @@ impl IndexerNodeImpl {
async fn start_an_event_task(
&self,
db: &DB3Address,
mutation: &EventDatabaseMutation,
evm_node_url: &str,
abi: &str,
tables: &Vec<String>,
contract_address: &str,
start_block: u64,
) -> Result<()> {
let config = EventProcessorConfig {
evm_node_url: mutation.evm_node_url.to_string(),
evm_node_url: evm_node_url.to_string(),
db_addr: db.to_hex(),
abi: mutation.events_json_abi.to_string(),
target_events: mutation
.tables
.iter()
.map(|t| t.collection_name.to_string())
.collect(),
contract_addr: mutation.contract_address.to_string(),
abi: abi.to_string(),
target_events: tables.iter().map(|t| t.to_string()).collect(),
contract_addr: contract_address.to_string(),
start_block,
};
let processor = Arc::new(
EventProcessor::new(config, self.db_store.clone())
Expand All @@ -181,14 +203,14 @@ impl IndexerNodeImpl {
match self.processor_mapping.lock() {
Ok(mut mapping) => {
//TODO limit the total count
if mapping.contains_key(mutation.contract_address.as_str()) {
warn!("contract addr {} exist", mutation.contract_address.as_str());
if mapping.contains_key(contract_address) {
warn!("contract addr {} exist", contract_address);
return Err(DB3Error::WriteStoreError(format!(
"contract_addr {} exist",
mutation.contract_address.as_str()
contract_address
)));
}
mapping.insert(mutation.contract_address.to_string(), processor.clone());
mapping.insert(contract_address.to_string(), processor.clone());
}
_ => todo!(),
}
Expand Down Expand Up @@ -237,9 +259,21 @@ impl IndexerNodeImpl {
order,
)
.map_err(|e| DB3Error::WriteStoreError(format!("{e}")))?;
self.start_an_event_task(db_id.address(), mutation)
.await
.map_err(|e| DB3Error::WriteStoreError(format!("{e}")))?;
let tables = mutation
.tables
.iter()
.map(|t| t.collection_name.to_string())
.collect();
self.start_an_event_task(
db_id.address(),
mutation.evm_node_url.as_str(),
mutation.events_json_abi.as_str(),
&tables,
mutation.contract_address.as_str(),
mutation.start_block,
)
.await
.map_err(|e| DB3Error::WriteStoreError(format!("{e}")))?;
let db_id_hex = db_id.to_hex();
info!(
"add event database with addr {} from owner {}",
Expand Down
2 changes: 2 additions & 0 deletions src/node/src/storage_node_light_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ impl StorageNodeV2Impl {
c.min_rollup_size, c.rollup_interval, c.network_id
);
}
self.db_store.recover_db_state()?;
Ok(())
}

Expand Down Expand Up @@ -955,6 +956,7 @@ mod tests {
scan_max_limit: 1000,
enable_doc_store: false,
doc_store_conf: DocStoreConfig::default(),
doc_start_id: 0,
};
StorageNodeV2Config {
store_config,
Expand Down
4 changes: 4 additions & 0 deletions src/proto/proto/db3_database_v2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ message EventDatabase {
uint64 ttl = 5;
string events_json_abi = 6;
string evm_node_url = 7;
uint64 start_block = 8;
}

message DatabaseMessage {
Expand All @@ -54,12 +55,15 @@ message CollectionState {
message DatabaseState {
uint64 total_doc_count = 2;
uint64 total_col_count = 3;
int64 doc_order = 4;
}

message DatabaseStatePersistence {
string addr = 1;
uint64 total_doc_count = 2;
uint64 total_col_count = 3;
map<string, CollectionState> collection_states = 4;
int64 doc_order = 5;
}

message EventTable {
Expand Down
1 change: 1 addition & 0 deletions src/proto/proto/db3_mutation_v2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ message EventDatabaseMutation {
repeated CollectionMutation tables = 4;
string events_json_abi = 5;
string evm_node_url = 6;
uint64 start_block = 7;
}

message DocumentMask {
Expand Down
17 changes: 15 additions & 2 deletions src/storage/src/db_doc_key_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
use db3_crypto::db3_address::DB3Address;
use db3_error::Result;
use db3_crypto::db3_address::{DB3Address, DB3_ADDRESS_LENGTH};
use db3_error::{DB3Error, Result};
use std::fmt;
const DOC_PREFIX: &str = "/doc/";
const KEY_LENGTH: usize = DOC_PREFIX.len() + DB3_ADDRESS_LENGTH + 8;
/// DBDocKey with db address, doc id
pub struct DbDocKeyV2<'a>(pub &'a DB3Address, pub i64);
impl<'a> DbDocKeyV2<'a> {
Expand All @@ -30,6 +31,18 @@ impl<'a> DbDocKeyV2<'a> {
encoded_key.extend_from_slice(self.1.to_be_bytes().as_ref());
Ok(encoded_key)
}

pub fn decode_id(key: &[u8]) -> Result<i64> {
if key.len() != KEY_LENGTH {
return Err(DB3Error::KeyCodecError(
"invalid doc key length".to_string(),
));
}
Ok(i64::from_be_bytes(
<[u8; 8]>::try_from(&key[KEY_LENGTH - 8..KEY_LENGTH])
.map_err(|e| DB3Error::KeyCodecError(format!("get doc id err {e}")))?,
))
}
}

impl fmt::Display for DbDocKeyV2<'_> {
Expand Down
Loading

0 comments on commit f1ac0b0

Please sign in to comment.