Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance http outbound to work with https and http/2 upstreams #2488

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/proxyman/outbound/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Conn
conn := net.NewConnection(net.ConnectionInputMulti(uplinkWriter), net.ConnectionOutputMulti(downlinkReader))

if config := tls.ConfigFromStreamSettings(h.streamSettings); config != nil {
tlsConfig := config.GetTLSConfig(tls.WithDestination(dest), tls.WithNextProto("h2"))
tlsConfig := config.GetTLSConfig(tls.WithDestination(dest))
conn = tls.Client(conn, tlsConfig)
}

Expand Down
16 changes: 9 additions & 7 deletions infra/conf/transport_internet.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,13 @@ func (c *TLSCertConfig) Build() (*tls.Certificate, error) {
}

type TLSConfig struct {
Insecure bool `json:"allowInsecure"`
InsecureCiphers bool `json:"allowInsecureCiphers"`
Certs []*TLSCertConfig `json:"certificates"`
ServerName string `json:"serverName"`
ALPN *StringList `json:"alpn"`
DiableSystemRoot bool `json:"disableSystemRoot"`
Insecure bool `json:"allowInsecure"`
InsecureCiphers bool `json:"allowInsecureCiphers"`
Certs []*TLSCertConfig `json:"certificates"`
ServerName string `json:"serverName"`
ALPN *StringList `json:"alpn"`
DisableSessionResumption bool `json:"disableSessionResumption"`
DisableSystemRoot bool `json:"disableSystemRoot"`
}

// Build implements Buildable.
Expand All @@ -302,7 +303,8 @@ func (c *TLSConfig) Build() (proto.Message, error) {
if c.ALPN != nil && len(*c.ALPN) > 0 {
config.NextProtocol = []string(*c.ALPN)
}
config.DisableSystemRoot = c.DiableSystemRoot
config.DisableSessionResumption = c.DisableSessionResumption
config.DisableSystemRoot = c.DisableSystemRoot
return config, nil
}

Expand Down
208 changes: 176 additions & 32 deletions proxy/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@
package http

import (
"bufio"
"context"
"encoding/base64"
"io"
"strings"
"net/http"
"net/url"
"sync"

"golang.org/x/net/http2"

"v2ray.com/core"
"v2ray.com/core/common"
Expand All @@ -20,13 +25,24 @@ import (
"v2ray.com/core/features/policy"
"v2ray.com/core/transport"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/tls"
)

type Client struct {
serverPicker protocol.ServerPicker
policyManager policy.Manager
}

type h2Conn struct {
rawConn net.Conn
h2Conn *http2.ClientConn
}

var (
cachedH2Mutex sync.Mutex
cachedH2Conns map[net.Destination]h2Conn
)

// NewClient create a new http client based on the given config.
func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) {
serverList := protocol.NewServerList()
Expand Down Expand Up @@ -54,25 +70,26 @@ func (c *Client) Process(ctx context.Context, link *transport.Link, dialer inter
if outbound == nil || !outbound.Target.IsValid() {
return newError("target not specified.")
}
destination := outbound.Target
target := outbound.Target

if destination.Network == net.Network_UDP {
if target.Network == net.Network_UDP {
return newError("UDP is not supported by HTTP outbound")
}

var server *protocol.ServerSpec
var user *protocol.MemoryUser
var conn internet.Connection

if err := retry.ExponentialBackoff(5, 100).On(func() error {
server = c.serverPicker.PickServer()
server := c.serverPicker.PickServer()
dest := server.Destination()
rawConn, err := dialer.Dial(ctx, dest)
if err != nil {
return err
}
conn = rawConn
user = server.PickUser()
targetAddr := target.NetAddr()

return nil
netConn, err := setUpHttpTunnel(ctx, dest, targetAddr, user, dialer)
if netConn != nil {
conn = internet.Connection(netConn)
}
return err
}); err != nil {
return newError("failed to find an available destination").Base(err)
}
Expand All @@ -84,16 +101,10 @@ func (c *Client) Process(ctx context.Context, link *transport.Link, dialer inter
}()

p := c.policyManager.ForLevel(0)

user := server.PickUser()
if user != nil {
p = c.policyManager.ForLevel(user.Level)
}

if err := setUpHttpTunnel(conn, conn, &destination, user); err != nil {
return err
}

ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, p.Timeouts.ConnectionIdle)

Expand All @@ -115,30 +126,163 @@ func (c *Client) Process(ctx context.Context, link *transport.Link, dialer inter
}

// setUpHttpTunnel will create a socket tunnel via HTTP CONNECT method
func setUpHttpTunnel(reader io.Reader, writer io.Writer, destination *net.Destination, user *protocol.MemoryUser) error {
var headers []string
destNetAddr := destination.NetAddr()
headers = append(headers, "CONNECT "+destNetAddr+" HTTP/1.1")
headers = append(headers, "Host: "+destNetAddr)
func setUpHttpTunnel(ctx context.Context, dest net.Destination, target string, user *protocol.MemoryUser, dialer internet.Dialer) (net.Conn, error) {
req := (&http.Request{
Method: "CONNECT",
URL: &url.URL{Host: target},
Header: make(http.Header),
Host: target,
}).WithContext(ctx)

if user != nil && user.Account != nil {
account := user.Account.(*Account)
auth := account.GetUsername() + ":" + account.GetPassword()
headers = append(headers, "Proxy-Authorization: Basic "+base64.StdEncoding.EncodeToString([]byte(auth)))
req.Header.Set("Proxy-Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(auth)))
}
headers = append(headers, "Proxy-Connection: Keep-Alive")

b := buf.New()
b.WriteString(strings.Join(headers, "\r\n") + "\r\n\r\n")
if err := buf.WriteAllBytes(writer, b.Bytes()); err != nil {
return err
connectHttp1 := func(rawConn net.Conn) (net.Conn, error) {
req.Proto = "HTTP/1.1"
req.ProtoMajor = 1
req.ProtoMinor = 1

err := req.Write(rawConn)
if err != nil {
rawConn.Close()
return nil, err
}

resp, err := http.ReadResponse(bufio.NewReader(rawConn), req)
if err != nil {
rawConn.Close()
return nil, err
}

if resp.StatusCode != http.StatusOK {
rawConn.Close()
return nil, newError("Proxy responded with non 200 code: " + resp.Status)
}
return rawConn, nil
}

b.Clear()
if _, err := b.ReadFrom(reader); err != nil {
return err
connectHttp2 := func(rawConn net.Conn, h2clientConn *http2.ClientConn) (net.Conn, error) {
req.Proto = "HTTP/2.0"
req.ProtoMajor = 2
req.ProtoMinor = 0
pr, pw := io.Pipe()
req.Body = pr

resp, err := h2clientConn.RoundTrip(req)
if err != nil {
rawConn.Close()
return nil, err
}

if resp.StatusCode != http.StatusOK {
rawConn.Close()
return nil, newError("Proxy responded with non 200 code: " + resp.Status)
}
return newHttp2Conn(rawConn, pw, resp.Body), nil
}

return nil
cachedH2Mutex.Lock()
defer cachedH2Mutex.Unlock()

if cachedConn, found := cachedH2Conns[dest]; found {
if cachedConn.rawConn != nil && cachedConn.h2Conn != nil {
rc := cachedConn.rawConn
cc := cachedConn.h2Conn
if cc.CanTakeNewRequest() {
proxyConn, err := connectHttp2(rc, cc)
if err != nil {
return nil, err
}

return proxyConn, nil
}
}
}

rawConn, err := dialer.Dial(ctx, dest)
if err != nil {
return nil, err
}

nextProto := ""
if tlsConn, ok := rawConn.(*tls.Conn); ok {
if err := tlsConn.Handshake(); err != nil {
rawConn.Close()
return nil, err
}
nextProto = tlsConn.ConnectionState().NegotiatedProtocol
}

switch nextProto {
case "":
fallthrough
case "http/1.1":
return connectHttp1(rawConn)
case "h2":
t := http2.Transport{}
h2clientConn, err := t.NewClientConn(rawConn)
if err != nil {
rawConn.Close()
return nil, err
}

proxyConn, err := connectHttp2(rawConn, h2clientConn)
if err != nil {
rawConn.Close()
return nil, err
}

if cachedH2Conns == nil {
cachedH2Conns = make(map[net.Destination]h2Conn)
}

cachedH2Conns[dest] = h2Conn{
rawConn: rawConn,
h2Conn: h2clientConn,
}

return proxyConn, err
default:
return nil, newError("negotiated unsupported application layer protocol: " + nextProto)
}
}

func newHttp2Conn(c net.Conn, pipedReqBody *io.PipeWriter, respBody io.ReadCloser) net.Conn {
return &http2Conn{Conn: c, in: pipedReqBody, out: respBody}
}

type http2Conn struct {
net.Conn
in *io.PipeWriter
out io.ReadCloser
}

func (h *http2Conn) Read(p []byte) (n int, err error) {
return h.out.Read(p)
}

func (h *http2Conn) Write(p []byte) (n int, err error) {
return h.in.Write(p)
}

func (h *http2Conn) Close() error {
h.in.Close()
return h.out.Close()
}

func (h *http2Conn) CloseConn() error {
return h.Conn.Close()
}

func (h *http2Conn) CloseWrite() error {
return h.in.Close()
}

func (h *http2Conn) CloseRead() error {
return h.out.Close()
}

func init() {
Expand Down
27 changes: 22 additions & 5 deletions transport/internet/http/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import (

var (
globalDialerMap map[net.Destination]*http.Client
globalDailerAccess sync.Mutex
globalDialerAccess sync.Mutex
)

func getHTTPClient(ctx context.Context, dest net.Destination, tlsSettings *tls.Config) (*http.Client, error) {
globalDailerAccess.Lock()
defer globalDailerAccess.Unlock()
globalDialerAccess.Lock()
defer globalDialerAccess.Unlock()

if globalDialerMap == nil {
globalDialerMap = make(map[net.Destination]*http.Client)
Expand Down Expand Up @@ -54,9 +54,26 @@ func getHTTPClient(ctx context.Context, dest net.Destination, tlsSettings *tls.C
if err != nil {
return nil, err
}
return gotls.Client(pconn, tlsConfig), nil

cn := gotls.Client(pconn, tlsConfig)
if err := cn.Handshake(); err != nil {
return nil, err
}
if !tlsConfig.InsecureSkipVerify {
if err := cn.VerifyHostname(tlsConfig.ServerName); err != nil {
return nil, err
}
}
state := cn.ConnectionState()
if p := state.NegotiatedProtocol; p != http2.NextProtoTLS {
return nil, newError("http2: unexpected ALPN protocol " + p + "; want q" + http2.NextProtoTLS).AtError()
}
if !state.NegotiatedProtocolIsMutual {
return nil, newError("http2: could not negotiate protocol mutually").AtError()
}
return cn, nil
},
TLSClientConfig: tlsSettings.GetTLSConfig(tls.WithDestination(dest), tls.WithNextProto("h2")),
TLSClientConfig: tlsSettings.GetTLSConfig(tls.WithDestination(dest)),
}

client := &http.Client{
Expand Down
2 changes: 1 addition & 1 deletion transport/internet/tcp/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
}

if config := tls.ConfigFromStreamSettings(streamSettings); config != nil {
tlsConfig := config.GetTLSConfig(tls.WithDestination(dest), tls.WithNextProto("h2"))
tlsConfig := config.GetTLSConfig(tls.WithDestination(dest))
if config.IsExperiment8357() {
conn = tls.UClient(conn, tlsConfig)
} else {
Expand Down
Loading