5
5
import dev .waterdog .waterdogpe .network .serverinfo .ServerInfoType ;
6
6
import dev .waterdog .waterdogpe .player .ProxiedPlayer ;
7
7
import io .netty .bootstrap .Bootstrap ;
8
- import io .netty .channel .ChannelFuture ;
9
- import io .netty .channel .EventLoop ;
10
- import io .netty .channel .EventLoopGroup ;
8
+ import io .netty .channel .*;
11
9
import io .netty .channel .epoll .Epoll ;
12
10
import io .netty .channel .epoll .EpollEventLoopGroup ;
13
11
import io .netty .channel .epoll .EpollSocketChannel ;
14
12
import io .netty .channel .nio .NioEventLoopGroup ;
15
13
import io .netty .channel .socket .SocketChannel ;
16
14
import io .netty .channel .socket .nio .NioSocketChannel ;
15
+ import io .netty .handler .ssl .util .InsecureTrustManagerFactory ;
16
+ import io .netty .incubator .codec .quic .*;
17
17
import io .netty .util .concurrent .Future ;
18
18
import io .netty .util .concurrent .Promise ;
19
19
import net .jodah .expiringmap .internal .NamedThreadFactory ;
20
20
import org .nethergames .proxytransport .impl .TransportChannelInitializer ;
21
21
22
22
import java .net .InetSocketAddress ;
23
+ import java .util .HashMap ;
23
24
import java .util .concurrent .ThreadFactory ;
25
+ import java .util .concurrent .TimeUnit ;
24
26
25
27
public class CustomTransportServerInfo extends ServerInfo {
26
28
public static final int availableCPU = Runtime .getRuntime ().availableProcessors ();
27
- public static final ThreadFactory downstreamThreadFactory = new NamedThreadFactory ("TCP -Downstream %s" );
29
+ public static final ThreadFactory downstreamThreadFactory = new NamedThreadFactory ("QUIC -Downstream %s" );
28
30
public static final EventLoopGroup downstreamLoopGroup = Epoll .isAvailable () ? new EpollEventLoopGroup (availableCPU , downstreamThreadFactory ) : new NioEventLoopGroup (availableCPU , downstreamThreadFactory );
29
31
30
- public static final String TYPE_IDENT = "tcp " ;
32
+ public static final String TYPE_IDENT = "quic " ;
31
33
public static final ServerInfoType TYPE = ServerInfoType .builder ()
32
34
.identifier (TYPE_IDENT )
33
35
.serverInfoFactory (CustomTransportServerInfo ::new )
34
36
.register ();
35
37
38
+ private final HashMap <InetSocketAddress , QuicChannel > serverConnections = new HashMap <>();
39
+
36
40
public CustomTransportServerInfo (String serverName , InetSocketAddress address , InetSocketAddress publicAddress ) {
37
41
super (serverName , address , publicAddress );
38
42
}
@@ -44,19 +48,83 @@ public ServerInfoType getServerType() {
44
48
45
49
@ Override
46
50
public Future <ClientConnection > createConnection (ProxiedPlayer proxiedPlayer ) {
51
+ proxiedPlayer .getLogger ().info ("Creating connection to " + this .getAddress ());
47
52
EventLoop eventLoop = proxiedPlayer .getProxy ().getWorkerEventLoopGroup ().next ();
48
53
Promise <ClientConnection > promise = eventLoop .newPromise ();
54
+
55
+ this .createServerConnection (eventLoop , this .getAddress ()).addListener ((Future <QuicChannel > future ) -> {
56
+ proxiedPlayer .getLogger ().info ("Created server connection to " + this .getAddress ());
57
+ if (future .isSuccess ()) {
58
+ QuicChannel quicChannel = future .getNow ();
59
+
60
+ quicChannel .createStream (QuicStreamType .BIDIRECTIONAL , new TransportChannelInitializer (proxiedPlayer , this , promise )).addListener ((Future <QuicStreamChannel > streamFuture ) -> {
61
+ if (!streamFuture .isSuccess ()) {
62
+ promise .tryFailure (streamFuture .cause ());
63
+ quicChannel .close ();
64
+ }
65
+ });
66
+ } else {
67
+ promise .tryFailure (future .cause ());
68
+ }
69
+ });
70
+
71
+ return promise ;
72
+ }
73
+
74
+ private Future <QuicChannel > createServerConnection (EventLoopGroup eventLoopGroup , InetSocketAddress address ) {
75
+ EventLoop eventLoop = eventLoopGroup .next ();
76
+
77
+ if (serverConnections .containsKey (address )) {
78
+ return eventLoop .newSucceededFuture (serverConnections .get (address ));
79
+ }
80
+
81
+ Promise <QuicChannel > promise = eventLoop .newPromise ();
82
+
83
+ QuicSslContext sslContext = QuicSslContextBuilder .forClient ().trustManager (InsecureTrustManagerFactory .INSTANCE ).applicationProtocols ("ng" ).build ();
84
+ System .out .println ("1 Creating server connection to " + address );
85
+ ChannelHandler codec = new QuicClientCodecBuilder ()
86
+ .sslContext (sslContext )
87
+ .maxIdleTimeout (5000 , TimeUnit .MILLISECONDS )
88
+ .initialMaxData (10000000 )
89
+ .initialMaxStreamDataBidirectionalLocal (1000000 )
90
+ .build ();
91
+
92
+ System .out .println ("4 Creating server connection to " + address );
49
93
50
94
new Bootstrap ()
51
95
.group (downstreamLoopGroup )
52
- .handler (new TransportChannelInitializer (proxiedPlayer , this , promise ))
53
- .localAddress (new InetSocketAddress ("0.0.0.0" , 0 ))
96
+ .handler (codec )
54
97
.channel (getProperSocketChannel ())
55
- .remoteAddress (this .getAddress ())
56
- .connect ().addListener ((ChannelFuture future ) -> {
57
- if (!future .isSuccess ()) {
58
- promise .tryFailure (future .cause ());
59
- future .channel ().close ();
98
+ .remoteAddress (address )
99
+ .bind (0 ).addListener ((ChannelFuture channelFuture ) -> {
100
+ if (channelFuture .isSuccess ()) {
101
+ System .out .println ("Connected to " + address );
102
+ QuicChannel .newBootstrap (channelFuture .channel ())
103
+ .streamHandler (new ChannelInboundHandlerAdapter () {
104
+ @ Override
105
+ public void channelActive (ChannelHandlerContext ctx ) {
106
+ ctx .close ();
107
+ }
108
+ })
109
+ .remoteAddress (address )
110
+ .connect ().addListener ((Future <QuicChannel > quicChannelFuture ) -> {
111
+ if (quicChannelFuture .isSuccess ()) {
112
+ System .out .println ("Connected to " + address + " via QUIC" );
113
+ QuicChannel quicChannel = quicChannelFuture .getNow ();
114
+ quicChannel .closeFuture ().addListener (f -> serverConnections .remove (address ));
115
+ serverConnections .put (address , quicChannel );
116
+
117
+ promise .trySuccess (quicChannel );
118
+ } else {
119
+ System .out .println ("Failed to connect to " + address + " via QUIC" );
120
+ promise .tryFailure (quicChannelFuture .cause ());
121
+ channelFuture .channel ().close ();
122
+ }
123
+ });
124
+ } else {
125
+ System .out .println ("Failed to connect to " + address );
126
+ promise .tryFailure (channelFuture .cause ());
127
+ channelFuture .channel ().close ();
60
128
}
61
129
});
62
130
0 commit comments