Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: add PacketAddr support to Trojan #1919

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion proxy/trojan/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/v2fly/v2ray-core/v5/common"
"github.com/v2fly/v2ray-core/v5/common/buf"
"github.com/v2fly/v2ray-core/v5/common/net"
"github.com/v2fly/v2ray-core/v5/common/net/packetaddr"
"github.com/v2fly/v2ray-core/v5/common/protocol"
"github.com/v2fly/v2ray-core/v5/common/retry"
"github.com/v2fly/v2ray-core/v5/common/session"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/v2fly/v2ray-core/v5/proxy"
"github.com/v2fly/v2ray-core/v5/transport"
"github.com/v2fly/v2ray-core/v5/transport/internet"
"github.com/v2fly/v2ray-core/v5/transport/internet/udp"
)

// Client is an inbound handler for trojan protocol
Expand Down Expand Up @@ -85,6 +87,51 @@ func (c *Client) Process(ctx context.Context, link *transport.Link, dialer inter
ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle)

if packetConn, err := packetaddr.ToPacketAddrConn(link, destination); err == nil {
postRequest := func() error {
defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)

var buffer [2048]byte
_, addr, err := packetConn.ReadFrom(buffer[:])
if err != nil {
return newError("failed to read a packet").Base(err)
}
dest := net.DestinationFromAddr(addr)

bufferWriter := buf.NewBufferedWriter(buf.NewWriter(conn))
connWriter := &ConnWriter{Writer: bufferWriter, Target: dest, Account: account}
packetWriter := &PacketWriter{Writer: connWriter, Target: dest}

// write some request payload to buffer
if _, err := packetWriter.WriteTo(buffer[:], addr); err != nil {
return newError("failed to write a request payload").Base(err)
}

// Flush; bufferWriter.WriteMultiBuffer now is bufferWriter.writer.WriteMultiBuffer
if err = bufferWriter.SetBuffered(false); err != nil {
return newError("failed to flush payload").Base(err).AtWarning()
}

return udp.CopyPacketConn(packetWriter, packetConn, udp.UpdateActivity(timer))
}

getResponse := func() error {
defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly)

packetReader := &PacketReader{Reader: conn}
splitReader := &PacketSplitReader{Reader: packetReader}

return udp.CopyPacketConn(packetConn, splitReader, udp.UpdateActivity(timer))
}

responseDoneAndCloseWriter := task.OnSuccess(getResponse, task.Close(link.Writer))
if err := task.Run(ctx, postRequest, responseDoneAndCloseWriter); err != nil {
return newError("connection ends").Base(err)
}

return nil
}

postRequest := func() error {
defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)

Expand All @@ -100,7 +147,7 @@ func (c *Client) Process(ctx context.Context, link *transport.Link, dialer inter

// write some request payload to buffer
if err = buf.CopyOnceTimeout(link.Reader, bodyWriter, proxy.FirstPayloadTimeout); err != nil && err != buf.ErrNotTimeoutReader && err != buf.ErrReadTimeout {
return newError("failed to write A request payload").Base(err).AtWarning()
return newError("failed to write a request payload").Base(err).AtWarning()
}

// Flush; bufferWriter.WriteMultiBuffer now is bufferWriter.writer.WriteMultiBuffer
Expand Down
96 changes: 57 additions & 39 deletions proxy/trojan/config.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions proxy/trojan/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ option java_multiple_files = true;

import "common/protocol/user.proto";
import "common/protocol/server_spec.proto";
import "common/net/packetaddr/config.proto";

message Account {
string password = 1;
Expand All @@ -28,4 +29,5 @@ message ClientConfig {
message ServerConfig {
repeated v2ray.core.common.protocol.User users = 1;
repeated Fallback fallbacks = 3;
v2ray.core.net.packetaddr.PacketAddrType packet_encoding = 4;
}
30 changes: 30 additions & 0 deletions proxy/trojan/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package trojan
import (
"encoding/binary"
"io"
gonet "net"

"github.com/v2fly/v2ray-core/v5/common/buf"
"github.com/v2fly/v2ray-core/v5/common/net"
Expand Down Expand Up @@ -128,6 +129,12 @@ func (w *PacketWriter) WriteMultiBufferWithMetadata(mb buf.MultiBuffer, dest net
return nil
}

func (w *PacketWriter) WriteTo(payload []byte, addr gonet.Addr) (int, error) {
dest := net.DestinationFromAddr(addr)

return w.writePacket(payload, dest)
}

func (w *PacketWriter) writePacket(payload []byte, dest net.Destination) (int, error) { // nolint: unparam
buffer := buf.StackNew()
defer buffer.Release()
Expand Down Expand Up @@ -279,3 +286,26 @@ func (r *PacketReader) ReadMultiBufferWithMetadata() (*PacketPayload, error) {

return &PacketPayload{Target: dest, Buffer: mb}, nil
}

type PacketSplitReader struct {
Reader *PacketReader
Payload *PacketPayload
}

func (r *PacketSplitReader) ReadFrom(p []byte) (n int, addr gonet.Addr, err error) {
if r.Payload == nil || r.Payload.Buffer.IsEmpty() {
r.Payload, err = r.Reader.ReadMultiBufferWithMetadata()
if err != nil {
return
}
}

addr = &gonet.UDPAddr{
IP: r.Payload.Target.Address.IP(),
Port: int(r.Payload.Target.Port),
}

r.Payload.Buffer, n = buf.SplitBytes(r.Payload.Buffer, p)

return
}
29 changes: 20 additions & 9 deletions proxy/trojan/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/v2fly/v2ray-core/v5/common/errors"
"github.com/v2fly/v2ray-core/v5/common/log"
"github.com/v2fly/v2ray-core/v5/common/net"
"github.com/v2fly/v2ray-core/v5/common/net/packetaddr"
"github.com/v2fly/v2ray-core/v5/common/protocol"
udp_proto "github.com/v2fly/v2ray-core/v5/common/protocol/udp"
"github.com/v2fly/v2ray-core/v5/common/retry"
Expand All @@ -33,9 +34,10 @@ func init() {

// Server is an inbound connection handler that handles messages in trojan protocol.
type Server struct {
policyManager policy.Manager
validator *Validator
fallbacks map[string]map[string]*Fallback // or nil
policyManager policy.Manager
validator *Validator
fallbacks map[string]map[string]*Fallback // or nil
packetEncoding packetaddr.PacketAddrType
}

// NewServer creates a new trojan inbound handler.
Expand All @@ -54,8 +56,9 @@ func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) {

v := core.MustFromContext(ctx)
server := &Server{
policyManager: v.GetFeature(policy.ManagerType()).(policy.Manager),
validator: validator,
policyManager: v.GetFeature(policy.ManagerType()).(policy.Manager),
validator: validator,
packetEncoding: config.PacketEncoding,
}

if config.Fallbacks != nil {
Expand Down Expand Up @@ -204,7 +207,15 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet
}

func (s *Server) handleUDPPayload(ctx context.Context, clientReader *PacketReader, clientWriter *PacketWriter, dispatcher routing.Dispatcher) error {
udpServer := udp.NewSplitDispatcher(dispatcher, func(ctx context.Context, packet *udp_proto.Packet) {
udpDispatcherConstructor := udp.NewSplitDispatcher
switch s.packetEncoding {
case packetaddr.PacketAddrType_None:
case packetaddr.PacketAddrType_Packet:
packetAddrDispatcherFactory := udp.NewPacketAddrDispatcherCreator(ctx)
udpDispatcherConstructor = packetAddrDispatcherFactory.NewPacketAddrDispatcher
}

udpServer := udpDispatcherConstructor(dispatcher, func(ctx context.Context, packet *udp_proto.Packet) {
if err := clientWriter.WriteMultiBufferWithMetadata(buf.MultiBuffer{packet.Payload}, packet.Source); err != nil {
newError("failed to write response").Base(err).AtWarning().WriteToLog(session.ExportIDToError(ctx))
}
Expand All @@ -225,8 +236,8 @@ func (s *Server) handleUDPPayload(ctx context.Context, clientReader *PacketReade
}
return nil
}

ctx = log.ContextWithAccessMessage(ctx, &log.AccessMessage{
currentPacketCtx := ctx
currentPacketCtx = log.ContextWithAccessMessage(currentPacketCtx, &log.AccessMessage{
From: inbound.Source,
To: p.Target,
Status: log.AccessAccepted,
Expand All @@ -236,7 +247,7 @@ func (s *Server) handleUDPPayload(ctx context.Context, clientReader *PacketReade
newError("tunnelling request to ", p.Target).WriteToLog(session.ExportIDToError(ctx))

for _, b := range p.Buffer {
udpServer.Dispatch(ctx, p.Target, b)
udpServer.Dispatch(currentPacketCtx, p.Target, b)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions proxy/trojan/simplified/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func init() {
}
return
}(),
PacketEncoding: simplifiedServer.PacketEncoding,
}
return common.CreateObject(ctx, fullServer)
}))
Expand Down
Loading