diff --git a/partialmessages/partialmsgs.go b/partialmessages/partialmsgs.go index c86dbfeb..7a7460e1 100644 --- a/partialmessages/partialmsgs.go +++ b/partialmessages/partialmsgs.go @@ -52,8 +52,10 @@ type Message interface { // Idempotent. type peerState struct { // The parts metadata the peer has sent us + // (i.e. our view of what parts the peer has and what parts the peer is requesting/willing to provider) partsMetadata PartsMetadata // The parts metadata this node has sent to the peer + // (i.e. the peer's view of the parts we have and what parts the peer thinks we're requesting/willing to provide) sentPartsMetadata PartsMetadata } @@ -247,6 +249,10 @@ func (e *PartialMessagesExtension) PublishPartial(topic string, partial Message, if requestedPartial && havePeersPartsMetadata { // This peer has previously asked for a certain part. We'll give // them what we can. + // We pass our view of the peer's parts metadata to the partial message implementation + // so that it can return parts to send to the peer based on what it has and what it is actually requesting. + // We could update this function to also returns the peer's updated parts metadata based on what is actually being sent. + // Then we can do: pState.partsMetadata = updatedPartsMetadata returned by this function. pm, err := partial.PartialMessageBytes(pState.partsMetadata) if err != nil { log.Warn("partial message extension failed to get partial message bytes", "error", err) @@ -254,6 +260,11 @@ func (e *PartialMessagesExtension) PublishPartial(topic string, partial Message, state.clearPeerMetadata(p) continue } + // This merge should not update the peer's request bitmask. Only it's available bitmask based on what is being sent + // which in turn depends on it's request bitmask and what we have available. + // Either we let partial.PartialMessageBytes return the entire updated parts metadata + // or change this function to "MergePeerPartsMetadata(existing, sending now)" + // The function needs to know that it should not change the peer's request bitmask as all. pState.partsMetadata = e.MergePartsMetadata(topic, pState.partsMetadata, myPartsMeta) if len(pm) > 0 { log.Debug("Respond to peer's IWant") @@ -271,13 +282,20 @@ func (e *PartialMessagesExtension) PublishPartial(topic string, partial Message, rpc.PartialMessage = eagerData // Merge the peer's empty partsMetadata with the parts we eagerly pushed. // This tracks what has been sent to the peer and avoids sending duplicates. + // Same as above. Set the peer's parts metadata to whatever we are sending here and don't touch the request bitmask + // at all i.e. it stays at 0. previous "pState.partsMetadata" will be nil here. pState.partsMetadata = e.MergePartsMetadata(topic, pState.partsMetadata, eagerPartsMeta) } + // Maybe a better API for both the above Merges is peerState = MergeForOutgoingRPC(existing peerState struct, partsMetadata to send) + // Or we let partial.PartialMessageBytes/EagerPartialMessageBytes return the updated parts metadata/peerState. + // Only send parts metadata if it was different then before if pState.sentPartsMetadata == nil || !bytes.Equal(myPartsMeta, pState.sentPartsMetadata) { log.Debug("Including parts metadata") sendRPC = true + // No change here, this is simple. We're updating the peer's view of what we have and what we're requesting to + // the bitmasks we're sending to them. pState.sentPartsMetadata = myPartsMeta rpc.PartsMetadata = myPartsMeta } @@ -370,6 +388,8 @@ func (e *PartialMessagesExtension) EmitGossip(topic string, peers []peer.ID) { } if !bytes.Equal(rpc.PartsMetadata, pState.sentPartsMetadata) { + // No change, we're just updating the peer's view of what we have and what we're requesting to + // the bitmasks we're sending to them here. pState.sentPartsMetadata = rpc.PartsMetadata e.sendRPC(peer, rpc) } @@ -404,6 +424,9 @@ func (e *PartialMessagesExtension) HandleRPC(from peer.ID, rpc *pb.PartialMessag pState = &peerState{} state.peerState[from] = pState } + // We're updating our view of what the peer has/is requesting by merging their existing parts metadata + // with the one they've sent us here. In this case, we'd OR the available bitmask and overwrite their request bitmask + // to what they've sent here. pState.partsMetadata = e.MergePartsMetadata(rpc.GetTopicID(), pState.partsMetadata, rpc.PartsMetadata) } @@ -415,6 +438,10 @@ func (e *PartialMessagesExtension) HandleRPC(from peer.ID, rpc *pb.PartialMessag // This works if they are responding to our request or // if they send data eagerly. In the latter case, they will update our // view when they receive our parts metadata. + // This merge should not update the request bitmask (because it is their view of what we're requesting which does not change here + // on an incoming RPC.) + // The right API here is either "MergeForIncomingRPC(existing peerState struct, rpc.PartsMetadata)" + // Or we let e.OnIncomingRPC below return the updated parts metadata(also pass it current peerState struct). pState.sentPartsMetadata = e.MergePartsMetadata(rpc.GetTopicID(), pState.sentPartsMetadata, pState.partsMetadata) }