Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,6 @@ func startRegularRpcServer(ctx context.Context, cfg httpcfg.HttpCfg, rpcAPI []rp
// register apis and create handler stack
httpEndpoint := fmt.Sprintf("%s:%d", cfg.HttpListenAddress, cfg.HttpPort)

logger.Trace("TraceRequests = %t\n", cfg.TraceRequests)
srv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests, cfg.RpcStreamingDisable, logger)

allowListForRPC, err := parseAllowListForRPC(cfg.RpcAllowListFilePath)
Expand Down Expand Up @@ -609,7 +608,6 @@ type engineInfo struct {
}

func startAuthenticatedRpcServer(cfg httpcfg.HttpCfg, rpcAPI []rpc.API, logger log.Logger) (*engineInfo, error) {
logger.Trace("TraceRequests = %t\n", cfg.TraceRequests)
srv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests, cfg.RpcStreamingDisable, logger)

engineListener, engineSrv, engineHttpEndpoint, err := createEngineListener(cfg, rpcAPI, logger)
Expand Down
2 changes: 1 addition & 1 deletion rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func newClient(initctx context.Context, connect reconnectFunc, logger log.Logger
if err != nil {
return nil, err
}
c := initClient(conn, randomIDGenerator(), new(serviceRegistry), logger)
c := initClient(conn, randomIDGenerator(), &serviceRegistry{logger: logger}, logger)
c.reconnectFunc = connect
return c, nil
}
Expand Down
2 changes: 1 addition & 1 deletion rpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *
if conn.remoteAddr() != "" {
h.logger = h.logger.New("conn", conn.remoteAddr())
}
h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe), "unsubscribe")
h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe), "unsubscribe", h.logger)
return h
}

Expand Down
2 changes: 1 addition & 1 deletion rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type Server struct {

// NewServer creates a new server instance with no registered handlers.
func NewServer(batchConcurrency uint, traceRequests, disableStreaming bool, logger log.Logger) *Server {
server := &Server{idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1, batchConcurrency: batchConcurrency,
server := &Server{services: serviceRegistry{logger: logger}, idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1, batchConcurrency: batchConcurrency,
disableStreaming: disableStreaming, traceRequests: traceRequests, logger: logger}
// Register the default service providing meta information about the RPC service such
// as the services and methods it offers.
Expand Down
18 changes: 10 additions & 8 deletions rpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
type serviceRegistry struct {
mu sync.Mutex
services map[string]service
logger log.Logger
}

// service represents a registered object.
Expand All @@ -59,14 +60,15 @@ type callback struct {
errPos int // err return idx, of -1 when method cannot return error
isSubscribe bool // true if this is a subscription callback
streamable bool // support JSON streaming (more efficient for large responses)
logger log.Logger
}

func (r *serviceRegistry) registerName(name string, rcvr interface{}) error {
rcvrVal := reflect.ValueOf(rcvr)
if name == "" {
return fmt.Errorf("no service name for type %s", rcvrVal.Type().String())
}
callbacks := suitableCallbacks(rcvrVal)
callbacks := suitableCallbacks(rcvrVal, r.logger)
if len(callbacks) == 0 {
return fmt.Errorf("service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
}
Expand Down Expand Up @@ -116,7 +118,7 @@ func (r *serviceRegistry) subscription(service, name string) *callback {
// suitableCallbacks iterates over the methods of the given type. It determines if a method
// satisfies the criteria for a RPC callback or a subscription callback and adds it to the
// collection of callbacks. See server documentation for a summary of these criteria.
func suitableCallbacks(receiver reflect.Value) map[string]*callback {
func suitableCallbacks(receiver reflect.Value, logger log.Logger) map[string]*callback {
typ := receiver.Type()
callbacks := make(map[string]*callback)
for m := 0; m < typ.NumMethod(); m++ {
Expand All @@ -125,7 +127,7 @@ func suitableCallbacks(receiver reflect.Value) map[string]*callback {
continue // method not exported
}
name := formatName(method.Name)
cb := newCallback(receiver, method.Func, name)
cb := newCallback(receiver, method.Func, name, logger)
if cb == nil {
continue // function invalid
}
Expand All @@ -136,9 +138,9 @@ func suitableCallbacks(receiver reflect.Value) map[string]*callback {

// newCallback turns fn (a function) into a callback object. It returns nil if the function
// is unsuitable as an RPC callback.
func newCallback(receiver, fn reflect.Value, name string) *callback {
func newCallback(receiver, fn reflect.Value, name string, logger log.Logger) *callback {
fntype := fn.Type()
c := &callback{fn: fn, rcvr: receiver, errPos: -1, isSubscribe: isPubSub(fntype)}
c := &callback{fn: fn, rcvr: receiver, errPos: -1, isSubscribe: isPubSub(fntype), logger: logger}
// Determine parameter types. They must all be exported or builtin types.
c.makeArgTypes()

Expand All @@ -149,7 +151,7 @@ func newCallback(receiver, fn reflect.Value, name string) *callback {
outs[i] = fntype.Out(i)
}
if len(outs) > 2 {
log.Warn(fmt.Sprintf("Cannot register RPC callback [%s] - maximum 2 return values are allowed, got %d", name, len(outs)))
logger.Warn(fmt.Sprintf("Cannot register RPC callback [%s] - maximum 2 return values are allowed, got %d", name, len(outs)))
return nil
}
// If an error is returned, it must be the last returned value.
Expand All @@ -158,7 +160,7 @@ func newCallback(receiver, fn reflect.Value, name string) *callback {
c.errPos = 0
case len(outs) == 2:
if isErrorType(outs[0]) || !isErrorType(outs[1]) {
log.Warn(fmt.Sprintf("Cannot register RPC callback [%s] - error must the last return value", name))
logger.Warn(fmt.Sprintf("Cannot register RPC callback [%s] - error must the last return value", name))
return nil
}
c.errPos = 1
Expand Down Expand Up @@ -214,7 +216,7 @@ func (c *callback) call(ctx context.Context, method string, args []reflect.Value
// Catch panic while running the callback.
defer func() {
if err := recover(); err != nil {
log.Error("RPC method " + method + " crashed: " + fmt.Sprintf("%v\n%s", err, dbg.Stack()))
c.logger.Error("RPC method " + method + " crashed: " + fmt.Sprintf("%v\n%s", err, dbg.Stack()))
errRes = errors.New("method handler crashed")
}
}()
Expand Down
7 changes: 3 additions & 4 deletions turbo/stages/headerdownload/header_algos.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
"golang.org/x/exp/slices"

"github.com/ledgerwatch/erigon/dataflow"
Expand Down Expand Up @@ -303,7 +302,7 @@ func (hd *HeaderDownload) logAnchorState() {
sort.Strings(ss)
hd.logger.Debug("[downloader] Queue sizes", "anchors", hd.anchorTree.Len(), "links", hd.linkQueue.Len(), "persisted", hd.persistedLinkQueue.Len())
for _, s := range ss {
log.Debug(s)
hd.logger.Debug(s)
}
}

Expand Down Expand Up @@ -476,7 +475,7 @@ func (hd *HeaderDownload) UpdateStats(req *HeaderRequest, skeleton bool, peer [6
}
}
}
//log.Debug("Header request sent", "req", fmt.Sprintf("%+v", req), "peer", fmt.Sprintf("%x", peer)[:8])
//hd.logger.Debug("Header request sent", "req", fmt.Sprintf("%+v", req), "peer", fmt.Sprintf("%x", peer)[:8])
}

func (hd *HeaderDownload) UpdateRetryTime(req *HeaderRequest, currentTime time.Time, timeout time.Duration) {
Expand Down Expand Up @@ -570,7 +569,7 @@ func (hd *HeaderDownload) InsertHeader(hf FeedHeaderFunc, terminalTotalDifficult
if terminalTotalDifficulty != nil {
if td.Cmp(terminalTotalDifficulty) >= 0 {
hd.highestInDb = link.blockHeight
log.Info(POSPandaBanner)
hd.logger.Info(POSPandaBanner)
dataflow.HeaderDownloadStates.AddChange(link.blockHeight, dataflow.HeaderInserted)
return true, true, 0, lastTime, nil
}
Expand Down
6 changes: 3 additions & 3 deletions turbo/stages/stageloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ func StageLoop(ctx context.Context,
return
}

log.Error("Staged Sync", "err", err)
logger.Error("Staged Sync", "err", err)
if recoveryErr := hd.RecoverFromDb(db); recoveryErr != nil {
log.Error("Failed to recover header sentriesClient", "err", recoveryErr)
logger.Error("Failed to recover header sentriesClient", "err", recoveryErr)
}
time.Sleep(500 * time.Millisecond) // just to avoid too much similar errors in logs
continue
Expand All @@ -112,7 +112,7 @@ func StageLoop(ctx context.Context,

if loopMinTime != 0 {
waitTime := loopMinTime - time.Since(start)
log.Info("Wait time until next loop", "for", waitTime)
logger.Info("Wait time until next loop", "for", waitTime)
c := time.After(waitTime)
select {
case <-ctx.Done():
Expand Down