Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,24 +322,24 @@ func EmbeddedServices(ctx context.Context,
func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger, rootCancel context.CancelFunc) (
db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient,
stateCache kvcache.Cache, blockReader services.FullBlockReader, engine consensus.EngineReader,
ff *rpchelper.Filters, agg *libstate.Aggregator, err error) {
ff *rpchelper.Filters, err error) {
if !cfg.WithDatadir && cfg.PrivateApiAddr == "" {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("either remote db or local db must be specified")
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("either remote db or local db must be specified")
}
creds, err := grpcutil.TLS(cfg.TLSCACert, cfg.TLSCertfile, cfg.TLSKeyFile)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("open tls cert: %w", err)
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("open tls cert: %w", err)
}
conn, err := grpcutil.Connect(creds, cfg.PrivateApiAddr)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to execution service privateApi: %w", err)
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("could not connect to execution service privateApi: %w", err)
}

remoteBackendClient := remote.NewETHBACKENDClient(conn)
remoteKvClient := remote.NewKVClient(conn)
remoteKv, err := remotedb.NewRemote(gointerfaces.VersionFromProto(remotedbserver.KvServiceAPIVersion), logger, remoteKvClient).Open()
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to remoteKv: %w", err)
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("could not connect to remoteKv: %w", err)
}

// Configure DB first
Expand All @@ -364,10 +364,10 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
limiter := semaphore.NewWeighted(int64(cfg.DBReadConcurrency))
rwKv, err = kv2.NewMDBX(logger).RoTxsLimiter(limiter).Path(cfg.Dirs.Chaindata).Accede().Open(ctx)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
return nil, nil, nil, nil, nil, nil, nil, ff, err
}
if compatErr := checkDbCompatibility(ctx, rwKv); compatErr != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, compatErr
return nil, nil, nil, nil, nil, nil, nil, ff, compatErr
}
db = rwKv

Expand All @@ -386,10 +386,10 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
}
return nil
}); err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
return nil, nil, nil, nil, nil, nil, nil, ff, err
}
if cc == nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("chain config not found in db. Need start erigon at least once on this db")
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("chain config not found in db. Need start erigon at least once on this db")
}
cfg.Snap.Enabled = cfg.Snap.Enabled || cfg.Sync.UseSnapshots
if !cfg.Snap.Enabled {
Expand All @@ -407,8 +407,9 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
allBorSnapshots.LogStat("bor:remote")

cr := rawdb.NewCanonicalReader()
if agg, err = libstate.NewAggregator(ctx, cfg.Dirs, config3.HistoryV3AggregationStep, db, cr, logger); err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err)
agg, err := libstate.NewAggregator(ctx, cfg.Dirs, config3.HistoryV3AggregationStep, db, cr, logger)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("create aggregator: %w", err)
}
_ = agg.OpenFolder() //TODO: must use analog of `OptimisticReopenWithDB`

Expand Down Expand Up @@ -460,7 +461,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger

db, err = temporal.New(rwKv, agg)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, nil, nil, nil, err
}
stateCache = kvcache.NewDummy()
}
Expand All @@ -484,7 +485,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
if cfg.TxPoolApiAddr != cfg.PrivateApiAddr {
txpoolConn, err = grpcutil.Connect(creds, cfg.TxPoolApiAddr)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to txpool api: %w", err)
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("could not connect to txpool api: %w", err)
}
}

Expand Down Expand Up @@ -515,7 +516,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
logger.Warn("[rpc] Opening Bor db", "path", borDbPath)
borKv, err = kv2.NewMDBX(logger).Path(borDbPath).Label(kv.ConsensusDB).Accede().Open(ctx)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
return nil, nil, nil, nil, nil, nil, nil, ff, err
}
// Skip the compatibility check, until we have a schema in erigon-lib

Expand Down Expand Up @@ -558,7 +559,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
}()

ff = rpchelper.New(ctx, cfg.RpcFiltersConfig, eth, txPool, mining, onNewSnapshot, logger)
return db, eth, txPool, mining, stateCache, blockReader, engine, ff, agg, err
return db, eth, txPool, mining, stateCache, blockReader, engine, ff, err
}

func StartRpcServer(ctx context.Context, cfg *httpcfg.HttpCfg, rpcAPI []rpc.API, logger log.Logger) error {
Expand Down
4 changes: 2 additions & 2 deletions cmd/rpcdaemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func main() {
cmd.RunE = func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
logger := debug.SetupCobra(cmd, "sentry")
db, backend, txPool, mining, stateCache, blockReader, engine, ff, agg, err := cli.RemoteServices(ctx, cfg, logger, rootCancel)
db, backend, txPool, mining, stateCache, blockReader, engine, ff, err := cli.RemoteServices(ctx, cfg, logger, rootCancel)
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error("Could not connect to DB", "err", err)
Expand All @@ -49,7 +49,7 @@ func main() {
defer db.Close()
defer engine.Close()

apiList := jsonrpc.APIList(db, backend, txPool, mining, ff, stateCache, blockReader, agg, cfg, engine, logger)
apiList := jsonrpc.APIList(db, backend, txPool, mining, ff, stateCache, blockReader, cfg, engine, logger)
rpc.PreAllocateRPCMetricLabels(apiList)
if err := cli.StartRpcServer(ctx, cfg, apiList, logger); err != nil {
logger.Error(err.Error())
Expand Down
6 changes: 6 additions & 0 deletions core/types/receipt.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,3 +494,9 @@ func (r *Receipt) DeriveFieldsV3ForSingleReceipt(txnIdx int, blockHash libcommon
}
return nil
}

// TODO: maybe make it more prettier (only for debug purposes)
func (r *Receipt) String() string {
str := fmt.Sprintf("Receipt of tx %+v", *r)
return str
}
11 changes: 3 additions & 8 deletions erigon-lib/txpool/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ func (f *Fetch) receiveMessageLoop(sentryClient sentry.SentryClient) {
f.logger.Warn("[txpool.recvMessage] sentry not ready yet", "err", err)
continue
}

if err := f.receiveMessage(f.ctx, sentryClient); err != nil {
if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) {
time.Sleep(3 * time.Second)
Expand Down Expand Up @@ -175,7 +174,6 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl
}
return err
}

var req *sentry.InboundMessage
for req, err = stream.Recv(); ; req, err = stream.Recv() {
if err != nil {
Expand All @@ -184,17 +182,14 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl
return ctx.Err()
default:
}
return err
}
if req == nil {
return nil
return fmt.Errorf("txpool.receiveMessage: %w", err)
}
if err := f.handleInboundMessage(streamCtx, req, sentryClient); err != nil {
if err = f.handleInboundMessage(streamCtx, req, sentryClient); err != nil {
if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) {
time.Sleep(3 * time.Second)
continue
}
f.logger.Debug("[txpool.fetch] Handling incoming message", "msg", req.Id.String(), "err", err)
f.logger.Debug("[txpool.fetch] Handling incoming message", "msg", string(req.Data), "reqID", req.Id.String(), "err", err)
}
if f.wg != nil {
f.wg.Done()
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -1071,7 +1071,7 @@ func (s *Ethereum) Init(stack *node.Node, config *ethconfig.Config, chainConfig
}
}

s.apiList = jsonrpc.APIList(chainKv, ethRpcClient, txPoolRpcClient, miningRpcClient, ff, stateCache, blockReader, s.agg, &httpRpcCfg, s.engine, s.logger)
s.apiList = jsonrpc.APIList(chainKv, ethRpcClient, txPoolRpcClient, miningRpcClient, ff, stateCache, blockReader, &httpRpcCfg, s.engine, s.logger)

if config.SilkwormRpcDaemon && httpRpcCfg.Enabled {
interface_log_settings := silkworm.RpcInterfaceLogSettings{
Expand Down
23 changes: 19 additions & 4 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ package eth
import (
"context"
"fmt"

"github.com/ledgerwatch/erigon-lib/chain"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/log/v3"

"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
Expand Down Expand Up @@ -160,12 +159,17 @@ func AnswerGetBlockBodiesQuery(db kv.Tx, query GetBlockBodiesPacket, blockReader
return bodies
}

func AnswerGetReceiptsQuery(br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket) ([]rlp.RawValue, error) { //nolint:unparam
type ReceiptsGetter interface {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI:

  • we also have turbo/services/interfaces.go. But define interfaces on consumer side is also good.
  • can use next trick:
var _ evmtypes.IntraBlockState = new(IntraBlockState) // compile-time interface-check

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in an another file

GetReceipts(ctx context.Context, cfg *chain.Config, tx kv.Tx, block *types.Block, senders []libcommon.Address) (types.Receipts, error)
}

func AnswerGetReceiptsQuery(ctx context.Context, cfg *chain.Config, receiptsGetter ReceiptsGetter, br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket) ([]rlp.RawValue, error) { //nolint:unparam
// Gather state data until the fetch or network limits is reached
var (
bytes int
receipts []rlp.RawValue
)

for lookups, hash := range query {
if bytes >= softResponseLimit || len(receipts) >= maxReceiptsServe ||
lookups >= 2*maxReceiptsServe {
Expand All @@ -183,7 +187,12 @@ func AnswerGetReceiptsQuery(br services.FullBlockReader, db kv.Tx, query GetRece
if b == nil {
return nil, nil
}
results := rawdb.ReadReceipts(db, b, s)

results, err := receiptsGetter.GetReceipts(ctx, cfg, db, b, s)
if err != nil {
return nil, err
}

if results == nil {
header, err := rawdb.ReadHeaderByHash(db, hash)
if err != nil {
Expand All @@ -193,6 +202,12 @@ func AnswerGetReceiptsQuery(br services.FullBlockReader, db kv.Tx, query GetRece
continue
}
}
// For debug
//println("receipts:")
//for _, result := range results {
// println(result.String())
//}

// If known, encode and queue for response packet
if encoded, err := rlp.EncodeToBytes(results); err != nil {
return nil, fmt.Errorf("failed to encode receipt: %w", err)
Expand Down
104 changes: 63 additions & 41 deletions p2p/sentry/sentry_multi_client/sentry_multi_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"encoding/hex"
"errors"
"fmt"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon/turbo/jsonrpc/receipts"
"golang.org/x/sync/semaphore"
"math/rand"
"sort"
"sync"
Expand Down Expand Up @@ -291,9 +295,13 @@ type MultiClient struct {
// decouple sentry multi client from header and body downloading logic is done
disableBlockDownload bool

logger log.Logger
logger log.Logger
getReceiptsActiveGoroutineNumber *semaphore.Weighted
ethApiWrapper eth.ReceiptsGetter
}

var _ eth.ReceiptsGetter = new(receipts.Generator) // compile-time interface-check

func NewMultiClient(
db kv.RwDB,
chainConfig *chain.Config,
Expand Down Expand Up @@ -342,6 +350,14 @@ func NewMultiClient(
bd = &bodydownload.BodyDownload{}
}

receiptsCacheLimit := 32
receiptsCache, err := lru.New[common.Hash, []*types.Receipt](receiptsCacheLimit)
if err != nil {
return nil, err
}

receiptsGenerator := receipts.NewGenerator(receiptsCache, blockReader, engine)

cs := &MultiClient{
Hd: hd,
Bd: bd,
Expand All @@ -356,6 +372,8 @@ func NewMultiClient(
maxBlockBroadcastPeers: maxBlockBroadcastPeers,
disableBlockDownload: disableBlockDownload,
logger: logger,
getReceiptsActiveGoroutineNumber: semaphore.NewWeighted(1),
ethApiWrapper: receiptsGenerator,
}

return cs, nil
Expand Down Expand Up @@ -696,45 +714,50 @@ func (cs *MultiClient) getBlockBodies66(ctx context.Context, inreq *proto_sentry
return nil
}

func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error {
return nil //TODO: https://github.com/ledgerwatch/erigon/issues/10320
//var query eth.GetReceiptsPacket66
//if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
// return fmt.Errorf("decoding getReceipts66: %w, data: %x", err, inreq.Data)
//}
//tx, err := cs.db.BeginRo(ctx)
//if err != nil {
// return err
//}
//defer tx.Rollback()
//receipts, err := eth.AnswerGetReceiptsQuery(cs.blockReader, tx, query.GetReceiptsPacket)
//if err != nil {
// return err
//}
//tx.Rollback()
//b, err := rlp.EncodeToBytes(&eth.ReceiptsRLPPacket66{
// RequestId: query.RequestId,
// ReceiptsRLPPacket: receipts,
//})
//if err != nil {
// return fmt.Errorf("encode header response: %w", err)
//}
//outreq := proto_sentry.SendMessageByIdRequest{
// PeerId: inreq.PeerId,
// Data: &proto_sentry.OutboundMessageData{
// Id: proto_sentry.MessageId_RECEIPTS_66,
// Data: b,
// },
//}
//_, err = sentry.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{})
//if err != nil {
// if isPeerNotFoundErr(err) {
// return nil
// }
// return fmt.Errorf("send bodies response: %w", err)
//}
////cs.logger.Info(fmt.Sprintf("[%s] GetReceipts responseLen %d", ConvertH512ToPeerID(inreq.PeerId), len(b)))
//return nil
func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentryClient direct.SentryClient) error {
err := cs.getReceiptsActiveGoroutineNumber.Acquire(ctx, 1)
if err != nil {
return err
}
defer cs.getReceiptsActiveGoroutineNumber.Release(1)
var query eth.GetReceiptsPacket66
if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
return fmt.Errorf("decoding getReceipts66: %w, data: %x", err, inreq.Data)
}

tx, err := cs.db.BeginRo(ctx)
if err != nil {
return err
}
defer tx.Rollback()

receiptsList, err := eth.AnswerGetReceiptsQuery(ctx, cs.ChainConfig, cs.ethApiWrapper, cs.blockReader, tx, query.GetReceiptsPacket)
if err != nil {
return err
}
b, err := rlp.EncodeToBytes(&eth.ReceiptsRLPPacket66{
RequestId: query.RequestId,
ReceiptsRLPPacket: receiptsList,
})
if err != nil {
return fmt.Errorf("encode header response: %w", err)
}
outreq := proto_sentry.SendMessageByIdRequest{
PeerId: inreq.PeerId,
Data: &proto_sentry.OutboundMessageData{
Id: proto_sentry.MessageId_RECEIPTS_66,
Data: b,
},
}
_, err = sentryClient.SendMessageById(ctx, &outreq, &grpc.OnFinishCallOption{})
if err != nil {
if isPeerNotFoundErr(err) {
return nil
}
return fmt.Errorf("send receipts response: %w", err)
}
//println(fmt.Sprintf("[%s] GetReceipts responseLen %d", sentry.ConvertH512ToPeerID(inreq.PeerId), len(b)))
return nil
}

func MakeInboundMessage() *proto_sentry.InboundMessage {
Expand All @@ -747,7 +770,6 @@ func (cs *MultiClient) HandleInboundMessage(ctx context.Context, message *proto_
err = fmt.Errorf("%+v, msgID=%s, trace: %s", rec, message.Id.String(), dbg.Stack())
}
}() // avoid crash because Erigon's core does many things

err = cs.handleInboundMessage(ctx, message, sentry)

if (err != nil) && rlp.IsInvalidRLPError(err) {
Expand Down
2 changes: 1 addition & 1 deletion turbo/engineapi/engine_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (e *EngineServer) Start(
txPool txpool.TxpoolClient,
mining txpool.MiningClient,
) {
base := jsonrpc.NewBaseApi(filters, stateCache, blockReader, agg, httpConfig.WithDatadir, httpConfig.EvmCallTimeout, engineReader, httpConfig.Dirs)
base := jsonrpc.NewBaseApi(filters, stateCache, blockReader, httpConfig.WithDatadir, httpConfig.EvmCallTimeout, engineReader)

ethImpl := jsonrpc.NewEthAPI(base, db, eth, txPool, mining, httpConfig.Gascap, httpConfig.Feecap, httpConfig.ReturnDataLimit, httpConfig.AllowUnprotectedTxs, httpConfig.MaxGetProofRewindBlockCount, httpConfig.WebsocketSubscribeLogsChannelSize, e.logger)

Expand Down
Loading