Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
1cd3d23
Initial wip using builder pattern
mpilquist Apr 8, 2025
5e5b39e
Generalize NeedAddress
mpilquist Apr 8, 2025
5e27b71
Dump builder pattern
mpilquist Apr 8, 2025
a86b18d
wip
mpilquist Apr 8, 2025
c06e969
wip
mpilquist Apr 9, 2025
eff4b58
wip
mpilquist Apr 9, 2025
89362b0
wip
mpilquist Apr 10, 2025
fcd9233
wip
mpilquist Apr 10, 2025
5e912ab
Drop JVM socket group implementations
mpilquist Apr 10, 2025
7081b7a
Unix socket tests
mpilquist Apr 10, 2025
dedda55
Unix socket tests
mpilquist Apr 10, 2025
17ade5c
Cleanup Bind
mpilquist Apr 10, 2025
e2fafad
s/Bind/ServerSocket/
mpilquist Apr 10, 2025
fad0e04
Push old localAddress back down to Socket
mpilquist Apr 10, 2025
598d3ed
Native
mpilquist Apr 11, 2025
b649d00
JS tests passing
mpilquist Apr 16, 2025
4a00566
JS duplication reduction
mpilquist Apr 16, 2025
e22041b
Scalafmt
mpilquist Apr 16, 2025
906d8d5
Fix compilation errors
mpilquist Apr 17, 2025
058e616
Implement getLocalAddressGen on JVM
mpilquist Apr 17, 2025
7648b71
Cleanup
mpilquist Apr 18, 2025
01322d5
Progress on fixing addresses
mpilquist Apr 18, 2025
42773ce
Fixed addresses
mpilquist Apr 18, 2025
6de7fbe
Scalafmt
mpilquist Apr 18, 2025
4399a1a
Test unix socket addresses
mpilquist Apr 18, 2025
18eb5d2
IP address tests
mpilquist Apr 18, 2025
1e9703e
Socket option cleanup
mpilquist Apr 18, 2025
f27f191
Socket option cleanup
mpilquist Apr 18, 2025
5285acb
Socket option cleanup
mpilquist Apr 18, 2025
78bc8cc
Deprecate old socket group methods
mpilquist Apr 18, 2025
a0ab96b
Address cleanup
mpilquist Apr 19, 2025
8c28c69
Add address and peerAddress, deprecate localAddress and remoteAddress
mpilquist Apr 20, 2025
6533318
Deprecate Socket#isOpen
mpilquist Apr 21, 2025
e7ca920
Remove some unnecesssary changes from net facade
mpilquist Apr 21, 2025
f4bc9c8
Cleanup in selecting ip sockets provider
mpilquist Apr 21, 2025
896738a
Unify Network implementations
mpilquist Apr 21, 2025
e47817c
Fix 2.12 compilation
mpilquist Apr 21, 2025
f3c5f70
Update to ip4s 3.7.0
mpilquist Apr 21, 2025
0ee5c57
Fix JS 2.12 compilation
mpilquist Apr 21, 2025
0807afb
Mima fixes
mpilquist Apr 21, 2025
91c25f6
Fix native 2.12 warnings
mpilquist Apr 21, 2025
861f7ea
Fix selecting socket address NPE
mpilquist Apr 21, 2025
7395a77
Fix JVM unix sockets test
mpilquist Apr 21, 2025
8a0df13
Fix site docs
mpilquist Apr 21, 2025
47815b2
Deprecate old UnixSockets
mpilquist Apr 21, 2025
5af859e
Set client socket options on JVM Unix
mpilquist Apr 22, 2025
e09f425
Remove explicit DNS lookups from JS IP socket connect & bind
mpilquist Apr 23, 2025
c063029
Scalafmt
mpilquist Apr 23, 2025
6c7f330
Make SO_REUSEPORT lazy loaded
mpilquist May 5, 2025
21af914
Merge branch 'main' into topic/net2
mpilquist May 9, 2025
3df2f3d
Revamped datagram support
mpilquist Jun 4, 2025
9ef0cf7
Remove accidentally added PeerCredentials
mpilquist Jun 4, 2025
72b1197
Fix test compilation
mpilquist Jun 4, 2025
7a83700
Fix mima warnings
mpilquist Jun 4, 2025
4248d68
Scalafmt
mpilquist Jun 4, 2025
e6aeb4d
Fix warnings
mpilquist Jun 4, 2025
e0d78b7
Fix warnings
mpilquist Jun 4, 2025
8542919
Add temp debug to UnixDatagramSuite
mpilquist Jun 5, 2025
dac60c1
Scalafmt
mpilquist Jun 5, 2025
d963953
Debug
mpilquist Jun 5, 2025
73f640d
Debug
mpilquist Jun 5, 2025
e02d9e5
Exclude UnixDatagramSuite from Linux due to bug in jnr-unixsocket
mpilquist Jun 5, 2025
8488321
Bridge deprecated datagram soccket options
mpilquist Jun 6, 2025
645f93f
Docs
mpilquist Jun 9, 2025
4d2924c
Scalafmt
mpilquist Jun 9, 2025
855070e
Scalafmt
mpilquist Jun 9, 2025
c4c9ee3
Add support for ip4s NetworkInterface
mpilquist Jun 22, 2025
562cbc5
Mima
mpilquist Jun 22, 2025
e8721b0
Bump to ip4s 3.8.0-RC1
mpilquist Jun 26, 2025
05e5e99
Bump to ip4s 3.8.0-RC1
mpilquist Jun 26, 2025
e6a8728
Merge branch 'main' into topic/net2
mpilquist Jun 26, 2025
426ceaa
Merge branch 'main' into topic/net2
mpilquist Jul 11, 2025
80642f9
Merge branch 'main' into topic/net2
mpilquist Aug 4, 2025
0bf4faf
Merge branch 'main' into topic/net2
mpilquist Aug 25, 2025
1153afa
Merge branch 'main' into topic/net2
mpilquist Aug 25, 2025
6e49f13
Scalafmt
mpilquist Aug 25, 2025
02d73e5
Downgrade GHA runner for macos to fix multicast tests
mpilquist Aug 25, 2025
b05bd18
Merge branch 'main' into topic/net2
mpilquist Aug 26, 2025
de6e955
Address deprecation warnings
mpilquist Aug 26, 2025
cf74c2d
Merge branch 'main' into topic/net2
mpilquist Sep 1, 2025
fe722ce
Update fromKeyStoreFile to take a Path and improve error message from…
mpilquist Sep 2, 2025
17f6852
Fix 2.12 compilation
mpilquist Sep 2, 2025
01f139d
Fix spinloop bug in TLSEngine
mpilquist Sep 3, 2025
57ab186
Drop .only tag
mpilquist Sep 3, 2025
ea03a6e
Change byte limit in test
mpilquist Sep 3, 2025
1d91556
Change explicit intercept to attempt to handle behavior differences o…
mpilquist Sep 3, 2025
fad230e
Rewrote test to be an example of a client that only sends partial han…
mpilquist Sep 4, 2025
39f1318
Merge pull request #3599 from typelevel/topic/fix-spinloop-in-tlsengine
mpilquist Sep 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ lazy val io = crossProject(JVMPlatform, JSPlatform, NativePlatform)
.settings(
name := "fs2-io",
tlVersionIntroduced ~= { _.updated("3", "3.1.0") },
libraryDependencies += "com.comcast" %%% "ip4s-core" % "3.6.0",
libraryDependencies += "com.comcast" %%% "ip4s-core" % "3.6.0-91-51bd018-SNAPSHOT",
tlJdkRelease := None
)
.jvmSettings(
Expand Down
136 changes: 8 additions & 128 deletions io/jvm-native/src/main/scala/fs2/io/net/SocketGroupPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,140 +23,20 @@ package fs2
package io
package net

import java.net.InetSocketAddress
import java.nio.channels.{
AsynchronousCloseException,
AsynchronousServerSocketChannel,
AsynchronousSocketChannel,
CompletionHandler
}
import java.nio.channels.AsynchronousChannelGroup
import cats.syntax.all._
import cats.effect.kernel.{Async, Resource}
import com.comcast.ip4s.{Dns, Host, IpAddress, Port, SocketAddress}
import com.comcast.ip4s.{Host, IpAddress, Ipv4Address, Port, SocketAddress}

private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type =>
private[fs2] def unsafe[F[_]: Async: Dns](
channelGroup: AsynchronousChannelGroup
): SocketGroup[F] =
new AsyncSocketGroup[F](channelGroup)

private final class AsyncSocketGroup[F[_]: Async: Dns](channelGroup: AsynchronousChannelGroup)
extends AbstractAsyncSocketGroup[F] {

def client(
to: SocketAddress[Host],
options: List[SocketOption]
): Resource[F, Socket[F]] = {
def setup: Resource[F, AsynchronousSocketChannel] =
Resource
.make(
Async[F].delay(
AsynchronousSocketChannel.open(channelGroup)
)
)(ch => Async[F].delay(if (ch.isOpen) ch.close else ()))
.evalTap(ch => Async[F].delay(options.foreach(opt => ch.setOption(opt.key, opt.value))))

def connect(ch: AsynchronousSocketChannel): F[AsynchronousSocketChannel] =
to.resolve[F].flatMap { ip =>
Async[F].async[AsynchronousSocketChannel] { cb =>
Async[F]
.delay {
ch.connect(
ip.toInetSocketAddress,
null,
new CompletionHandler[Void, Void] {
def completed(result: Void, attachment: Void): Unit =
cb(Right(ch))
def failed(rsn: Throwable, attachment: Void): Unit =
cb(Left(rsn))
}
)
}
.as(Some(Async[F].delay(ch.close())))
}
}

setup.evalMap(ch => connect(ch) *> Socket.forAsync(ch))
}

def serverResource(
address: Option[Host],
port: Option[Port],
options: List[SocketOption]
): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] = {
def fromIpSockets[F[_]: Async](ipSockets: IpSocketsProvider[F]): SocketGroup[F] = new SocketGroup[F] {
def client(to: SocketAddress[Host], options: List[SocketOption]) =
ipSockets.connect(to, options)

val setup: Resource[F, AsynchronousServerSocketChannel] =
Resource.eval(address.traverse(_.resolve[F])).flatMap { addr =>
Resource
.make(
Async[F].delay(
AsynchronousServerSocketChannel.open(channelGroup)
)
)(sch => Async[F].delay(if (sch.isOpen) sch.close()))
.evalTap(ch =>
Async[F].delay(
ch.bind(
new InetSocketAddress(
addr.map(_.toInetAddress).orNull,
port.map(_.value).getOrElse(0)
)
)
)
)
}
def server(address: Option[Host], port: Option[Port], options: List[SocketOption]): Stream[F, Socket[F]] =
Stream.resource(serverResource(address, port, options)).flatMap(_._2)

def acceptIncoming(
sch: AsynchronousServerSocketChannel
): Stream[F, Socket[F]] = {
def go: Stream[F, Socket[F]] = {
def acceptChannel = Resource.makeFull[F, AsynchronousSocketChannel] { poll =>
poll {
Async[F].async[AsynchronousSocketChannel] { cb =>
Async[F]
.delay {
sch.accept(
null,
new CompletionHandler[AsynchronousSocketChannel, Void] {
def completed(ch: AsynchronousSocketChannel, attachment: Void): Unit =
cb(Right(ch))
def failed(rsn: Throwable, attachment: Void): Unit =
cb(Left(rsn))
}
)
}
.as(Some(Async[F].delay(sch.close())))
}
}
}(ch => Async[F].delay(if (ch.isOpen) ch.close else ()))

def setOpts(ch: AsynchronousSocketChannel) =
Async[F].delay {
options.foreach(o => ch.setOption(o.key, o.value))
}

Stream.resource(acceptChannel.attempt).flatMap {
case Left(_) => Stream.empty[F]
case Right(accepted) => Stream.eval(setOpts(accepted) *> Socket.forAsync(accepted))
} ++ go
}

go.handleErrorWith {
case err: AsynchronousCloseException =>
Stream.eval(Async[F].delay(sch.isOpen)).flatMap { isOpen =>
if (isOpen) Stream.raiseError[F](err)
else Stream.empty
}
case err => Stream.raiseError[F](err)
}
}

setup.map { sch =>
val jLocalAddress = sch.getLocalAddress.asInstanceOf[java.net.InetSocketAddress]
val localAddress = SocketAddress.fromInetSocketAddress(jLocalAddress)
(localAddress, acceptIncoming(sch))
}
}
def serverResource(address: Option[Host], port: Option[Port], options: List[SocketOption]): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] =
ipSockets.bind(SocketAddress(address.getOrElse(Ipv4Address.Wildcard), port.getOrElse(Port.Wildcard)), options).evalMap(b => b.socketInfo.localAddress.tupleRight(b.accept))
}

}
83 changes: 83 additions & 0 deletions io/jvm-native/src/main/scala/fs2/io/net/SocketInfoPlatform.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package fs2
package io
package net

import com.comcast.ip4s.{GenSocketAddress, IpAddress, SocketAddress}
import cats.effect.Async

import java.net.InetSocketAddress
import java.nio.channels.NetworkChannel

import scala.jdk.CollectionConverters.*

private[net] trait SocketInfoCompanionPlatform {
private[net] def forAsync[F[_]](ch: NetworkChannel)(implicit F: Async[F]): SocketInfo[F] =
new AsyncSocketInfo[F] {
def asyncInstance = F
def channel = ch
}

private[net] trait AsyncSocketInfo[F[_]] extends SocketInfo[F] {

implicit protected def asyncInstance: Async[F]
protected def channel: NetworkChannel

override def localAddress: F[SocketAddress[IpAddress]] =
asyncInstance.delay(
SocketAddress.fromInetSocketAddress(
channel.getLocalAddress.asInstanceOf[InetSocketAddress]
)
)

override def localAddressGen: F[GenSocketAddress] =
asyncInstance.delay(
channel.getLocalAddress match {
case addr: InetSocketAddress => SocketAddress.fromInetSocketAddress(addr)
// TODO handle unix sockets
}
)

override def supportedOptions: F[Set[SocketOption.Key[_]]] =
asyncInstance.delay {
channel.supportedOptions.asScala.toSet
}

override def getOption[A](key: SocketOption.Key[A]): F[Option[A]] =
asyncInstance.delay {
try {
Some(channel.getOption(key))
} catch {
case _: UnsupportedOperationException => None
}
}

override def setOption[A](key: SocketOption.Key[A], value: A): F[Unit] =
asyncInstance.delay {
channel.setOption(key, value)
()
}
}

}

17 changes: 17 additions & 0 deletions io/jvm-native/src/main/scala/fs2/io/net/SocketOptionPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,21 @@ private[net] trait SocketOptionCompanionPlatform {

def noDelay(value: Boolean): SocketOption =
boolean(StandardSocketOptions.TCP_NODELAY, value)

val UnixServerSocketDeleteIfExists: Key[JBoolean] = new Key[JBoolean] {
def name() = "FS2_UNIX_DELETE_IF_EXISTS"
def `type`() = classOf[JBoolean]
}

def unixServerSocketDeleteIfExists(value: JBoolean): SocketOption =
boolean(UnixServerSocketDeleteIfExists, value)

val UnixServerSocketDeleteOnClose: Key[JBoolean] = new Key[JBoolean] {
def name() = "FS2_UNIX_DELETE_ON_CLOSE"
def `type`() = classOf[JBoolean]
}

def unixServerSocketDeleteOnClose(value: Boolean): SocketOption =
boolean(UnixServerSocketDeleteOnClose, value)

}
16 changes: 7 additions & 9 deletions io/jvm-native/src/main/scala/fs2/io/net/SocketPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ package fs2
package io
package net

import com.comcast.ip4s.{IpAddress, SocketAddress}
import com.comcast.ip4s.{GenSocketAddress, IpAddress, SocketAddress}
import cats.effect.Async
import cats.effect.std.Mutex
import cats.syntax.all._
Expand Down Expand Up @@ -108,7 +108,10 @@ private[net] trait SocketCompanionPlatform {
readMutex: Mutex[F],
writeMutex: Mutex[F]
)(implicit F: Async[F])
extends BufferedReads[F](readMutex) {
extends BufferedReads[F](readMutex) with SocketInfo.AsyncSocketInfo[F] {

protected def asyncInstance = F
protected def channel = ch

protected def readChunk(buffer: ByteBuffer): F[Int] =
F.async[Int] { cb =>
Expand Down Expand Up @@ -139,20 +142,15 @@ private[net] trait SocketCompanionPlatform {
}
}

def localAddress: F[SocketAddress[IpAddress]] =
F.delay(
SocketAddress.fromInetSocketAddress(
ch.getLocalAddress.asInstanceOf[InetSocketAddress]
)
)

def remoteAddress: F[SocketAddress[IpAddress]] =
F.delay(
SocketAddress.fromInetSocketAddress(
ch.getRemoteAddress.asInstanceOf[InetSocketAddress]
)
)

override def remoteAddressGen: F[GenSocketAddress] = ???

def isOpen: F[Boolean] = F.delay(ch.isOpen)

def endOfOutput: F[Unit] =
Expand Down
Loading
Loading