Skip to content

Commit 1e575ea

Browse files
authored
rpc service and stageloop logger updates (#7696)
This is another update to logging to replace the root logger with a contextual logger
1 parent 2ea9eb5 commit 1e575ea

File tree

7 files changed

+19
-20
lines changed

7 files changed

+19
-20
lines changed

cmd/rpcdaemon/cli/config.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,6 @@ func startRegularRpcServer(ctx context.Context, cfg httpcfg.HttpCfg, rpcAPI []rp
489489
// register apis and create handler stack
490490
httpEndpoint := fmt.Sprintf("%s:%d", cfg.HttpListenAddress, cfg.HttpPort)
491491

492-
logger.Trace("TraceRequests = %t\n", cfg.TraceRequests)
493492
srv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests, cfg.RpcStreamingDisable, logger)
494493

495494
allowListForRPC, err := parseAllowListForRPC(cfg.RpcAllowListFilePath)
@@ -609,7 +608,6 @@ type engineInfo struct {
609608
}
610609

611610
func startAuthenticatedRpcServer(cfg httpcfg.HttpCfg, rpcAPI []rpc.API, logger log.Logger) (*engineInfo, error) {
612-
logger.Trace("TraceRequests = %t\n", cfg.TraceRequests)
613611
srv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests, cfg.RpcStreamingDisable, logger)
614612

615613
engineListener, engineSrv, engineHttpEndpoint, err := createEngineListener(cfg, rpcAPI, logger)

rpc/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ func newClient(initctx context.Context, connect reconnectFunc, logger log.Logger
198198
if err != nil {
199199
return nil, err
200200
}
201-
c := initClient(conn, randomIDGenerator(), new(serviceRegistry), logger)
201+
c := initClient(conn, randomIDGenerator(), &serviceRegistry{logger: logger}, logger)
202202
c.reconnectFunc = connect
203203
return c, nil
204204
}

rpc/handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *
134134
if conn.remoteAddr() != "" {
135135
h.logger = h.logger.New("conn", conn.remoteAddr())
136136
}
137-
h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe), "unsubscribe")
137+
h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe), "unsubscribe", h.logger)
138138
return h
139139
}
140140

rpc/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ type Server struct {
5959

6060
// NewServer creates a new server instance with no registered handlers.
6161
func NewServer(batchConcurrency uint, traceRequests, disableStreaming bool, logger log.Logger) *Server {
62-
server := &Server{idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1, batchConcurrency: batchConcurrency,
62+
server := &Server{services: serviceRegistry{logger: logger}, idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1, batchConcurrency: batchConcurrency,
6363
disableStreaming: disableStreaming, traceRequests: traceRequests, logger: logger}
6464
// Register the default service providing meta information about the RPC service such
6565
// as the services and methods it offers.

rpc/service.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ var (
4141
type serviceRegistry struct {
4242
mu sync.Mutex
4343
services map[string]service
44+
logger log.Logger
4445
}
4546

4647
// service represents a registered object.
@@ -59,14 +60,15 @@ type callback struct {
5960
errPos int // err return idx, of -1 when method cannot return error
6061
isSubscribe bool // true if this is a subscription callback
6162
streamable bool // support JSON streaming (more efficient for large responses)
63+
logger log.Logger
6264
}
6365

6466
func (r *serviceRegistry) registerName(name string, rcvr interface{}) error {
6567
rcvrVal := reflect.ValueOf(rcvr)
6668
if name == "" {
6769
return fmt.Errorf("no service name for type %s", rcvrVal.Type().String())
6870
}
69-
callbacks := suitableCallbacks(rcvrVal)
71+
callbacks := suitableCallbacks(rcvrVal, r.logger)
7072
if len(callbacks) == 0 {
7173
return fmt.Errorf("service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
7274
}
@@ -116,7 +118,7 @@ func (r *serviceRegistry) subscription(service, name string) *callback {
116118
// suitableCallbacks iterates over the methods of the given type. It determines if a method
117119
// satisfies the criteria for a RPC callback or a subscription callback and adds it to the
118120
// collection of callbacks. See server documentation for a summary of these criteria.
119-
func suitableCallbacks(receiver reflect.Value) map[string]*callback {
121+
func suitableCallbacks(receiver reflect.Value, logger log.Logger) map[string]*callback {
120122
typ := receiver.Type()
121123
callbacks := make(map[string]*callback)
122124
for m := 0; m < typ.NumMethod(); m++ {
@@ -125,7 +127,7 @@ func suitableCallbacks(receiver reflect.Value) map[string]*callback {
125127
continue // method not exported
126128
}
127129
name := formatName(method.Name)
128-
cb := newCallback(receiver, method.Func, name)
130+
cb := newCallback(receiver, method.Func, name, logger)
129131
if cb == nil {
130132
continue // function invalid
131133
}
@@ -136,9 +138,9 @@ func suitableCallbacks(receiver reflect.Value) map[string]*callback {
136138

137139
// newCallback turns fn (a function) into a callback object. It returns nil if the function
138140
// is unsuitable as an RPC callback.
139-
func newCallback(receiver, fn reflect.Value, name string) *callback {
141+
func newCallback(receiver, fn reflect.Value, name string, logger log.Logger) *callback {
140142
fntype := fn.Type()
141-
c := &callback{fn: fn, rcvr: receiver, errPos: -1, isSubscribe: isPubSub(fntype)}
143+
c := &callback{fn: fn, rcvr: receiver, errPos: -1, isSubscribe: isPubSub(fntype), logger: logger}
142144
// Determine parameter types. They must all be exported or builtin types.
143145
c.makeArgTypes()
144146

@@ -149,7 +151,7 @@ func newCallback(receiver, fn reflect.Value, name string) *callback {
149151
outs[i] = fntype.Out(i)
150152
}
151153
if len(outs) > 2 {
152-
log.Warn(fmt.Sprintf("Cannot register RPC callback [%s] - maximum 2 return values are allowed, got %d", name, len(outs)))
154+
logger.Warn(fmt.Sprintf("Cannot register RPC callback [%s] - maximum 2 return values are allowed, got %d", name, len(outs)))
153155
return nil
154156
}
155157
// If an error is returned, it must be the last returned value.
@@ -158,7 +160,7 @@ func newCallback(receiver, fn reflect.Value, name string) *callback {
158160
c.errPos = 0
159161
case len(outs) == 2:
160162
if isErrorType(outs[0]) || !isErrorType(outs[1]) {
161-
log.Warn(fmt.Sprintf("Cannot register RPC callback [%s] - error must the last return value", name))
163+
logger.Warn(fmt.Sprintf("Cannot register RPC callback [%s] - error must the last return value", name))
162164
return nil
163165
}
164166
c.errPos = 1
@@ -214,7 +216,7 @@ func (c *callback) call(ctx context.Context, method string, args []reflect.Value
214216
// Catch panic while running the callback.
215217
defer func() {
216218
if err := recover(); err != nil {
217-
log.Error("RPC method " + method + " crashed: " + fmt.Sprintf("%v\n%s", err, dbg.Stack()))
219+
c.logger.Error("RPC method " + method + " crashed: " + fmt.Sprintf("%v\n%s", err, dbg.Stack()))
218220
errRes = errors.New("method handler crashed")
219221
}
220222
}()

turbo/stages/headerdownload/header_algos.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
libcommon "github.com/ledgerwatch/erigon-lib/common"
1818
"github.com/ledgerwatch/erigon-lib/etl"
1919
"github.com/ledgerwatch/erigon-lib/kv"
20-
"github.com/ledgerwatch/log/v3"
2120
"golang.org/x/exp/slices"
2221

2322
"github.com/ledgerwatch/erigon/dataflow"
@@ -303,7 +302,7 @@ func (hd *HeaderDownload) logAnchorState() {
303302
sort.Strings(ss)
304303
hd.logger.Debug("[downloader] Queue sizes", "anchors", hd.anchorTree.Len(), "links", hd.linkQueue.Len(), "persisted", hd.persistedLinkQueue.Len())
305304
for _, s := range ss {
306-
log.Debug(s)
305+
hd.logger.Debug(s)
307306
}
308307
}
309308

@@ -476,7 +475,7 @@ func (hd *HeaderDownload) UpdateStats(req *HeaderRequest, skeleton bool, peer [6
476475
}
477476
}
478477
}
479-
//log.Debug("Header request sent", "req", fmt.Sprintf("%+v", req), "peer", fmt.Sprintf("%x", peer)[:8])
478+
//hd.logger.Debug("Header request sent", "req", fmt.Sprintf("%+v", req), "peer", fmt.Sprintf("%x", peer)[:8])
480479
}
481480

482481
func (hd *HeaderDownload) UpdateRetryTime(req *HeaderRequest, currentTime time.Time, timeout time.Duration) {
@@ -570,7 +569,7 @@ func (hd *HeaderDownload) InsertHeader(hf FeedHeaderFunc, terminalTotalDifficult
570569
if terminalTotalDifficulty != nil {
571570
if td.Cmp(terminalTotalDifficulty) >= 0 {
572571
hd.highestInDb = link.blockHeight
573-
log.Info(POSPandaBanner)
572+
hd.logger.Info(POSPandaBanner)
574573
dataflow.HeaderDownloadStates.AddChange(link.blockHeight, dataflow.HeaderInserted)
575574
return true, true, 0, lastTime, nil
576575
}

turbo/stages/stageloop.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,9 @@ func StageLoop(ctx context.Context,
9999
return
100100
}
101101

102-
log.Error("Staged Sync", "err", err)
102+
logger.Error("Staged Sync", "err", err)
103103
if recoveryErr := hd.RecoverFromDb(db); recoveryErr != nil {
104-
log.Error("Failed to recover header sentriesClient", "err", recoveryErr)
104+
logger.Error("Failed to recover header sentriesClient", "err", recoveryErr)
105105
}
106106
time.Sleep(500 * time.Millisecond) // just to avoid too much similar errors in logs
107107
continue
@@ -112,7 +112,7 @@ func StageLoop(ctx context.Context,
112112

113113
if loopMinTime != 0 {
114114
waitTime := loopMinTime - time.Since(start)
115-
log.Info("Wait time until next loop", "for", waitTime)
115+
logger.Info("Wait time until next loop", "for", waitTime)
116116
c := time.After(waitTime)
117117
select {
118118
case <-ctx.Done():

0 commit comments

Comments
 (0)