Skip to content

Commit

Permalink
feat: log PubSub ignores and rejects noisily
Browse files Browse the repository at this point in the history
  • Loading branch information
arajasek committed Jun 12, 2023
1 parent 1e29616 commit fa47c71
Showing 1 changed file with 11 additions and 0 deletions.
11 changes: 11 additions & 0 deletions chain/sub/incoming.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub
err := xerrors.Errorf("validate block: %s", rerr)
recordFailure(ctx, metrics.BlockValidationFailure, err.Error())
bv.flagPeer(pid)
log.Errorf("REJECTING block message from %s", pid)
res = pubsub.ValidationReject
return
}
Expand Down Expand Up @@ -339,6 +340,7 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
log.Warnf("failed to decode incoming message: %s", err)
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "decode"))
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
log.Errorf("REJECTING: failed to decode signed message from %s", pid)
return pubsub.ValidationReject
}

Expand All @@ -361,8 +363,10 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
case xerrors.Is(err, messagepool.ErrGasFeeCapTooLow):
fallthrough
case xerrors.Is(err, messagepool.ErrNonceTooLow):
//log.Errorf("IGNORING message %s from %s", m.Message.Cid(), pid)
return pubsub.ValidationIgnore
default:
log.Errorf("REJECTING message %s from %s, because %s", m.Message.Cid(), pid, err)
return pubsub.ValidationReject
}
}
Expand Down Expand Up @@ -488,30 +492,35 @@ func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg
if pid == v.self {
log.Debug("ignoring indexer message from self")
stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1))
log.Errorf("IGNORING indexer message from %s", pid)
return pubsub.ValidationIgnore
}
originPeer := msg.GetFrom()
if originPeer == v.self {
log.Debug("ignoring indexer message originating from self")
stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1))
log.Errorf("IGNORING indexer message from %s", pid)
return pubsub.ValidationIgnore
}

idxrMsg := message.Message{}
err := idxrMsg.UnmarshalCBOR(bytes.NewBuffer(msg.Data))
if err != nil {
log.Errorw("Could not decode indexer pubsub message", "err", err)
log.Errorf("REJECTING indexer message from %s", pid)
return pubsub.ValidationReject
}
if len(idxrMsg.ExtraData) == 0 {
log.Debugw("ignoring messsage missing miner id", "peer", originPeer)
log.Errorf("IGNORING indexer message from %s", pid)
return pubsub.ValidationIgnore
}

// Get miner info from lotus
minerAddr, err := address.NewFromBytes(idxrMsg.ExtraData)
if err != nil {
log.Warnw("cannot parse extra data as miner address", "err", err, "extraData", idxrMsg.ExtraData)
log.Errorf("REJECTING indexer message from %s", pid)
return pubsub.ValidationReject
}

Expand All @@ -532,6 +541,7 @@ func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg
seqno := binary.BigEndian.Uint64(msg.Message.GetSeqno())
if seqno <= msgInfo.lastSeqno {
log.Debugf("ignoring replayed indexer message")
log.Errorf("IGNORING indexer message from %s", pid)
return pubsub.ValidationIgnore
}
msgInfo.lastSeqno = seqno
Expand All @@ -543,6 +553,7 @@ func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg
if err != nil {
log.Warnw("cannot authenticate messsage", "err", err, "peer", originPeer, "minerID", minerAddr)
stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1))
log.Errorf("REJECTING indexer message from %s", pid)
return pubsub.ValidationReject
}
msgInfo.peerID = originPeer
Expand Down

0 comments on commit fa47c71

Please sign in to comment.