Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions io/src/main/scala/fs2/io/net/SocketGroup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ trait SocketGroup[F[_]] {
* client sockets -- one per client that connects to the bound address/port.
*
* When the stream terminates, all open connections will terminate as well.
* Because of this, make sure to handle errors in the client socket Streams.
*
* @param address address to accept connections from; none for all interfaces
* @param port port to bind
Expand All @@ -71,6 +72,8 @@ trait SocketGroup[F[_]] {
): Stream[F, Socket[F]]

/** Like [[server]] but provides the `SocketAddress` of the bound server socket before providing accepted sockets.
*
* Make sure to handle errors in the client socket Streams.
*/
def serverResource(
address: Option[Host] = None,
Expand Down
3 changes: 3 additions & 0 deletions site/io.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def echoServer[F[_]: Concurrent: Network]: F[Unit] =
.interleave(Stream.constant("\n"))
.through(text.utf8Encode)
.through(client.writes)
.handleErrorWith(_ => Stream.empty) // handle errors of client sockets
}.parJoin(100).compile.drain
```

Expand All @@ -152,6 +153,8 @@ we read from the client socket, UTF-8 decode the received bytes, extract individ

Since we mapped over the infinite client stream, we end up with a `Stream[F, Stream[F, Unit]]`. We flatten this to a single `Stream[F, Unit]` via `parJoin(100)`, which runs up to 100 of the inner streams concurrently. As inner streams finish, new inner streams are pulled from the source. Hence, `parJoin` is controlling the maximum number of concurrent client requests our server processes.

In joining all these streams together, be prudent to handle errors in the client streams.

The pattern of `Network[F].server(address).map(handleClient).parJoin(maxConcurrentClients)` is very common when working with server sockets.

A simpler echo server could be implemented with this core logic:
Expand Down