Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Response Assembler Refactor #138

Merged
merged 15 commits into from
Dec 23, 2020
8 changes: 4 additions & 4 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type GraphSync struct {
requestManager *requestmanager.RequestManager
responseManager *responsemanager.ResponseManager
asyncLoader *asyncloader.AsyncLoader
peerResponseManager *responseassembler.ResponseAssembler
responseAssembler *responseassembler.ResponseAssembler
peerTaskQueue *peertaskqueue.PeerTaskQueue
peerManager *peermanager.PeerMessageManager
incomingRequestHooks *responderhooks.IncomingRequestHooks
Expand Down Expand Up @@ -141,17 +141,17 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
asyncLoader := asyncloader.New(ctx, loader, storer)
requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks, networkErrorListeners)
peerResponseManager := responseassembler.New(ctx, allocator, peerManager)
responseAssembler := responseassembler.New(ctx, allocator, peerManager)
peerTaskQueue := peertaskqueue.New()
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressRequests)
responseManager := responsemanager.New(ctx, loader, responseAssembler, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressRequests)
graphSync := &GraphSync{
network: network,
loader: loader,
storer: storer,
requestManager: requestManager,
responseManager: responseManager,
asyncLoader: asyncLoader,
peerResponseManager: peerResponseManager,
responseAssembler: responseAssembler,
peerTaskQueue: peerTaskQueue,
peerManager: peerManager,
incomingRequestHooks: incomingRequestHooks,
Expand Down
7 changes: 7 additions & 0 deletions messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,13 @@ func (mq *MessageQueue) extractOutgoingMessage() (gsmsg.GraphSyncMessage, gsmsg.
}
builder := mq.builders[0]
mq.builders = mq.builders[1:]
// if there are more queued messages, signal we still have more work
if len(mq.builders) > 0 {
select {
case mq.outgoingWork <- struct{}{}:
default:
}
}
mq.buildersLk.Unlock()
if builder.Empty() {
return gsmsg.GraphSyncMessage{}, gsmsg.Topic(0), errEmptyMessage
Expand Down
58 changes: 58 additions & 0 deletions messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,3 +267,61 @@ func TestDedupingMessages(t *testing.T) {
}
}
}

func TestResponseAssemblerSendsVeryLargeBlocksResponses(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

peer := testutil.GeneratePeers(1)[0]
messagesSent := make(chan gsmsg.GraphSyncMessage)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
messageSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
var waitGroup sync.WaitGroup
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}
allocator := allocator2.NewAllocator(1<<30, 1<<30)

messageQueue := New(ctx, peer, messageNetwork, allocator)
messageQueue.Startup()
waitGroup.Add(1)

// generate large blocks before proceeding
blks := testutil.GenerateBlocksOfSize(5, 1000000)
messageQueue.BuildMessage(uint64(len(blks[0].RawData())), func(b *gsmsg.Builder) {
b.AddBlock(blks[0])
}, []notifications.Notifee{})
waitGroup.Wait()
var message gsmsg.GraphSyncMessage
testutil.AssertReceive(ctx, t, messagesSent, &message, "message did not send")

msgBlks := message.Blocks()
require.Len(t, msgBlks, 1, "number of blks in first message was not 1")
require.True(t, blks[0].Cid().Equals(msgBlks[0].Cid()))

// Send 3 very large blocks
messageQueue.BuildMessage(uint64(len(blks[1].RawData())), func(b *gsmsg.Builder) {
b.AddBlock(blks[1])
}, []notifications.Notifee{})
messageQueue.BuildMessage(uint64(len(blks[2].RawData())), func(b *gsmsg.Builder) {
b.AddBlock(blks[2])
}, []notifications.Notifee{})
messageQueue.BuildMessage(uint64(len(blks[3].RawData())), func(b *gsmsg.Builder) {
b.AddBlock(blks[3])
}, []notifications.Notifee{})

testutil.AssertReceive(ctx, t, messagesSent, &message, "message did not send")
msgBlks = message.Blocks()
require.Len(t, msgBlks, 1, "number of blks in first message was not 1")
require.True(t, blks[1].Cid().Equals(msgBlks[0].Cid()))

testutil.AssertReceive(ctx, t, messagesSent, &message, "message did not send")
msgBlks = message.Blocks()
require.Len(t, msgBlks, 1, "number of blks in first message was not 1")
require.True(t, blks[2].Cid().Equals(msgBlks[0].Cid()))

testutil.AssertReceive(ctx, t, messagesSent, &message, "message did not send")
msgBlks = message.Blocks()
require.Len(t, msgBlks, 1, "number of blks in first message was not 1")
require.True(t, blks[3].Cid().Equals(msgBlks[0].Cid()))
}
30 changes: 15 additions & 15 deletions responsemanager/queryexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type queryExecutor struct {
blockHooks BlockHooks
updateHooks UpdateHooks
cancelledListeners CancelledListeners
peerManager ResponseAssembler
responseAssembler ResponseAssembler
loader ipld.Loader
queryQueue QueryQueue
messages chan responseManagerMessage
Expand Down Expand Up @@ -117,7 +117,7 @@ func (qe *queryExecutor) prepareQuery(ctx context.Context,
var transactionError error
var isPaused bool
failNotifee := notifications.Notifee{Data: graphsync.RequestFailedUnknown, Subscriber: sub}
err := qe.peerManager.Transaction(p, request.ID(), func(transaction responseassembler.PeerResponseTransactionBuilder) error {
err := qe.responseAssembler.Transaction(p, request.ID(), func(transaction responseassembler.TransactionBuilder) error {
for _, extension := range result.Extensions {
transaction.SendExtensionData(extension)
}
Expand Down Expand Up @@ -163,14 +163,14 @@ func (qe *queryExecutor) processDedupByKey(request gsmsg.GraphSyncRequest, p pee
}
key, err := dedupkey.DecodeDedupKey(dedupData)
if err != nil {
_ = qe.peerManager.Transaction(p, request.ID(), func(prtb responseassembler.PeerResponseTransactionBuilder) error {
_ = qe.responseAssembler.Transaction(p, request.ID(), func(prtb responseassembler.TransactionBuilder) error {
prtb.FinishWithError(graphsync.RequestFailedUnknown)
prtb.AddNotifee(failNotifee)
return nil
})
return err
}
qe.peerManager.DedupKey(p, request.ID(), key)
qe.responseAssembler.DedupKey(p, request.ID(), key)
return nil
}

Expand All @@ -181,7 +181,7 @@ func (qe *queryExecutor) processDoNoSendCids(request gsmsg.GraphSyncRequest, p p
}
cidSet, err := cidset.DecodeCidSet(doNotSendCidsData)
if err != nil {
_ = qe.peerManager.Transaction(p, request.ID(), func(prtb responseassembler.PeerResponseTransactionBuilder) error {
_ = qe.responseAssembler.Transaction(p, request.ID(), func(prtb responseassembler.TransactionBuilder) error {
prtb.FinishWithError(graphsync.RequestFailedUnknown)
prtb.AddNotifee(failNotifee)
return nil
Expand All @@ -196,7 +196,7 @@ func (qe *queryExecutor) processDoNoSendCids(request gsmsg.GraphSyncRequest, p p
if err != nil {
return err
}
qe.peerManager.IgnoreBlocks(p, request.ID(), links)
qe.responseAssembler.IgnoreBlocks(p, request.ID(), links)
return nil
}

Expand All @@ -210,7 +210,7 @@ func (qe *queryExecutor) executeQuery(
updateChan := make(chan []gsmsg.GraphSyncRequest)
err := runtraversal.RunTraversal(loader, traverser, func(link ipld.Link, data []byte) error {
var err error
_ = qe.peerManager.Transaction(p, request.ID(), func(transaction responseassembler.PeerResponseTransactionBuilder) error {
_ = qe.responseAssembler.Transaction(p, request.ID(), func(transaction responseassembler.TransactionBuilder) error {
err = qe.checkForUpdates(p, request, signals, updateChan, transaction)
if _, ok := err.(hooks.ErrPaused); !ok && err != nil {
return nil
Expand All @@ -234,15 +234,15 @@ func (qe *queryExecutor) executeQuery(
return err
})
var code graphsync.ResponseStatusCode
_ = qe.peerManager.Transaction(p, request.ID(), func(peerResponseSender responseassembler.PeerResponseTransactionBuilder) error {
_ = qe.responseAssembler.Transaction(p, request.ID(), func(transactionBuilder responseassembler.TransactionBuilder) error {
if err != nil {
_, isPaused := err.(hooks.ErrPaused)
if isPaused {
code = graphsync.RequestPaused
return nil
}
if isContextErr(err) {
peerResponseSender.FinishWithCancel()
transactionBuilder.FinishWithCancel()
code = graphsync.RequestCancelled
return nil
}
Expand All @@ -255,11 +255,11 @@ func (qe *queryExecutor) executeQuery(
} else {
code = graphsync.RequestFailedUnknown
}
peerResponseSender.FinishWithError(graphsync.RequestCancelled)
transactionBuilder.FinishWithError(graphsync.RequestCancelled)
} else {
code = peerResponseSender.FinishRequest()
code = transactionBuilder.FinishRequest()
}
peerResponseSender.AddNotifee(notifications.Notifee{Data: code, Subscriber: sub})
transactionBuilder.AddNotifee(notifications.Notifee{Data: code, Subscriber: sub})
return nil
})
return code, err
Expand All @@ -270,11 +270,11 @@ func (qe *queryExecutor) checkForUpdates(
request gsmsg.GraphSyncRequest,
signals signals,
updateChan chan []gsmsg.GraphSyncRequest,
peerResponseSender responseassembler.PeerResponseTransactionBuilder) error {
transactionBuilder responseassembler.TransactionBuilder) error {
for {
select {
case <-signals.pauseSignal:
peerResponseSender.PauseRequest()
transactionBuilder.PauseRequest()
return hooks.ErrPaused{}
case err := <-signals.errSignal:
return err
Expand All @@ -288,7 +288,7 @@ func (qe *queryExecutor) checkForUpdates(
for _, update := range updates {
result := qe.updateHooks.ProcessUpdateHooks(p, request, update)
for _, extension := range result.Extensions {
peerResponseSender.SendExtensionData(extension)
transactionBuilder.SendExtensionData(extension)
}
if result.Err != nil {
return result.Err
Expand Down
49 changes: 30 additions & 19 deletions responsemanager/responseassembler/responseassembler.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
/*
Package responseassembler assembles responses that are queued for sending in outgoing messages

The response assembler's Transaction method allows a caller to specify response actions that will go into a single
libp2p2 message. The response assembler will also deduplicate blocks that have already been sent over the network in
a previous message
*/
package responseassembler

import (
Expand All @@ -13,10 +20,11 @@ import (
)

// Transaction is a series of operations that should be send together in a single response
type Transaction func(PeerResponseTransactionBuilder) error
type Transaction func(TransactionBuilder) error

// PeerResponseTransactionBuilder is a limited interface for sending responses inside a transaction
type PeerResponseTransactionBuilder interface {
// TransactionBuilder is a limited interface for assembling responses inside a transaction, so that they are included
// in the same message on the protocol
type TransactionBuilder interface {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like ResponseBuilder or MessageBuilder more for this. I can see how this 'adds' to a transaction, but the name suggests it 'builds' a transaction in the sense that a message builder builds a message, but it doesn't really. I can also see TransactionContext, TransactionMutator or TransactionAPI as this represents the limited set of message building functionality available within a transaction.

SendResponse(
link ipld.Link,
data []byte,
Expand All @@ -29,8 +37,7 @@ type PeerResponseTransactionBuilder interface {
AddNotifee(notifications.Notifee)
}

// PeerMessageHandler is an interface that can send a response for a given peer across
// the network.
// PeerMessageHandler is an interface that can queues a response for a given peer to go out over the network
type PeerMessageHandler interface {
BuildMessage(p peer.ID, blkSize uint64, buildResponseFn func(*gsmsg.Builder), notifees []notifications.Notifee)
}
Expand All @@ -40,15 +47,16 @@ type Allocator interface {
AllocateBlockMemory(p peer.ID, amount uint64) <-chan error
}

// ResponseAssembler manages message queues for peers
// ResponseAssembler manages assembling responses to go out over the network
// in libp2p messages
type ResponseAssembler struct {
*peermanager.PeerManager
allocator Allocator
peerHandler PeerMessageHandler
ctx context.Context
}

// New generates a new peer manager for sending responses
// New generates a new ResponseAssembler for sending responses
func New(ctx context.Context, allocator Allocator, peerHandler PeerMessageHandler) *ResponseAssembler {
return &ResponseAssembler{
PeerManager: peermanager.New(ctx, func(ctx context.Context, p peer.ID) peermanager.PeerHandler {
Expand All @@ -60,40 +68,43 @@ func New(ctx context.Context, allocator Allocator, peerHandler PeerMessageHandle
}
}

func (prm *ResponseAssembler) DedupKey(p peer.ID, requestID graphsync.RequestID, key string) {
prm.GetProcess(p).(*peerLinkTracker).DedupKey(requestID, key)
// DedupKey indicates that outgoing blocks should be deduplicated in a seperate bucket (only with requests that share
// supplied key string)
func (ra *ResponseAssembler) DedupKey(p peer.ID, requestID graphsync.RequestID, key string) {
ra.GetProcess(p).(*peerLinkTracker).DedupKey(requestID, key)
}

func (prm *ResponseAssembler) IgnoreBlocks(p peer.ID, requestID graphsync.RequestID, links []ipld.Link) {
prm.GetProcess(p).(*peerLinkTracker).IgnoreBlocks(requestID, links)
// IgnoreBlocks indicates that a list of keys that should be ignored when sending blocks
func (ra *ResponseAssembler) IgnoreBlocks(p peer.ID, requestID graphsync.RequestID, links []ipld.Link) {
ra.GetProcess(p).(*peerLinkTracker).IgnoreBlocks(requestID, links)
}

// Transaction Build A Response
func (prm *ResponseAssembler) Transaction(p peer.ID, requestID graphsync.RequestID, transaction Transaction) error {
// Transaction build a response, and queues it for sending in the next outgoing message
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Transaction build a response, and queues it for sending in the next outgoing message
// Transaction builds a response, and queues it for sending in the next outgoing message

func (ra *ResponseAssembler) Transaction(p peer.ID, requestID graphsync.RequestID, transaction Transaction) error {
prts := &transactionBuilder{
requestID: requestID,
linkTracker: prm.GetProcess(p).(*peerLinkTracker),
linkTracker: ra.GetProcess(p).(*peerLinkTracker),
}
err := transaction(prts)
if err == nil {
prm.execute(p, prts.operations, prts.notifees)
ra.execute(p, prts.operations, prts.notifees)
}
return err
}

func (prs *ResponseAssembler) execute(p peer.ID, operations []responseOperation, notifees []notifications.Notifee) {
func (ra *ResponseAssembler) execute(p peer.ID, operations []responseOperation, notifees []notifications.Notifee) {
size := uint64(0)
for _, op := range operations {
size += op.size()
}
if size > 0 {
select {
case <-prs.allocator.AllocateBlockMemory(p, size):
case <-prs.ctx.Done():
case <-ra.allocator.AllocateBlockMemory(p, size):
case <-ra.ctx.Done():
return
}
}
prs.peerHandler.BuildMessage(p, size, func(responseBuilder *gsmsg.Builder) {
ra.peerHandler.BuildMessage(p, size, func(responseBuilder *gsmsg.Builder) {
for _, op := range operations {
op.build(responseBuilder)
}
Expand Down
Loading