Skip to content

Commit

Permalink
Tech Debt Cleanup and Docs Update (#219)
Browse files Browse the repository at this point in the history
* refactor(graphsync): cleanup status code utilities

* refactor(responsemanager): reorganize for clarity

Clarify actor pattern and responsibilities across threads

* refactor(requestmanager): reorganize for clarity

refactor requestmanager to clearly designate responsibilities in actor pattern

* refactor(asyncloader): remove go routines

remove the actor pattern from asyncloader, as its not needed. it works well as simply a locked data
structure and is about 10x simpler

* docs(README): cleanup and update for LinkSystem branch

* docs(architecture): update to explain actor pattern

* docs(architecture.md): update for link system

update references in architecture doc to reflect IPLD linksystem

* refactor(style): cleanup errors, remove unused field
  • Loading branch information
hannahhoward authored Sep 24, 2021
1 parent 48cf3df commit 9c1504a
Show file tree
Hide file tree
Showing 21 changed files with 1,606 additions and 1,714 deletions.
110 changes: 7 additions & 103 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ If your existing library (i.e. `go-ipfs` or `go-filecoin`) uses these other olde

## Install

`go-graphsync` requires Go >= 1.11 and can be installed using Go modules
`go-graphsync` requires Go >= 1.13 and can be installed using Go modules

## Usage

Expand All @@ -58,24 +58,22 @@ import (

var ctx context.Context
var host libp2p.Host
var loader ipld.Loader
var storer ipld.Storer
var lsys ipld.LinkSystem

network := gsnet.NewFromLibp2pHost(host)
exchange := graphsync.New(ctx, network, loader, storer)
exchange := graphsync.New(ctx, network, lsys)
```

Parameter Notes:

1. `context` is just the parent context for all of GraphSync
2. `network` is a network abstraction provided to Graphsync on top
of libp2p. This allows graphsync to be tested without the actual network
3. `loader` is used to load blocks from content ids from the local block store. It's used when RESPONDING to requests from other clients. It should conform to the IPLD loader interface: https://github.com/ipld/go-ipld-prime/blob/master/linking.go
4. `storer` is used to store incoming blocks to the local block store. It's used when REQUESTING a graphsync query, to store blocks locally once they are validated as part of the correct response. It should conform to the IPLD storer interface: https://github.com/ipld/go-ipld-prime/blob/master/linking.go
3. `lsys` is an go-ipld-prime LinkSystem, which provides mechanisms loading and constructing go-ipld-prime nodes from a link, and saving ipld prime nodes to serialized data

### Using GraphSync With An IPFS BlockStore

GraphSync provides two convenience functions in the `storeutil` package for
GraphSync provides a convenience function in the `storeutil` package for
integrating with BlockStore's from IPFS.

```golang
Expand All @@ -92,103 +90,9 @@ var host libp2p.Host
var bs blockstore.Blockstore

network := gsnet.NewFromLibp2pHost(host)
loader := storeutil.LoaderForBlockstore(bs)
storer := storeutil.StorerForBlockstore(bs)
lsys := storeutil.LinkSystemForBlockstore(bs)

exchange := graphsync.New(ctx, network, loader, storer)
```

### Write A Loader For An IPFS BlockStore

If you are using a traditional go-ipfs-blockstore, your link loading function looks like this:

```golang
type BlockStore interface {
Get(lnk cid.Cid) (blocks.Block, error)
}
```

or, more generally:

```golang
type Cid2BlockFn func (lnk cid.Cid) (blocks.Block, error)
```

in `go-ipld-prime`, the signature for a link loader is as follows:

```golang
type Loader func(lnk Link, lnkCtx LinkContext) (io.Reader, error)
```

`go-ipld-prime` intentionally keeps its interfaces as abstract as possible to limit dependencies on other ipfs/filecoin specific packages. An IPLD Link is an abstraction for a CID, and IPLD expects io.Reader's rather than an actual block. IPLD provides a `cidLink` package for working with Links that use CIDs as the underlying data, and it's safe to assume that's the type in use if your code deals only with CIDs. A conversion would look something like this:

```golang
import (
ipld "github.com/ipld/go-ipld-prime"
cidLink "github.com/ipld/go-ipld-prime/linking/cid"
)

func LoaderFromCid2BlockFn(cid2BlockFn Cid2BlockFn) ipld.Loader {
return func(lnk ipld.Link, lnkCtx ipld.LinkContext) (io.Reader, error) {
asCidLink, ok := lnk.(cidlink.Link)
if !ok {
return nil, fmt.Errorf("Unsupported Link Type")
}
block, err := cid2BlockFn(asCidLink.Cid)
if err != nil {
return nil, err
}
return bytes.NewReader(block.RawData()), nil
}
}
```

### Write A Storer From An IPFS BlockStore

If you are using a traditional go-ipfs-blockstore, your storage function looks like this:

```golang
type BlockStore interface {
Put(blocks.Block) error
}
```

or, more generally:

```golang
type BlockStoreFn func (blocks.Block) (error)
```

in `go-ipld-prime`, the signature for a link storer is a bit different:

```golang
type StoreCommitter func(Link) error
type Storer func(lnkCtx LinkContext) (io.Writer, StoreCommitter, error)
```

`go-ipld-prime` stores in two parts to support streaming -- the storer is called and returns an IO.Writer and a function to commit changes when finished. Here's how you can write a storer from a traditional block storing signature.

```golang
import (
blocks "github.com/ipfs/go-block-format"
ipld "github.com/ipld/go-ipld-prime"
cidLink "github.com/ipld/go-ipld-prime/linking/cid"
)

func StorerFromBlockStoreFn(blockStoreFn BlockStoreFn) ipld.Storer {
return func(lnkCtx ipld.LinkContext) (io.Writer, ipld.StoreCommitter, error) {
var buffer bytes.Buffer
committer := func(lnk ipld.Link) error {
asCidLink, ok := lnk.(cidlink.Link)
if !ok {
return fmt.Errorf("Unsupported Link Type")
}
block := blocks.NewBlockWithCid(buffer.Bytes(), asCidLink.Cid)
return blockStoreFn(block)
}
return &buffer, committer, nil
}
}
exchange := graphsync.New(ctx, network, lsys)
```

### Calling Graphsync
Expand Down
36 changes: 26 additions & 10 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ This document explains the basic architecture for the go implementation of the G
- [Requestor Implementation](#requestor-implementation)
- [Responder Implementation](#responder-implementation)
- [Message Sending Layer](#message-sending-layer)
- [Miscellaneous](#miscellaneous)

## Overview

Expand All @@ -28,7 +29,7 @@ go-graphsync also depends on the following external dependencies:

1. A network implementation, which provides basic functions for sending and receiving messages on a network.

2. A local blockstore implementation, expressed by a `loader` function and a `storer` function.
2. A local blockstore implementation, expressed by an IPLD `LinkSystem`.

## Request Lifecycle

Expand All @@ -47,13 +48,13 @@ This order of these requirements corresponds roughly with the sequence they're e

However, if you reverse the order of these requirements, it becomes clear that a GraphSync request is really an IPLD Selector Query performed locally that happens to be backed by another remote peer performing the same query on its machine and feeding the results to the requestor.

Selector queries, as implemented in the `go-ipld-prime` library, rely on a loader function to load data any time a link boundary is crossed during a query. The loader can be configured each time a selector query is performed. We use this to support network communication on both sides of a GraphSync query.
Selector queries, as implemented in the `go-ipld-prime` library, rely on a function to load data any time a link boundary is crossed during a query. The loader can be configured each time a selector query is performed. We use this to support network communication on both sides of a GraphSync query.

On the requestor side, instead of supplying the local storage loader, we supply it with a different loader that waits for responses from the network -- and also simultaneously stores them in local storage as they are loaded. Blocks that come back on the network that are never loaded as part of the local Selector traversal are simply dropped. Moreover, we can take advantage of the fact that blocks get stored locally as they are traversed to limit network traffic -- there's no need to send back a block twice because we can safely assume in a single query, once a block is traversed once, it's in the requestors local storage.
On the requestor side, instead of supplying a function to read from local storage, we supply a function that waits for responses from the network -- and also simultaneously stores them in local storage as they are loaded. Blocks that come back on the network that are never loaded as part of the local Selector traversal are simply dropped. Moreover, we can take advantage of the fact that blocks get stored locally as they are traversed to limit network traffic -- there's no need to send back a block twice because we can safely assume in a single query, once a block is traversed once, it's in the requestors local storage.

On the responder side, we employ a similar method -- while an IPLD Selector query operates at the finer grain of traversing IPLD Nodes, what we really care about is when it crosses a link boundary. At this point, IPLD asks the Loader to load the link, and here, we provide IPLD with a loader that wraps the local storage loader but also transmits every block loaded across the network.
On the responder side, we employ a similar method -- while an IPLD Selector query operates at the finer grain of traversing IPLD Nodes, what we really care about is when it crosses a link boundary. At this point, IPLD calls out to a function to load the link, and here, we provide IPLD with a function that loads from local storage but also transmits every block loaded across the network.

So, effectively what we are doing is using intercepted loaders on both sides to handle the transmitting and receiving of data across the network.
So, effectively what we are doing is using intercepted block loaders on both sides to handle the transmitting and receiving of data across the network.

While the actual code operates in a way that is slightly more complicated, the basic sequence of a single GraphSync request is as follows:

Expand All @@ -71,9 +72,8 @@ Having outlined all the steps to execute a single roundtrip Graphsync request, t
To do this, GraphSync maintains several independent threads of execution (i.e. goroutines). Specifically:
- On the requestor side:
1. We maintain an independent thread to make and track requests (RequestManager)
2. We maintain an independent thread to feed incoming blocks to selector verifications (AsyncLoader)
3. Each outgoing request has an independent thread performing selector verification
4. Each outgoing request has an independent thread collecting and buffering final responses before they are returned to the caller. Graphsync returns responses to the caller through a channel. If the caller fails to immediately read the response channel, this should not block other requests from being processed.
2. Each outgoing request has an independent thread performing selector verification
3. Each outgoing request has an independent thread collecting and buffering final responses before they are returned to the caller. Graphsync returns responses to the caller through a channel. If the caller fails to immediately read the response channel, this should not block other requests from being processed.
- On the responder side:
1. We maintain an independent thread to receive incoming requests and track outgoing responses. As each incoming request is received, it's put into a prioritized queue.
2. We maintain fixed number of threads that continuously pull the highest priority request from the queue and perform the selector query for that request. We marshal and deduplicate outgoing responses and blocks before they are sent back. This minimizes data sent on the wire and allows queries to proceed without getting blocked by the network.
Expand All @@ -93,7 +93,7 @@ The network implementation needs to provide basic lower level utilities for send

### Local Blockstore Implementation

Interacting with a local blockstore is expressed by a `loader` function and a `storer` function. The `loader` function takes an IPLD Link and returns an `io.Reader` for corresponding block data, while the `storer` takes a Link and returns a `io.Writer` to write corresponding block data, plus a commit function to call when the data is ready to transfer to permanent storage.
Interacting with a local blockstore is expressed via an IPLD `LinkSystem`. The block loading function in an IPLD `LinkSystem` takes an IPLD Link and returns an `io.Reader` for corresponding block data, while the block storing function takes a Link and returns a `io.Writer` to write corresponding block data, plus a commit function to call when the data is ready to transfer to permanent storage.

## Requestor Implementation

Expand Down Expand Up @@ -172,9 +172,25 @@ The message consists of a PeerManager which tracks peers, and a message queue fo

The message queue system contains a mechanism for applying backpressure to a query execution to make sure that a slow network connection doesn't cause us to load all the blocks for the query into memory while we wait for messages to go over the network. Whenever you attempt to queue data into the message queue, you provide an estimated size for the data that will be held in memory till the message goes out. Internally, the message queue uses the Allocator to track memory usage, and the call to queue data will block if there is too much data buffered in memory. When messages are sent out, memory is released, which will unblock requests to queue data for the message queue.

## Hooks And Listeners
## Miscellaneous

### Hooks And Listeners

go-graphsync provides a variety of points in the request/response lifecycle where one can provide a hook to inspect the current state of the request/response and potentially take action. These hooks provide the core mechanisms for authenticating requests, processing graphsync extensions, pausing and resuming, and generally enabling a higher level consumer of the graphsync to precisely control the request/response lifecycle.

Graphsync also provides listeners that enable a caller to be notified when various asynchronous events happen in the request response lifecycle. Currently graphsync contains an internal pubsub notification system (see [notifications](../notifications)) to escalate low level asynchonous events back to high level modules that pass them to external listeners. A future refactor might look for a way to remove this notification system as it adds additional complexity.

### Actor Pattern In RequestManager And ResponseManager

To manage concurrency in a predictable way, the RequestManager and the ResponseManager are informally implemented using the [Actor model](https://en.wikipedia.org/wiki/Actor_model) employed in distributed systems languages like Erlang.

Each has isolated, internal state and a semi-asynchronous message queue (just a go channel with a 16 message buffer). The internal thread takes messages off the queue and dispatches them to call methods that modify internal state.

Each implementation is spread out across three files:
- client.go - the public interface whose methods dispatch messages to the internal thread
- server.go - the methods run inside the thread that actually process messages and modify internal state
- messages.go - the differnt messages that are sent through the main message box

To achieve the kind of dynamic dispatch one expects from the actor pattern based on message type, we use the visitor pattern to simulate sum types. (https://making.pusher.com/alternatives-to-sum-types-in-go/) This does mean the implementation is a bit verbose to say the least.

However, implementing actors provides a more predictable way to handle concurrency issues than traditional select statements and helps make the logic of complex classes like the RequestManager and ResponseManager easier to follow.
Binary file modified docs/processes.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 0 additions & 1 deletion docs/processes.puml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ if (operation type) then (outgoing request or incoming response)
partition "Graphsync Requestor Implementation" {
:RequestManager;
if (operation type) then (incoming response)
:AsyncLoader;
partition "Verifying Queries" {
fork
:ipld.Traverse;
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/ipfs/go-graphsync

go 1.12
go 1.13

require (
github.com/gogo/protobuf v1.3.2
Expand Down
3 changes: 1 addition & 2 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
requestAllocator: requestAllocator,
}

asyncLoader.Startup()
requestManager.SetDelegate(peerManager)
requestManager.Startup()
responseManager.Startup()
Expand All @@ -206,7 +205,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,

// Request initiates a new GraphSync request to the given peer using the given selector spec.
func (gs *GraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {
return gs.requestManager.SendRequest(ctx, p, root, selector, extensions...)
return gs.requestManager.NewRequest(ctx, p, root, selector, extensions...)
}

// RegisterIncomingRequestHook adds a hook that runs when a request is received
Expand Down
15 changes: 6 additions & 9 deletions message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,23 @@ import (

// IsTerminalSuccessCode returns true if the response code indicates the
// request terminated successfully.
// DEPRECATED: use status.IsSuccess()
func IsTerminalSuccessCode(status graphsync.ResponseStatusCode) bool {
return status == graphsync.RequestCompletedFull ||
status == graphsync.RequestCompletedPartial
return status.IsSuccess()
}

// IsTerminalFailureCode returns true if the response code indicates the
// request terminated in failure.
// DEPRECATED: use status.IsFailure()
func IsTerminalFailureCode(status graphsync.ResponseStatusCode) bool {
return status == graphsync.RequestFailedBusy ||
status == graphsync.RequestFailedContentNotFound ||
status == graphsync.RequestFailedLegal ||
status == graphsync.RequestFailedUnknown ||
status == graphsync.RequestCancelled ||
status == graphsync.RequestRejected
return status.IsFailure()
}

// IsTerminalResponseCode returns true if the response code signals
// the end of the request
// DEPRECATED: use status.IsTerminal()
func IsTerminalResponseCode(status graphsync.ResponseStatusCode) bool {
return IsTerminalSuccessCode(status) || IsTerminalFailureCode(status)
return status.IsTerminal()
}

// Exportable is an interface that can serialize to a protobuf
Expand Down
Loading

0 comments on commit 9c1504a

Please sign in to comment.