Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start removing allFutures #125

Merged
merged 10 commits into from
Apr 11, 2020
39 changes: 39 additions & 0 deletions libp2p/errors.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# this module will be further extended in PR
# https://github.com/status-im/nim-libp2p/pull/107/

import chronos
import chronicles
import macros

# could not figure how to make it with a simple template
# sadly nim needs more love for hygenic templates
# so here goes the macro, its based on the proc/template version
# and uses quote do so it's quite readable

macro checkFutures*[T](futs: seq[Future[T]], exclude: untyped = []): untyped =
let nexclude = exclude.len
case nexclude
of 0:
quote do:
let pos = instantiationInfo()
for res in `futs`:
if res.failed:
let exc = res.readError()
# We still don't abort but warn
warn "Something went wrong in a future",
error=exc.name, file=pos.filename, line=pos.line
else:
quote do:
let pos = instantiationInfo()
for res in `futs`:
block check:
if res.failed:
let exc = res.readError()
for i in 0..<`nexclude`:
if exc of `exclude`[i]:
trace "Ignoring an error (no warning)",
error=exc.name, file=pos.filename, line=pos.line
break check
# We still don't abort but warn
warn "Something went wrong in a future",
error=exc.name, file=pos.filename, line=pos.line
24 changes: 20 additions & 4 deletions libp2p/muxers/mplex/lpchannel.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import types,
../../stream/bufferstream,
../../stream/lpstream,
../../connection,
../../utility
../../utility,
../../errors

logScope:
topic = "MplexChannel"
Expand Down Expand Up @@ -93,12 +94,27 @@ proc resetMessage(s: LPChannel) {.async.} =
await s.conn.writeMsg(s.id, s.resetCode)

proc resetByRemote*(s: LPChannel) {.async.} =
await allFutures(s.close(), s.closedByRemote())
# Immediately block futher calls
s.isReset = true
await s.cleanUp()

# start and await async teardown
let
futs = await allFinished(
s.close(),
s.closedByRemote(),
s.cleanUp()
)

checkFutures(futs, [LPStreamEOFError])

proc reset*(s: LPChannel) {.async.} =
await allFutures(s.resetMessage(), s.resetByRemote())
let
futs = await allFinished(
s.resetMessage(),
s.resetByRemote()
)

checkFutures(futs, [LPStreamEOFError])

method closed*(s: LPChannel): bool =
trace "closing lpchannel", id = s.id, initiator = s.initiator
Expand Down
11 changes: 9 additions & 2 deletions libp2p/muxers/mplex/mplex.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import ../muxer,
../../connection,
../../stream/lpstream,
../../utility,
../../errors,
coder,
types,
lpchannel
Expand Down Expand Up @@ -154,10 +155,16 @@ method newStream*(m: Mplex,

method close*(m: Mplex) {.async, gcsafe.} =
trace "closing mplex muxer"

if not m.connection.closed():
await m.connection.close()

await allFutures(@[allFutures(toSeq(m.remote.values).mapIt(it.reset())),
allFutures(toSeq(m.local.values).mapIt(it.reset()))])
let
futs = await allFinished(
toSeq(m.remote.values).mapIt(it.reset()) &
toSeq(m.local.values).mapIt(it.reset()))

checkFutures(futs)

m.remote.clear()
m.local.clear()
24 changes: 16 additions & 8 deletions libp2p/muxers/muxer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

import chronos, chronicles
import ../protocols/protocol,
../connection
../connection,
../errors

logScope:
topic = "Muxer"
Expand Down Expand Up @@ -45,15 +46,22 @@ proc newMuxerProvider*(creator: MuxerConstructor, codec: string): MuxerProvider

method init(c: MuxerProvider) =
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let muxer = c.newMuxer(conn)
var handlerFut = if not isNil(c.muxerHandler):
c.muxerHandler(muxer)
else:
var dummyFut = newFuture[void]()
dummyFut.complete(); dummyFut
let
muxer = c.newMuxer(conn)

if not isNil(c.streamHandler):
muxer.streamHandler = c.streamHandler

await allFutures(muxer.handle(), handlerFut)
var futs = newSeq[Future[void]]()

futs &= muxer.handle()

# finally await both the futures
if not isNil(c.muxerHandler):
futs &= c.muxerHandler(muxer)

# log and re-raise on errors
futs = await allFinished(futs)
checkFutures(futs)

c.handler = handler
13 changes: 10 additions & 3 deletions libp2p/protocols/pubsub/floodsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import pubsub,
../../connection,
../../peerinfo,
../../peer,
../../utility
../../utility,
../../errors

logScope:
topic = "FloodSub"
Expand Down Expand Up @@ -85,10 +86,13 @@ method rpcHandler*(f: FloodSub,

# forward the message to all peers interested in it
var sent: seq[Future[void]]
# start the future but do not wait yet
for p in toSendPeers:
if p in f.peers and f.peers[p].id != peer.id:
sent.add(f.peers[p].send(@[RPCMsg(messages: m.messages)]))
await allFutures(sent)
# wait for all the futures now
sent = await allFinished(sent)
checkFutures(sent)

method init(f: FloodSub) =
proc handler(conn: Connection, proto: string) {.async.} =
Expand Down Expand Up @@ -118,10 +122,13 @@ method publish*(f: FloodSub,
trace "publishing on topic", name = topic
let msg = newMessage(f.peerInfo, data, topic)
var sent: seq[Future[void]]
# start the future but do not wait yet
for p in f.floodsub[topic]:
trace "publishing message", name = topic, peer = p, data = data.shortLog
sent.add(f.peers[p].send(@[RPCMsg(messages: @[msg])]))
await allFutures(sent)
# wait for all the futures now
sent = await allFinished(sent)
checkFutures(sent)

method unsubscribe*(f: FloodSub,
topics: seq[TopicPair]) {.async.} =
Expand Down
9 changes: 6 additions & 3 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import pubsub,
../protocol,
../../peerinfo,
../../connection,
../../peer
../../peer,
../../errors

logScope:
topic = "GossipSub"
Expand Down Expand Up @@ -220,7 +221,8 @@ method rpcHandler(g: GossipSub,
if msgs.len > 0:
trace "forwarding message to", peerId = id
sent.add(g.peers[p].send(@[RPCMsg(messages: msgs)]))
await allFutures(sent)
sent = await allFinished(sent)
checkFutures(sent)

var respControl: ControlMessage
if m.control.isSome:
Expand Down Expand Up @@ -408,7 +410,8 @@ method publish*(g: GossipSub,
trace "publishing on topic", name = topic
g.mcache.put(msg)
sent.add(g.peers[p].send(@[RPCMsg(messages: @[msg])]))
await allFutures(sent)
sent = await allFinished(sent)
checkFutures(sent)

method start*(g: GossipSub) {.async.} =
## start pubsub
Expand Down
5 changes: 2 additions & 3 deletions libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,8 @@ method validate*(p: PubSub, message: Message): Future[bool] {.async, base.} =
# TODO: add timeout to validator
pending.add(p.validators[topic].mapIt(it(topic, message)))

await allFutures(pending)
if pending.allIt(it.read()): # only if all passed
result = true
let futs = await allFinished(pending)
result = futs.allIt(not it.failed and it.read())

proc newPubSub*(p: typedesc[PubSub],
peerInfo: PeerInfo,
Expand Down
15 changes: 12 additions & 3 deletions libp2p/switch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import connection,
protocols/identify,
protocols/pubsub/pubsub,
muxers/muxer,
errors,
peer

logScope:
Expand Down Expand Up @@ -309,11 +310,19 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
proc stop*(s: Switch) {.async.} =
trace "stopping switch"

# we want to report erros but we do not want to fail
# or crash here, cos we need to clean possibly MANY items
# and any following conn/transport won't be cleaned up
var futs = newSeq[Future[void]]()

if s.pubSub.isSome:
await s.pubSub.get().stop()
futs &= s.pubSub.get().stop()

futs &= toSeq(s.connections.values).mapIt(s.cleanupConn(it))
futs &= s.transports.mapIt(it.close())

await allFutures(toSeq(s.connections.values).mapIt(s.cleanupConn(it)))
await allFutures(s.transports.mapIt(it.close()))
futs = await allFinished(futs)
checkFutures(futs)

proc subscribeToPeer(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
## Subscribe to pub sub peer
Expand Down
3 changes: 2 additions & 1 deletion libp2p/transports/tcptransport.nim
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ method close*(t: TcpTransport): Future[void] {.async, gcsafe.} =
t.server.stop()
t.server.close()
await t.server.join()
trace "transport stopped"

trace "transport stopped"

method listen*(t: TcpTransport,
ma: MultiAddress,
Expand Down
6 changes: 4 additions & 2 deletions libp2p/transports/transport.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import sequtils
import chronos, chronicles
import ../connection,
../multiaddress,
../multicodec
../multicodec,
../errors

type
ConnHandler* = proc (conn: Connection): Future[void] {.gcsafe.}
Expand All @@ -33,7 +34,8 @@ proc newTransport*(t: typedesc[Transport]): t {.gcsafe.} =
method close*(t: Transport) {.base, async, gcsafe.} =
## stop and cleanup the transport
## including all outstanding connections
await allFutures(t.connections.mapIt(it.close()))
let futs = await allFinished(t.connections.mapIt(it.close()))
checkFutures(futs)

method listen*(t: Transport,
ma: MultiAddress,
Expand Down