diff --git a/cmd/rpcdaemon/rpcservices/eth_backend.go b/cmd/rpcdaemon/rpcservices/eth_backend.go index b4878a9f503..edf3829b75d 100644 --- a/cmd/rpcdaemon/rpcservices/eth_backend.go +++ b/cmd/rpcdaemon/rpcservices/eth_backend.go @@ -283,6 +283,29 @@ func (back *RemoteBackend) SubscribeLogs(ctx context.Context, onNewLogs func(rep return nil } +func (back *RemoteBackend) SubscribeReceipts(ctx context.Context, onNewReceipts func(reply *remoteproto.SubscribeReceiptsReply), requestor *atomic.Value) error { + subscription, err := back.remoteEthBackend.SubscribeReceipts(ctx, grpc.WaitForReady(true)) + if err != nil { + if s, ok := status.FromError(err); ok { + return errors.New(s.Message()) + } + return err + } + requestor.Store(subscription.Send) + for { + receipts, err := subscription.Recv() + if errors.Is(err, io.EOF) { + log.Info("rpcdaemon: the receipts subscription channel was closed") + break + } + if err != nil { + return err + } + onNewReceipts(receipts) + } + return nil +} + func (back *RemoteBackend) TxnLookup(ctx context.Context, tx kv.Getter, txnHash common.Hash) (uint64, uint64, bool, error) { return back.blockReader.TxnLookup(ctx, tx, txnHash) } diff --git a/execution/engineapi/engine_helpers/fork_validator.go b/execution/engineapi/engine_helpers/fork_validator.go index a37aff7b1e3..3d4c1ede057 100644 --- a/execution/engineapi/engine_helpers/fork_validator.go +++ b/execution/engineapi/engine_helpers/fork_validator.go @@ -141,7 +141,7 @@ func (fv *ForkValidator) NotifyCurrentHeight(currentHeight uint64) { } // FlushExtendingFork flush the current extending fork if fcu chooses its head hash as the its forkchoice. -func (fv *ForkValidator) FlushExtendingFork(tx kv.TemporalRwTx, accumulator *shards.Accumulator, recentLogs *shards.RecentLogs) error { +func (fv *ForkValidator) FlushExtendingFork(tx kv.TemporalRwTx, accumulator *shards.Accumulator, recentReceipts *shards.RecentReceipts) error { fv.lock.Lock() defer fv.lock.Unlock() start := time.Now() @@ -163,7 +163,7 @@ func (fv *ForkValidator) FlushExtendingFork(tx kv.TemporalRwTx, accumulator *sha timings[BlockTimingsFlushExtendingFork] = time.Since(start) fv.timingsCache.Add(fv.extendingForkHeadHash, timings) fv.extendingForkNotifications.Accumulator.CopyAndReset(accumulator) - fv.extendingForkNotifications.RecentLogs.CopyAndReset(recentLogs) + fv.extendingForkNotifications.RecentReceipts.CopyAndReset(recentReceipts) // Clean extending fork data fv.sharedDom = nil diff --git a/execution/execmodule/ethereum_execution.go b/execution/execmodule/ethereum_execution.go index 9a60f5a6b66..4d5eb3f979f 100644 --- a/execution/execmodule/ethereum_execution.go +++ b/execution/execmodule/ethereum_execution.go @@ -112,7 +112,7 @@ type EthereumExecutionModule struct { // Changes accumulator hook *stageloop.Hook accumulator *shards.Accumulator - recentLogs *shards.RecentLogs + recentReceipts *shards.RecentReceipts stateChangeConsumer shards.StateChangeConsumer // configuration @@ -133,7 +133,7 @@ func NewEthereumExecutionModule(blockReader services.FullBlockReader, db kv.Temp executionPipeline *stagedsync.Sync, forkValidator *engine_helpers.ForkValidator, config *chain.Config, builderFunc builder.BlockBuilderFunc, hook *stageloop.Hook, accumulator *shards.Accumulator, - recentLogs *shards.RecentLogs, + recentReceipts *shards.RecentReceipts, stateChangeConsumer shards.StateChangeConsumer, logger log.Logger, engine rules.Engine, syncCfg ethconfig.Sync, @@ -151,7 +151,7 @@ func NewEthereumExecutionModule(blockReader services.FullBlockReader, db kv.Temp semaphore: semaphore.NewWeighted(1), hook: hook, accumulator: accumulator, - recentLogs: recentLogs, + recentReceipts: recentReceipts, stateChangeConsumer: stateChangeConsumer, engine: engine, syncCfg: syncCfg, diff --git a/execution/execmodule/forkchoice.go b/execution/execmodule/forkchoice.go index 4ddd60f189f..25b09d0ced4 100644 --- a/execution/execmodule/forkchoice.go +++ b/execution/execmodule/forkchoice.go @@ -449,7 +449,7 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original ValidationError: validationError, }, false) } - if err := e.forkValidator.FlushExtendingFork(tx, e.accumulator, e.recentLogs); err != nil { + if err := e.forkValidator.FlushExtendingFork(tx, e.accumulator, e.recentReceipts); err != nil { sendForkchoiceErrorWithoutWaiting(e.logger, outcomeCh, err, stateFlushingInParallel) return } diff --git a/execution/stagedsync/exec3_parallel.go b/execution/stagedsync/exec3_parallel.go index 08e4f6134ef..fff182f91a8 100644 --- a/execution/stagedsync/exec3_parallel.go +++ b/execution/stagedsync/exec3_parallel.go @@ -234,7 +234,7 @@ func (pe *parallelExecutor) exec(ctx context.Context, execStage *StageState, u U } if !pe.isMining && !applyResult.isPartial && !execStage.CurrentSyncCycle.IsInitialCycle { - pe.cfg.notifications.RecentLogs.Add(applyResult.Receipts) + pe.cfg.notifications.RecentReceipts.Add(applyResult.Receipts, b.Transactions(), lastHeader) } } diff --git a/execution/stagedsync/exec3_serial.go b/execution/stagedsync/exec3_serial.go index 3dcd8fa801e..524d30fb820 100644 --- a/execution/stagedsync/exec3_serial.go +++ b/execution/stagedsync/exec3_serial.go @@ -433,7 +433,7 @@ func (se *serialExecutor) executeBlock(ctx context.Context, tasks []exec.Task, i } if !se.isMining && startTxIndex == 0 && !isInitialCycle { - se.cfg.notifications.RecentLogs.Add(blockReceipts) + se.cfg.notifications.RecentReceipts.Add(blockReceipts, txTask.Txs, txTask.Header) } checkReceipts := !se.cfg.vmConfig.StatelessExec && se.cfg.chainConfig.IsByzantium(txTask.BlockNumber()) && !se.cfg.vmConfig.NoReceipts && !se.isMining if txTask.BlockNumber() > 0 && startTxIndex == 0 { diff --git a/execution/stagedsync/stagebuilder.go b/execution/stagedsync/stagebuilder.go index e471a152692..4dbaea5b867 100644 --- a/execution/stagedsync/stagebuilder.go +++ b/execution/stagedsync/stagebuilder.go @@ -32,6 +32,8 @@ type ChainEventNotifier interface { OnNewPendingLogs(types.Logs) OnLogs([]*remoteproto.SubscribeLogsReply) HasLogSubscriptions() bool + OnReceipts([]*remoteproto.SubscribeReceiptsReply) + HasReceiptSubscriptions() bool } func MiningStages( diff --git a/execution/stagedsync/stageloop/stageloop.go b/execution/stagedsync/stageloop/stageloop.go index b7d51920e1f..f5d6ccd7c9f 100644 --- a/execution/stagedsync/stageloop/stageloop.go +++ b/execution/stagedsync/stageloop/stageloop.go @@ -512,7 +512,8 @@ func (h *Hook) sendNotifications(tx kv.Tx, finishStageBeforeSync, finishStageAft if err = stagedsync.NotifyNewHeaders(h.ctx, notifyFrom, notifyTo, h.notifications.Events, tx, h.logger); err != nil { return nil } - h.notifications.RecentLogs.Notify(h.notifications.Events, notifyFrom, notifyTo, isUnwind) + h.notifications.RecentReceipts.NotifyReceipts(h.notifications.Events, notifyFrom, notifyTo, isUnwind) + h.notifications.RecentReceipts.NotifyLogs(h.notifications.Events, notifyFrom, notifyTo, isUnwind) } currentHeader := rawdb.ReadCurrentHeader(tx) diff --git a/execution/tests/mock/mock_sentry.go b/execution/tests/mock/mock_sentry.go index fe11677d50e..33967aa9df1 100644 --- a/execution/tests/mock/mock_sentry.go +++ b/execution/tests/mock/mock_sentry.go @@ -549,7 +549,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK pipelineStages := stageloop.NewPipelineStages(mock.Ctx, db, &cfg, mock.sentriesClient, mock.Notifications, snapDownloader, mock.BlockReader, blockRetire, nil, forkValidator, tracer) mock.posStagedSync = stagedsync.New(cfg.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger, stages.ModeApplyingBlocks) - mock.Eth1ExecutionService = execmodule.NewEthereumExecutionModule(mock.BlockReader, mock.DB, mock.posStagedSync, forkValidator, mock.ChainConfig, assembleBlockPOS, nil, mock.Notifications.Accumulator, mock.Notifications.RecentLogs, mock.Notifications.StateChangesConsumer, logger, engine, cfg.Sync, ctx) + mock.Eth1ExecutionService = execmodule.NewEthereumExecutionModule(mock.BlockReader, mock.DB, mock.posStagedSync, forkValidator, mock.ChainConfig, assembleBlockPOS, nil, mock.Notifications.Accumulator, mock.Notifications.RecentReceipts, mock.Notifications.StateChangesConsumer, logger, engine, cfg.Sync, ctx) mock.sentriesClient.Hd.StartPoSDownloader(mock.Ctx, sendHeaderRequest, penalize) diff --git a/execution/types/ethutils/receipt.go b/execution/types/ethutils/receipt.go index d1b48767a3e..621015c0213 100644 --- a/execution/types/ethutils/receipt.go +++ b/execution/types/ethutils/receipt.go @@ -28,6 +28,8 @@ import ( "github.com/erigontech/erigon/execution/protocol/misc" "github.com/erigontech/erigon/execution/types" "github.com/erigontech/erigon/execution/types/accounts" + "github.com/erigontech/erigon/node/gointerfaces" + "github.com/erigontech/erigon/node/gointerfaces/remoteproto" ) func MarshalReceipt( @@ -128,3 +130,85 @@ func MarshalReceipt( return fields } + +func MarshalSubscribeReceipt(protoReceipt *remoteproto.SubscribeReceiptsReply) map[string]interface{} { + receipt := make(map[string]interface{}) + + // Basic metadata - convert to proper hex strings + blockHash := common.Hash(gointerfaces.ConvertH256ToHash(protoReceipt.BlockHash)) + receipt["blockHash"] = blockHash + receipt["blockNumber"] = hexutil.Uint64(protoReceipt.BlockNumber) + txHash := common.Hash(gointerfaces.ConvertH256ToHash(protoReceipt.TransactionHash)) + receipt["transactionHash"] = txHash + receipt["transactionIndex"] = hexutil.Uint64(protoReceipt.TransactionIndex) + + // From address as hex string + from := common.Address(gointerfaces.ConvertH160toAddress(protoReceipt.From)) + receipt["from"] = from + + // To can be null for contract creation + if protoReceipt.To != nil { + toAddr := common.Address(gointerfaces.ConvertH160toAddress(protoReceipt.To)) + if toAddr != (common.Address{}) { + receipt["to"] = toAddr + } else { + receipt["to"] = nil + } + } else { + receipt["to"] = nil + } + + receipt["type"] = hexutil.Uint64(protoReceipt.Type) + receipt["status"] = hexutil.Uint64(protoReceipt.Status) + receipt["cumulativeGasUsed"] = hexutil.Uint64(protoReceipt.CumulativeGasUsed) + receipt["gasUsed"] = hexutil.Uint64(protoReceipt.GasUsed) + + if protoReceipt.ContractAddress != nil { + addr := common.Address(gointerfaces.ConvertH160toAddress(protoReceipt.ContractAddress)) + if addr != (common.Address{}) { + receipt["contractAddress"] = addr + } else { + receipt["contractAddress"] = nil + } + } else { + receipt["contractAddress"] = nil + } + + if len(protoReceipt.LogsBloom) > 0 { + receipt["logsBloom"] = hexutil.Bytes(protoReceipt.LogsBloom) + } + + logs := make([]map[string]interface{}, 0, len(protoReceipt.Logs)) + for _, protoLog := range protoReceipt.Logs { + logEntry := make(map[string]interface{}) + + if protoLog.Address != nil { + logEntry["address"] = common.Address(gointerfaces.ConvertH160toAddress(protoLog.Address)) + } + + topics := make([]common.Hash, len(protoLog.Topics)) + for i, topic := range protoLog.Topics { + topics[i] = common.Hash(gointerfaces.ConvertH256ToHash(topic)) + } + logEntry["topics"] = topics + logEntry["data"] = hexutil.Bytes(protoLog.Data) + + logs = append(logs, logEntry) + } + receipt["logs"] = logs + + if protoReceipt.BaseFee != nil { + baseFee := gointerfaces.ConvertH256ToUint256Int(protoReceipt.BaseFee) + receipt["effectiveGasPrice"] = (*hexutil.Big)(baseFee.ToBig()) + } + + if protoReceipt.BlobGasUsed > 0 { + receipt["blobGasUsed"] = hexutil.Uint64(protoReceipt.BlobGasUsed) + } + if protoReceipt.BlobGasPrice != nil { + blobGasPrice := gointerfaces.ConvertH256ToUint256Int(protoReceipt.BlobGasPrice) + receipt["blobGasPrice"] = (*hexutil.Big)(blobGasPrice.ToBig()) + } + + return receipt +} diff --git a/node/direct/eth_backend_client.go b/node/direct/eth_backend_client.go index d25f3f544c4..b23d8222f1b 100644 --- a/node/direct/eth_backend_client.go +++ b/node/direct/eth_backend_client.go @@ -209,6 +209,96 @@ func (c *SubscribeLogsStreamC) Recv() (*remoteproto.SubscribeLogsReply, error) { // -- end SubscribeLogs +// -- SubscribeReceipts + +func (s *EthBackendClientDirect) SubscribeReceipts(ctx context.Context, opts ...grpc.CallOption) (remoteproto.ETHBACKEND_SubscribeReceiptsClient, error) { + subscribeReceiptsRequestChan := make(chan *subscribeReceiptsRequest, 16384) + subscribeReceiptsReplyChan := make(chan *subscribeReceiptsReply, 16384) + srv := &SubscribeReceiptsStreamS{ + chSend: subscribeReceiptsReplyChan, + chRecv: subscribeReceiptsRequestChan, + ctx: ctx, + } + go func() { + defer close(subscribeReceiptsRequestChan) + defer close(subscribeReceiptsReplyChan) + srv.Err(s.server.SubscribeReceipts(srv)) + }() + cli := &SubscribeReceiptsStreamC{ + chSend: subscribeReceiptsRequestChan, + chRecv: subscribeReceiptsReplyChan, + ctx: ctx, + } + return cli, nil +} + +type SubscribeReceiptsStreamS struct { + chSend chan *subscribeReceiptsReply + chRecv chan *subscribeReceiptsRequest + ctx context.Context + grpc.ServerStream +} + +type subscribeReceiptsReply struct { + r *remoteproto.SubscribeReceiptsReply + err error +} + +type subscribeReceiptsRequest struct { + r *remoteproto.ReceiptsFilterRequest + err error +} + +func (s *SubscribeReceiptsStreamS) Send(m *remoteproto.SubscribeReceiptsReply) error { + s.chSend <- &subscribeReceiptsReply{r: m} + return nil +} + +func (s *SubscribeReceiptsStreamS) Recv() (*remoteproto.ReceiptsFilterRequest, error) { + select { + case m, ok := <-s.chRecv: + if !ok || m == nil { + return nil, io.EOF + } + return m.r, m.err + case <-s.ctx.Done(): + return nil, s.ctx.Err() + } +} + +func (s *SubscribeReceiptsStreamS) Err(err error) { + if err == nil { + return + } + s.chSend <- &subscribeReceiptsReply{err: err} +} + +type SubscribeReceiptsStreamC struct { + chSend chan *subscribeReceiptsRequest + chRecv chan *subscribeReceiptsReply + ctx context.Context + grpc.ClientStream +} + +func (c *SubscribeReceiptsStreamC) Send(m *remoteproto.ReceiptsFilterRequest) error { + c.chSend <- &subscribeReceiptsRequest{r: m} + return nil +} + +func (c *SubscribeReceiptsStreamC) Recv() (*remoteproto.SubscribeReceiptsReply, error) { + select { + case m, ok := <-c.chRecv: + if !ok || m == nil { + return nil, io.EOF + } + return m.r, m.err + case <-c.ctx.Done(): + return nil, c.ctx.Err() + } +} + +// -- end SubscribeReceipts + func (s *EthBackendClientDirect) CanonicalBodyForStorage(ctx context.Context, in *remoteproto.CanonicalBodyForStorageRequest, opts ...grpc.CallOption) (*remoteproto.CanonicalBodyForStorageReply, error) { return s.server.CanonicalBodyForStorage(ctx, in) } diff --git a/node/eth/backend.go b/node/eth/backend.go index 0eba40134e1..a53a8e5a298 100644 --- a/node/eth/backend.go +++ b/node/eth/backend.go @@ -991,7 +991,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger pipelineStages := stageloop.NewPipelineStages(ctx, backend.chainDB, config, backend.sentriesClient, backend.notifications, backend.downloaderClient, blockReader, blockRetire, backend.silkworm, backend.forkValidator, tracer) backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger, stages.ModeApplyingBlocks) - backend.eth1ExecutionServer = execmodule.NewEthereumExecutionModule(blockReader, backend.chainDB, backend.pipelineStagedSync, backend.forkValidator, chainConfig, assembleBlockPOS, hook, backend.notifications.Accumulator, backend.notifications.RecentLogs, backend.notifications.StateChangesConsumer, logger, backend.engine, config.Sync, ctx) + backend.eth1ExecutionServer = execmodule.NewEthereumExecutionModule(blockReader, backend.chainDB, backend.pipelineStagedSync, backend.forkValidator, chainConfig, assembleBlockPOS, hook, backend.notifications.Accumulator, backend.notifications.RecentReceipts, backend.notifications.StateChangesConsumer, logger, backend.engine, config.Sync, ctx) executionRpc := direct.NewExecutionClientDirect(backend.eth1ExecutionServer) var executionEngine executionclient.ExecutionEngine @@ -1647,6 +1647,7 @@ func (s *Ethereum) SentryCtx() context.Context { func (s *Ethereum) SentryControlServer() *sentry_multi_client.MultiClient { return s.sentriesClient } + func (s *Ethereum) BlockIO() (services.FullBlockReader, *blockio.BlockWriter) { return s.blockReader, s.blockWriter } diff --git a/node/gointerfaces/remoteproto/ethbackend.pb.go b/node/gointerfaces/remoteproto/ethbackend.pb.go index b957194b19f..aca01caa726 100644 --- a/node/gointerfaces/remoteproto/ethbackend.pb.go +++ b/node/gointerfaces/remoteproto/ethbackend.pb.go @@ -1090,6 +1090,246 @@ func (x *SubscribeLogsReply) GetRemoved() bool { return false } +type ReceiptsFilterRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + AllTransactions bool `protobuf:"varint,1,opt,name=all_transactions,json=allTransactions,proto3" json:"all_transactions,omitempty"` + TransactionHashes []*typesproto.H256 `protobuf:"bytes,2,rep,name=transaction_hashes,json=transactionHashes,proto3" json:"transaction_hashes,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ReceiptsFilterRequest) Reset() { + *x = ReceiptsFilterRequest{} + mi := &file_remote_ethbackend_proto_msgTypes[21] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ReceiptsFilterRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReceiptsFilterRequest) ProtoMessage() {} + +func (x *ReceiptsFilterRequest) ProtoReflect() protoreflect.Message { + mi := &file_remote_ethbackend_proto_msgTypes[21] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReceiptsFilterRequest.ProtoReflect.Descriptor instead. +func (*ReceiptsFilterRequest) Descriptor() ([]byte, []int) { + return file_remote_ethbackend_proto_rawDescGZIP(), []int{21} +} + +func (x *ReceiptsFilterRequest) GetAllTransactions() bool { + if x != nil { + return x.AllTransactions + } + return false +} + +func (x *ReceiptsFilterRequest) GetTransactionHashes() []*typesproto.H256 { + if x != nil { + return x.TransactionHashes + } + return nil +} + +type SubscribeReceiptsReply struct { + state protoimpl.MessageState `protogen:"open.v1"` + BlockHash *typesproto.H256 `protobuf:"bytes,1,opt,name=block_hash,json=blockHash,proto3" json:"block_hash,omitempty"` + BlockNumber uint64 `protobuf:"varint,2,opt,name=block_number,json=blockNumber,proto3" json:"block_number,omitempty"` + TransactionHash *typesproto.H256 `protobuf:"bytes,3,opt,name=transaction_hash,json=transactionHash,proto3" json:"transaction_hash,omitempty"` + TransactionIndex uint64 `protobuf:"varint,4,opt,name=transaction_index,json=transactionIndex,proto3" json:"transaction_index,omitempty"` + Type uint32 `protobuf:"varint,5,opt,name=type,proto3" json:"type,omitempty"` + Status uint64 `protobuf:"varint,6,opt,name=status,proto3" json:"status,omitempty"` + CumulativeGasUsed uint64 `protobuf:"varint,7,opt,name=cumulative_gas_used,json=cumulativeGasUsed,proto3" json:"cumulative_gas_used,omitempty"` + GasUsed uint64 `protobuf:"varint,8,opt,name=gas_used,json=gasUsed,proto3" json:"gas_used,omitempty"` + ContractAddress *typesproto.H160 `protobuf:"bytes,9,opt,name=contract_address,json=contractAddress,proto3" json:"contract_address,omitempty"` + LogsBloom []byte `protobuf:"bytes,10,opt,name=logs_bloom,json=logsBloom,proto3" json:"logs_bloom,omitempty"` + Logs []*SubscribeLogsReply `protobuf:"bytes,11,rep,name=logs,proto3" json:"logs,omitempty"` + From *typesproto.H160 `protobuf:"bytes,12,opt,name=from,proto3" json:"from,omitempty"` + To *typesproto.H160 `protobuf:"bytes,13,opt,name=to,proto3" json:"to,omitempty"` + TxType uint32 `protobuf:"varint,14,opt,name=tx_type,json=txType,proto3" json:"tx_type,omitempty"` + BaseFee *typesproto.H256 `protobuf:"bytes,15,opt,name=base_fee,json=baseFee,proto3" json:"base_fee,omitempty"` + BlockTime uint64 `protobuf:"varint,16,opt,name=block_time,json=blockTime,proto3" json:"block_time,omitempty"` + ExcessBlobGas uint64 `protobuf:"varint,17,opt,name=excess_blob_gas,json=excessBlobGas,proto3" json:"excess_blob_gas,omitempty"` + BlobGasUsed uint64 `protobuf:"varint,18,opt,name=blob_gas_used,json=blobGasUsed,proto3" json:"blob_gas_used,omitempty"` + BlobGasPrice *typesproto.H256 `protobuf:"bytes,19,opt,name=blob_gas_price,json=blobGasPrice,proto3" json:"blob_gas_price,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *SubscribeReceiptsReply) Reset() { + *x = SubscribeReceiptsReply{} + mi := &file_remote_ethbackend_proto_msgTypes[22] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *SubscribeReceiptsReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeReceiptsReply) ProtoMessage() {} + +func (x *SubscribeReceiptsReply) ProtoReflect() protoreflect.Message { + mi := &file_remote_ethbackend_proto_msgTypes[22] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeReceiptsReply.ProtoReflect.Descriptor instead. +func (*SubscribeReceiptsReply) Descriptor() ([]byte, []int) { + return file_remote_ethbackend_proto_rawDescGZIP(), []int{22} +} + +func (x *SubscribeReceiptsReply) GetBlockHash() *typesproto.H256 { + if x != nil { + return x.BlockHash + } + return nil +} + +func (x *SubscribeReceiptsReply) GetBlockNumber() uint64 { + if x != nil { + return x.BlockNumber + } + return 0 +} + +func (x *SubscribeReceiptsReply) GetTransactionHash() *typesproto.H256 { + if x != nil { + return x.TransactionHash + } + return nil +} + +func (x *SubscribeReceiptsReply) GetTransactionIndex() uint64 { + if x != nil { + return x.TransactionIndex + } + return 0 +} + +func (x *SubscribeReceiptsReply) GetType() uint32 { + if x != nil { + return x.Type + } + return 0 +} + +func (x *SubscribeReceiptsReply) GetStatus() uint64 { + if x != nil { + return x.Status + } + return 0 +} + +func (x *SubscribeReceiptsReply) GetCumulativeGasUsed() uint64 { + if x != nil { + return x.CumulativeGasUsed + } + return 0 +} + +func (x *SubscribeReceiptsReply) GetGasUsed() uint64 { + if x != nil { + return x.GasUsed + } + return 0 +} + +func (x *SubscribeReceiptsReply) GetContractAddress() *typesproto.H160 { + if x != nil { + return x.ContractAddress + } + return nil +} + +func (x *SubscribeReceiptsReply) GetLogsBloom() []byte { + if x != nil { + return x.LogsBloom + } + return nil +} + +func (x *SubscribeReceiptsReply) GetLogs() []*SubscribeLogsReply { + if x != nil { + return x.Logs + } + return nil +} + +func (x *SubscribeReceiptsReply) GetFrom() *typesproto.H160 { + if x != nil { + return x.From + } + return nil +} + +func (x *SubscribeReceiptsReply) GetTo() *typesproto.H160 { + if x != nil { + return x.To + } + return nil +} + +func (x *SubscribeReceiptsReply) GetTxType() uint32 { + if x != nil { + return x.TxType + } + return 0 +} + +func (x *SubscribeReceiptsReply) GetBaseFee() *typesproto.H256 { + if x != nil { + return x.BaseFee + } + return nil +} + +func (x *SubscribeReceiptsReply) GetBlockTime() uint64 { + if x != nil { + return x.BlockTime + } + return 0 +} + +func (x *SubscribeReceiptsReply) GetExcessBlobGas() uint64 { + if x != nil { + return x.ExcessBlobGas + } + return 0 +} + +func (x *SubscribeReceiptsReply) GetBlobGasUsed() uint64 { + if x != nil { + return x.BlobGasUsed + } + return 0 +} + +func (x *SubscribeReceiptsReply) GetBlobGasPrice() *typesproto.H256 { + if x != nil { + return x.BlobGasPrice + } + return nil +} + type BlockRequest struct { state protoimpl.MessageState `protogen:"open.v1"` BlockHeight uint64 `protobuf:"varint,2,opt,name=block_height,json=blockHeight,proto3" json:"block_height,omitempty"` @@ -1100,7 +1340,7 @@ type BlockRequest struct { func (x *BlockRequest) Reset() { *x = BlockRequest{} - mi := &file_remote_ethbackend_proto_msgTypes[21] + mi := &file_remote_ethbackend_proto_msgTypes[23] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1112,7 +1352,7 @@ func (x *BlockRequest) String() string { func (*BlockRequest) ProtoMessage() {} func (x *BlockRequest) ProtoReflect() protoreflect.Message { - mi := &file_remote_ethbackend_proto_msgTypes[21] + mi := &file_remote_ethbackend_proto_msgTypes[23] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1125,7 +1365,7 @@ func (x *BlockRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use BlockRequest.ProtoReflect.Descriptor instead. func (*BlockRequest) Descriptor() ([]byte, []int) { - return file_remote_ethbackend_proto_rawDescGZIP(), []int{21} + return file_remote_ethbackend_proto_rawDescGZIP(), []int{23} } func (x *BlockRequest) GetBlockHeight() uint64 { @@ -1152,7 +1392,7 @@ type BlockReply struct { func (x *BlockReply) Reset() { *x = BlockReply{} - mi := &file_remote_ethbackend_proto_msgTypes[22] + mi := &file_remote_ethbackend_proto_msgTypes[24] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1164,7 +1404,7 @@ func (x *BlockReply) String() string { func (*BlockReply) ProtoMessage() {} func (x *BlockReply) ProtoReflect() protoreflect.Message { - mi := &file_remote_ethbackend_proto_msgTypes[22] + mi := &file_remote_ethbackend_proto_msgTypes[24] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1177,7 +1417,7 @@ func (x *BlockReply) ProtoReflect() protoreflect.Message { // Deprecated: Use BlockReply.ProtoReflect.Descriptor instead. func (*BlockReply) Descriptor() ([]byte, []int) { - return file_remote_ethbackend_proto_rawDescGZIP(), []int{22} + return file_remote_ethbackend_proto_rawDescGZIP(), []int{24} } func (x *BlockReply) GetBlockRlp() []byte { @@ -1203,7 +1443,7 @@ type TxnLookupRequest struct { func (x *TxnLookupRequest) Reset() { *x = TxnLookupRequest{} - mi := &file_remote_ethbackend_proto_msgTypes[23] + mi := &file_remote_ethbackend_proto_msgTypes[25] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1215,7 +1455,7 @@ func (x *TxnLookupRequest) String() string { func (*TxnLookupRequest) ProtoMessage() {} func (x *TxnLookupRequest) ProtoReflect() protoreflect.Message { - mi := &file_remote_ethbackend_proto_msgTypes[23] + mi := &file_remote_ethbackend_proto_msgTypes[25] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1228,7 +1468,7 @@ func (x *TxnLookupRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use TxnLookupRequest.ProtoReflect.Descriptor instead. func (*TxnLookupRequest) Descriptor() ([]byte, []int) { - return file_remote_ethbackend_proto_rawDescGZIP(), []int{23} + return file_remote_ethbackend_proto_rawDescGZIP(), []int{25} } func (x *TxnLookupRequest) GetTxnHash() *typesproto.H256 { @@ -1248,7 +1488,7 @@ type TxnLookupReply struct { func (x *TxnLookupReply) Reset() { *x = TxnLookupReply{} - mi := &file_remote_ethbackend_proto_msgTypes[24] + mi := &file_remote_ethbackend_proto_msgTypes[26] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1260,7 +1500,7 @@ func (x *TxnLookupReply) String() string { func (*TxnLookupReply) ProtoMessage() {} func (x *TxnLookupReply) ProtoReflect() protoreflect.Message { - mi := &file_remote_ethbackend_proto_msgTypes[24] + mi := &file_remote_ethbackend_proto_msgTypes[26] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1273,7 +1513,7 @@ func (x *TxnLookupReply) ProtoReflect() protoreflect.Message { // Deprecated: Use TxnLookupReply.ProtoReflect.Descriptor instead. func (*TxnLookupReply) Descriptor() ([]byte, []int) { - return file_remote_ethbackend_proto_rawDescGZIP(), []int{24} + return file_remote_ethbackend_proto_rawDescGZIP(), []int{26} } func (x *TxnLookupReply) GetBlockNumber() uint64 { @@ -1299,7 +1539,7 @@ type NodesInfoRequest struct { func (x *NodesInfoRequest) Reset() { *x = NodesInfoRequest{} - mi := &file_remote_ethbackend_proto_msgTypes[25] + mi := &file_remote_ethbackend_proto_msgTypes[27] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1311,7 +1551,7 @@ func (x *NodesInfoRequest) String() string { func (*NodesInfoRequest) ProtoMessage() {} func (x *NodesInfoRequest) ProtoReflect() protoreflect.Message { - mi := &file_remote_ethbackend_proto_msgTypes[25] + mi := &file_remote_ethbackend_proto_msgTypes[27] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1324,7 +1564,7 @@ func (x *NodesInfoRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use NodesInfoRequest.ProtoReflect.Descriptor instead. func (*NodesInfoRequest) Descriptor() ([]byte, []int) { - return file_remote_ethbackend_proto_rawDescGZIP(), []int{25} + return file_remote_ethbackend_proto_rawDescGZIP(), []int{27} } func (x *NodesInfoRequest) GetLimit() uint32 { @@ -1343,7 +1583,7 @@ type AddPeerRequest struct { func (x *AddPeerRequest) Reset() { *x = AddPeerRequest{} - mi := &file_remote_ethbackend_proto_msgTypes[26] + mi := &file_remote_ethbackend_proto_msgTypes[28] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1355,7 +1595,7 @@ func (x *AddPeerRequest) String() string { func (*AddPeerRequest) ProtoMessage() {} func (x *AddPeerRequest) ProtoReflect() protoreflect.Message { - mi := &file_remote_ethbackend_proto_msgTypes[26] + mi := &file_remote_ethbackend_proto_msgTypes[28] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1368,7 +1608,7 @@ func (x *AddPeerRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use AddPeerRequest.ProtoReflect.Descriptor instead. func (*AddPeerRequest) Descriptor() ([]byte, []int) { - return file_remote_ethbackend_proto_rawDescGZIP(), []int{26} + return file_remote_ethbackend_proto_rawDescGZIP(), []int{28} } func (x *AddPeerRequest) GetUrl() string { @@ -1387,7 +1627,7 @@ type RemovePeerRequest struct { func (x *RemovePeerRequest) Reset() { *x = RemovePeerRequest{} - mi := &file_remote_ethbackend_proto_msgTypes[27] + mi := &file_remote_ethbackend_proto_msgTypes[29] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1399,7 +1639,7 @@ func (x *RemovePeerRequest) String() string { func (*RemovePeerRequest) ProtoMessage() {} func (x *RemovePeerRequest) ProtoReflect() protoreflect.Message { - mi := &file_remote_ethbackend_proto_msgTypes[27] + mi := &file_remote_ethbackend_proto_msgTypes[29] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1412,7 +1652,7 @@ func (x *RemovePeerRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use RemovePeerRequest.ProtoReflect.Descriptor instead. func (*RemovePeerRequest) Descriptor() ([]byte, []int) { - return file_remote_ethbackend_proto_rawDescGZIP(), []int{27} + return file_remote_ethbackend_proto_rawDescGZIP(), []int{29} } func (x *RemovePeerRequest) GetUrl() string { @@ -1431,7 +1671,7 @@ type NodesInfoReply struct { func (x *NodesInfoReply) Reset() { *x = NodesInfoReply{} - mi := &file_remote_ethbackend_proto_msgTypes[28] + mi := &file_remote_ethbackend_proto_msgTypes[30] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1443,7 +1683,7 @@ func (x *NodesInfoReply) String() string { func (*NodesInfoReply) ProtoMessage() {} func (x *NodesInfoReply) ProtoReflect() protoreflect.Message { - mi := &file_remote_ethbackend_proto_msgTypes[28] + mi := &file_remote_ethbackend_proto_msgTypes[30] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1456,7 +1696,7 @@ func (x *NodesInfoReply) ProtoReflect() protoreflect.Message { // Deprecated: Use NodesInfoReply.ProtoReflect.Descriptor instead. func (*NodesInfoReply) Descriptor() ([]byte, []int) { - return file_remote_ethbackend_proto_rawDescGZIP(), []int{28} + return file_remote_ethbackend_proto_rawDescGZIP(), []int{30} } func (x *NodesInfoReply) GetNodesInfo() []*typesproto.NodeInfoReply { @@ -1475,7 +1715,7 @@ type PeersReply struct { func (x *PeersReply) Reset() { *x = PeersReply{} - mi := &file_remote_ethbackend_proto_msgTypes[29] + mi := &file_remote_ethbackend_proto_msgTypes[31] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1487,7 +1727,7 @@ func (x *PeersReply) String() string { func (*PeersReply) ProtoMessage() {} func (x *PeersReply) ProtoReflect() protoreflect.Message { - mi := &file_remote_ethbackend_proto_msgTypes[29] + mi := &file_remote_ethbackend_proto_msgTypes[31] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1500,7 +1740,7 @@ func (x *PeersReply) ProtoReflect() protoreflect.Message { // Deprecated: Use PeersReply.ProtoReflect.Descriptor instead. func (*PeersReply) Descriptor() ([]byte, []int) { - return file_remote_ethbackend_proto_rawDescGZIP(), []int{29} + return file_remote_ethbackend_proto_rawDescGZIP(), []int{31} } func (x *PeersReply) GetPeers() []*typesproto.PeerInfo { @@ -1519,7 +1759,7 @@ type AddPeerReply struct { func (x *AddPeerReply) Reset() { *x = AddPeerReply{} - mi := &file_remote_ethbackend_proto_msgTypes[30] + mi := &file_remote_ethbackend_proto_msgTypes[32] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1531,7 +1771,7 @@ func (x *AddPeerReply) String() string { func (*AddPeerReply) ProtoMessage() {} func (x *AddPeerReply) ProtoReflect() protoreflect.Message { - mi := &file_remote_ethbackend_proto_msgTypes[30] + mi := &file_remote_ethbackend_proto_msgTypes[32] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1544,7 +1784,7 @@ func (x *AddPeerReply) ProtoReflect() protoreflect.Message { // Deprecated: Use AddPeerReply.ProtoReflect.Descriptor instead. func (*AddPeerReply) Descriptor() ([]byte, []int) { - return file_remote_ethbackend_proto_rawDescGZIP(), []int{30} + return file_remote_ethbackend_proto_rawDescGZIP(), []int{32} } func (x *AddPeerReply) GetSuccess() bool { @@ -1563,7 +1803,7 @@ type RemovePeerReply struct { func (x *RemovePeerReply) Reset() { *x = RemovePeerReply{} - mi := &file_remote_ethbackend_proto_msgTypes[31] + mi := &file_remote_ethbackend_proto_msgTypes[33] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1575,7 +1815,7 @@ func (x *RemovePeerReply) String() string { func (*RemovePeerReply) ProtoMessage() {} func (x *RemovePeerReply) ProtoReflect() protoreflect.Message { - mi := &file_remote_ethbackend_proto_msgTypes[31] + mi := &file_remote_ethbackend_proto_msgTypes[33] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1588,7 +1828,7 @@ func (x *RemovePeerReply) ProtoReflect() protoreflect.Message { // Deprecated: Use RemovePeerReply.ProtoReflect.Descriptor instead. func (*RemovePeerReply) Descriptor() ([]byte, []int) { - return file_remote_ethbackend_proto_rawDescGZIP(), []int{31} + return file_remote_ethbackend_proto_rawDescGZIP(), []int{33} } func (x *RemovePeerReply) GetSuccess() bool { @@ -1607,7 +1847,7 @@ type PendingBlockReply struct { func (x *PendingBlockReply) Reset() { *x = PendingBlockReply{} - mi := &file_remote_ethbackend_proto_msgTypes[32] + mi := &file_remote_ethbackend_proto_msgTypes[34] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1619,7 +1859,7 @@ func (x *PendingBlockReply) String() string { func (*PendingBlockReply) ProtoMessage() {} func (x *PendingBlockReply) ProtoReflect() protoreflect.Message { - mi := &file_remote_ethbackend_proto_msgTypes[32] + mi := &file_remote_ethbackend_proto_msgTypes[34] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1632,7 +1872,7 @@ func (x *PendingBlockReply) ProtoReflect() protoreflect.Message { // Deprecated: Use PendingBlockReply.ProtoReflect.Descriptor instead. func (*PendingBlockReply) Descriptor() ([]byte, []int) { - return file_remote_ethbackend_proto_rawDescGZIP(), []int{32} + return file_remote_ethbackend_proto_rawDescGZIP(), []int{34} } func (x *PendingBlockReply) GetBlockRlp() []byte { @@ -1651,7 +1891,7 @@ type EngineGetPayloadBodiesByHashV1Request struct { func (x *EngineGetPayloadBodiesByHashV1Request) Reset() { *x = EngineGetPayloadBodiesByHashV1Request{} - mi := &file_remote_ethbackend_proto_msgTypes[33] + mi := &file_remote_ethbackend_proto_msgTypes[35] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1663,7 +1903,7 @@ func (x *EngineGetPayloadBodiesByHashV1Request) String() string { func (*EngineGetPayloadBodiesByHashV1Request) ProtoMessage() {} func (x *EngineGetPayloadBodiesByHashV1Request) ProtoReflect() protoreflect.Message { - mi := &file_remote_ethbackend_proto_msgTypes[33] + mi := &file_remote_ethbackend_proto_msgTypes[35] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1676,7 +1916,7 @@ func (x *EngineGetPayloadBodiesByHashV1Request) ProtoReflect() protoreflect.Mess // Deprecated: Use EngineGetPayloadBodiesByHashV1Request.ProtoReflect.Descriptor instead. func (*EngineGetPayloadBodiesByHashV1Request) Descriptor() ([]byte, []int) { - return file_remote_ethbackend_proto_rawDescGZIP(), []int{33} + return file_remote_ethbackend_proto_rawDescGZIP(), []int{35} } func (x *EngineGetPayloadBodiesByHashV1Request) GetHashes() []*typesproto.H256 { @@ -1696,7 +1936,7 @@ type EngineGetPayloadBodiesByRangeV1Request struct { func (x *EngineGetPayloadBodiesByRangeV1Request) Reset() { *x = EngineGetPayloadBodiesByRangeV1Request{} - mi := &file_remote_ethbackend_proto_msgTypes[34] + mi := &file_remote_ethbackend_proto_msgTypes[36] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1708,7 +1948,7 @@ func (x *EngineGetPayloadBodiesByRangeV1Request) String() string { func (*EngineGetPayloadBodiesByRangeV1Request) ProtoMessage() {} func (x *EngineGetPayloadBodiesByRangeV1Request) ProtoReflect() protoreflect.Message { - mi := &file_remote_ethbackend_proto_msgTypes[34] + mi := &file_remote_ethbackend_proto_msgTypes[36] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1721,7 +1961,7 @@ func (x *EngineGetPayloadBodiesByRangeV1Request) ProtoReflect() protoreflect.Mes // Deprecated: Use EngineGetPayloadBodiesByRangeV1Request.ProtoReflect.Descriptor instead. func (*EngineGetPayloadBodiesByRangeV1Request) Descriptor() ([]byte, []int) { - return file_remote_ethbackend_proto_rawDescGZIP(), []int{34} + return file_remote_ethbackend_proto_rawDescGZIP(), []int{36} } func (x *EngineGetPayloadBodiesByRangeV1Request) GetStart() uint64 { @@ -1747,7 +1987,7 @@ type AAValidationRequest struct { func (x *AAValidationRequest) Reset() { *x = AAValidationRequest{} - mi := &file_remote_ethbackend_proto_msgTypes[35] + mi := &file_remote_ethbackend_proto_msgTypes[37] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1759,7 +1999,7 @@ func (x *AAValidationRequest) String() string { func (*AAValidationRequest) ProtoMessage() {} func (x *AAValidationRequest) ProtoReflect() protoreflect.Message { - mi := &file_remote_ethbackend_proto_msgTypes[35] + mi := &file_remote_ethbackend_proto_msgTypes[37] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1772,7 +2012,7 @@ func (x *AAValidationRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use AAValidationRequest.ProtoReflect.Descriptor instead. func (*AAValidationRequest) Descriptor() ([]byte, []int) { - return file_remote_ethbackend_proto_rawDescGZIP(), []int{35} + return file_remote_ethbackend_proto_rawDescGZIP(), []int{37} } func (x *AAValidationRequest) GetTx() *typesproto.AccountAbstractionTransaction { @@ -1791,7 +2031,7 @@ type AAValidationReply struct { func (x *AAValidationReply) Reset() { *x = AAValidationReply{} - mi := &file_remote_ethbackend_proto_msgTypes[36] + mi := &file_remote_ethbackend_proto_msgTypes[38] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1803,7 +2043,7 @@ func (x *AAValidationReply) String() string { func (*AAValidationReply) ProtoMessage() {} func (x *AAValidationReply) ProtoReflect() protoreflect.Message { - mi := &file_remote_ethbackend_proto_msgTypes[36] + mi := &file_remote_ethbackend_proto_msgTypes[38] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1816,7 +2056,7 @@ func (x *AAValidationReply) ProtoReflect() protoreflect.Message { // Deprecated: Use AAValidationReply.ProtoReflect.Descriptor instead. func (*AAValidationReply) Descriptor() ([]byte, []int) { - return file_remote_ethbackend_proto_rawDescGZIP(), []int{36} + return file_remote_ethbackend_proto_rawDescGZIP(), []int{38} } func (x *AAValidationReply) GetValid() bool { @@ -1835,7 +2075,7 @@ type BlockForTxNumRequest struct { func (x *BlockForTxNumRequest) Reset() { *x = BlockForTxNumRequest{} - mi := &file_remote_ethbackend_proto_msgTypes[37] + mi := &file_remote_ethbackend_proto_msgTypes[39] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1847,7 +2087,7 @@ func (x *BlockForTxNumRequest) String() string { func (*BlockForTxNumRequest) ProtoMessage() {} func (x *BlockForTxNumRequest) ProtoReflect() protoreflect.Message { - mi := &file_remote_ethbackend_proto_msgTypes[37] + mi := &file_remote_ethbackend_proto_msgTypes[39] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1860,7 +2100,7 @@ func (x *BlockForTxNumRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use BlockForTxNumRequest.ProtoReflect.Descriptor instead. func (*BlockForTxNumRequest) Descriptor() ([]byte, []int) { - return file_remote_ethbackend_proto_rawDescGZIP(), []int{37} + return file_remote_ethbackend_proto_rawDescGZIP(), []int{39} } func (x *BlockForTxNumRequest) GetTxnum() uint64 { @@ -1880,7 +2120,7 @@ type BlockForTxNumResponse struct { func (x *BlockForTxNumResponse) Reset() { *x = BlockForTxNumResponse{} - mi := &file_remote_ethbackend_proto_msgTypes[38] + mi := &file_remote_ethbackend_proto_msgTypes[40] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1892,7 +2132,7 @@ func (x *BlockForTxNumResponse) String() string { func (*BlockForTxNumResponse) ProtoMessage() {} func (x *BlockForTxNumResponse) ProtoReflect() protoreflect.Message { - mi := &file_remote_ethbackend_proto_msgTypes[38] + mi := &file_remote_ethbackend_proto_msgTypes[40] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1905,7 +2145,7 @@ func (x *BlockForTxNumResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use BlockForTxNumResponse.ProtoReflect.Descriptor instead. func (*BlockForTxNumResponse) Descriptor() ([]byte, []int) { - return file_remote_ethbackend_proto_rawDescGZIP(), []int{38} + return file_remote_ethbackend_proto_rawDescGZIP(), []int{40} } func (x *BlockForTxNumResponse) GetBlockNumber() uint64 { @@ -1931,7 +2171,7 @@ type MinimumBlockAvailableReply struct { func (x *MinimumBlockAvailableReply) Reset() { *x = MinimumBlockAvailableReply{} - mi := &file_remote_ethbackend_proto_msgTypes[39] + mi := &file_remote_ethbackend_proto_msgTypes[41] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1943,7 +2183,7 @@ func (x *MinimumBlockAvailableReply) String() string { func (*MinimumBlockAvailableReply) ProtoMessage() {} func (x *MinimumBlockAvailableReply) ProtoReflect() protoreflect.Message { - mi := &file_remote_ethbackend_proto_msgTypes[39] + mi := &file_remote_ethbackend_proto_msgTypes[41] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1956,7 +2196,7 @@ func (x *MinimumBlockAvailableReply) ProtoReflect() protoreflect.Message { // Deprecated: Use MinimumBlockAvailableReply.ProtoReflect.Descriptor instead. func (*MinimumBlockAvailableReply) Descriptor() ([]byte, []int) { - return file_remote_ethbackend_proto_rawDescGZIP(), []int{39} + return file_remote_ethbackend_proto_rawDescGZIP(), []int{41} } func (x *MinimumBlockAvailableReply) GetBlockNum() uint64 { @@ -1976,7 +2216,7 @@ type SyncingReply_StageProgress struct { func (x *SyncingReply_StageProgress) Reset() { *x = SyncingReply_StageProgress{} - mi := &file_remote_ethbackend_proto_msgTypes[40] + mi := &file_remote_ethbackend_proto_msgTypes[42] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1988,7 +2228,7 @@ func (x *SyncingReply_StageProgress) String() string { func (*SyncingReply_StageProgress) ProtoMessage() {} func (x *SyncingReply_StageProgress) ProtoReflect() protoreflect.Message { - mi := &file_remote_ethbackend_proto_msgTypes[40] + mi := &file_remote_ethbackend_proto_msgTypes[42] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2082,7 +2322,34 @@ const file_remote_ethbackend_proto_rawDesc = "" + "\x06topics\x18\x06 \x03(\v2\v.types.H256R\x06topics\x126\n" + "\x10transaction_hash\x18\a \x01(\v2\v.types.H256R\x0ftransactionHash\x12+\n" + "\x11transaction_index\x18\b \x01(\x04R\x10transactionIndex\x12\x18\n" + - "\aremoved\x18\t \x01(\bR\aremoved\"]\n" + + "\aremoved\x18\t \x01(\bR\aremoved\"~\n" + + "\x15ReceiptsFilterRequest\x12)\n" + + "\x10all_transactions\x18\x01 \x01(\bR\x0fallTransactions\x12:\n" + + "\x12transaction_hashes\x18\x02 \x03(\v2\v.types.H256R\x11transactionHashes\"\xe7\x05\n" + + "\x16SubscribeReceiptsReply\x12*\n" + + "\n" + + "block_hash\x18\x01 \x01(\v2\v.types.H256R\tblockHash\x12!\n" + + "\fblock_number\x18\x02 \x01(\x04R\vblockNumber\x126\n" + + "\x10transaction_hash\x18\x03 \x01(\v2\v.types.H256R\x0ftransactionHash\x12+\n" + + "\x11transaction_index\x18\x04 \x01(\x04R\x10transactionIndex\x12\x12\n" + + "\x04type\x18\x05 \x01(\rR\x04type\x12\x16\n" + + "\x06status\x18\x06 \x01(\x04R\x06status\x12.\n" + + "\x13cumulative_gas_used\x18\a \x01(\x04R\x11cumulativeGasUsed\x12\x19\n" + + "\bgas_used\x18\b \x01(\x04R\agasUsed\x126\n" + + "\x10contract_address\x18\t \x01(\v2\v.types.H160R\x0fcontractAddress\x12\x1d\n" + + "\n" + + "logs_bloom\x18\n" + + " \x01(\fR\tlogsBloom\x12.\n" + + "\x04logs\x18\v \x03(\v2\x1a.remote.SubscribeLogsReplyR\x04logs\x12\x1f\n" + + "\x04from\x18\f \x01(\v2\v.types.H160R\x04from\x12\x1b\n" + + "\x02to\x18\r \x01(\v2\v.types.H160R\x02to\x12\x17\n" + + "\atx_type\x18\x0e \x01(\rR\x06txType\x12&\n" + + "\bbase_fee\x18\x0f \x01(\v2\v.types.H256R\abaseFee\x12\x1d\n" + + "\n" + + "block_time\x18\x10 \x01(\x04R\tblockTime\x12&\n" + + "\x0fexcess_blob_gas\x18\x11 \x01(\x04R\rexcessBlobGas\x12\"\n" + + "\rblob_gas_used\x18\x12 \x01(\x04R\vblobGasUsed\x121\n" + + "\x0eblob_gas_price\x18\x13 \x01(\v2\v.types.H256R\fblobGasPrice\"]\n" + "\fBlockRequest\x12!\n" + "\fblock_height\x18\x02 \x01(\x04R\vblockHeight\x12*\n" + "\n" + @@ -2135,7 +2402,7 @@ const file_remote_ethbackend_proto_rawDesc = "" + "\x06HEADER\x10\x00\x12\x10\n" + "\fPENDING_LOGS\x10\x01\x12\x11\n" + "\rPENDING_BLOCK\x10\x02\x12\x10\n" + - "\fNEW_SNAPSHOT\x10\x032\x80\r\n" + + "\fNEW_SNAPSHOT\x10\x032\xd8\r\n" + "\n" + "ETHBACKEND\x12=\n" + "\tEtherbase\x12\x18.remote.EtherbaseRequest\x1a\x16.remote.EtherbaseReply\x12@\n" + @@ -2147,7 +2414,8 @@ const file_remote_ethbackend_proto_rawDesc = "" + "\x0fProtocolVersion\x12\x1e.remote.ProtocolVersionRequest\x1a\x1c.remote.ProtocolVersionReply\x12I\n" + "\rClientVersion\x12\x1c.remote.ClientVersionRequest\x1a\x1a.remote.ClientVersionReply\x12?\n" + "\tSubscribe\x12\x18.remote.SubscribeRequest\x1a\x16.remote.SubscribeReply0\x01\x12J\n" + - "\rSubscribeLogs\x12\x19.remote.LogsFilterRequest\x1a\x1a.remote.SubscribeLogsReply(\x010\x01\x121\n" + + "\rSubscribeLogs\x12\x19.remote.LogsFilterRequest\x1a\x1a.remote.SubscribeLogsReply(\x010\x01\x12V\n" + + "\x11SubscribeReceipts\x12\x1d.remote.ReceiptsFilterRequest\x1a\x1e.remote.SubscribeReceiptsReply(\x010\x01\x121\n" + "\x05Block\x12\x14.remote.BlockRequest\x1a\x12.remote.BlockReply\x12g\n" + "\x17CanonicalBodyForStorage\x12&.remote.CanonicalBodyForStorageRequest\x1a$.remote.CanonicalBodyForStorageReply\x12I\n" + "\rCanonicalHash\x12\x1c.remote.CanonicalHashRequest\x1a\x1a.remote.CanonicalHashReply\x12F\n" + @@ -2178,7 +2446,7 @@ func file_remote_ethbackend_proto_rawDescGZIP() []byte { } var file_remote_ethbackend_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_remote_ethbackend_proto_msgTypes = make([]protoimpl.MessageInfo, 41) +var file_remote_ethbackend_proto_msgTypes = make([]protoimpl.MessageInfo, 43) var file_remote_ethbackend_proto_goTypes = []any{ (Event)(0), // 0: remote.Event (*EtherbaseRequest)(nil), // 1: remote.EtherbaseRequest @@ -2202,110 +2470,123 @@ var file_remote_ethbackend_proto_goTypes = []any{ (*SubscribeReply)(nil), // 19: remote.SubscribeReply (*LogsFilterRequest)(nil), // 20: remote.LogsFilterRequest (*SubscribeLogsReply)(nil), // 21: remote.SubscribeLogsReply - (*BlockRequest)(nil), // 22: remote.BlockRequest - (*BlockReply)(nil), // 23: remote.BlockReply - (*TxnLookupRequest)(nil), // 24: remote.TxnLookupRequest - (*TxnLookupReply)(nil), // 25: remote.TxnLookupReply - (*NodesInfoRequest)(nil), // 26: remote.NodesInfoRequest - (*AddPeerRequest)(nil), // 27: remote.AddPeerRequest - (*RemovePeerRequest)(nil), // 28: remote.RemovePeerRequest - (*NodesInfoReply)(nil), // 29: remote.NodesInfoReply - (*PeersReply)(nil), // 30: remote.PeersReply - (*AddPeerReply)(nil), // 31: remote.AddPeerReply - (*RemovePeerReply)(nil), // 32: remote.RemovePeerReply - (*PendingBlockReply)(nil), // 33: remote.PendingBlockReply - (*EngineGetPayloadBodiesByHashV1Request)(nil), // 34: remote.EngineGetPayloadBodiesByHashV1Request - (*EngineGetPayloadBodiesByRangeV1Request)(nil), // 35: remote.EngineGetPayloadBodiesByRangeV1Request - (*AAValidationRequest)(nil), // 36: remote.AAValidationRequest - (*AAValidationReply)(nil), // 37: remote.AAValidationReply - (*BlockForTxNumRequest)(nil), // 38: remote.BlockForTxNumRequest - (*BlockForTxNumResponse)(nil), // 39: remote.BlockForTxNumResponse - (*MinimumBlockAvailableReply)(nil), // 40: remote.MinimumBlockAvailableReply - (*SyncingReply_StageProgress)(nil), // 41: remote.SyncingReply.StageProgress - (*typesproto.H160)(nil), // 42: types.H160 - (*typesproto.H256)(nil), // 43: types.H256 - (*typesproto.NodeInfoReply)(nil), // 44: types.NodeInfoReply - (*typesproto.PeerInfo)(nil), // 45: types.PeerInfo - (*typesproto.AccountAbstractionTransaction)(nil), // 46: types.AccountAbstractionTransaction - (*emptypb.Empty)(nil), // 47: google.protobuf.Empty - (*BorTxnLookupRequest)(nil), // 48: remote.BorTxnLookupRequest - (*BorEventsRequest)(nil), // 49: remote.BorEventsRequest - (*typesproto.VersionReply)(nil), // 50: types.VersionReply - (*BorTxnLookupReply)(nil), // 51: remote.BorTxnLookupReply - (*BorEventsReply)(nil), // 52: remote.BorEventsReply + (*ReceiptsFilterRequest)(nil), // 22: remote.ReceiptsFilterRequest + (*SubscribeReceiptsReply)(nil), // 23: remote.SubscribeReceiptsReply + (*BlockRequest)(nil), // 24: remote.BlockRequest + (*BlockReply)(nil), // 25: remote.BlockReply + (*TxnLookupRequest)(nil), // 26: remote.TxnLookupRequest + (*TxnLookupReply)(nil), // 27: remote.TxnLookupReply + (*NodesInfoRequest)(nil), // 28: remote.NodesInfoRequest + (*AddPeerRequest)(nil), // 29: remote.AddPeerRequest + (*RemovePeerRequest)(nil), // 30: remote.RemovePeerRequest + (*NodesInfoReply)(nil), // 31: remote.NodesInfoReply + (*PeersReply)(nil), // 32: remote.PeersReply + (*AddPeerReply)(nil), // 33: remote.AddPeerReply + (*RemovePeerReply)(nil), // 34: remote.RemovePeerReply + (*PendingBlockReply)(nil), // 35: remote.PendingBlockReply + (*EngineGetPayloadBodiesByHashV1Request)(nil), // 36: remote.EngineGetPayloadBodiesByHashV1Request + (*EngineGetPayloadBodiesByRangeV1Request)(nil), // 37: remote.EngineGetPayloadBodiesByRangeV1Request + (*AAValidationRequest)(nil), // 38: remote.AAValidationRequest + (*AAValidationReply)(nil), // 39: remote.AAValidationReply + (*BlockForTxNumRequest)(nil), // 40: remote.BlockForTxNumRequest + (*BlockForTxNumResponse)(nil), // 41: remote.BlockForTxNumResponse + (*MinimumBlockAvailableReply)(nil), // 42: remote.MinimumBlockAvailableReply + (*SyncingReply_StageProgress)(nil), // 43: remote.SyncingReply.StageProgress + (*typesproto.H160)(nil), // 44: types.H160 + (*typesproto.H256)(nil), // 45: types.H256 + (*typesproto.NodeInfoReply)(nil), // 46: types.NodeInfoReply + (*typesproto.PeerInfo)(nil), // 47: types.PeerInfo + (*typesproto.AccountAbstractionTransaction)(nil), // 48: types.AccountAbstractionTransaction + (*emptypb.Empty)(nil), // 49: google.protobuf.Empty + (*BorTxnLookupRequest)(nil), // 50: remote.BorTxnLookupRequest + (*BorEventsRequest)(nil), // 51: remote.BorEventsRequest + (*typesproto.VersionReply)(nil), // 52: types.VersionReply + (*BorTxnLookupReply)(nil), // 53: remote.BorTxnLookupReply + (*BorEventsReply)(nil), // 54: remote.BorEventsReply } var file_remote_ethbackend_proto_depIdxs = []int32{ - 42, // 0: remote.EtherbaseReply.address:type_name -> types.H160 - 41, // 1: remote.SyncingReply.stages:type_name -> remote.SyncingReply.StageProgress - 43, // 2: remote.CanonicalHashReply.hash:type_name -> types.H256 - 43, // 3: remote.HeaderNumberRequest.hash:type_name -> types.H256 + 44, // 0: remote.EtherbaseReply.address:type_name -> types.H160 + 43, // 1: remote.SyncingReply.stages:type_name -> remote.SyncingReply.StageProgress + 45, // 2: remote.CanonicalHashReply.hash:type_name -> types.H256 + 45, // 3: remote.HeaderNumberRequest.hash:type_name -> types.H256 0, // 4: remote.SubscribeRequest.type:type_name -> remote.Event 0, // 5: remote.SubscribeReply.type:type_name -> remote.Event - 42, // 6: remote.LogsFilterRequest.addresses:type_name -> types.H160 - 43, // 7: remote.LogsFilterRequest.topics:type_name -> types.H256 - 42, // 8: remote.SubscribeLogsReply.address:type_name -> types.H160 - 43, // 9: remote.SubscribeLogsReply.block_hash:type_name -> types.H256 - 43, // 10: remote.SubscribeLogsReply.topics:type_name -> types.H256 - 43, // 11: remote.SubscribeLogsReply.transaction_hash:type_name -> types.H256 - 43, // 12: remote.BlockRequest.block_hash:type_name -> types.H256 - 43, // 13: remote.TxnLookupRequest.txn_hash:type_name -> types.H256 - 44, // 14: remote.NodesInfoReply.nodes_info:type_name -> types.NodeInfoReply - 45, // 15: remote.PeersReply.peers:type_name -> types.PeerInfo - 43, // 16: remote.EngineGetPayloadBodiesByHashV1Request.hashes:type_name -> types.H256 - 46, // 17: remote.AAValidationRequest.tx:type_name -> types.AccountAbstractionTransaction - 1, // 18: remote.ETHBACKEND.Etherbase:input_type -> remote.EtherbaseRequest - 3, // 19: remote.ETHBACKEND.NetVersion:input_type -> remote.NetVersionRequest - 6, // 20: remote.ETHBACKEND.NetPeerCount:input_type -> remote.NetPeerCountRequest - 47, // 21: remote.ETHBACKEND.Version:input_type -> google.protobuf.Empty - 47, // 22: remote.ETHBACKEND.Syncing:input_type -> google.protobuf.Empty - 8, // 23: remote.ETHBACKEND.ProtocolVersion:input_type -> remote.ProtocolVersionRequest - 10, // 24: remote.ETHBACKEND.ClientVersion:input_type -> remote.ClientVersionRequest - 18, // 25: remote.ETHBACKEND.Subscribe:input_type -> remote.SubscribeRequest - 20, // 26: remote.ETHBACKEND.SubscribeLogs:input_type -> remote.LogsFilterRequest - 22, // 27: remote.ETHBACKEND.Block:input_type -> remote.BlockRequest - 16, // 28: remote.ETHBACKEND.CanonicalBodyForStorage:input_type -> remote.CanonicalBodyForStorageRequest - 12, // 29: remote.ETHBACKEND.CanonicalHash:input_type -> remote.CanonicalHashRequest - 14, // 30: remote.ETHBACKEND.HeaderNumber:input_type -> remote.HeaderNumberRequest - 24, // 31: remote.ETHBACKEND.TxnLookup:input_type -> remote.TxnLookupRequest - 26, // 32: remote.ETHBACKEND.NodeInfo:input_type -> remote.NodesInfoRequest - 47, // 33: remote.ETHBACKEND.Peers:input_type -> google.protobuf.Empty - 27, // 34: remote.ETHBACKEND.AddPeer:input_type -> remote.AddPeerRequest - 28, // 35: remote.ETHBACKEND.RemovePeer:input_type -> remote.RemovePeerRequest - 47, // 36: remote.ETHBACKEND.PendingBlock:input_type -> google.protobuf.Empty - 48, // 37: remote.ETHBACKEND.BorTxnLookup:input_type -> remote.BorTxnLookupRequest - 49, // 38: remote.ETHBACKEND.BorEvents:input_type -> remote.BorEventsRequest - 36, // 39: remote.ETHBACKEND.AAValidation:input_type -> remote.AAValidationRequest - 38, // 40: remote.ETHBACKEND.BlockForTxNum:input_type -> remote.BlockForTxNumRequest - 47, // 41: remote.ETHBACKEND.MinimumBlockAvailable:input_type -> google.protobuf.Empty - 2, // 42: remote.ETHBACKEND.Etherbase:output_type -> remote.EtherbaseReply - 4, // 43: remote.ETHBACKEND.NetVersion:output_type -> remote.NetVersionReply - 7, // 44: remote.ETHBACKEND.NetPeerCount:output_type -> remote.NetPeerCountReply - 50, // 45: remote.ETHBACKEND.Version:output_type -> types.VersionReply - 5, // 46: remote.ETHBACKEND.Syncing:output_type -> remote.SyncingReply - 9, // 47: remote.ETHBACKEND.ProtocolVersion:output_type -> remote.ProtocolVersionReply - 11, // 48: remote.ETHBACKEND.ClientVersion:output_type -> remote.ClientVersionReply - 19, // 49: remote.ETHBACKEND.Subscribe:output_type -> remote.SubscribeReply - 21, // 50: remote.ETHBACKEND.SubscribeLogs:output_type -> remote.SubscribeLogsReply - 23, // 51: remote.ETHBACKEND.Block:output_type -> remote.BlockReply - 17, // 52: remote.ETHBACKEND.CanonicalBodyForStorage:output_type -> remote.CanonicalBodyForStorageReply - 13, // 53: remote.ETHBACKEND.CanonicalHash:output_type -> remote.CanonicalHashReply - 15, // 54: remote.ETHBACKEND.HeaderNumber:output_type -> remote.HeaderNumberReply - 25, // 55: remote.ETHBACKEND.TxnLookup:output_type -> remote.TxnLookupReply - 29, // 56: remote.ETHBACKEND.NodeInfo:output_type -> remote.NodesInfoReply - 30, // 57: remote.ETHBACKEND.Peers:output_type -> remote.PeersReply - 31, // 58: remote.ETHBACKEND.AddPeer:output_type -> remote.AddPeerReply - 32, // 59: remote.ETHBACKEND.RemovePeer:output_type -> remote.RemovePeerReply - 33, // 60: remote.ETHBACKEND.PendingBlock:output_type -> remote.PendingBlockReply - 51, // 61: remote.ETHBACKEND.BorTxnLookup:output_type -> remote.BorTxnLookupReply - 52, // 62: remote.ETHBACKEND.BorEvents:output_type -> remote.BorEventsReply - 37, // 63: remote.ETHBACKEND.AAValidation:output_type -> remote.AAValidationReply - 39, // 64: remote.ETHBACKEND.BlockForTxNum:output_type -> remote.BlockForTxNumResponse - 40, // 65: remote.ETHBACKEND.MinimumBlockAvailable:output_type -> remote.MinimumBlockAvailableReply - 42, // [42:66] is the sub-list for method output_type - 18, // [18:42] is the sub-list for method input_type - 18, // [18:18] is the sub-list for extension type_name - 18, // [18:18] is the sub-list for extension extendee - 0, // [0:18] is the sub-list for field type_name + 44, // 6: remote.LogsFilterRequest.addresses:type_name -> types.H160 + 45, // 7: remote.LogsFilterRequest.topics:type_name -> types.H256 + 44, // 8: remote.SubscribeLogsReply.address:type_name -> types.H160 + 45, // 9: remote.SubscribeLogsReply.block_hash:type_name -> types.H256 + 45, // 10: remote.SubscribeLogsReply.topics:type_name -> types.H256 + 45, // 11: remote.SubscribeLogsReply.transaction_hash:type_name -> types.H256 + 45, // 12: remote.ReceiptsFilterRequest.transaction_hashes:type_name -> types.H256 + 45, // 13: remote.SubscribeReceiptsReply.block_hash:type_name -> types.H256 + 45, // 14: remote.SubscribeReceiptsReply.transaction_hash:type_name -> types.H256 + 44, // 15: remote.SubscribeReceiptsReply.contract_address:type_name -> types.H160 + 21, // 16: remote.SubscribeReceiptsReply.logs:type_name -> remote.SubscribeLogsReply + 44, // 17: remote.SubscribeReceiptsReply.from:type_name -> types.H160 + 44, // 18: remote.SubscribeReceiptsReply.to:type_name -> types.H160 + 45, // 19: remote.SubscribeReceiptsReply.base_fee:type_name -> types.H256 + 45, // 20: remote.SubscribeReceiptsReply.blob_gas_price:type_name -> types.H256 + 45, // 21: remote.BlockRequest.block_hash:type_name -> types.H256 + 45, // 22: remote.TxnLookupRequest.txn_hash:type_name -> types.H256 + 46, // 23: remote.NodesInfoReply.nodes_info:type_name -> types.NodeInfoReply + 47, // 24: remote.PeersReply.peers:type_name -> types.PeerInfo + 45, // 25: remote.EngineGetPayloadBodiesByHashV1Request.hashes:type_name -> types.H256 + 48, // 26: remote.AAValidationRequest.tx:type_name -> types.AccountAbstractionTransaction + 1, // 27: remote.ETHBACKEND.Etherbase:input_type -> remote.EtherbaseRequest + 3, // 28: remote.ETHBACKEND.NetVersion:input_type -> remote.NetVersionRequest + 6, // 29: remote.ETHBACKEND.NetPeerCount:input_type -> remote.NetPeerCountRequest + 49, // 30: remote.ETHBACKEND.Version:input_type -> google.protobuf.Empty + 49, // 31: remote.ETHBACKEND.Syncing:input_type -> google.protobuf.Empty + 8, // 32: remote.ETHBACKEND.ProtocolVersion:input_type -> remote.ProtocolVersionRequest + 10, // 33: remote.ETHBACKEND.ClientVersion:input_type -> remote.ClientVersionRequest + 18, // 34: remote.ETHBACKEND.Subscribe:input_type -> remote.SubscribeRequest + 20, // 35: remote.ETHBACKEND.SubscribeLogs:input_type -> remote.LogsFilterRequest + 22, // 36: remote.ETHBACKEND.SubscribeReceipts:input_type -> remote.ReceiptsFilterRequest + 24, // 37: remote.ETHBACKEND.Block:input_type -> remote.BlockRequest + 16, // 38: remote.ETHBACKEND.CanonicalBodyForStorage:input_type -> remote.CanonicalBodyForStorageRequest + 12, // 39: remote.ETHBACKEND.CanonicalHash:input_type -> remote.CanonicalHashRequest + 14, // 40: remote.ETHBACKEND.HeaderNumber:input_type -> remote.HeaderNumberRequest + 26, // 41: remote.ETHBACKEND.TxnLookup:input_type -> remote.TxnLookupRequest + 28, // 42: remote.ETHBACKEND.NodeInfo:input_type -> remote.NodesInfoRequest + 49, // 43: remote.ETHBACKEND.Peers:input_type -> google.protobuf.Empty + 29, // 44: remote.ETHBACKEND.AddPeer:input_type -> remote.AddPeerRequest + 30, // 45: remote.ETHBACKEND.RemovePeer:input_type -> remote.RemovePeerRequest + 49, // 46: remote.ETHBACKEND.PendingBlock:input_type -> google.protobuf.Empty + 50, // 47: remote.ETHBACKEND.BorTxnLookup:input_type -> remote.BorTxnLookupRequest + 51, // 48: remote.ETHBACKEND.BorEvents:input_type -> remote.BorEventsRequest + 38, // 49: remote.ETHBACKEND.AAValidation:input_type -> remote.AAValidationRequest + 40, // 50: remote.ETHBACKEND.BlockForTxNum:input_type -> remote.BlockForTxNumRequest + 49, // 51: remote.ETHBACKEND.MinimumBlockAvailable:input_type -> google.protobuf.Empty + 2, // 52: remote.ETHBACKEND.Etherbase:output_type -> remote.EtherbaseReply + 4, // 53: remote.ETHBACKEND.NetVersion:output_type -> remote.NetVersionReply + 7, // 54: remote.ETHBACKEND.NetPeerCount:output_type -> remote.NetPeerCountReply + 52, // 55: remote.ETHBACKEND.Version:output_type -> types.VersionReply + 5, // 56: remote.ETHBACKEND.Syncing:output_type -> remote.SyncingReply + 9, // 57: remote.ETHBACKEND.ProtocolVersion:output_type -> remote.ProtocolVersionReply + 11, // 58: remote.ETHBACKEND.ClientVersion:output_type -> remote.ClientVersionReply + 19, // 59: remote.ETHBACKEND.Subscribe:output_type -> remote.SubscribeReply + 21, // 60: remote.ETHBACKEND.SubscribeLogs:output_type -> remote.SubscribeLogsReply + 23, // 61: remote.ETHBACKEND.SubscribeReceipts:output_type -> remote.SubscribeReceiptsReply + 25, // 62: remote.ETHBACKEND.Block:output_type -> remote.BlockReply + 17, // 63: remote.ETHBACKEND.CanonicalBodyForStorage:output_type -> remote.CanonicalBodyForStorageReply + 13, // 64: remote.ETHBACKEND.CanonicalHash:output_type -> remote.CanonicalHashReply + 15, // 65: remote.ETHBACKEND.HeaderNumber:output_type -> remote.HeaderNumberReply + 27, // 66: remote.ETHBACKEND.TxnLookup:output_type -> remote.TxnLookupReply + 31, // 67: remote.ETHBACKEND.NodeInfo:output_type -> remote.NodesInfoReply + 32, // 68: remote.ETHBACKEND.Peers:output_type -> remote.PeersReply + 33, // 69: remote.ETHBACKEND.AddPeer:output_type -> remote.AddPeerReply + 34, // 70: remote.ETHBACKEND.RemovePeer:output_type -> remote.RemovePeerReply + 35, // 71: remote.ETHBACKEND.PendingBlock:output_type -> remote.PendingBlockReply + 53, // 72: remote.ETHBACKEND.BorTxnLookup:output_type -> remote.BorTxnLookupReply + 54, // 73: remote.ETHBACKEND.BorEvents:output_type -> remote.BorEventsReply + 39, // 74: remote.ETHBACKEND.AAValidation:output_type -> remote.AAValidationReply + 41, // 75: remote.ETHBACKEND.BlockForTxNum:output_type -> remote.BlockForTxNumResponse + 42, // 76: remote.ETHBACKEND.MinimumBlockAvailable:output_type -> remote.MinimumBlockAvailableReply + 52, // [52:77] is the sub-list for method output_type + 27, // [27:52] is the sub-list for method input_type + 27, // [27:27] is the sub-list for extension type_name + 27, // [27:27] is the sub-list for extension extendee + 0, // [0:27] is the sub-list for field type_name } func init() { file_remote_ethbackend_proto_init() } @@ -2321,7 +2602,7 @@ func file_remote_ethbackend_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_remote_ethbackend_proto_rawDesc), len(file_remote_ethbackend_proto_rawDesc)), NumEnums: 1, - NumMessages: 41, + NumMessages: 43, NumExtensions: 0, NumServices: 1, }, diff --git a/node/gointerfaces/remoteproto/ethbackend_grpc.pb.go b/node/gointerfaces/remoteproto/ethbackend_grpc.pb.go index 632e3632db2..a8eef7d1c04 100644 --- a/node/gointerfaces/remoteproto/ethbackend_grpc.pb.go +++ b/node/gointerfaces/remoteproto/ethbackend_grpc.pb.go @@ -30,6 +30,7 @@ const ( ETHBACKEND_ClientVersion_FullMethodName = "/remote.ETHBACKEND/ClientVersion" ETHBACKEND_Subscribe_FullMethodName = "/remote.ETHBACKEND/Subscribe" ETHBACKEND_SubscribeLogs_FullMethodName = "/remote.ETHBACKEND/SubscribeLogs" + ETHBACKEND_SubscribeReceipts_FullMethodName = "/remote.ETHBACKEND/SubscribeReceipts" ETHBACKEND_Block_FullMethodName = "/remote.ETHBACKEND/Block" ETHBACKEND_CanonicalBodyForStorage_FullMethodName = "/remote.ETHBACKEND/CanonicalBodyForStorage" ETHBACKEND_CanonicalHash_FullMethodName = "/remote.ETHBACKEND/CanonicalHash" @@ -65,6 +66,8 @@ type ETHBACKENDClient interface { Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[SubscribeReply], error) // Only one subscription is needed to serve all the users, LogsFilterRequest allows to dynamically modifying the subscription SubscribeLogs(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[LogsFilterRequest, SubscribeLogsReply], error) + // Only one subscription is needed to serve all the users, ReceiptsFilterRequest allows to dynamically modifying the subscription + SubscribeReceipts(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[ReceiptsFilterRequest, SubscribeReceiptsReply], error) // High-level method - can read block from db, snapshots or apply any other logic // it doesn't provide consistency // Request fields are optional - it's ok to request block only by hash or only by number @@ -203,6 +206,19 @@ func (c *eTHBACKENDClient) SubscribeLogs(ctx context.Context, opts ...grpc.CallO // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. type ETHBACKEND_SubscribeLogsClient = grpc.BidiStreamingClient[LogsFilterRequest, SubscribeLogsReply] +func (c *eTHBACKENDClient) SubscribeReceipts(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[ReceiptsFilterRequest, SubscribeReceiptsReply], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, ÐBACKEND_ServiceDesc.Streams[2], ETHBACKEND_SubscribeReceipts_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[ReceiptsFilterRequest, SubscribeReceiptsReply]{ClientStream: stream} + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type ETHBACKEND_SubscribeReceiptsClient = grpc.BidiStreamingClient[ReceiptsFilterRequest, SubscribeReceiptsReply] + func (c *eTHBACKENDClient) Block(ctx context.Context, in *BlockRequest, opts ...grpc.CallOption) (*BlockReply, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(BlockReply) @@ -371,6 +387,8 @@ type ETHBACKENDServer interface { Subscribe(*SubscribeRequest, grpc.ServerStreamingServer[SubscribeReply]) error // Only one subscription is needed to serve all the users, LogsFilterRequest allows to dynamically modifying the subscription SubscribeLogs(grpc.BidiStreamingServer[LogsFilterRequest, SubscribeLogsReply]) error + // Only one subscription is needed to serve all the users, ReceiptsFilterRequest allows to dynamically modifying the subscription + SubscribeReceipts(grpc.BidiStreamingServer[ReceiptsFilterRequest, SubscribeReceiptsReply]) error // High-level method - can read block from db, snapshots or apply any other logic // it doesn't provide consistency // Request fields are optional - it's ok to request block only by hash or only by number @@ -434,6 +452,9 @@ func (UnimplementedETHBACKENDServer) Subscribe(*SubscribeRequest, grpc.ServerStr func (UnimplementedETHBACKENDServer) SubscribeLogs(grpc.BidiStreamingServer[LogsFilterRequest, SubscribeLogsReply]) error { return status.Error(codes.Unimplemented, "method SubscribeLogs not implemented") } +func (UnimplementedETHBACKENDServer) SubscribeReceipts(grpc.BidiStreamingServer[ReceiptsFilterRequest, SubscribeReceiptsReply]) error { + return status.Errorf(codes.Unimplemented, "method SubscribeReceipts not implemented") +} func (UnimplementedETHBACKENDServer) Block(context.Context, *BlockRequest) (*BlockReply, error) { return nil, status.Error(codes.Unimplemented, "method Block not implemented") } @@ -644,6 +665,13 @@ func _ETHBACKEND_SubscribeLogs_Handler(srv interface{}, stream grpc.ServerStream // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. type ETHBACKEND_SubscribeLogsServer = grpc.BidiStreamingServer[LogsFilterRequest, SubscribeLogsReply] +func _ETHBACKEND_SubscribeReceipts_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(ETHBACKENDServer).SubscribeReceipts(&grpc.GenericServerStream[ReceiptsFilterRequest, SubscribeReceiptsReply]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type ETHBACKEND_SubscribeReceiptsServer = grpc.BidiStreamingServer[ReceiptsFilterRequest, SubscribeReceiptsReply] + func _ETHBACKEND_Block_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(BlockRequest) if err := dec(in); err != nil { @@ -1022,6 +1050,12 @@ var ETHBACKEND_ServiceDesc = grpc.ServiceDesc{ ServerStreams: true, ClientStreams: true, }, + { + StreamName: "SubscribeReceipts", + Handler: _ETHBACKEND_SubscribeReceipts_Handler, + ServerStreams: true, + ClientStreams: true, + }, }, Metadata: "remote/ethbackend.proto", } diff --git a/node/privateapi/ethbackend.go b/node/privateapi/ethbackend.go index c9073354b49..86a0209c95d 100644 --- a/node/privateapi/ethbackend.go +++ b/node/privateapi/ethbackend.go @@ -69,9 +69,10 @@ type EthBackendServer struct { bridgeStore bridge.Store latestBlockBuiltStore *builder.LatestBlockBuiltStore - logsFilter *LogsFilterAggregator - logger log.Logger - chainConfig *chain.Config + logsFilter *LogsFilterAggregator + receiptsFilter *ReceiptsFilterAggregator + logger log.Logger + chainConfig *chain.Config } type EthBackend interface { @@ -95,6 +96,7 @@ func NewEthBackendServer(ctx context.Context, eth EthBackend, db kv.RwDB, notifi blockReader: blockReader, bridgeStore: bridgeStore, logsFilter: NewLogsFilterAggregator(notifications.Events), + receiptsFilter: NewReceiptsFilterAggregator(notifications.Events), logger: logger, latestBlockBuiltStore: latestBlockBuiltStore, chainConfig: chainConfig, @@ -121,6 +123,29 @@ func NewEthBackendServer(ctx context.Context, eth EthBackend, db kv.RwDB, notifi } } }() + + rch, rclean := s.notifications.Events.AddReceiptsSubscription() + go func() { + var err error + defer rclean() + defer func() { + if err != nil { + if !errors.Is(err, context.Canceled) { + logger.Warn("[rpc] terminated subscription to `receipts` events", "reason", err) + } + } + }() + for { + select { + case <-s.ctx.Done(): + err = s.ctx.Err() + return + case receipts := <-rch: + s.receiptsFilter.distributeReceipts(receipts) + } + } + }() + return s } @@ -405,6 +430,13 @@ func (s *EthBackendServer) SubscribeLogs(server remoteproto.ETHBACKEND_Subscribe return errors.New("no logs filter available") } +func (s *EthBackendServer) SubscribeReceipts(server remoteproto.ETHBACKEND_SubscribeReceiptsServer) (err error) { + if s.receiptsFilter != nil { + return s.receiptsFilter.subscribeReceipts(server) + } + return errors.New("no receipts filter available") +} + func (s *EthBackendServer) BorTxnLookup(ctx context.Context, req *remoteproto.BorTxnLookupRequest) (*remoteproto.BorTxnLookupReply, error) { tx, err := s.db.BeginRo(ctx) if err != nil { diff --git a/node/privateapi/receiptsfilter.go b/node/privateapi/receiptsfilter.go new file mode 100644 index 00000000000..04f81bdca97 --- /dev/null +++ b/node/privateapi/receiptsfilter.go @@ -0,0 +1,165 @@ +// Copyright 2025 The Erigon Authors +// This file is part of Erigon. +// +// Erigon is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Erigon is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with Erigon. If not, see . + +package privateapi + +import ( + "fmt" + "io" + "sync" + + "github.com/erigontech/erigon/common" + "github.com/erigontech/erigon/node/gointerfaces" + "github.com/erigontech/erigon/node/gointerfaces/remoteproto" + "github.com/erigontech/erigon/node/shards" +) + +type ReceiptsFilterAggregator struct { + aggReceiptsFilter ReceiptsFilter + receiptsFilters map[uint64]*ReceiptsFilter + receiptsFilterLock sync.Mutex + nextFilterId uint64 + events *shards.Events +} + +type ReceiptsFilter struct { + allTxHashes int + txHashes map[common.Hash]int + sender remoteproto.ETHBACKEND_SubscribeReceiptsServer +} + +func NewReceiptsFilterAggregator(events *shards.Events) *ReceiptsFilterAggregator { + return &ReceiptsFilterAggregator{ + aggReceiptsFilter: ReceiptsFilter{ + txHashes: make(map[common.Hash]int), + }, + receiptsFilters: make(map[uint64]*ReceiptsFilter), + nextFilterId: 0, + events: events, + } +} + +func (a *ReceiptsFilterAggregator) insertReceiptsFilter(sender remoteproto.ETHBACKEND_SubscribeReceiptsServer) (uint64, *ReceiptsFilter) { + a.receiptsFilterLock.Lock() + defer a.receiptsFilterLock.Unlock() + filterId := a.nextFilterId + a.nextFilterId++ + filter := &ReceiptsFilter{ + txHashes: make(map[common.Hash]int), + sender: sender, + } + a.receiptsFilters[filterId] = filter + return filterId, filter +} + +func (a *ReceiptsFilterAggregator) checkEmpty() { + isEmpty := a.aggReceiptsFilter.allTxHashes == 0 && len(a.aggReceiptsFilter.txHashes) == 0 + a.events.EmptyReceiptSubscription(isEmpty) +} + +func (a *ReceiptsFilterAggregator) removeReceiptsFilter(filterId uint64, filter *ReceiptsFilter) { + a.receiptsFilterLock.Lock() + defer a.receiptsFilterLock.Unlock() + a.subtractReceiptsFilters(filter) + delete(a.receiptsFilters, filterId) + a.checkEmpty() +} + +func (a *ReceiptsFilterAggregator) updateReceiptsFilter(filter *ReceiptsFilter, filterReq *remoteproto.ReceiptsFilterRequest) { + a.receiptsFilterLock.Lock() + defer a.receiptsFilterLock.Unlock() + a.subtractReceiptsFilters(filter) + filter.txHashes = make(map[common.Hash]int) + + // Empty TransactionHashes slice (not nil) means subscribe to all + txHashes := filterReq.GetTransactionHashes() + if txHashes != nil && len(txHashes) == 0 { + filter.allTxHashes = 1 + } else if filterReq.GetAllTransactions() { + filter.allTxHashes = 1 + } else { + filter.allTxHashes = 0 + } + + // Process specific transaction hashes + for _, txHash := range txHashes { + filter.txHashes[gointerfaces.ConvertH256ToHash(txHash)] = 1 + } + + a.addReceiptsFilters(filter) + a.checkEmpty() +} + +func (a *ReceiptsFilterAggregator) addReceiptsFilters(filter *ReceiptsFilter) { + a.aggReceiptsFilter.allTxHashes += filter.allTxHashes + for txHash, count := range filter.txHashes { + a.aggReceiptsFilter.txHashes[txHash] += count + } +} + +func (a *ReceiptsFilterAggregator) subtractReceiptsFilters(filter *ReceiptsFilter) { + a.aggReceiptsFilter.allTxHashes -= filter.allTxHashes + for txHash, count := range filter.txHashes { + a.aggReceiptsFilter.txHashes[txHash] -= count + if a.aggReceiptsFilter.txHashes[txHash] == 0 { + delete(a.aggReceiptsFilter.txHashes, txHash) + } + } +} + +func (a *ReceiptsFilterAggregator) subscribeReceipts(server remoteproto.ETHBACKEND_SubscribeReceiptsServer) error { + filterId, filter := a.insertReceiptsFilter(server) + defer a.removeReceiptsFilter(filterId, filter) + var filterReq *remoteproto.ReceiptsFilterRequest + var recvErr error + for filterReq, recvErr = server.Recv(); recvErr == nil; filterReq, recvErr = server.Recv() { + a.updateReceiptsFilter(filter, filterReq) + } + if recvErr != io.EOF { + return fmt.Errorf("receiving receipts filter request: %w", recvErr) + } + return nil +} + +func (a *ReceiptsFilterAggregator) distributeReceipts(receipts []*remoteproto.SubscribeReceiptsReply) error { + a.receiptsFilterLock.Lock() + defer a.receiptsFilterLock.Unlock() + filtersToDelete := make(map[uint64]*ReceiptsFilter) + for _, receipt := range receipts { + if a.aggReceiptsFilter.allTxHashes == 0 { + txHash := gointerfaces.ConvertH256ToHash(receipt.TransactionHash) + if _, ok := a.aggReceiptsFilter.txHashes[txHash]; !ok { + continue + } + } + for filterId, filter := range a.receiptsFilters { + if filter.allTxHashes == 0 { + txHash := gointerfaces.ConvertH256ToHash(receipt.TransactionHash) + if _, ok := filter.txHashes[txHash]; !ok { + continue + } + } + if err := filter.sender.Send(receipt); err != nil { + filtersToDelete[filterId] = filter + } + } + } + for filterId, filter := range filtersToDelete { + a.subtractReceiptsFilters(filter) + delete(a.receiptsFilters, filterId) + } + return nil +} diff --git a/node/privateapi/receiptsfilter_test.go b/node/privateapi/receiptsfilter_test.go new file mode 100644 index 00000000000..d692f259a26 --- /dev/null +++ b/node/privateapi/receiptsfilter_test.go @@ -0,0 +1,302 @@ +// Copyright 2024 The Erigon Authors +// This file is part of Erigon. +// +// Erigon is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Erigon is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with Erigon. If not, see . + +package privateapi + +import ( + "context" + "io" + "testing" + + "google.golang.org/grpc" + + "github.com/erigontech/erigon/common" + "github.com/erigontech/erigon/node/gointerfaces" + "github.com/erigontech/erigon/node/gointerfaces/remoteproto" + "github.com/erigontech/erigon/node/gointerfaces/typesproto" + "github.com/erigontech/erigon/node/shards" +) + +var ( + txHash1 = common.HexToHash("0xffc4978dfe7ab496f0158ae8916adae6ffd0c1fca4f09f7a7134556011357424") + txHash2 = common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef") + txHash1H256 *typesproto.H256 + txHash2H256 *typesproto.H256 +) + +func init() { + txHash1H256 = gointerfaces.ConvertHashToH256(txHash1) + txHash2H256 = gointerfaces.ConvertHashToH256(txHash2) +} + +type testReceiptsServer struct { + received chan *remoteproto.ReceiptsFilterRequest + receiveCompleted chan struct{} + sent []*remoteproto.SubscribeReceiptsReply + ctx context.Context + grpc.ServerStream +} + +func newTestReceiptsServer(ctx context.Context) *testReceiptsServer { + ts := &testReceiptsServer{ + received: make(chan *remoteproto.ReceiptsFilterRequest, 256), + receiveCompleted: make(chan struct{}, 1), + sent: make([]*remoteproto.SubscribeReceiptsReply, 0), + ctx: ctx, + ServerStream: nil, + } + go func() { + <-ts.ctx.Done() + close(ts.received) + }() + return ts +} + +func (ts *testReceiptsServer) Send(m *remoteproto.SubscribeReceiptsReply) error { + ts.sent = append(ts.sent, m) + return nil +} + +func (ts *testReceiptsServer) Recv() (*remoteproto.ReceiptsFilterRequest, error) { + // notify receive completed when the last request has been processed + if len(ts.received) == 0 { + ts.receiveCompleted <- struct{}{} + } + + request, ok := <-ts.received + if !ok { + return nil, io.EOF + } + return request, nil +} + +func createReceipt(txHash common.Hash) *remoteproto.SubscribeReceiptsReply { + return &remoteproto.SubscribeReceiptsReply{ + BlockHash: gointerfaces.ConvertHashToH256([32]byte{1}), + BlockNumber: 100, + TransactionHash: gointerfaces.ConvertHashToH256(txHash), + TransactionIndex: 0, + Type: 0, + Status: 1, + CumulativeGasUsed: 21000, + GasUsed: 21000, + ContractAddress: nil, + Logs: []*remoteproto.SubscribeLogsReply{}, + LogsBloom: make([]byte, 256), + From: gointerfaces.ConvertAddressToH160([20]byte{2}), + To: gointerfaces.ConvertAddressToH160([20]byte{3}), + } +} + +func TestReceiptsFilter_EmptyFilter_DoesNotDistributeAnything(t *testing.T) { + events := shards.NewEvents() + agg := NewReceiptsFilterAggregator(events) + + ctx := t.Context() + srv := newTestReceiptsServer(ctx) + + // Empty filter - no transaction hashes specified + req1 := &remoteproto.ReceiptsFilterRequest{ + TransactionHashes: nil, + } + srv.received <- req1 + + go func() { + err := agg.subscribeReceipts(srv) + if err != nil { + t.Error(err) + } + }() + + <-srv.receiveCompleted + + // Try to distribute a receipt - but empty filter means nothing matches + receipt := createReceipt(txHash1) + _ = agg.distributeReceipts([]*remoteproto.SubscribeReceiptsReply{receipt}) + + if len(srv.sent) != 0 { + t.Error("expected the sent slice to be empty for empty filter") + } +} + +func TestReceiptsFilter_AllTransactionsFilter_DistributesAllReceipts(t *testing.T) { + events := shards.NewEvents() + agg := NewReceiptsFilterAggregator(events) + + ctx := t.Context() + srv := newTestReceiptsServer(ctx) + + // Empty TransactionHashes means subscribe to all receipts + req1 := &remoteproto.ReceiptsFilterRequest{ + TransactionHashes: []*typesproto.H256{}, + } + srv.received <- req1 + + go func() { + err := agg.subscribeReceipts(srv) + if err != nil { + t.Error(err) + } + }() + + <-srv.receiveCompleted + + // Should distribute any receipt + receipt1 := createReceipt(txHash1) + _ = agg.distributeReceipts([]*remoteproto.SubscribeReceiptsReply{receipt1}) + if len(srv.sent) != 1 { + t.Error("expected the sent slice to have the receipt present") + } + + receipt2 := createReceipt(txHash2) + _ = agg.distributeReceipts([]*remoteproto.SubscribeReceiptsReply{receipt2}) + if len(srv.sent) != 2 { + t.Error("expected any receipt to be allowed through the filter") + } +} + +func TestReceiptsFilter_SpecificTransactionHash_OnlyAllowsThatTransactionThrough(t *testing.T) { + events := shards.NewEvents() + agg := NewReceiptsFilterAggregator(events) + + ctx := t.Context() + srv := newTestReceiptsServer(ctx) + + // Filter for specific transaction hash + req1 := &remoteproto.ReceiptsFilterRequest{ + TransactionHashes: []*typesproto.H256{txHash1H256}, + } + srv.received <- req1 + + go func() { + err := agg.subscribeReceipts(srv) + if err != nil { + t.Error(err) + } + }() + + <-srv.receiveCompleted + + // Try with non-matching transaction hash + receipt := createReceipt(txHash2) + _ = agg.distributeReceipts([]*remoteproto.SubscribeReceiptsReply{receipt}) + if len(srv.sent) != 0 { + t.Error("the sent slice should be empty as the transaction hash didn't match") + } + + // Try with matching transaction hash + receipt = createReceipt(txHash1) + _ = agg.distributeReceipts([]*remoteproto.SubscribeReceiptsReply{receipt}) + if len(srv.sent) != 1 { + t.Error("expected the receipt to be distributed as the transaction hash matched") + } +} + +func TestReceiptsFilter_MultipleTransactionHashes_AllowsAnyOfThem(t *testing.T) { + events := shards.NewEvents() + agg := NewReceiptsFilterAggregator(events) + + ctx := t.Context() + srv := newTestReceiptsServer(ctx) + + // Filter for multiple transaction hashes + req1 := &remoteproto.ReceiptsFilterRequest{ + TransactionHashes: []*typesproto.H256{txHash1H256, txHash2H256}, + } + srv.received <- req1 + + go func() { + err := agg.subscribeReceipts(srv) + if err != nil { + t.Error(err) + } + }() + + <-srv.receiveCompleted + + // Try with first transaction hash + receipt1 := createReceipt(txHash1) + _ = agg.distributeReceipts([]*remoteproto.SubscribeReceiptsReply{receipt1}) + if len(srv.sent) != 1 { + t.Error("expected the receipt to be distributed as txHash1 matched") + } + + // Try with second transaction hash + receipt2 := createReceipt(txHash2) + _ = agg.distributeReceipts([]*remoteproto.SubscribeReceiptsReply{receipt2}) + if len(srv.sent) != 2 { + t.Error("expected the receipt to be distributed as txHash2 matched") + } + + // Try with non-matching transaction hash + txHash3 := common.HexToHash("0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef") + receipt3 := createReceipt(txHash3) + _ = agg.distributeReceipts([]*remoteproto.SubscribeReceiptsReply{receipt3}) + if len(srv.sent) != 2 { + t.Error("the sent slice should not increase as txHash3 didn't match") + } +} + +func TestReceiptsFilter_UpdateFilter_ChangesWhatIsAllowed(t *testing.T) { + events := shards.NewEvents() + agg := NewReceiptsFilterAggregator(events) + + ctx := t.Context() + srv := newTestReceiptsServer(ctx) + + // Start with filter for txHash1 + req1 := &remoteproto.ReceiptsFilterRequest{ + TransactionHashes: []*typesproto.H256{txHash1H256}, + } + srv.received <- req1 + + go func() { + err := agg.subscribeReceipts(srv) + if err != nil { + t.Error(err) + } + }() + + <-srv.receiveCompleted + + // Should allow txHash1 + receipt1 := createReceipt(txHash1) + _ = agg.distributeReceipts([]*remoteproto.SubscribeReceiptsReply{receipt1}) + if len(srv.sent) != 1 { + t.Error("expected txHash1 to be allowed") + } + + // Update filter to txHash2 + req2 := &remoteproto.ReceiptsFilterRequest{ + TransactionHashes: []*typesproto.H256{txHash2H256}, + } + srv.received <- req2 + <-srv.receiveCompleted + + // Now txHash1 should be rejected + receipt1Again := createReceipt(txHash1) + _ = agg.distributeReceipts([]*remoteproto.SubscribeReceiptsReply{receipt1Again}) + if len(srv.sent) != 1 { + t.Error("expected txHash1 to be rejected after filter update") + } + + // And txHash2 should be allowed + receipt2 := createReceipt(txHash2) + _ = agg.distributeReceipts([]*remoteproto.SubscribeReceiptsReply{receipt2}) + if len(srv.sent) != 2 { + t.Error("expected txHash2 to be allowed after filter update") + } +} diff --git a/node/shards/events.go b/node/shards/events.go index c6a272fbbf0..225c6e380da 100644 --- a/node/shards/events.go +++ b/node/shards/events.go @@ -20,6 +20,8 @@ import ( "sync" "sync/atomic" + "github.com/holiman/uint256" + "github.com/erigontech/erigon/common" "github.com/erigontech/erigon/execution/types" "github.com/erigontech/erigon/node/gointerfaces" @@ -46,12 +48,15 @@ type Events struct { pendingTxsSubscriptions map[int]PendingTxsSubscription logsSubscriptions map[int]chan []*remoteproto.SubscribeLogsReply hasLogSubscriptions bool + receiptsSubscriptions map[int]chan []*remoteproto.SubscribeReceiptsReply + hasReceiptSubscriptions bool lock sync.RWMutex } func NewEvents() *Events { return &Events{ headerSubscriptions: map[int]chan [][]byte{}, + receiptsSubscriptions: map[int]chan []*remoteproto.SubscribeReceiptsReply{}, pendingLogsSubscriptions: map[int]PendingLogsSubscription{}, pendingBlockSubscriptions: map[int]PendingBlockSubscription{}, pendingTxsSubscriptions: map[int]PendingTxsSubscription{}, @@ -75,6 +80,31 @@ func (e *Events) AddHeaderSubscription() (chan [][]byte, func()) { } } +func (e *Events) AddReceiptsSubscription() (chan []*remoteproto.SubscribeReceiptsReply, func()) { + e.lock.Lock() + defer e.lock.Unlock() + ch := make(chan []*remoteproto.SubscribeReceiptsReply, 8) + e.id++ + id := e.id + e.receiptsSubscriptions[id] = ch + return ch, func() { + delete(e.receiptsSubscriptions, id) + close(ch) + } +} + +func (e *Events) EmptyReceiptSubscription(empty bool) { + e.lock.Lock() + defer e.lock.Unlock() + e.hasReceiptSubscriptions = !empty +} + +func (e *Events) HasReceiptSubscriptions() bool { + e.lock.RLock() + defer e.lock.RUnlock() + return e.hasReceiptSubscriptions +} + func (e *Events) AddNewSnapshotSubscription() (chan struct{}, func()) { e.lock.Lock() defer e.lock.Unlock() @@ -185,6 +215,14 @@ func (e *Events) OnLogs(logs []*remoteproto.SubscribeLogsReply) { } } +func (e *Events) OnReceipts(receipts []*remoteproto.SubscribeReceiptsReply) { + e.lock.Lock() + defer e.lock.Unlock() + for _, ch := range e.receiptsSubscriptions { + common.PrioritizedSend(ch, receipts) + } +} + func (e *Events) OnRetirementStart(started bool) { e.lock.Lock() defer e.lock.Unlock() @@ -205,7 +243,7 @@ type Notifications struct { Events *Events Accumulator *Accumulator // StateAccumulator StateChangesConsumer StateChangeConsumer - RecentLogs *RecentLogs + RecentReceipts *RecentReceipts LastNewBlockSeen atomic.Uint64 // This is used by eth_syncing as an heuristic to determine if the node is syncing or not. } @@ -217,7 +255,7 @@ func NewNotifications(StateChangesConsumer StateChangeConsumer) *Notifications { return &Notifications{ Events: NewEvents(), Accumulator: NewAccumulator(), - RecentLogs: NewRecentLogs(512), + RecentReceipts: NewRecentReceipts(512), StateChangesConsumer: StateChangesConsumer, } } @@ -226,25 +264,33 @@ func NewNotifications(StateChangesConsumer StateChangeConsumer) *Notifications { // - Erigon3 doesn't store logs in db (yet) // - need support unwind of receipts // - need send notification after `rwtx.Commit` (or user will recv notification, but can't request new data by RPC) -type RecentLogs struct { +type RecentReceipts struct { receipts map[uint64]types.Receipts + txs map[uint64][]types.Transaction + headers map[uint64]*types.Header limit uint64 mu sync.Mutex } -func NewRecentLogs(limit uint64) *RecentLogs { - return &RecentLogs{receipts: make(map[uint64]types.Receipts, limit), limit: limit} +func NewRecentReceipts(limit uint64) *RecentReceipts { + return &RecentReceipts{ + receipts: make(map[uint64]types.Receipts, limit), + txs: make(map[uint64][]types.Transaction, limit), + headers: make(map[uint64]*types.Header, limit), + limit: limit, + } } +// Notify sends log notifications (for logs subscription) // [from,to) -func (r *RecentLogs) Notify(n *Events, from, to uint64, isUnwind bool) { +func (r *RecentReceipts) NotifyLogs(n *Events, from, to uint64, isUnwind bool) { if !n.HasLogSubscriptions() { return } r.mu.Lock() defer r.mu.Unlock() for bn, receipts := range r.receipts { - if bn+r.limit < from { //evict old + if bn+r.limit < from { // evict old delete(r.receipts, bn) continue } @@ -291,7 +337,7 @@ func (r *RecentLogs) Notify(n *Events, from, to uint64, isUnwind bool) { } } -func (r *RecentLogs) Add(receipts types.Receipts) { +func (r *RecentReceipts) Add(receipts types.Receipts, txs []types.Transaction, header *types.Header) { if len(receipts) == 0 { return } @@ -311,23 +357,143 @@ func (r *RecentLogs) Add(receipts types.Receipts) { return } r.receipts[blockNum] = receipts + r.txs[blockNum] = txs + r.headers[blockNum] = header - //enforce `limit`: drop all items older than `limit` blocks + // enforce `limit`: drop all items older than `limit` blocks if len(r.receipts) <= int(r.limit) { return } for bn := range r.receipts { if bn+r.limit < blockNum { delete(r.receipts, bn) + delete(r.txs, bn) + delete(r.headers, bn) + } + } +} + +// NotifyReceipts sends receipt Proto notifications (for receipts subscription) +// [from,to) +func (r *RecentReceipts) NotifyReceipts(n *Events, from, to uint64, isUnwind bool) { + if !n.HasReceiptSubscriptions() { + return + } + r.mu.Lock() + defer r.mu.Unlock() + + for blockNum, receipts := range r.receipts { + if blockNum+r.limit < from { // evict old + delete(r.receipts, blockNum) + delete(r.txs, blockNum) + delete(r.headers, blockNum) + continue + } + if blockNum < from || blockNum >= to { + continue + } + + txs := r.txs[blockNum] + header := r.headers[blockNum] + if len(receipts) == 0 || len(txs) == 0 || header == nil { + continue + } + + var reply []*remoteproto.SubscribeReceiptsReply + signer := types.MakeSigner(nil, blockNum, 0) + + for _, receipt := range receipts { + if receipt == nil { + continue + } + + txIndex := receipt.TransactionIndex + if int(txIndex) >= len(txs) { + continue + } + txn := txs[txIndex] + + // Convert logs to Proto format + protoLogs := make([]*remoteproto.SubscribeLogsReply, 0, len(receipt.Logs)) + for _, l := range receipt.Logs { + protoLog := &remoteproto.SubscribeLogsReply{ + Address: gointerfaces.ConvertAddressToH160(l.Address), + BlockHash: gointerfaces.ConvertHashToH256(receipt.BlockHash), + BlockNumber: blockNum, + Data: l.Data, + LogIndex: uint64(l.Index), + Topics: make([]*typesproto.H256, 0, len(l.Topics)), + TransactionHash: gointerfaces.ConvertHashToH256(receipt.TxHash), + TransactionIndex: uint64(l.TxIndex), + Removed: isUnwind, + } + for _, topic := range l.Topics { + protoLog.Topics = append(protoLog.Topics, gointerfaces.ConvertHashToH256(topic)) + } + protoLogs = append(protoLogs, protoLog) + } + + // Build Proto receipt with all metadata + protoReceipt := &remoteproto.SubscribeReceiptsReply{ + BlockHash: gointerfaces.ConvertHashToH256(receipt.BlockHash), + BlockNumber: blockNum, + TransactionHash: gointerfaces.ConvertHashToH256(receipt.TxHash), + TransactionIndex: uint64(txIndex), + Type: uint32(receipt.Type), + Status: receipt.Status, + CumulativeGasUsed: receipt.CumulativeGasUsed, + GasUsed: receipt.GasUsed, + LogsBloom: receipt.Bloom[:], + Logs: protoLogs, + BlobGasUsed: receipt.BlobGasUsed, + } + + // Add contract address if present + if receipt.ContractAddress != (common.Address{}) { + protoReceipt.ContractAddress = gointerfaces.ConvertAddressToH160(receipt.ContractAddress) + } + + // Add transaction data (from/to) from txs array + if sender, err := txn.Sender(*signer); err == nil { + protoReceipt.From = gointerfaces.ConvertAddressToH160(sender.Value()) + } + if to := txn.GetTo(); to != nil { + protoReceipt.To = gointerfaces.ConvertAddressToH160(*to) + } + protoReceipt.TxType = uint32(txn.Type()) + + // Add header data + if header.BaseFee != nil { + baseFee, _ := uint256.FromBig(header.BaseFee) + protoReceipt.BaseFee = gointerfaces.ConvertUint256IntToH256(baseFee) + } + protoReceipt.BlockTime = header.Time + if header.ExcessBlobGas != nil { + protoReceipt.ExcessBlobGas = *header.ExcessBlobGas + } + + // Add blob gas price for EIP-4844 if needed + // Can be calculated from ExcessBlobGas + + reply = append(reply, protoReceipt) + } + + // Send batch per block + if len(reply) > 0 { + n.OnReceipts(reply) } } } -func (r *RecentLogs) CopyAndReset(target *RecentLogs) { +func (r *RecentReceipts) CopyAndReset(target *RecentReceipts) { r.mu.Lock() defer r.mu.Unlock() for blockNum, receipts := range r.receipts { - target.Add(receipts) + txs := r.txs[blockNum] + header := r.headers[blockNum] + target.Add(receipts, txs, header) delete(r.receipts, blockNum) + delete(r.txs, blockNum) + delete(r.headers, blockNum) } } diff --git a/node/shards/events_test.go b/node/shards/events_test.go index c1fab383145..bce475c0bf0 100644 --- a/node/shards/events_test.go +++ b/node/shards/events_test.go @@ -20,36 +20,37 @@ import ( "math/big" "testing" - "github.com/erigontech/erigon/execution/types" "github.com/stretchr/testify/require" + + "github.com/erigontech/erigon/execution/types" ) -func TestRecentLogs(t *testing.T) { +func TestRecentReceipts(t *testing.T) { t.Parallel() t.Run("Evict", func(t *testing.T) { - e := NewRecentLogs(3) - e.Add(types.Receipts{{BlockNumber: big.NewInt(1)}}) - e.Add(types.Receipts{{BlockNumber: big.NewInt(11)}}) - e.Add(types.Receipts{{BlockNumber: big.NewInt(21)}}) + e := NewRecentReceipts(3) + e.Add(types.Receipts{{BlockNumber: big.NewInt(1)}}, []types.Transaction{}, nil) + e.Add(types.Receipts{{BlockNumber: big.NewInt(11)}}, []types.Transaction{}, nil) + e.Add(types.Receipts{{BlockNumber: big.NewInt(21)}}, []types.Transaction{}, nil) require.Len(t, e.receipts, 3) - e.Add(types.Receipts{{BlockNumber: big.NewInt(31)}}) + e.Add(types.Receipts{{BlockNumber: big.NewInt(31)}}, []types.Transaction{}, nil) require.Len(t, e.receipts, 1) }) t.Run("Nil", func(t *testing.T) { - e := NewRecentLogs(3) - e.Add(types.Receipts{nil, {BlockNumber: big.NewInt(1)}}) - e.Add(types.Receipts{{BlockNumber: big.NewInt(21)}, nil}) - e.Add(types.Receipts{nil, nil, {BlockNumber: big.NewInt(31)}}) + e := NewRecentReceipts(3) + e.Add(types.Receipts{nil, {BlockNumber: big.NewInt(1)}}, []types.Transaction{}, nil) + e.Add(types.Receipts{{BlockNumber: big.NewInt(21)}, nil}, []types.Transaction{}, nil) + e.Add(types.Receipts{nil, nil, {BlockNumber: big.NewInt(31)}}, []types.Transaction{}, nil) require.Len(t, e.receipts, 3) }) t.Run("Order", func(t *testing.T) { - e := NewRecentLogs(3) - e.Add(types.Receipts{{BlockNumber: big.NewInt(1)}}) - e.Add(types.Receipts{{BlockNumber: big.NewInt(11)}}) - e.Add(types.Receipts{{BlockNumber: big.NewInt(1)}}) + e := NewRecentReceipts(3) + e.Add(types.Receipts{{BlockNumber: big.NewInt(1)}}, []types.Transaction{}, nil) + e.Add(types.Receipts{{BlockNumber: big.NewInt(11)}}, []types.Transaction{}, nil) + e.Add(types.Receipts{{BlockNumber: big.NewInt(1)}}, []types.Transaction{}, nil) require.Len(t, e.receipts, 2) - e.Add(types.Receipts{{BlockNumber: big.NewInt(11)}}) + e.Add(types.Receipts{{BlockNumber: big.NewInt(11)}}, []types.Transaction{}, nil) require.Len(t, e.receipts, 2) }) } diff --git a/rpc/filters/api.go b/rpc/filters/api.go index ec86260cefa..dddbe2b2f5a 100644 --- a/rpc/filters/api.go +++ b/rpc/filters/api.go @@ -48,6 +48,13 @@ func DefaultLogFilterOptions() LogFilterOptions { } } +// TransactionReceiptsFilter defines criteria for transaction receipts subscription. +// If TransactionHashes is nil or empty, receipts for all transactions included in new blocks will be delivered. +// Otherwise, only receipts for the specified transactions will be delivered. +type ReceiptsFilterCriteria struct { + TransactionHashes []common.Hash `json:"transactionHashes"` +} + /* // filter is a helper struct that holds meta information over the filter type // and associated subscription in the event system. diff --git a/rpc/jsonrpc/eth_filters.go b/rpc/jsonrpc/eth_filters.go index b983d6af6df..96682a0e5f3 100644 --- a/rpc/jsonrpc/eth_filters.go +++ b/rpc/jsonrpc/eth_filters.go @@ -23,6 +23,7 @@ import ( "github.com/erigontech/erigon/common/dbg" "github.com/erigontech/erigon/common/log/v3" "github.com/erigontech/erigon/execution/types" + "github.com/erigontech/erigon/execution/types/ethutils" "github.com/erigontech/erigon/rpc" "github.com/erigontech/erigon/rpc/filters" "github.com/erigontech/erigon/rpc/rpchelper" @@ -313,3 +314,43 @@ func (api *APIImpl) Logs(ctx context.Context, crit filters.FilterCriteria) (*rpc return rpcSub, nil } + +// TransactionReceipts send a notification each time a new receipt appears. +func (api *APIImpl) TransactionReceipts(ctx context.Context, crit filters.ReceiptsFilterCriteria) (*rpc.Subscription, error) { + if api.filters == nil { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + go func() { + defer dbg.LogPanic() + receipts, id := api.filters.SubscribeReceipts(api.SubscribeLogsChannelSize, crit) + defer api.filters.UnsubscribeReceipts(id) + + for { + select { + case protoReceipt, ok := <-receipts: + if protoReceipt != nil { + receipt := ethutils.MarshalSubscribeReceipt(protoReceipt) + err := notifier.Notify(rpcSub.ID, []map[string]interface{}{receipt}) + if err != nil { + log.Warn("[rpc] error while notifying subscription", "err", err) + } + } + if !ok { + log.Warn("[rpc] receipts channel was closed") + return + } + case <-rpcSub.Err(): + return + } + } + }() + + return rpcSub, nil +} diff --git a/rpc/jsonrpc/eth_subscribe_test.go b/rpc/jsonrpc/eth_subscribe_test.go index b76a7f2989b..df4dca14aa2 100644 --- a/rpc/jsonrpc/eth_subscribe_test.go +++ b/rpc/jsonrpc/eth_subscribe_test.go @@ -37,6 +37,7 @@ import ( "github.com/erigontech/erigon/node/gointerfaces/sentryproto" "github.com/erigontech/erigon/node/privateapi" "github.com/erigontech/erigon/p2p/protocols/eth" + "github.com/erigontech/erigon/rpc/filters" "github.com/erigontech/erigon/rpc/rpchelper" ) @@ -102,3 +103,61 @@ func TestEthSubscribe(t *testing.T) { require.Equal(i, header.Number.Uint64()) } } + +func TestEthSubscribeReceipts(t *testing.T) { + m, require := mock.Mock(t), require.New(t) + + chain, err := blockgen.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 3, func(i int, b *blockgen.BlockGen) { + b.SetCoinbase(common.Address{1}) + }) + require.NoError(err) + + b, err := rlp.EncodeToBytes(ð.BlockHeadersPacket66{ + RequestId: 1, + BlockHeadersPacket: chain.Headers, + }) + require.NoError(err) + + m.ReceiveWg.Add(1) + for _, err = range m.Send(&sentryproto.InboundMessage{Id: sentryproto.MessageId_BLOCK_HEADERS_66, Data: b, PeerId: m.PeerId}) { + require.NoError(err) + } + m.ReceiveWg.Wait() + + ctx := context.Background() + logger := log.New() + backendServer := privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications, m.BlockReader, nil, logger, builder.NewLatestBlockBuiltStore(), nil) + backendClient := direct.NewEthBackendClientDirect(backendServer) + backend := rpcservices.NewRemoteBackend(backendClient, m.DB, m.BlockReader) + + subscriptionReadyWg := sync.WaitGroup{} + subscriptionReadyWg.Add(1) + onNewSnapshot := func() { + subscriptionReadyWg.Done() + } + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, backend, nil, nil, onNewSnapshot, m.Log) + subscriptionReadyWg.Wait() + + _, id := ff.SubscribeReceipts(16, filters.ReceiptsFilterCriteria{ + TransactionHashes: []common.Hash{}, + }) + defer ff.UnsubscribeReceipts(id) + + initialCycle, firstCycle := mock.MockInsertAsInitialCycle, false + + hook := stageloop.NewHook(m.Ctx, m.DB, m.Notifications, m.Sync, m.BlockReader, m.ChainConfig, m.Log, nil, nil, nil) + if err := m.DB.UpdateTemporal(m.Ctx, func(tx kv.TemporalRwTx) error { + sd, err := execctx.NewSharedDomains(m.Ctx, tx, log.Root()) + if err != nil { + return err + } + defer sd.Close() + if err := stageloop.StageLoopIteration(m.Ctx, m.DB, sd, tx, m.Sync, initialCycle, firstCycle, logger, m.BlockReader, hook); err != nil { + return err + } + return sd.Flush(m.Ctx, tx) + }); err != nil { + t.Fatal(err) + } + t.Logf("Successfully created receipt subscription with ID: %v", id) +} diff --git a/rpc/rpchelper/filter_id.go b/rpc/rpchelper/filter_id.go index d80691352ca..077ca524070 100644 --- a/rpc/rpchelper/filter_id.go +++ b/rpc/rpchelper/filter_id.go @@ -31,6 +31,7 @@ type ( PendingBlockSubID SubscriptionID PendingTxsSubID SubscriptionID LogsSubID SubscriptionID + ReceiptsSubID SubscriptionID ) var globalSubscriptionId uint64 diff --git a/rpc/rpchelper/filters.go b/rpc/rpchelper/filters.go index 198af4321ab..2b4f6505d70 100644 --- a/rpc/rpchelper/filters.go +++ b/rpc/rpchelper/filters.go @@ -52,13 +52,15 @@ type Filters struct { pendingBlock *types.Block - headsSubs *concurrent.SyncMap[HeadsSubID, Sub[*types.Header]] - pendingLogsSubs *concurrent.SyncMap[PendingLogsSubID, Sub[types.Logs]] - pendingBlockSubs *concurrent.SyncMap[PendingBlockSubID, Sub[*types.Block]] - pendingTxsSubs *concurrent.SyncMap[PendingTxsSubID, Sub[[]types.Transaction]] - logsSubs *LogsFilterAggregator - logsRequestor atomic.Value - onNewSnapshot func() + headsSubs *concurrent.SyncMap[HeadsSubID, Sub[*types.Header]] + pendingLogsSubs *concurrent.SyncMap[PendingLogsSubID, Sub[types.Logs]] + pendingBlockSubs *concurrent.SyncMap[PendingBlockSubID, Sub[*types.Block]] + pendingTxsSubs *concurrent.SyncMap[PendingTxsSubID, Sub[[]types.Transaction]] + logsSubs *LogsFilterAggregator + logsRequestor atomic.Value + receiptsSubs *ReceiptsFilterAggregator + receiptsRequestor atomic.Value + onNewSnapshot func() logsStores *concurrent.SyncMap[LogsSubID, []*types.Log] pendingHeadsStores *concurrent.SyncMap[HeadsSubID, []*types.Header] @@ -79,6 +81,7 @@ func New(ctx context.Context, config FiltersConfig, ethBackend ApiBackend, txPoo pendingTxsSubs: concurrent.NewSyncMap[PendingTxsSubID, Sub[[]types.Transaction]](), pendingLogsSubs: concurrent.NewSyncMap[PendingLogsSubID, Sub[types.Logs]](), pendingBlockSubs: concurrent.NewSyncMap[PendingBlockSubID, Sub[*types.Block]](), + receiptsSubs: NewReceiptsFilterAggregator(), logsSubs: NewLogsFilterAggregator(), onNewSnapshot: onNewSnapshot, logsStores: concurrent.NewSyncMap[LogsSubID, []*types.Log](), @@ -145,6 +148,34 @@ func New(ctx context.Context, config FiltersConfig, ethBackend ApiBackend, txPoo } }() + go func() { + if ethBackend == nil { + return + } + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "ethBackend_Receipts"}).Inc() + for { + select { + case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "ethBackend_Receipts"}).Dec() + return + default: + } + if err := ethBackend.SubscribeReceipts(ctx, ff.OnReceipts, &ff.receiptsRequestor); err != nil { + select { + case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "ethBackend_Receipts"}).Dec() + return + default: + } + if grpcutil.IsEndOfStream(err) || grpcutil.IsRetryLater(err) { + time.Sleep(3 * time.Second) + continue + } + logger.Warn("rpc filters: error subscribing to receipts", "err", err) + } + } + }() + if txPool != nil { go func() { activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingTxs"}).Inc() @@ -433,6 +464,55 @@ func (ff *Filters) UnsubscribePendingTxs(id PendingTxsSubID) bool { return true } +// SubscribeReceipts subscribes to transaction receipts and returns a channel to receive the receipts +// and a subscription ID to manage the subscription. +func (ff *Filters) SubscribeReceipts(size int, criteria filters.ReceiptsFilterCriteria) (<-chan *remoteproto.SubscribeReceiptsReply, ReceiptsSubID) { + sub := newChanSub[*remoteproto.SubscribeReceiptsReply](size) + id, f := ff.receiptsSubs.insertReceiptsFilter(sub) + f.transactionHashes = concurrent.NewSyncMap[common.Hash, int]() + if len(criteria.TransactionHashes) == 0 { + f.allTxHashes = 1 + } else { + txHashCount := 0 + maxTxHashes := ff.config.RpcSubscriptionFiltersMaxLogs // Reuse the same limit as logs + for _, txHash := range criteria.TransactionHashes { + if maxTxHashes == 0 || txHashCount < maxTxHashes { + f.transactionHashes.Put(txHash, 1) + txHashCount++ + } else { + break + } + } + } + ff.receiptsSubs.addReceiptsFilters(f) + rfr := ff.receiptsSubs.createFilterRequest() + loaded := ff.loadReceiptsRequester() + if loaded != nil { + if err := loaded.(func(*remoteproto.ReceiptsFilterRequest) error)(rfr); err != nil { + ff.logger.Warn("Could not update remote receipts filter", "err", err) + ff.receiptsSubs.removeReceiptsFilter(id) + } + } + return sub.ch, id +} + +// UnsubscribeReceipts unsubscribes from transaction receipts using the given subscription ID. +// It returns true if the unsubscription was successful, otherwise false. +func (ff *Filters) UnsubscribeReceipts(id ReceiptsSubID) bool { + removed := ff.receiptsSubs.removeReceiptsFilter(id) + if !removed { + return false + } + rfr := ff.receiptsSubs.createFilterRequest() + loaded := ff.loadReceiptsRequester() + if loaded != nil { + if err := loaded.(func(*remoteproto.ReceiptsFilterRequest) error)(rfr); err != nil { + ff.logger.Warn("Could not update remote receipts filter after unsubscribe", "err", err) + } + } + return true +} + // SubscribeLogs subscribes to logs using the specified filter criteria and returns a channel to receive the logs // and a subscription ID to manage the subscription. func (ff *Filters) SubscribeLogs(size int, criteria filters.FilterCriteria) (<-chan *types.Log, LogsSubID) { @@ -517,6 +597,13 @@ func (ff *Filters) loadLogsRequester() any { return ff.logsRequestor.Load() } +// loadReceiptsRequester loads the current receipts requester and returns it. +func (ff *Filters) loadReceiptsRequester() any { + ff.mu.Lock() + defer ff.mu.Unlock() + return ff.receiptsRequestor.Load() +} + func (ff *Filters) HasSubscription(id LogsSubID) bool { return ff.logsSubs.hasLogsFilter(id) } @@ -643,6 +730,11 @@ func (ff *Filters) onNewHeader(event *remoteproto.SubscribeReply) error { }) } +// OnReceipts handles a new receipt event from the remote and processes it. +func (ff *Filters) OnReceipts(reply *remoteproto.SubscribeReceiptsReply) { + ff.receiptsSubs.distributeReceipt(reply) +} + // OnNewTx handles a new transaction event from the transaction pool and processes it. func (ff *Filters) OnNewTx(reply *txpoolproto.OnAddReply) { txs := make([]types.Transaction, len(reply.RplTxs)) diff --git a/rpc/rpchelper/filters_test.go b/rpc/rpchelper/filters_test.go index 9f47d96438d..59d9122ed60 100644 --- a/rpc/rpchelper/filters_test.go +++ b/rpc/rpchelper/filters_test.go @@ -504,3 +504,275 @@ func TestFilters_AddPendingTxs(t *testing.T) { }) } } + +func createReceipt(txHash common.Hash) *remoteproto.SubscribeReceiptsReply { + return &remoteproto.SubscribeReceiptsReply{ + BlockHash: gointerfaces.ConvertHashToH256([32]byte{1}), + BlockNumber: 100, + TransactionHash: gointerfaces.ConvertHashToH256(txHash), + TransactionIndex: 0, + Type: 0, + Status: 1, + CumulativeGasUsed: 21000, + GasUsed: 21000, + ContractAddress: nil, + Logs: []*remoteproto.SubscribeLogsReply{}, + LogsBloom: make([]byte, 256), + From: gointerfaces.ConvertAddressToH160([20]byte{2}), + To: gointerfaces.ConvertAddressToH160([20]byte{3}), + } +} + +var ( + txHash1 = common.HexToHash("0xffc4978dfe7ab496f0158ae8916adae6ffd0c1fca4f09f7a7134556011357424") + txHash2 = common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef") + txHash3 = common.HexToHash("0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef") +) + +func TestFilters_SingleReceiptsSubscription_OnlyTransactionHashesSubscribedAreBroadcast(t *testing.T) { + t.Parallel() + config := FiltersConfig{} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) + + criteria := filters.ReceiptsFilterCriteria{ + TransactionHashes: []common.Hash{txHash1}, + } + + outChan, _ := f.SubscribeReceipts(10, criteria) + + // Create a receipt for a different transaction hash + receipt := createReceipt(txHash2) + + f.OnReceipts(receipt) + + if len(outChan) != 0 { + t.Error("expected the subscription channel to be empty for non-matching txHash") + } + + // Now a receipt that the subscription cares about + receipt = createReceipt(txHash1) + + f.OnReceipts(receipt) + + if len(outChan) != 1 { + t.Error("expected a message in the channel for the subscribed transaction hash") + } +} + +func TestFilters_ReceiptsSubscription_EmptyFilterSubscribesToAll(t *testing.T) { + t.Parallel() + config := FiltersConfig{} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) + + // Empty TransactionHashes means subscribe to all receipts + criteria := filters.ReceiptsFilterCriteria{ + TransactionHashes: []common.Hash{}, + } + + outChan, _ := f.SubscribeReceipts(10, criteria) + + // Any receipt should be received + receipt1 := createReceipt(txHash1) + f.OnReceipts(receipt1) + + if len(outChan) != 1 { + t.Error("expected empty filter to receive all receipts") + } + + receipt2 := createReceipt(txHash2) + f.OnReceipts(receipt2) + + if len(outChan) != 2 { + t.Error("expected empty filter to receive all receipts") + } +} + +func TestFilters_TwoReceiptsSubscriptionsWithDifferentCriteria(t *testing.T) { + t.Parallel() + config := FiltersConfig{} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) + + // First subscription: all receipts + criteria1 := filters.ReceiptsFilterCriteria{ + TransactionHashes: []common.Hash{}, + } + // Second subscription: specific transaction hash + criteria2 := filters.ReceiptsFilterCriteria{ + TransactionHashes: []common.Hash{txHash1}, + } + + chan1, _ := f.SubscribeReceipts(256, criteria1) + chan2, _ := f.SubscribeReceipts(256, criteria2) + + // Create a receipt for txHash2 + receipt := createReceipt(txHash2) + + f.OnReceipts(receipt) + + if len(chan1) != 1 { + t.Error("expected channel 1 to receive the receipt, no filters") + } + if len(chan2) != 0 { + t.Error("expected channel 2 to be empty, it has a transaction hash filter") + } + + // Now a receipt that the second subscription cares about + receipt = createReceipt(txHash1) + + f.OnReceipts(receipt) + + if len(chan1) != 2 { + t.Error("expected the second receipt to be in the channel with no filters") + } + if len(chan2) != 1 { + t.Error("expected the channel with filters to receive the message as the filter matches") + } +} + +func TestFilters_ThreeReceiptsSubscriptionsWithDifferentCriteria(t *testing.T) { + t.Parallel() + config := FiltersConfig{} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) + + criteria1 := filters.ReceiptsFilterCriteria{ + TransactionHashes: []common.Hash{}, + } + criteria2 := filters.ReceiptsFilterCriteria{ + TransactionHashes: []common.Hash{txHash1}, + } + criteria3 := filters.ReceiptsFilterCriteria{ + TransactionHashes: []common.Hash{txHash1, txHash2}, + } + + chan1, _ := f.SubscribeReceipts(256, criteria1) + chan2, _ := f.SubscribeReceipts(256, criteria2) + chan3, _ := f.SubscribeReceipts(256, criteria3) + + // Receipt for txHash3 (not subscribed by chan2 or chan3) + receipt := createReceipt(txHash3) + + f.OnReceipts(receipt) + + if len(chan1) != 1 { + t.Error("expected channel 1 to receive the receipt, no filters") + } + if len(chan2) != 0 { + t.Error("expected channel 2 to be empty, txHash doesn't match") + } + if len(chan3) != 0 { + t.Error("expected channel 3 to be empty, txHash doesn't match") + } + + // Receipt for txHash1 (subscribed by chan2 and chan3) + receipt = createReceipt(txHash1) + + f.OnReceipts(receipt) + + if len(chan1) != 2 { + t.Error("expected the second receipt to be in channel 1 with no filters") + } + if len(chan2) != 1 { + t.Error("expected channel 2 to contain a receipt as txHash matched") + } + if len(chan3) != 1 { + t.Error("expected channel 3 to contain a receipt as txHash matched") + } + + // Receipt for txHash2 (subscribed by chan3 only) + receipt = createReceipt(txHash2) + + f.OnReceipts(receipt) + + if len(chan1) != 3 { + t.Error("expected the third receipt to be in channel 1 with no filters") + } + if len(chan2) != 1 { + t.Error("expected channel 2 to still have 1 as txHash2 doesn't match") + } + if len(chan3) != 2 { + t.Error("expected channel 3 to have 2 receipts as txHash2 matched") + } +} + +func TestFilters_SubscribeReceiptsGeneratesCorrectReceiptsFilterRequest(t *testing.T) { + t.Parallel() + var lastFilterRequest *remoteproto.ReceiptsFilterRequest + loadRequester := func(r *remoteproto.ReceiptsFilterRequest) error { + lastFilterRequest = r + return nil + } + + config := FiltersConfig{} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) + f.receiptsRequestor.Store(loadRequester) + + // First request: subscribe to all receipts + criteria1 := filters.ReceiptsFilterCriteria{ + TransactionHashes: []common.Hash{}, + } + _, id1 := f.SubscribeReceipts(1, criteria1) + + // Request should have AllTransactions=true and empty TransactionHashes + if !lastFilterRequest.AllTransactions { + t.Error("1: expected AllTransactions to be true for subscribe-all") + } + if len(lastFilterRequest.TransactionHashes) != 0 { + t.Error("1: expected transaction hashes to be empty for subscribe-all") + } + + // Second request: filter on a specific transaction hash + criteria2 := filters.ReceiptsFilterCriteria{ + TransactionHashes: []common.Hash{txHash1}, + } + _, id2 := f.SubscribeReceipts(1, criteria2) + + // Request should have AllTransactions=true and include txHash1 + // Backend uses OR logic: send if (AllTransactions OR hash matches) + if !lastFilterRequest.AllTransactions { + t.Error("2: expected AllTransactions to be true") + } + if len(lastFilterRequest.TransactionHashes) != 1 { + t.Errorf("2: expected 1 transaction hash, got %d", len(lastFilterRequest.TransactionHashes)) + } + if gointerfaces.ConvertH256ToHash(lastFilterRequest.TransactionHashes[0]) != txHash1 { + t.Error("2: expected transaction hash to match txHash1") + } + + // Unsubscribe the first filter (subscribe-all) + f.UnsubscribeReceipts(id1) + + // Now request should only have txHash1 + if len(lastFilterRequest.TransactionHashes) != 1 { + t.Errorf("3: expected 1 transaction hash, got %d", len(lastFilterRequest.TransactionHashes)) + } + if gointerfaces.ConvertH256ToHash(lastFilterRequest.TransactionHashes[0]) != txHash1 { + t.Error("3: expected transaction hash to match txHash1") + } + + // Third request: filter on multiple transaction hashes + criteria3 := filters.ReceiptsFilterCriteria{ + TransactionHashes: []common.Hash{txHash2, txHash3}, + } + _, id3 := f.SubscribeReceipts(1, criteria3) + + // Request should have all three transaction hashes + if len(lastFilterRequest.TransactionHashes) != 3 { + t.Errorf("4: expected 3 transaction hashes, got %d", len(lastFilterRequest.TransactionHashes)) + } + + // Unsubscribe the second filter + f.UnsubscribeReceipts(id2) + + // Request should have only txHash2 and txHash3 + if len(lastFilterRequest.TransactionHashes) != 2 { + t.Errorf("5: expected 2 transaction hashes, got %d", len(lastFilterRequest.TransactionHashes)) + } + + // Unsubscribe the last filter + f.UnsubscribeReceipts(id3) + + // Request should be nil (no active subscriptions) + if lastFilterRequest.TransactionHashes != nil { + t.Error("6: expected transaction hashes to be nil with no subscriptions") + } +} diff --git a/rpc/rpchelper/interface.go b/rpc/rpchelper/interface.go index 33de37eb36b..40bc86b1e4d 100644 --- a/rpc/rpchelper/interface.go +++ b/rpc/rpchelper/interface.go @@ -39,6 +39,7 @@ type ApiBackend interface { ClientVersion(ctx context.Context) (string, error) Subscribe(ctx context.Context, cb func(*remoteproto.SubscribeReply)) error SubscribeLogs(ctx context.Context, cb func(*remoteproto.SubscribeLogsReply), requestor *atomic.Value) error + SubscribeReceipts(ctx context.Context, cb func(*remoteproto.SubscribeReceiptsReply), requestor *atomic.Value) error BlockWithSenders(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (block *types.Block, senders []common.Address, err error) NodeInfo(ctx context.Context, limit uint32) ([]p2p.NodeInfo, error) Peers(ctx context.Context) ([]*p2p.PeerInfo, error) diff --git a/rpc/rpchelper/receiptsfilter.go b/rpc/rpchelper/receiptsfilter.go new file mode 100644 index 00000000000..3b4d54a1c16 --- /dev/null +++ b/rpc/rpchelper/receiptsfilter.go @@ -0,0 +1,180 @@ +// Copyright 2025 The Erigon Authors +// This file is part of Erigon. +// +// Erigon is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Erigon is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with Erigon. If not, see . + +package rpchelper + +import ( + "sync" + + "github.com/erigontech/erigon/common" + "github.com/erigontech/erigon/common/concurrent" + "github.com/erigontech/erigon/node/gointerfaces" + "github.com/erigontech/erigon/node/gointerfaces/remoteproto" +) + +type ReceiptsFilterAggregator struct { + aggReceiptsFilter ReceiptsFilter // Aggregation of all current receipt filters + receiptsFilters *concurrent.SyncMap[ReceiptsSubID, *ReceiptsFilter] // Filter for each subscriber + receiptsFilterLock sync.RWMutex +} + +// ReceiptsFilter filters receipts by transaction hashes +type ReceiptsFilter struct { + allTxHashes int // Counter: subscribe to all receipts if > 0 + transactionHashes *concurrent.SyncMap[common.Hash, int] // Transaction hashes to filter, with ref count + sender Sub[*remoteproto.SubscribeReceiptsReply] +} + +// Send sends a receipt to the subscriber +func (f *ReceiptsFilter) Send(receipt *remoteproto.SubscribeReceiptsReply) { + f.sender.Send(receipt) +} + +// Close closes the sender +func (f *ReceiptsFilter) Close() { + f.sender.Close() +} + +// NewReceiptsFilterAggregator creates a new ReceiptsFilterAggregator +func NewReceiptsFilterAggregator() *ReceiptsFilterAggregator { + return &ReceiptsFilterAggregator{ + aggReceiptsFilter: ReceiptsFilter{ + transactionHashes: concurrent.NewSyncMap[common.Hash, int](), + }, + receiptsFilters: concurrent.NewSyncMap[ReceiptsSubID, *ReceiptsFilter](), + } +} + +// insertReceiptsFilter inserts a new receipt filter with the specified sender +func (a *ReceiptsFilterAggregator) insertReceiptsFilter(sender Sub[*remoteproto.SubscribeReceiptsReply]) (ReceiptsSubID, *ReceiptsFilter) { + a.receiptsFilterLock.Lock() + defer a.receiptsFilterLock.Unlock() + + filterId := ReceiptsSubID(generateSubscriptionID()) + filter := &ReceiptsFilter{ + transactionHashes: concurrent.NewSyncMap[common.Hash, int](), + sender: sender, + } + a.receiptsFilters.Put(filterId, filter) + return filterId, filter +} + +// removeReceiptsFilter removes a receipt filter +func (a *ReceiptsFilterAggregator) removeReceiptsFilter(filterId ReceiptsSubID) bool { + a.receiptsFilterLock.Lock() + defer a.receiptsFilterLock.Unlock() + + filter, ok := a.receiptsFilters.Get(filterId) + if !ok { + return false + } + + filter.Close() + a.subtractReceiptsFilters(filter) + + _, ok = a.receiptsFilters.Delete(filterId) + return ok +} + +// addReceiptsFilters adds filter counts to the aggregate +func (a *ReceiptsFilterAggregator) addReceiptsFilters(f *ReceiptsFilter) { + a.aggReceiptsFilter.allTxHashes += f.allTxHashes + + f.transactionHashes.Range(func(txHash common.Hash, count int) error { + a.aggReceiptsFilter.transactionHashes.DoAndStore(txHash, func(value int, exists bool) int { + return value + count + }) + return nil + }) +} + +// subtractReceiptsFilters subtracts filter counts from the aggregate +func (a *ReceiptsFilterAggregator) subtractReceiptsFilters(f *ReceiptsFilter) { + a.aggReceiptsFilter.allTxHashes -= f.allTxHashes + + f.transactionHashes.Range(func(txHash common.Hash, count int) error { + a.aggReceiptsFilter.transactionHashes.Do(txHash, func(value int, exists bool) (int, bool) { + if exists { + newValue := value - count + if newValue <= 0 { + return 0, false + } + return newValue, true + } + return 0, false + }) + return nil + }) +} + +// createFilterRequest creates a ReceiptsFilterRequest from current state +func (a *ReceiptsFilterAggregator) createFilterRequest() *remoteproto.ReceiptsFilterRequest { + a.receiptsFilterLock.RLock() + defer a.receiptsFilterLock.RUnlock() + + req := &remoteproto.ReceiptsFilterRequest{ + AllTransactions: a.aggReceiptsFilter.allTxHashes >= 1, + } + + // Always add specific transaction hashes (even if also subscribing to all) + // Backend will use OR logic: send if (AllTransactions OR hash matches) + a.aggReceiptsFilter.transactionHashes.Range(func(txHash common.Hash, count int) error { + if count > 0 { + req.TransactionHashes = append(req.TransactionHashes, gointerfaces.ConvertHashToH256(txHash)) + } + return nil + }) + + return req +} + +// getAggMaps returns aggregated transaction hashes +func (a *ReceiptsFilterAggregator) getAggMaps() map[common.Hash]int { + a.receiptsFilterLock.RLock() + defer a.receiptsFilterLock.RUnlock() + + txHashes := make(map[common.Hash]int) + a.aggReceiptsFilter.transactionHashes.Range(func(k common.Hash, v int) error { + txHashes[k] = v + return nil + }) + return txHashes +} + +// distributeReceipt processes a receipt and distributes it to matching filters +func (a *ReceiptsFilterAggregator) distributeReceipt(receipt *remoteproto.SubscribeReceiptsReply) error { + a.receiptsFilterLock.RLock() + defer a.receiptsFilterLock.RUnlock() + + txHash := gointerfaces.ConvertH256ToHash(receipt.TransactionHash) + + a.receiptsFilters.Range(func(k ReceiptsSubID, filter *ReceiptsFilter) error { + // Check if this filter matches the receipt + if filter.allTxHashes == 0 { + // Filter has specific transaction hashes + if _, ok := filter.transactionHashes.Get(txHash); !ok { + return nil // This filter doesn't want this receipt + } + } + // allTxHashes > 0 means subscribe to all receipts + + // Send to subscriber + filter.sender.Send(receipt) + return nil + }) + + return nil +}