Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions partialmessages/partialmsgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ type Message interface {
// Idempotent.
type peerState struct {
// The parts metadata the peer has sent us
// (i.e. our view of what parts the peer has and what parts the peer is requesting/willing to provider)
partsMetadata PartsMetadata
// The parts metadata this node has sent to the peer
// (i.e. the peer's view of the parts we have and what parts the peer thinks we're requesting/willing to provide)
sentPartsMetadata PartsMetadata
}

Expand Down Expand Up @@ -247,13 +249,22 @@ func (e *PartialMessagesExtension) PublishPartial(topic string, partial Message,
if requestedPartial && havePeersPartsMetadata {
// This peer has previously asked for a certain part. We'll give
// them what we can.
// We pass our view of the peer's parts metadata to the partial message implementation
// so that it can return parts to send to the peer based on what it has and what it is actually requesting.
// We could update this function to also returns the peer's updated parts metadata based on what is actually being sent.
// Then we can do: pState.partsMetadata = updatedPartsMetadata returned by this function.
pm, err := partial.PartialMessageBytes(pState.partsMetadata)
if err != nil {
log.Warn("partial message extension failed to get partial message bytes", "error", err)
// Possibly a bad request, we'll delete the request as we will likely error next time we try to handle it
state.clearPeerMetadata(p)
continue
}
// This merge should not update the peer's request bitmask. Only it's available bitmask based on what is being sent
// which in turn depends on it's request bitmask and what we have available.
// Either we let partial.PartialMessageBytes return the entire updated parts metadata
// or change this function to "MergePeerPartsMetadata(existing, sending now)"
// The function needs to know that it should not change the peer's request bitmask as all.
pState.partsMetadata = e.MergePartsMetadata(topic, pState.partsMetadata, myPartsMeta)
if len(pm) > 0 {
log.Debug("Respond to peer's IWant")
Expand All @@ -271,13 +282,20 @@ func (e *PartialMessagesExtension) PublishPartial(topic string, partial Message,
rpc.PartialMessage = eagerData
// Merge the peer's empty partsMetadata with the parts we eagerly pushed.
// This tracks what has been sent to the peer and avoids sending duplicates.
// Same as above. Set the peer's parts metadata to whatever we are sending here and don't touch the request bitmask
// at all i.e. it stays at 0. previous "pState.partsMetadata" will be nil here.
pState.partsMetadata = e.MergePartsMetadata(topic, pState.partsMetadata, eagerPartsMeta)
}

// Maybe a better API for both the above Merges is peerState = MergeForOutgoingRPC(existing peerState struct, partsMetadata to send)
// Or we let partial.PartialMessageBytes/EagerPartialMessageBytes return the updated parts metadata/peerState.

// Only send parts metadata if it was different then before
if pState.sentPartsMetadata == nil || !bytes.Equal(myPartsMeta, pState.sentPartsMetadata) {
log.Debug("Including parts metadata")
sendRPC = true
// No change here, this is simple. We're updating the peer's view of what we have and what we're requesting to
// the bitmasks we're sending to them.
pState.sentPartsMetadata = myPartsMeta
rpc.PartsMetadata = myPartsMeta
}
Expand Down Expand Up @@ -370,6 +388,8 @@ func (e *PartialMessagesExtension) EmitGossip(topic string, peers []peer.ID) {
}

if !bytes.Equal(rpc.PartsMetadata, pState.sentPartsMetadata) {
// No change, we're just updating the peer's view of what we have and what we're requesting to
// the bitmasks we're sending to them here.
pState.sentPartsMetadata = rpc.PartsMetadata
e.sendRPC(peer, rpc)
}
Expand Down Expand Up @@ -404,6 +424,9 @@ func (e *PartialMessagesExtension) HandleRPC(from peer.ID, rpc *pb.PartialMessag
pState = &peerState{}
state.peerState[from] = pState
}
// We're updating our view of what the peer has/is requesting by merging their existing parts metadata
// with the one they've sent us here. In this case, we'd OR the available bitmask and overwrite their request bitmask
// to what they've sent here.
pState.partsMetadata = e.MergePartsMetadata(rpc.GetTopicID(), pState.partsMetadata, rpc.PartsMetadata)
}

Expand All @@ -415,6 +438,10 @@ func (e *PartialMessagesExtension) HandleRPC(from peer.ID, rpc *pb.PartialMessag
// This works if they are responding to our request or
// if they send data eagerly. In the latter case, they will update our
// view when they receive our parts metadata.
// This merge should not update the request bitmask (because it is their view of what we're requesting which does not change here
// on an incoming RPC.)
// The right API here is either "MergeForIncomingRPC(existing peerState struct, rpc.PartsMetadata)"
// Or we let e.OnIncomingRPC below return the updated parts metadata(also pass it current peerState struct).
pState.sentPartsMetadata = e.MergePartsMetadata(rpc.GetTopicID(), pState.sentPartsMetadata, pState.partsMetadata)
}

Expand Down