@@ -9,6 +9,8 @@ package topology
99import (
1010 "context"
1111 "fmt"
12+ "io"
13+ "io/ioutil"
1214 "net"
1315 "sync"
1416 "sync/atomic"
@@ -788,17 +790,27 @@ var (
788790//
789791// It calls the package-global BGReadCallback function, if set, with the
790792// address, timings, and any errors that occurred.
791- func bgRead (pool * pool , conn * connection ) {
792- var start , read time.Time
793- start = time .Now ()
794- errs := make ([]error , 0 )
795- connClosed := false
793+ func bgRead (pool * pool , conn * connection , size int32 ) {
794+ var err error
795+ start := time .Now ()
796796
797797 defer func () {
798+ read := time .Now ()
799+ errs := make ([]error , 0 )
800+ connClosed := false
801+ if err != nil {
802+ errs = append (errs , err )
803+ connClosed = true
804+ err = conn .close ()
805+ if err != nil {
806+ errs = append (errs , fmt .Errorf ("error closing conn after reading: %w" , err ))
807+ }
808+ }
809+
798810 // No matter what happens, always check the connection back into the
799811 // pool, which will either make it available for other operations or
800812 // remove it from the pool if it was closed.
801- err : = pool .checkInNoEvent (conn )
813+ err = pool .checkInNoEvent (conn )
802814 if err != nil {
803815 errs = append (errs , fmt .Errorf ("error checking in: %w" , err ))
804816 }
@@ -808,34 +820,37 @@ func bgRead(pool *pool, conn *connection) {
808820 }
809821 }()
810822
811- err : = conn .nc .SetReadDeadline (time .Now ().Add (BGReadTimeout ))
823+ err = conn .nc .SetReadDeadline (time .Now ().Add (BGReadTimeout ))
812824 if err != nil {
813- errs = append (errs , fmt .Errorf ("error setting a read deadline: %w" , err ))
814-
815- connClosed = true
816- err := conn .close ()
817- if err != nil {
818- errs = append (errs , fmt .Errorf ("error closing conn after setting read deadline: %w" , err ))
819- }
820-
825+ err = fmt .Errorf ("error setting a read deadline: %w" , err )
821826 return
822827 }
823828
824- // The context here is only used for cancellation, not deadline timeout, so
825- // use context.Background(). The read timeout is set by calling
826- // SetReadDeadline above.
827- _ , _ , err = conn .read (context .Background ())
828- read = time .Now ()
829- if err != nil {
830- errs = append (errs , fmt .Errorf ("error reading: %w" , err ))
831-
832- connClosed = true
833- err := conn .close ()
829+ if size == 0 {
830+ var sizeBuf [4 ]byte
831+ _ , err = io .ReadFull (conn .nc , sizeBuf [:])
834832 if err != nil {
835- errs = append (errs , fmt .Errorf ("error closing conn after reading: %w" , err ))
833+ err = fmt .Errorf ("error reading the message size: %w" , err )
834+ return
836835 }
837-
838- return
836+ size = (int32 (sizeBuf [0 ])) | (int32 (sizeBuf [1 ]) << 8 ) | (int32 (sizeBuf [2 ]) << 16 ) | (int32 (sizeBuf [3 ]) << 24 )
837+ if size < 4 {
838+ err = fmt .Errorf ("malformed message length: %d" , size )
839+ return
840+ }
841+ maxMessageSize := conn .desc .MaxMessageSize
842+ if maxMessageSize == 0 {
843+ maxMessageSize = defaultMaxMessageSize
844+ }
845+ if uint32 (size ) > maxMessageSize {
846+ err = errResponseTooLarge
847+ return
848+ }
849+ size -= 4
850+ }
851+ _ , err = io .CopyN (ioutil .Discard , conn .nc , int64 (size ))
852+ if err != nil {
853+ err = fmt .Errorf ("error reading message of %d: %w" , size , err )
839854 }
840855}
841856
@@ -886,9 +901,10 @@ func (p *pool) checkInNoEvent(conn *connection) error {
886901 // means that connections in "awaiting response" state are checked in but
887902 // not usable, which is not covered by the current pool events. We may need
888903 // to add pool event information in the future to communicate that.
889- if conn .awaitingResponse {
890- conn .awaitingResponse = false
891- go bgRead (p , conn )
904+ if conn .awaitingResponse != nil {
905+ size := * conn .awaitingResponse
906+ conn .awaitingResponse = nil
907+ go bgRead (p , conn , size )
892908 return nil
893909 }
894910
0 commit comments