Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions cmd/rpcdaemon/rpcservices/eth_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,29 @@ func (back *RemoteBackend) SubscribeLogs(ctx context.Context, onNewLogs func(rep
return nil
}

func (back *RemoteBackend) SubscribeReceipts(ctx context.Context, onNewReceipts func(reply *remoteproto.SubscribeReceiptsReply), requestor *atomic.Value) error {
subscription, err := back.remoteEthBackend.SubscribeReceipts(ctx, grpc.WaitForReady(true))
if err != nil {
if s, ok := status.FromError(err); ok {
return errors.New(s.Message())
}
return err
}
requestor.Store(subscription.Send)
for {
receipts, err := subscription.Recv()
if errors.Is(err, io.EOF) {
log.Info("rpcdaemon: the receipts subscription channel was closed")
break
}
if err != nil {
return err
}
onNewReceipts(receipts)
}
return nil
}

func (back *RemoteBackend) TxnLookup(ctx context.Context, tx kv.Getter, txnHash common.Hash) (uint64, uint64, bool, error) {
return back.blockReader.TxnLookup(ctx, tx, txnHash)
}
Expand Down
4 changes: 2 additions & 2 deletions execution/engineapi/engine_helpers/fork_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (fv *ForkValidator) NotifyCurrentHeight(currentHeight uint64) {
}

// FlushExtendingFork flush the current extending fork if fcu chooses its head hash as the its forkchoice.
func (fv *ForkValidator) FlushExtendingFork(tx kv.TemporalRwTx, accumulator *shards.Accumulator, recentLogs *shards.RecentLogs) error {
func (fv *ForkValidator) FlushExtendingFork(tx kv.TemporalRwTx, accumulator *shards.Accumulator, recentReceipts *shards.RecentReceipts) error {
fv.lock.Lock()
defer fv.lock.Unlock()
start := time.Now()
Expand All @@ -163,7 +163,7 @@ func (fv *ForkValidator) FlushExtendingFork(tx kv.TemporalRwTx, accumulator *sha
timings[BlockTimingsFlushExtendingFork] = time.Since(start)
fv.timingsCache.Add(fv.extendingForkHeadHash, timings)
fv.extendingForkNotifications.Accumulator.CopyAndReset(accumulator)
fv.extendingForkNotifications.RecentLogs.CopyAndReset(recentLogs)
fv.extendingForkNotifications.RecentReceipts.CopyAndReset(recentReceipts)
// Clean extending fork data
fv.sharedDom = nil

Expand Down
6 changes: 3 additions & 3 deletions execution/execmodule/ethereum_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ type EthereumExecutionModule struct {
// Changes accumulator
hook *stageloop.Hook
accumulator *shards.Accumulator
recentLogs *shards.RecentLogs
recentReceipts *shards.RecentReceipts
stateChangeConsumer shards.StateChangeConsumer

// configuration
Expand All @@ -133,7 +133,7 @@ func NewEthereumExecutionModule(blockReader services.FullBlockReader, db kv.Temp
executionPipeline *stagedsync.Sync, forkValidator *engine_helpers.ForkValidator,
config *chain.Config, builderFunc builder.BlockBuilderFunc,
hook *stageloop.Hook, accumulator *shards.Accumulator,
recentLogs *shards.RecentLogs,
recentReceipts *shards.RecentReceipts,
stateChangeConsumer shards.StateChangeConsumer,
logger log.Logger, engine rules.Engine,
syncCfg ethconfig.Sync,
Expand All @@ -151,7 +151,7 @@ func NewEthereumExecutionModule(blockReader services.FullBlockReader, db kv.Temp
semaphore: semaphore.NewWeighted(1),
hook: hook,
accumulator: accumulator,
recentLogs: recentLogs,
recentReceipts: recentReceipts,
stateChangeConsumer: stateChangeConsumer,
engine: engine,
syncCfg: syncCfg,
Expand Down
2 changes: 1 addition & 1 deletion execution/execmodule/forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original
ValidationError: validationError,
}, false)
}
if err := e.forkValidator.FlushExtendingFork(tx, e.accumulator, e.recentLogs); err != nil {
if err := e.forkValidator.FlushExtendingFork(tx, e.accumulator, e.recentReceipts); err != nil {
sendForkchoiceErrorWithoutWaiting(e.logger, outcomeCh, err, stateFlushingInParallel)
return
}
Expand Down
2 changes: 1 addition & 1 deletion execution/stagedsync/exec3_parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (pe *parallelExecutor) exec(ctx context.Context, execStage *StageState, u U
}

if !pe.isMining && !applyResult.isPartial && !execStage.CurrentSyncCycle.IsInitialCycle {
pe.cfg.notifications.RecentLogs.Add(applyResult.Receipts)
pe.cfg.notifications.RecentReceipts.Add(applyResult.Receipts, b.Transactions(), lastHeader)
}
}

Expand Down
2 changes: 1 addition & 1 deletion execution/stagedsync/exec3_serial.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func (se *serialExecutor) executeBlock(ctx context.Context, tasks []exec.Task, i
}

if !se.isMining && startTxIndex == 0 && !isInitialCycle {
se.cfg.notifications.RecentLogs.Add(blockReceipts)
se.cfg.notifications.RecentReceipts.Add(blockReceipts, txTask.Txs, txTask.Header)
}
checkReceipts := !se.cfg.vmConfig.StatelessExec && se.cfg.chainConfig.IsByzantium(txTask.BlockNumber()) && !se.cfg.vmConfig.NoReceipts && !se.isMining
if txTask.BlockNumber() > 0 && startTxIndex == 0 {
Expand Down
2 changes: 2 additions & 0 deletions execution/stagedsync/stagebuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type ChainEventNotifier interface {
OnNewPendingLogs(types.Logs)
OnLogs([]*remoteproto.SubscribeLogsReply)
HasLogSubscriptions() bool
OnReceipts([]*remoteproto.SubscribeReceiptsReply)
HasReceiptSubscriptions() bool
}

func MiningStages(
Expand Down
3 changes: 2 additions & 1 deletion execution/stagedsync/stageloop/stageloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,8 @@ func (h *Hook) sendNotifications(tx kv.Tx, finishStageBeforeSync, finishStageAft
if err = stagedsync.NotifyNewHeaders(h.ctx, notifyFrom, notifyTo, h.notifications.Events, tx, h.logger); err != nil {
return nil
}
h.notifications.RecentLogs.Notify(h.notifications.Events, notifyFrom, notifyTo, isUnwind)
h.notifications.RecentReceipts.NotifyReceipts(h.notifications.Events, notifyFrom, notifyTo, isUnwind)
h.notifications.RecentReceipts.NotifyLogs(h.notifications.Events, notifyFrom, notifyTo, isUnwind)
}

currentHeader := rawdb.ReadCurrentHeader(tx)
Expand Down
2 changes: 1 addition & 1 deletion execution/tests/mock/mock_sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
pipelineStages := stageloop.NewPipelineStages(mock.Ctx, db, &cfg, mock.sentriesClient, mock.Notifications, snapDownloader, mock.BlockReader, blockRetire, nil, forkValidator, tracer)
mock.posStagedSync = stagedsync.New(cfg.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger, stages.ModeApplyingBlocks)

mock.Eth1ExecutionService = execmodule.NewEthereumExecutionModule(mock.BlockReader, mock.DB, mock.posStagedSync, forkValidator, mock.ChainConfig, assembleBlockPOS, nil, mock.Notifications.Accumulator, mock.Notifications.RecentLogs, mock.Notifications.StateChangesConsumer, logger, engine, cfg.Sync, ctx)
mock.Eth1ExecutionService = execmodule.NewEthereumExecutionModule(mock.BlockReader, mock.DB, mock.posStagedSync, forkValidator, mock.ChainConfig, assembleBlockPOS, nil, mock.Notifications.Accumulator, mock.Notifications.RecentReceipts, mock.Notifications.StateChangesConsumer, logger, engine, cfg.Sync, ctx)

mock.sentriesClient.Hd.StartPoSDownloader(mock.Ctx, sendHeaderRequest, penalize)

Expand Down
84 changes: 84 additions & 0 deletions execution/types/ethutils/receipt.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"github.com/erigontech/erigon/execution/protocol/misc"
"github.com/erigontech/erigon/execution/types"
"github.com/erigontech/erigon/execution/types/accounts"
"github.com/erigontech/erigon/node/gointerfaces"
"github.com/erigontech/erigon/node/gointerfaces/remoteproto"
)

func MarshalReceipt(
Expand Down Expand Up @@ -128,3 +130,85 @@ func MarshalReceipt(

return fields
}

func MarshalSubscribeReceipt(protoReceipt *remoteproto.SubscribeReceiptsReply) map[string]interface{} {
receipt := make(map[string]interface{})

// Basic metadata - convert to proper hex strings
blockHash := common.Hash(gointerfaces.ConvertH256ToHash(protoReceipt.BlockHash))
receipt["blockHash"] = blockHash
receipt["blockNumber"] = hexutil.Uint64(protoReceipt.BlockNumber)
txHash := common.Hash(gointerfaces.ConvertH256ToHash(protoReceipt.TransactionHash))
receipt["transactionHash"] = txHash
receipt["transactionIndex"] = hexutil.Uint64(protoReceipt.TransactionIndex)

// From address as hex string
from := common.Address(gointerfaces.ConvertH160toAddress(protoReceipt.From))
receipt["from"] = from

// To can be null for contract creation
if protoReceipt.To != nil {
toAddr := common.Address(gointerfaces.ConvertH160toAddress(protoReceipt.To))
if toAddr != (common.Address{}) {
receipt["to"] = toAddr
} else {
receipt["to"] = nil
}
} else {
receipt["to"] = nil
}

receipt["type"] = hexutil.Uint64(protoReceipt.Type)
receipt["status"] = hexutil.Uint64(protoReceipt.Status)
receipt["cumulativeGasUsed"] = hexutil.Uint64(protoReceipt.CumulativeGasUsed)
receipt["gasUsed"] = hexutil.Uint64(protoReceipt.GasUsed)

if protoReceipt.ContractAddress != nil {
addr := common.Address(gointerfaces.ConvertH160toAddress(protoReceipt.ContractAddress))
if addr != (common.Address{}) {
receipt["contractAddress"] = addr
} else {
receipt["contractAddress"] = nil
}
} else {
receipt["contractAddress"] = nil
}

if len(protoReceipt.LogsBloom) > 0 {
receipt["logsBloom"] = hexutil.Bytes(protoReceipt.LogsBloom)
}

logs := make([]map[string]interface{}, 0, len(protoReceipt.Logs))
for _, protoLog := range protoReceipt.Logs {
logEntry := make(map[string]interface{})

if protoLog.Address != nil {
logEntry["address"] = common.Address(gointerfaces.ConvertH160toAddress(protoLog.Address))
}

topics := make([]common.Hash, len(protoLog.Topics))
for i, topic := range protoLog.Topics {
topics[i] = common.Hash(gointerfaces.ConvertH256ToHash(topic))
}
logEntry["topics"] = topics
logEntry["data"] = hexutil.Bytes(protoLog.Data)

logs = append(logs, logEntry)
}
receipt["logs"] = logs

if protoReceipt.BaseFee != nil {
baseFee := gointerfaces.ConvertH256ToUint256Int(protoReceipt.BaseFee)
receipt["effectiveGasPrice"] = (*hexutil.Big)(baseFee.ToBig())
}

if protoReceipt.BlobGasUsed > 0 {
receipt["blobGasUsed"] = hexutil.Uint64(protoReceipt.BlobGasUsed)
}
if protoReceipt.BlobGasPrice != nil {
blobGasPrice := gointerfaces.ConvertH256ToUint256Int(protoReceipt.BlobGasPrice)
receipt["blobGasPrice"] = (*hexutil.Big)(blobGasPrice.ToBig())
}

return receipt
}
90 changes: 90 additions & 0 deletions node/direct/eth_backend_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,96 @@ func (c *SubscribeLogsStreamC) Recv() (*remoteproto.SubscribeLogsReply, error) {

// -- end SubscribeLogs

// -- SubscribeReceipts

func (s *EthBackendClientDirect) SubscribeReceipts(ctx context.Context, opts ...grpc.CallOption) (remoteproto.ETHBACKEND_SubscribeReceiptsClient, error) {
subscribeReceiptsRequestChan := make(chan *subscribeReceiptsRequest, 16384)
subscribeReceiptsReplyChan := make(chan *subscribeReceiptsReply, 16384)
srv := &SubscribeReceiptsStreamS{
chSend: subscribeReceiptsReplyChan,
chRecv: subscribeReceiptsRequestChan,
ctx: ctx,
}
go func() {
defer close(subscribeReceiptsRequestChan)
defer close(subscribeReceiptsReplyChan)
srv.Err(s.server.SubscribeReceipts(srv))
}()
cli := &SubscribeReceiptsStreamC{
chSend: subscribeReceiptsRequestChan,
chRecv: subscribeReceiptsReplyChan,
ctx: ctx,
}
return cli, nil
}

type SubscribeReceiptsStreamS struct {
chSend chan *subscribeReceiptsReply
chRecv chan *subscribeReceiptsRequest
ctx context.Context
grpc.ServerStream
}

type subscribeReceiptsReply struct {
r *remoteproto.SubscribeReceiptsReply
err error
}

type subscribeReceiptsRequest struct {
r *remoteproto.ReceiptsFilterRequest
err error
}

func (s *SubscribeReceiptsStreamS) Send(m *remoteproto.SubscribeReceiptsReply) error {
s.chSend <- &subscribeReceiptsReply{r: m}
return nil
}

func (s *SubscribeReceiptsStreamS) Recv() (*remoteproto.ReceiptsFilterRequest, error) {
select {
case m, ok := <-s.chRecv:
if !ok || m == nil {
return nil, io.EOF
}
return m.r, m.err
case <-s.ctx.Done():
return nil, s.ctx.Err()
}
}

func (s *SubscribeReceiptsStreamS) Err(err error) {
if err == nil {
return
}
s.chSend <- &subscribeReceiptsReply{err: err}
}

type SubscribeReceiptsStreamC struct {
chSend chan *subscribeReceiptsRequest
chRecv chan *subscribeReceiptsReply
ctx context.Context
grpc.ClientStream
}

func (c *SubscribeReceiptsStreamC) Send(m *remoteproto.ReceiptsFilterRequest) error {
c.chSend <- &subscribeReceiptsRequest{r: m}
return nil
}

func (c *SubscribeReceiptsStreamC) Recv() (*remoteproto.SubscribeReceiptsReply, error) {
select {
case m, ok := <-c.chRecv:
if !ok || m == nil {
return nil, io.EOF
}
return m.r, m.err
case <-c.ctx.Done():
return nil, c.ctx.Err()
}
}

// -- end SubscribeReceipts

func (s *EthBackendClientDirect) CanonicalBodyForStorage(ctx context.Context, in *remoteproto.CanonicalBodyForStorageRequest, opts ...grpc.CallOption) (*remoteproto.CanonicalBodyForStorageReply, error) {
return s.server.CanonicalBodyForStorage(ctx, in)
}
Expand Down
3 changes: 2 additions & 1 deletion node/eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger

pipelineStages := stageloop.NewPipelineStages(ctx, backend.chainDB, config, backend.sentriesClient, backend.notifications, backend.downloaderClient, blockReader, blockRetire, backend.silkworm, backend.forkValidator, tracer)
backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger, stages.ModeApplyingBlocks)
backend.eth1ExecutionServer = execmodule.NewEthereumExecutionModule(blockReader, backend.chainDB, backend.pipelineStagedSync, backend.forkValidator, chainConfig, assembleBlockPOS, hook, backend.notifications.Accumulator, backend.notifications.RecentLogs, backend.notifications.StateChangesConsumer, logger, backend.engine, config.Sync, ctx)
backend.eth1ExecutionServer = execmodule.NewEthereumExecutionModule(blockReader, backend.chainDB, backend.pipelineStagedSync, backend.forkValidator, chainConfig, assembleBlockPOS, hook, backend.notifications.Accumulator, backend.notifications.RecentReceipts, backend.notifications.StateChangesConsumer, logger, backend.engine, config.Sync, ctx)
executionRpc := direct.NewExecutionClientDirect(backend.eth1ExecutionServer)

var executionEngine executionclient.ExecutionEngine
Expand Down Expand Up @@ -1647,6 +1647,7 @@ func (s *Ethereum) SentryCtx() context.Context {
func (s *Ethereum) SentryControlServer() *sentry_multi_client.MultiClient {
return s.sentriesClient
}

func (s *Ethereum) BlockIO() (services.FullBlockReader, *blockio.BlockWriter) {
return s.blockReader, s.blockWriter
}
Expand Down
Loading
Loading