-
Notifications
You must be signed in to change notification settings - Fork 196
Revive the previous Filecoin.EthSubscribe PR
#5749
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 44 commits
0cb4a4e
cba87c3
b6ceae6
44b4cca
222bc9e
86240dc
a543037
57aef9d
693a6d6
65c61a4
fe0417b
a5e0982
55225b0
562a63c
0cd8d86
aa7a4f4
09c09c0
a1caba8
f19f808
12e458a
09b8871
5d0ed6c
e41cfa1
28f7a94
1910206
7464d10
de8213c
cc3dbe1
3ad7f33
4d5f1cd
0fe9ce5
b986e48
2e994b8
5377199
1e02435
d569eb4
065b89f
76e130d
dcb8576
a25d9a7
0e68b0f
c1f4969
6a7de37
16f06fe
e3811e8
290b5c4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,7 @@ use crate::lotus_json::{HasLotusJson, LotusJson, lotus_json_with_self}; | |
| #[cfg(test)] | ||
| use crate::lotus_json::{assert_all_snapshots, assert_unchanged_via_json}; | ||
| use crate::message::{ChainMessage, SignedMessage}; | ||
| use crate::rpc::eth::{EthLog, eth_logs_with_filter, types::ApiHeaders, types::EthFilterSpec}; | ||
| use crate::rpc::types::{ApiTipsetKey, Event}; | ||
| use crate::rpc::{ApiPaths, Ctx, EthEventHandler, Permission, RpcMethod, ServerError}; | ||
| use crate::shim::clock::ChainEpoch; | ||
|
|
@@ -45,6 +46,81 @@ use tokio::sync::{ | |
| Mutex, | ||
| broadcast::{self, Receiver as Subscriber}, | ||
| }; | ||
| use tokio::task::JoinHandle; | ||
|
|
||
| const HEAD_CHANNEL_CAPACITY: usize = 10; | ||
|
|
||
| /// Subscribes to head changes from the chain store and broadcasts new blocks. | ||
| /// | ||
| /// # Notes | ||
| /// | ||
| /// Spawns an internal `tokio` task that can be aborted anytime via the returned `JoinHandle`, | ||
| /// allowing manual cleanup if needed. | ||
| pub(crate) fn new_heads<DB: Blockstore>( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some docs would be helpful.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
| data: &crate::rpc::RPCState<DB>, | ||
| ) -> (Subscriber<ApiHeaders>, JoinHandle<()>) { | ||
| let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY); | ||
|
|
||
| let mut subscriber = data.chain_store().publisher().subscribe(); | ||
|
|
||
| let handle = tokio::spawn(async move { | ||
| while let Ok(v) = subscriber.recv().await { | ||
| let headers = match v { | ||
| HeadChange::Apply(ts) => ApiHeaders(ts.block_headers().clone().into()), | ||
| }; | ||
| if sender.send(headers).is_err() { | ||
|
akaladarshi marked this conversation as resolved.
Outdated
|
||
| break; | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| (receiver, handle) | ||
| } | ||
|
|
||
| /// Subscribes to head changes from the chain store and broadcasts new `Ethereum` logs. | ||
| /// | ||
| /// # Notes | ||
| /// | ||
| /// Spawns an internal `tokio` task that can be aborted anytime via the returned `JoinHandle`, | ||
| /// allowing manual cleanup if needed. | ||
| pub(crate) fn logs<DB: Blockstore + Sync + Send + 'static>( | ||
| ctx: &Ctx<DB>, | ||
| filter: Option<EthFilterSpec>, | ||
| ) -> (Subscriber<Vec<EthLog>>, JoinHandle<()>) { | ||
| let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY); | ||
|
|
||
| let mut subscriber = ctx.chain_store().publisher().subscribe(); | ||
|
|
||
| let ctx = ctx.clone(); | ||
|
|
||
| let handle = tokio::spawn(async move { | ||
| while let Ok(v) = subscriber.recv().await { | ||
| match v { | ||
| HeadChange::Apply(ts) => { | ||
| match eth_logs_with_filter(&ctx, &ts, filter.clone(), None).await { | ||
| Ok(logs) => { | ||
| if !logs.is_empty() { | ||
| if let Err(e) = sender.send(logs) { | ||
| tracing::error!( | ||
| "Failed to send logs for tipset {}: {}", | ||
| ts.key(), | ||
| e | ||
| ); | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| Err(e) => { | ||
| tracing::error!("Failed to fetch logs for tipset {}: {}", ts.key(), e); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| (receiver, handle) | ||
| } | ||
|
|
||
| pub enum ChainGetMessage {} | ||
| impl RpcMethod<1> for ChainGetMessage { | ||
|
|
@@ -721,7 +797,7 @@ pub(crate) fn chain_notify<DB: Blockstore>( | |
| _params: Params<'_>, | ||
| data: &crate::rpc::RPCState<DB>, | ||
| ) -> Subscriber<Vec<ApiHeadChange>> { | ||
| let (sender, receiver) = broadcast::channel(100); | ||
| let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY); | ||
|
|
||
| // As soon as the channel is created, send the current tipset | ||
| let current = data.chain_store().heaviest_tipset(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,8 @@ | |
| pub(crate) mod errors; | ||
| mod eth_tx; | ||
| pub mod filter; | ||
| pub mod pubsub; | ||
| pub(crate) mod pubsub_trait; | ||
| mod trace; | ||
| pub mod types; | ||
| mod utils; | ||
|
|
@@ -1325,7 +1327,7 @@ async fn new_eth_tx_receipt<DB: Blockstore + Send + Sync + 'static>( | |
| Ok(tx_receipt) | ||
| } | ||
|
|
||
| async fn eth_logs_for_block_and_transaction<DB: Blockstore + Send + Sync + 'static>( | ||
| pub async fn eth_logs_for_block_and_transaction<DB: Blockstore + Send + Sync + 'static>( | ||
| ctx: &Ctx<DB>, | ||
| ts: &Arc<Tipset>, | ||
| block_hash: &EthHash, | ||
|
|
@@ -1336,22 +1338,33 @@ async fn eth_logs_for_block_and_transaction<DB: Blockstore + Send + Sync + 'stat | |
| ..Default::default() | ||
| }; | ||
|
|
||
| eth_logs_with_filter(ctx, ts, Some(spec), Some(tx_hash)).await | ||
| } | ||
|
|
||
| pub async fn eth_logs_with_filter<DB: Blockstore + Send + Sync + 'static>( | ||
| ctx: &Ctx<DB>, | ||
| ts: &Arc<Tipset>, | ||
| spec: Option<EthFilterSpec>, | ||
| tx_hash: Option<&EthHash>, | ||
| ) -> anyhow::Result<Vec<EthLog>> { | ||
| let mut events = vec![]; | ||
| EthEventHandler::collect_events( | ||
| ctx, | ||
| ts, | ||
| Some(&spec), | ||
| spec.as_ref(), | ||
| SkipEvent::OnUnresolvedAddress, | ||
| &mut events, | ||
| ) | ||
| .await?; | ||
|
|
||
| let logs = eth_filter_logs_from_events(ctx, &events)?; | ||
| let out: Vec<_> = logs | ||
| .into_iter() | ||
| .filter(|log| &log.transaction_hash == tx_hash) | ||
| .collect(); | ||
| Ok(out) | ||
| Ok(match tx_hash { | ||
| Some(hash) => logs | ||
| .into_iter() | ||
| .filter(|log| &log.transaction_hash == hash) | ||
| .collect(), | ||
| None => logs, // no tx hash, keep all logs | ||
| }) | ||
| } | ||
|
|
||
| fn get_signed_message<DB: Blockstore>(ctx: &Ctx<DB>, message_cid: Cid) -> Result<SignedMessage> { | ||
|
|
@@ -2639,6 +2652,56 @@ impl RpcMethod<1> for EthUninstallFilter { | |
| } | ||
| } | ||
|
|
||
| pub enum EthUnsubscribe {} | ||
| impl RpcMethod<0> for EthUnsubscribe { | ||
| const NAME: &'static str = "Filecoin.EthUnsubscribe"; | ||
| const NAME_ALIAS: Option<&'static str> = Some("eth_unsubscribe"); | ||
| const PARAM_NAMES: [&'static str; 0] = []; | ||
| const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all(); | ||
| const PERMISSION: Permission = Permission::Read; | ||
| const SUBSCRIPTION: bool = true; | ||
|
|
||
| type Params = (); | ||
| type Ok = (); | ||
|
|
||
| // This method is a placeholder and is never actually called. | ||
| // Subscription handling is performed in [`pubsub.rs`](pubsub). | ||
| // | ||
| // We still need to implement the [`RpcMethod`] trait to expose method metadata | ||
| // like [`NAME`](Self::NAME), [`NAME_ALIAS`](Self::NAME_ALIAS), [`PERMISSION`](Self::PERMISSION), etc.. | ||
| async fn handle( | ||
| _: Ctx<impl Blockstore + Send + Sync + 'static>, | ||
| (): Self::Params, | ||
| ) -> Result<Self::Ok, ServerError> { | ||
| Ok(()) | ||
| } | ||
| } | ||
|
Comment on lines
+2654
to
+2678
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Verification agent 🧩 Analysis chainVerify placeholder implementation will be completed The 🏁 Script executed: #!/bin/bash
# Verify if the pubsub module contains the actual subscription implementation
fd -t f -e rs pubsub --exec grep -l "EthUnsubscribe\|unsubscribe" {}Length of output: 104 🏁 Script executed: #!/bin/bash
# Check for EthUnsubscribe occurrences in pubsub.rs
grep -R "EthUnsubscribe" -n src/rpc/methods/eth/pubsub.rs
# Check for unsubscribe logic in pubsub.rs
grep -R "unsubscribe" -n src/rpc/methods/eth/pubsub.rsLength of output: 199 Implement actual unsubscribe logic for EthUnsubscribe The current Please update:
🤖 Prompt for AI Agents |
||
|
|
||
| pub enum EthSubscribe {} | ||
| impl RpcMethod<0> for EthSubscribe { | ||
| const NAME: &'static str = "Filecoin.EthSubscribe"; | ||
| const NAME_ALIAS: Option<&'static str> = Some("eth_subscribe"); | ||
| const PARAM_NAMES: [&'static str; 0] = []; | ||
| const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all(); | ||
| const PERMISSION: Permission = Permission::Read; | ||
| const SUBSCRIPTION: bool = true; | ||
|
|
||
| type Params = (); | ||
| type Ok = (); | ||
|
|
||
| // This method is a placeholder and is never actually called. | ||
| // Subscription handling is performed in [`pubsub.rs`](pubsub). | ||
| // | ||
| // We still need to implement the [`RpcMethod`] trait to expose method metadata | ||
| // like [`NAME`](Self::NAME), [`NAME_ALIAS`](Self::NAME_ALIAS), [`PERMISSION`](Self::PERMISSION), etc.. | ||
| async fn handle( | ||
| _: Ctx<impl Blockstore + Send + Sync + 'static>, | ||
| (): Self::Params, | ||
| ) -> Result<Self::Ok, ServerError> { | ||
| Ok(()) | ||
| } | ||
| } | ||
|
|
||
| pub enum EthAddressToFilecoinAddress {} | ||
| impl RpcMethod<1> for EthAddressToFilecoinAddress { | ||
| const NAME: &'static str = "Filecoin.EthAddressToFilecoinAddress"; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should claim we added support for
EthUnsubscribe. As the AI reviewer noted, it's a placeholder implementation.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, the
EthSubscribeseems also empty?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some comments to explain why we need them.