Skip to content

Commit

Permalink
Tunnel: newtunnelconnection
Browse files Browse the repository at this point in the history
Creates a tunneled connection with a pool key that `aquire` can find on subsequent requests if the connection is kept alive
  • Loading branch information
gustafsson committed Jan 9, 2024
1 parent e3ab46b commit 044874a
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 70 deletions.
25 changes: 0 additions & 25 deletions src/Connections.jl
Original file line number Diff line number Diff line change
Expand Up @@ -610,31 +610,6 @@ function sslconnection(::Type{SSLContext}, tcp::TCPSocket, host::AbstractString;
return io
end

function sslupgrade(::Type{IOType}, c::Connection{T},
host::AbstractString;
pool::Union{Nothing, Pool}=nothing,
require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"),
keepalive::Bool=true,
readtimeout::Int=0,
kw...)::Connection{IOType} where {T, IOType}
# initiate the upgrade to SSL
# if the upgrade fails, an error will be thrown and the original c will be closed
# in ConnectionRequest
tls = if readtimeout > 0
try_with_timeout(readtimeout) do _
sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, keepalive=keepalive, kw...)
end
else
sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, keepalive=keepalive, kw...)
end
# success, now we turn it into a new Connection
conn = Connection(host, "", 0, require_ssl_verification, keepalive, tls)
# release the "old" one, but don't return the connection since we're hijacking the socket
release(getpool(pool, T), connectionkey(c))
# and return the new one
return acquire(() -> conn, getpool(pool, IOType), connectionkey(conn); forcenew=true)
end

function Base.show(io::IO, c::Connection)
nwaiting = applicable(tcpsocket, c.io) ? bytesavailable(tcpsocket(c.io)) : 0
print(
Expand Down
1 change: 1 addition & 0 deletions src/HTTP.jl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ include("Connections.jl") ;using .Connections
const ConnectionPool = Connections
include("StatusCodes.jl") ;using .StatusCodes
include("Messages.jl") ;using .Messages
include("Tunnel.jl") ;using .Tunnel
include("cookies.jl") ;using .Cookies
include("Streams.jl") ;using .Streams

Expand Down
103 changes: 103 additions & 0 deletions src/Tunnel.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
module Tunnel

export newtunnelconnection

using Sockets, LoggingExtras, NetworkOptions, URIs
using ConcurrentUtilities: acquire, try_with_timeout

using ..Connections, ..Messages, ..Exceptions
using ..Connections: connection_limit_warning, getpool, getconnection, sslconnection, connectionkey, connection_isvalid

function newtunnelconnection(;
target_type::Type{<:IO},
target_host::AbstractString,
target_port::AbstractString,
proxy_type::Type{<:IO},
proxy_host::AbstractString,
proxy_port::AbstractString,
proxy_auth::AbstractString="",
pool::Union{Nothing, Pool}=nothing,
connection_limit=nothing,
forcenew::Bool=false,
idle_timeout=typemax(Int),
connect_timeout::Int=30,
readtimeout::Int=30,
keepalive::Bool=true,
kw...)
connection_limit_warning(connection_limit)

if isempty(target_port)
target_port = istcptype(target_type) ? "80" : "443"
end

require_ssl_verification = get(kw, :require_ssl_verification, NetworkOptions.verify_host(target_host, "SSL"))
host_key = proxy_host * "/" * target_host
port_key = proxy_port * "/" * target_port
key = (host_key, port_key, require_ssl_verification, keepalive, true)

return acquire(
getpool(pool, target_type),
key;
forcenew=forcenew,
isvalid=c->connection_isvalid(c, Int(idle_timeout))) do

conn = Connection(host_key, port_key, idle_timeout, require_ssl_verification, keepalive,
try_with_timeout0(connect_timeout) do _
getconnection(proxy_type, proxy_host, proxy_port; keepalive, kw...)
end
)
try
try_with_timeout0(readtimeout) do _
connect_tunnel(conn, target_host, target_port, proxy_auth)
end

if !istcptype(target_type)
tls = try_with_timeout0(readtimeout) do _
sslconnection(target_type, conn.io, target_host; keepalive, kw...)
end

# success, now we turn it into a new Connection
conn = Connection(host_key, port_key, idle_timeout, require_ssl_verification, keepalive, tls)
end

@assert connectionkey(conn) === key

conn
catch ex
close(conn)
rethrow()
end
end
end

function connect_tunnel(io, target_host, target_port, proxy_auth)
target = "$(URIs.hoststring(target_host)):$(target_port)"
@debugv 1 "📡 CONNECT HTTPS tunnel to $target"
headers = Dict("Host" => target)
if (!isempty(proxy_auth))
headers["Proxy-Authorization"] = proxy_auth
end
request = Request("CONNECT", target, headers)
# @debugv 2 "connect_tunnel: writing headers"
writeheaders(io, request)
# @debugv 2 "connect_tunnel: reading headers"
readheaders(io, request.response)
# @debugv 2 "connect_tunnel: done reading headers"
if request.response.status != 200
throw(StatusError(request.response.status,
request.method, request.target, request.response))
end
end

function try_with_timeout0(f, timeout, ::Type{T}=Any) where {T}
if timeout > 0
try_with_timeout(f, timeout, T)
else
f(Ref(false))
end
end

istcptype(::Type{TCPSocket}) = true
istcptype(::Type{<:IO}) = false

end # module Tunnel
68 changes: 25 additions & 43 deletions src/clientlayers/ConnectionRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module ConnectionRequest
using URIs, Sockets, Base64, LoggingExtras, ConcurrentUtilities, ExceptionUnwrapping
using MbedTLS: SSLContext, SSLConfig
using OpenSSL: SSLStream
using ..Messages, ..IOExtras, ..Connections, ..Streams, ..Exceptions
using ..Messages, ..IOExtras, ..Connections, ..Streams, ..Exceptions, ..Tunnel
import ..SOCKET_TYPE_TLS

islocalhost(host::AbstractString) = host == "localhost" || host == "127.0.0.1" || host == "::1" || host == "0000:0000:0000:0000:0000:0000:0000:0001" || host == "0:0:0:0:0:0:0:1"
Expand Down Expand Up @@ -77,8 +77,31 @@ function connectionlayer(handler)
IOType = sockettype(url, socket_type, socket_type_tls)
start_time = time()
try
io = newconnection(IOType, url.host, url.port; readtimeout=readtimeout, connect_timeout=connect_timeout, kw...)
if !isnothing(proxy) && req.url.scheme in ("https", "wss", "ws")
target_IOType = sockettype(target_url, socket_type, socket_type_tls)

io = newtunnelconnection(;
target_type=target_IOType,
target_host=target_url.host,
target_port=target_url.port,
proxy_type=IOType,
proxy_host=url.host,
proxy_port=url.port,
proxy_auth=header(req, "Proxy-Authorization"),
connect_timeout,
readtimeout,
kw...
)

req.headers = filter(x->x.first != "Proxy-Authorization", req.headers)
else
io = newconnection(IOType, url.host, url.port; readtimeout=readtimeout, connect_timeout=connect_timeout, kw...)
end
catch e
if e isa StatusError
return e.response
end

if logerrors
msg = current_exceptions_to_string()
@error msg type=Symbol("HTTP.ConnectError") method=req.method url=req.url context=req.context logtag=logtag
Expand All @@ -91,31 +114,6 @@ function connectionlayer(handler)

shouldreuse = !(target_url.scheme in ("ws", "wss"))
try
if proxy !== nothing && target_url.scheme in ("https", "wss", "ws")
shouldreuse = false
# tunnel request
if target_url.scheme in ("https", "wss")
target_url = URI(target_url, port=443)
elseif target_url.scheme in ("ws", ) && target_url.port == ""
target_url = URI(target_url, port=80) # if there is no port info, connect_tunnel will fail
end
r = if readtimeout > 0
try_with_timeout(readtimeout) do _
connect_tunnel(io, target_url, req)
end
else
connect_tunnel(io, target_url, req)
end
if r.status != 200
close(io)
return r
end
if target_url.scheme in ("https", "wss")
io = Connections.sslupgrade(socket_type_tls, io, target_url.host; readtimeout=readtimeout, kw...)
end
req.headers = filter(x->x.first != "Proxy-Authorization", req.headers)
end

stream = Stream(req.response, io)
return handler(stream; readtimeout=readtimeout, logerrors=logerrors, logtag=logtag, kw...)
catch e
Expand Down Expand Up @@ -153,20 +151,4 @@ end

sockettype(url::URI, tcp, tls) = url.scheme in ("wss", "https") ? tls : tcp

function connect_tunnel(io, target_url, req)
target = "$(URIs.hoststring(target_url.host)):$(target_url.port)"
@debugv 1 "📡 CONNECT HTTPS tunnel to $target"
headers = Dict("Host" => target)
if (auth = header(req, "Proxy-Authorization"); !isempty(auth))
headers["Proxy-Authorization"] = auth
end
request = Request("CONNECT", target, headers)
# @debugv 2 "connect_tunnel: writing headers"
writeheaders(io, request)
# @debugv 2 "connect_tunnel: reading headers"
readheaders(io, request.response)
# @debugv 2 "connect_tunnel: done reading headers"
return request.response
end

end # module ConnectionRequest
4 changes: 2 additions & 2 deletions test/pool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,14 @@ end
@testset "Only one tunnel should be established with sequential requests" begin
https_request_ip_through_proxy()
https_request_ip_through_proxy()
@test_broken connectcount == 1
@test connectcount == 1
end

@testset "parallell tunnels should be established with parallell requests" begin
n_asyncgetters = 3
asyncgetters = [@async https_request_ip_through_proxy() for _ in 1:n_asyncgetters]
wait.(asyncgetters)
@test_broken connectcount == n_asyncgetters
@test connectcount == n_asyncgetters
end

finally
Expand Down

0 comments on commit 044874a

Please sign in to comment.