Skip to content

Commit

Permalink
refactor(logs): use slog attributes for protocol clients (#763)
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Gianelloni <[email protected]>
  • Loading branch information
wolf31o2 authored Oct 25, 2024
1 parent 65a3bf1 commit 80906aa
Show file tree
Hide file tree
Showing 9 changed files with 521 additions and 81 deletions.
61 changes: 53 additions & 8 deletions protocol/blockfetch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
func (c *Client) Start() {
c.onceStart.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: starting client protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("starting client protocol",
"component", "network",
"protocol", ProtocolName,
"connection_id", c.callbackContext.ConnectionId.String(),
)
c.Protocol.Start()
// Start goroutine to cleanup resources on protocol shutdown
go func() {
Expand All @@ -97,7 +101,11 @@ func (c *Client) Stop() error {
var err error
c.onceStop.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: stopping client protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("stopping client protocol",
"component", "network",
"protocol", ProtocolName,
"connection_id", c.callbackContext.ConnectionId.String(),
)
msg := NewMsgClientDone()
err = c.SendMessage(msg)
})
Expand All @@ -107,7 +115,18 @@ func (c *Client) Stop() error {
// GetBlockRange starts an async process to fetch all blocks in the specified range (inclusive)
func (c *Client) GetBlockRange(start common.Point, end common.Point) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client called GetBlockRange(start: {Slot: %d, Hash: %x}, end: {Slot: %d, Hash: %x})", ProtocolName, start.Slot, start.Hash, end.Slot, end.Hash))
Debug(
fmt.Sprintf("calling GetBlockRange(start: {Slot: %d, Hash: %x}, end: {Slot: %d, Hash: %x})",
start.Slot,
start.Hash,
end.Slot,
end.Hash,
),
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
c.busyMutex.Lock()
c.blockUseCallback = true
msg := NewMsgRequestRange(start, end)
Expand All @@ -129,7 +148,13 @@ func (c *Client) GetBlockRange(start common.Point, end common.Point) error {
// GetBlock requests and returns a single block specified by the provided point
func (c *Client) GetBlock(point common.Point) (ledger.Block, error) {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client called GetBlock(point: {Slot: %d, Hash: %x})", ProtocolName, point.Slot, point.Hash))
Debug(
fmt.Sprintf("calling GetBlock(point: {Slot: %d, Hash: %x})", point.Slot, point.Hash),
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
c.busyMutex.Lock()
c.blockUseCallback = false
msg := NewMsgRequestRange(point, point)
Expand Down Expand Up @@ -175,22 +200,37 @@ func (c *Client) messageHandler(msg protocol.Message) error {

func (c *Client) handleStartBatch() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client start batch for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("starting batch",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
c.startBatchResultChan <- nil
return nil
}

func (c *Client) handleNoBlocks() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client no blocks found for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("no blocks returned",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
err := fmt.Errorf("block(s) not found")
c.startBatchResultChan <- err
return nil
}

func (c *Client) handleBlock(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client block found for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("block returned",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
msg := msgGeneric.(*MsgBlock)
// Decode only enough to get the block type value
var wrappedBlock WrappedBlock
Expand All @@ -217,7 +257,12 @@ func (c *Client) handleBlock(msgGeneric protocol.Message) error {

func (c *Client) handleBatchDone() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client batch done for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("batch done",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
c.busyMutex.Unlock()
return nil
}
148 changes: 132 additions & 16 deletions protocol/chainsync/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ func NewClient(
func (c *Client) Start() {
c.onceStart.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: starting client protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("starting client protocol",
"component", "network",
"protocol", ProtocolName,
"connection_id", c.callbackContext.ConnectionId.String(),
)
c.Protocol.Start()
// Start goroutine to cleanup resources on protocol shutdown
go func() {
Expand All @@ -132,7 +136,11 @@ func (c *Client) Stop() error {
var err error
c.onceStop.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: stopping client protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("stopping client protocol",
"component", "network",
"protocol", ProtocolName,
"connection_id", c.callbackContext.ConnectionId.String(),
)
c.busyMutex.Lock()
defer c.busyMutex.Unlock()
msg := NewMsgDone()
Expand All @@ -146,7 +154,12 @@ func (c *Client) Stop() error {
// GetCurrentTip returns the current chain tip
func (c *Client) GetCurrentTip() (*Tip, error) {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called GetCurrentTip()", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("calling GetCurrentTip()",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
done := atomic.Bool{}
requestResultChan := make(chan Tip, 1)
requestErrorChan := make(chan error, 1)
Expand Down Expand Up @@ -186,13 +199,25 @@ func (c *Client) GetCurrentTip() (*Tip, error) {
waitingForCurrentTipChan = nil
case tip := <-waitingResultChan:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: returning tip results {Slot: %d, Hash: %x, BlockNumber: %d} to %+v", ProtocolName, tip.Point.Slot, tip.Point.Hash, tip.BlockNumber, c.callbackContext.ConnectionId.RemoteAddr))
Debug(
fmt.Sprintf("received tip results {Slot: %d, Hash: %x, BlockNumber: %d}", tip.Point.Slot, tip.Point.Hash, tip.BlockNumber),
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
// The result from the other request is ready.
done.Store(true)
return &tip, nil
case tip := <-requestResultChan:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: returning tip results {Slot: %d, Hash: %x, BlockNumber: %d} to %+v", ProtocolName, tip.Point.Slot, tip.Point.Hash, tip.BlockNumber, c.callbackContext.ConnectionId.RemoteAddr))
Debug(
fmt.Sprintf("received tip results {Slot: %d, Hash: %x, BlockNumber: %d}", tip.Point.Slot, tip.Point.Hash, tip.BlockNumber),
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
// If waitingForCurrentTipChan is full, the for loop that empties it might finish the
// loop before the select statement that writes to it is triggered. For that reason we
// require requestResultChan here.
Expand All @@ -215,16 +240,49 @@ func (c *Client) GetAvailableBlockRange(
if len(intersectPoints) == 0 {
intersectPoints = []common.Point{common.NewPointOrigin()}
}

// Debug logging
switch len(intersectPoints) {
case 1:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called GetAvailableBlockRange(intersectPoints: []{Slot: %d, Hash: %x})", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints[0].Slot, intersectPoints[0].Hash))
Debug(
fmt.Sprintf(
"calling GetAvailableBlockRange(intersectPoints: []{Slot: %d, Hash: %x})",
intersectPoints[0].Slot,
intersectPoints[0].Hash,
),
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
case 2:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called GetAvailableBlockRange(intersectPoints: []{Slot: %d, Hash: %x},{Slot: %d, Hash: %x})", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints[0].Slot, intersectPoints[0].Hash, intersectPoints[1].Slot, intersectPoints[1].Hash))
Debug(
fmt.Sprintf(
"calling GetAvailableBlockRange(intersectPoints: []{Slot: %d, Hash: %x},{Slot: %d, Hash: %x})",
intersectPoints[0].Slot,
intersectPoints[0].Hash,
intersectPoints[1].Slot,
intersectPoints[1].Hash,
),
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
default:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called GetAvailableBlockRange(intersectPoints: %+v)", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints))
Debug(
fmt.Sprintf(
"calling GetAvailableBlockRange(intersectPoints: %+v)",
intersectPoints,
),
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
}

// Find our chain intersection
Expand Down Expand Up @@ -300,16 +358,49 @@ func (c *Client) Sync(intersectPoints []common.Point) error {
if len(intersectPoints) == 0 {
intersectPoints = []common.Point{common.NewPointOrigin()}
}

// Debug logging
switch len(intersectPoints) {
case 1:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called Sync(intersectPoints: []{Slot: %d, Hash: %x})", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints[0].Slot, intersectPoints[0].Hash))
Debug(
fmt.Sprintf(
"calling Sync(intersectPoints: []{Slot: %d, Hash: %x})",
intersectPoints[0].Slot,
intersectPoints[0].Hash,
),
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
case 2:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called Sync(intersectPoints: []{Slot: %d, Hash: %x},{Slot: %d, Hash: %x})", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints[0].Slot, intersectPoints[0].Hash, intersectPoints[1].Slot, intersectPoints[1].Hash))
Debug(
fmt.Sprintf(
"calling Sync(intersectPoints: []{Slot: %d, Hash: %x},{Slot: %d, Hash: %x})",
intersectPoints[0].Slot,
intersectPoints[0].Hash,
intersectPoints[1].Slot,
intersectPoints[1].Hash,
),
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
default:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called Sync(intersectPoints: %+v)", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints))
Debug(
fmt.Sprintf(
"calling Sync(intersectPoints: %+v)",
intersectPoints,
),
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
}

intersectResultChan, cancel := c.wantIntersectFound()
Expand Down Expand Up @@ -478,13 +569,23 @@ func (c *Client) messageHandler(msg protocol.Message) error {

func (c *Client) handleAwaitReply() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client await reply for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("waiting for next reply",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
return nil
}

func (c *Client) handleRollForward(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client roll forward for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("roll forward",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
firstBlockChan := func() chan<- clientPointResult {
select {
case ch := <-c.wantFirstBlockChan:
Expand Down Expand Up @@ -594,7 +695,12 @@ func (c *Client) handleRollForward(msgGeneric protocol.Message) error {

func (c *Client) handleRollBackward(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client roll backward for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("roll backward",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
msgRollBackward := msg.(*MsgRollBackward)
c.sendCurrentTip(msgRollBackward.Tip)
if len(c.wantFirstBlockChan) == 0 {
Expand All @@ -621,7 +727,12 @@ func (c *Client) handleRollBackward(msg protocol.Message) error {

func (c *Client) handleIntersectFound(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client intersect found for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("chain intersect found",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
msgIntersectFound := msg.(*MsgIntersectFound)
c.sendCurrentTip(msgIntersectFound.Tip)

Expand All @@ -635,7 +746,12 @@ func (c *Client) handleIntersectFound(msg protocol.Message) error {

func (c *Client) handleIntersectNotFound(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client intersect not found for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("chain intersect not found",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
msgIntersectNotFound := msgGeneric.(*MsgIntersectNotFound)
c.sendCurrentTip(msgIntersectNotFound.Tip)

Expand Down
20 changes: 17 additions & 3 deletions protocol/handshake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
func (c *Client) Start() {
c.onceStart.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: starting client protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("starting client protocol",
"component", "network",
"protocol", ProtocolName,
"connection_id", c.callbackContext.ConnectionId.String(),
)
c.Protocol.Start()
// Send our ProposeVersions message
msg := NewMsgProposeVersions(c.config.ProtocolVersionMap)
Expand All @@ -97,7 +101,12 @@ func (c *Client) messageHandler(msg protocol.Message) error {

func (c *Client) handleAcceptVersion(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client accept version for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("accepted version negotiation",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
if c.config.FinishedFunc == nil {
return fmt.Errorf(
"received handshake AcceptVersion message but no callback function is defined",
Expand All @@ -120,7 +129,12 @@ func (c *Client) handleAcceptVersion(msg protocol.Message) error {

func (c *Client) handleRefuse(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client refuse for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("refused handshake",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
msg := msgGeneric.(*MsgRefuse)
var err error
switch msg.Reason[0].(uint64) {
Expand Down
Loading

0 comments on commit 80906aa

Please sign in to comment.