Skip to content

Commit

Permalink
modify the original function implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
button-chen authored Oct 23, 2024
1 parent c7a50f9 commit b6cb227
Showing 1 changed file with 2 additions and 149 deletions.
151 changes: 2 additions & 149 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,7 @@ type writeRequest struct {
// STOMP server is specified by network and addr. STOMP protocol
// options can be specified in opts.
func Dial(network, addr string, opts ...func(*Conn) error) (*Conn, error) {
c, err := net.Dial(network, addr)
if err != nil {
return nil, err
}

host, _, err := net.SplitHostPort(c.RemoteAddr().String())
if err != nil {
c.Close()
return nil, err
}

// Add option to set host and make it the first option in list,
// so that if host has been explicitly specified it will override.
opts = append([]func(*Conn) error{ConnOpt.Host(host)}, opts...)

return Connect(c, opts...)
return DialWithContext(context.Background(), network, addr, opts...)
}

func DialWithContext(ctx context.Context, network, addr string, opts ...func(*Conn) error) (*Conn, error) {
Expand All @@ -109,139 +94,7 @@ func DialWithContext(ctx context.Context, network, addr string, opts ...func(*Co
// been created by the program. The opts parameter provides the
// opportunity to specify STOMP protocol options.
func Connect(conn io.ReadWriteCloser, opts ...func(*Conn) error) (*Conn, error) {
reader := frame.NewReader(conn)
writer := frame.NewWriter(conn)

c := &Conn{
conn: conn,
closeMutex: &sync.Mutex{},
}

options, err := newConnOptions(c, opts)
if err != nil {
return nil, err
}

c.log = options.Logger

if options.ReadBufferSize > 0 {
reader = frame.NewReaderSize(conn, options.ReadBufferSize)
}

if options.WriteBufferSize > 0 {
writer = frame.NewWriterSize(conn, options.ReadBufferSize)
}

readChannelCapacity := 20
writeChannelCapacity := 20

if options.ReadChannelCapacity > 0 {
readChannelCapacity = options.ReadChannelCapacity
}

if options.WriteChannelCapacity > 0 {
writeChannelCapacity = options.WriteChannelCapacity
}

c.hbGracePeriodMultiplier = options.HeartBeatGracePeriodMultiplier

c.readCh = make(chan *frame.Frame, readChannelCapacity)
c.writeCh = make(chan writeRequest, writeChannelCapacity)

if options.Host == "" {
// host not specified yet, attempt to get from net.Conn if possible
if connection, ok := conn.(net.Conn); ok {
host, _, err := net.SplitHostPort(connection.RemoteAddr().String())
if err == nil {
options.Host = host
}
}
// if host is still blank, use default
if options.Host == "" {
options.Host = "default"
}
}

connectFrame, err := options.NewFrame()
if err != nil {
return nil, err
}

err = writer.Write(connectFrame)
if err != nil {
return nil, err
}

response, err := reader.Read()
if err != nil {
return nil, err
}
if response == nil {
return nil, errors.New("unexpected empty frame")
}

if response.Command != frame.CONNECTED {
return nil, newError(response)
}

c.server = response.Header.Get(frame.Server)
c.session = response.Header.Get(frame.Session)

if versionString := response.Header.Get(frame.Version); versionString != "" {
version := Version(versionString)
if err = version.CheckSupported(); err != nil {
return nil, Error{
Message: err.Error(),
Frame: response,
}
}
c.version = version
} else {
// no version in the response, so assume version 1.0
c.version = V10
}

if heartBeat, ok := response.Header.Contains(frame.HeartBeat); ok {
readTimeout, writeTimeout, err := frame.ParseHeartBeat(heartBeat)
if err != nil {
return nil, Error{
Message: err.Error(),
Frame: response,
}
}

if readTimeout < options.ReadTimeout {
readTimeout = options.ReadTimeout
}

c.readTimeout = readTimeout
c.writeTimeout = writeTimeout

if c.readTimeout > 0 {
// Add time to the read timeout to account for time
// delay in other station transmitting timeout
c.readTimeout += options.HeartBeatError
}
if c.writeTimeout > options.HeartBeatError {
// Reduce time from the write timeout to account
// for time delay in transmitting to the other station
c.writeTimeout -= options.HeartBeatError
}
}

c.msgSendTimeout = options.MsgSendTimeout
c.rcvReceiptTimeout = options.RcvReceiptTimeout
c.disconnectReceiptTimeout = options.DisconnectReceiptTimeout
c.unsubscribeReceiptTimeout = options.UnsubscribeReceiptTimeout

if options.ResponseHeadersCallback != nil {
options.ResponseHeadersCallback(response.Header)
}

go readLoop(c, reader)
go processLoop(c, writer)

return c, nil
return ConnectWithContext(context.Background(), conn, opts...)
}

func ConnectWithContext(ctx context.Context, conn io.ReadWriteCloser, opts ...func(*Conn) error) (*Conn, error) {
Expand Down

0 comments on commit b6cb227

Please sign in to comment.