-
Notifications
You must be signed in to change notification settings - Fork 291
/
util.go
180 lines (153 loc) · 4.42 KB
/
util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package adder
import (
"context"
"errors"
"sync"
"github.com/ipfs-cluster/ipfs-cluster/api"
"go.uber.org/multierr"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
)
// ErrBlockAdder is returned when adding a to multiple destinations
// block fails on all of them.
var ErrBlockAdder = errors.New("failed to put block on all destinations")
// BlockStreamer helps streaming nodes to multiple destinations, as long as
// one of them is still working.
type BlockStreamer struct {
dests []peer.ID
rpcClient *rpc.Client
blocks <-chan api.NodeWithMeta
ctx context.Context
cancel context.CancelFunc
errMu sync.Mutex
err error
}
// NewBlockStreamer creates a BlockStreamer given an rpc client, allocated
// peers and a channel on which the blocks to stream are received.
func NewBlockStreamer(ctx context.Context, rpcClient *rpc.Client, dests []peer.ID, blocks <-chan api.NodeWithMeta) *BlockStreamer {
bsCtx, cancel := context.WithCancel(ctx)
bs := BlockStreamer{
ctx: bsCtx,
cancel: cancel,
dests: dests,
rpcClient: rpcClient,
blocks: blocks,
err: nil,
}
go bs.streamBlocks()
return &bs
}
// Done returns a channel which gets closed when the BlockStreamer has
// finished.
func (bs *BlockStreamer) Done() <-chan struct{} {
return bs.ctx.Done()
}
func (bs *BlockStreamer) setErr(err error) {
bs.errMu.Lock()
bs.err = err
bs.errMu.Unlock()
}
// Err returns any errors that happened after the operation of the
// BlockStreamer, for example when blocks could not be put to all nodes.
func (bs *BlockStreamer) Err() error {
bs.errMu.Lock()
defer bs.errMu.Unlock()
return bs.err
}
func (bs *BlockStreamer) streamBlocks() {
defer bs.cancel()
// Nothing should be sent on out.
// We drain though
out := make(chan struct{})
go func() {
for range out {
}
}()
errs := bs.rpcClient.MultiStream(
bs.ctx,
bs.dests,
"IPFSConnector",
"BlockStream",
bs.blocks,
out,
)
combinedErrors := multierr.Combine(errs...)
// FIXME: replicate everywhere.
if len(multierr.Errors(combinedErrors)) == len(bs.dests) {
logger.Error(combinedErrors)
bs.setErr(ErrBlockAdder)
} else if combinedErrors != nil {
logger.Warning("there were errors streaming blocks, but at least one destination succeeded")
logger.Warning(combinedErrors)
}
}
// IpldNodeToNodeWithMeta converts an ipld.Node to api.NodeWithMeta.
func IpldNodeToNodeWithMeta(n ipld.Node) api.NodeWithMeta {
size, err := n.Size()
if err != nil {
logger.Warn(err)
}
return api.NodeWithMeta{
Cid: api.NewCid(n.Cid()),
Data: n.RawData(),
CumSize: size,
}
}
// BlockAllocate helps allocating blocks to peers.
func BlockAllocate(ctx context.Context, rpc *rpc.Client, pinOpts api.PinOptions) ([]peer.ID, error) {
// Find where to allocate this file
var allocsStr []peer.ID
err := rpc.CallContext(
ctx,
"",
"Cluster",
"BlockAllocate",
api.PinWithOpts(api.CidUndef, pinOpts),
&allocsStr,
)
return allocsStr, err
}
// Pin helps sending local RPC pin requests.
func Pin(ctx context.Context, rpc *rpc.Client, pin api.Pin) error {
if pin.ReplicationFactorMin < 0 {
pin.Allocations = []peer.ID{}
}
logger.Debugf("adder pinning %+v", pin)
var pinResp api.Pin
return rpc.CallContext(
ctx,
"", // use ourself to pin
"Cluster",
"Pin",
pin,
&pinResp,
)
}
// ErrDAGNotFound is returned whenever we try to get a block from the DAGService.
var ErrDAGNotFound = errors.New("dagservice: a Get operation was attempted while cluster-adding (this is likely a bug)")
// BaseDAGService partially implements an ipld.DAGService.
// It provides the methods which are not needed by ClusterDAGServices
// (Get*, Remove*) so that they can save adding this code.
type BaseDAGService struct {
}
// Get always returns errNotFound
func (dag BaseDAGService) Get(ctx context.Context, key cid.Cid) (ipld.Node, error) {
return nil, ErrDAGNotFound
}
// GetMany returns an output channel that always emits an error
func (dag BaseDAGService) GetMany(ctx context.Context, keys []cid.Cid) <-chan *ipld.NodeOption {
out := make(chan *ipld.NodeOption, 1)
out <- &ipld.NodeOption{Err: ErrDAGNotFound}
close(out)
return out
}
// Remove is a nop
func (dag BaseDAGService) Remove(ctx context.Context, key cid.Cid) error {
return nil
}
// RemoveMany is a nop
func (dag BaseDAGService) RemoveMany(ctx context.Context, keys []cid.Cid) error {
return nil
}