Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc: Move some stats handler calls to gRPC layer, and add local address to peer.Peer #6716

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 18 additions & 19 deletions internal/transport/handler_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,14 @@

func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) }

func (ht *serverHandlerTransport) LocalAddr() net.Addr { return nil } // Server Handler transport has no access to local addr (was simply not calling sh with local addr).

Check warning on line 170 in internal/transport/handler_server.go

View check run for this annotation

Codecov / codecov/patch

internal/transport/handler_server.go#L170

Added line #L170 was not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this can be obtained with: ht.req.Context().Value(http.LocalAddrContextKey)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok, done. Since you only want peer though will be added to the Peer() method and this will be deleted.


func (ht *serverHandlerTransport) Peer() *peer.Peer {
return &peer.Peer{
Addr: ht.RemoteAddr(),
}
}

// strAddr is a net.Addr backed by either a TCP "ip:port" string, or
// the empty string if unknown.
type strAddr string
Expand Down Expand Up @@ -347,7 +355,7 @@
return err
}

func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
func (ht *serverHandlerTransport) HandleStreams(_ context.Context, startStream func(*Stream)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't ignoring the context mess everything up?

Can grpc not pass the http.Request.Context() (with things like Peer added) so that it doesn't need to be ignored here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was being ignored in master already, and it uses the requests Context (ctx := ht.req.Context()). I think this is the same functionality, and I don't think I want to add something to gRPC layer using the http.Request since it's a server handler transport specific concept.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline; decided to move peer creation to construction time and read it in gRPC before calling this.

// With this transport type there will be exactly 1 stream: this HTTP request.

ctx := ht.req.Context()
Expand All @@ -371,16 +379,16 @@
}()

req := ht.req

s := &Stream{
id: 0, // irrelevant
requestRead: func(int) {},
cancel: cancel,
buf: newRecvBuffer(),
st: ht,
method: req.URL.Path,
recvCompress: req.Header.Get("grpc-encoding"),
contentSubtype: ht.contentSubtype,
id: 0, // irrelevant
requestRead: func(int) {},
cancel: cancel,
buf: newRecvBuffer(),
st: ht,
method: req.URL.Path,
recvCompress: req.Header.Get("grpc-encoding"),
contentSubtype: ht.contentSubtype,
headerWireLength: 0, // doesn't know header wire length, will call into stats handler as 0.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one doesn't seem to be available. I think we'll need:

golang/go#18997

Please leave a comment in the code referencing the issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks.

}
pr := &peer.Peer{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reuse Peer()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point. Done.

Addr: ht.RemoteAddr(),
Expand All @@ -390,15 +398,6 @@
}
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
s.ctx = peer.NewContext(ctx, pr)
for _, sh := range ht.stats {
s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
inHeader := &stats.InHeader{
FullMethod: s.method,
RemoteAddr: ht.RemoteAddr(),
Compression: s.recvCompress,
}
sh.HandleRPC(s.ctx, inHeader)
}
s.trReader = &transportReader{
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},
windowHandler: func(int) {},
Expand Down
10 changes: 5 additions & 5 deletions internal/transport/handler_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (s) TestHandlerTransport_HandleStreams(t *testing.T) {
st.ht.WriteStatus(s, status.New(codes.OK, ""))
}
st.ht.HandleStreams(
func(s *Stream) { go handleStream(s) },
context.Background(), func(s *Stream) { go handleStream(s) },
)
wantHeader := http.Header{
"Date": nil,
Expand Down Expand Up @@ -347,7 +347,7 @@ func handleStreamCloseBodyTest(t *testing.T, statusCode codes.Code, msg string)
st.ht.WriteStatus(s, status.New(statusCode, msg))
}
st.ht.HandleStreams(
func(s *Stream) { go handleStream(s) },
context.Background(), func(s *Stream) { go handleStream(s) },
)
wantHeader := http.Header{
"Date": nil,
Expand Down Expand Up @@ -396,7 +396,7 @@ func (s) TestHandlerTransport_HandleStreams_Timeout(t *testing.T) {
ht.WriteStatus(s, status.New(codes.DeadlineExceeded, "too slow"))
}
ht.HandleStreams(
func(s *Stream) { go runStream(s) },
context.Background(), func(s *Stream) { go runStream(s) },
)
wantHeader := http.Header{
"Date": nil,
Expand Down Expand Up @@ -448,7 +448,7 @@ func (s) TestHandlerTransport_HandleStreams_WriteStatusWrite(t *testing.T) {
func testHandlerTransportHandleStreams(t *testing.T, handleStream func(st *handleStreamTest, s *Stream)) {
st := newHandleStreamTest(t)
st.ht.HandleStreams(
func(s *Stream) { go handleStream(st, s) },
context.Background(), func(s *Stream) { go handleStream(st, s) },
)
}

Expand Down Expand Up @@ -481,7 +481,7 @@ func (s) TestHandlerTransport_HandleStreams_ErrDetails(t *testing.T) {
hst.ht.WriteStatus(s, st)
}
hst.ht.HandleStreams(
func(s *Stream) { go handleStream(s) },
context.Background(), func(s *Stream) { go handleStream(s) },
)
wantHeader := http.Header{
"Date": nil,
Expand Down
69 changes: 16 additions & 53 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ var serverConnectionCounter uint64
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
ctx context.Context
done chan struct{}
conn net.Conn
loopy *loopyWriter
Expand Down Expand Up @@ -244,7 +243,6 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,

done := make(chan struct{})
t := &http2Server{
ctx: setConnection(context.Background(), rawConn),
done: done,
conn: conn,
remoteAddr: conn.RemoteAddr(),
Expand All @@ -267,8 +265,6 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
bufferPool: newBufferPool(),
}
t.logger = prefixLoggerForServerTransport(t)
// Add peer information to the http2server context.
t.ctx = peer.NewContext(t.ctx, t.getPeer())

t.controlBuf = newControlBuffer(t.done)
if dynamicWindow {
Expand All @@ -277,14 +273,6 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
updateFlowControl: t.updateFlowControl,
}
}
for _, sh := range t.stats {
t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
})
connBegin := &stats.ConnBegin{}
sh.HandleConn(t.ctx, connBegin)
}
t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
if err != nil {
return nil, err
Expand Down Expand Up @@ -342,7 +330,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,

// operateHeaders takes action on the decoded headers. Returns an error if fatal
// error encountered and transport needs to close, otherwise returns nil.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) error {
func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*Stream)) error {
// Acquire max stream ID lock for entire duration
t.maxStreamMu.Lock()
defer t.maxStreamMu.Unlock()
Expand All @@ -369,10 +357,11 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(

buf := newRecvBuffer()
s := &Stream{
id: streamID,
st: t,
buf: buf,
fc: &inFlow{limit: uint32(t.initialWindowSize)},
id: streamID,
st: t,
buf: buf,
fc: &inFlow{limit: uint32(t.initialWindowSize)},
headerWireLength: int(frame.Header().Length),
}
var (
// if false, content-type was missing or invalid
Expand Down Expand Up @@ -511,9 +500,9 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.state = streamReadDone
}
if timeoutSet {
s.ctx, s.cancel = context.WithTimeout(t.ctx, timeout)
s.ctx, s.cancel = context.WithTimeout(ctx, timeout)
} else {
s.ctx, s.cancel = context.WithCancel(t.ctx)
s.ctx, s.cancel = context.WithCancel(ctx)
}

// Attach the received metadata to the context.
Expand Down Expand Up @@ -592,18 +581,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.requestRead = func(n int) {
t.adjustWindow(s, uint32(n))
}
for _, sh := range t.stats {
s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
inHeader := &stats.InHeader{
FullMethod: s.method,
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
Compression: s.recvCompress,
WireLength: int(frame.Header().Length),
Header: mdata.Copy(),
}
sh.HandleRPC(s.ctx, inHeader)
}
s.ctxDone = s.ctx.Done()
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
s.trReader = &transportReader{
Expand All @@ -629,7 +606,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
// HandleStreams receives incoming streams using the given handler. This is
// typically run in a separate goroutine.
// traceCtx attaches trace to ctx and returns the new context.
func (t *http2Server) HandleStreams(handle func(*Stream)) {
func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {
defer close(t.readerDone)
for {
t.controlBuf.throttle()
Expand Down Expand Up @@ -664,7 +641,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if err := t.operateHeaders(frame, handle); err != nil {
if err := t.operateHeaders(ctx, frame, handle); err != nil {
t.Close(err)
break
}
Expand Down Expand Up @@ -1242,10 +1219,6 @@ func (t *http2Server) Close(err error) {
for _, s := range streams {
s.cancel()
}
for _, sh := range t.stats {
connEnd := &stats.ConnEnd{}
sh.HandleConn(t.ctx, connEnd)
}
}

// deleteStream deletes the stream s from transport's active streams.
Expand Down Expand Up @@ -1311,6 +1284,10 @@ func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eo
})
}

func (t *http2Server) LocalAddr() net.Addr {
return t.localAddr
}

func (t *http2Server) RemoteAddr() net.Addr {
return t.remoteAddr
}
Expand Down Expand Up @@ -1433,7 +1410,8 @@ func (t *http2Server) getOutFlowWindow() int64 {
}
}

func (t *http2Server) getPeer() *peer.Peer {
// Peer returns the peer of the transport.
func (t *http2Server) Peer() *peer.Peer {
return &peer.Peer{
Addr: t.remoteAddr,
AuthInfo: t.authInfo, // Can be nil
Expand All @@ -1449,18 +1427,3 @@ func getJitter(v time.Duration) time.Duration {
j := grpcrand.Int63n(2*r) - r
return time.Duration(j)
}

type connectionKey struct{}

// GetConnection gets the connection from the context.
func GetConnection(ctx context.Context) net.Conn {
conn, _ := ctx.Value(connectionKey{}).(net.Conn)
return conn
}

// SetConnection adds the connection to the context to be able to get
// information about the destination ip and port for an incoming RPC. This also
// allows any unary or streaming interceptors to see the connection.
func setConnection(ctx context.Context, conn net.Conn) context.Context {
return context.WithValue(ctx, connectionKey{}, conn)
}
24 changes: 22 additions & 2 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -265,7 +266,8 @@ type Stream struct {
// headerValid indicates whether a valid header was received. Only
// meaningful after headerChan is closed (always call waitOnHeader() before
// reading its value). Not valid on server side.
headerValid bool
headerValid bool
headerWireLength int // Only set on server side.

// hdrMu protects header and trailer metadata on the server-side.
hdrMu sync.Mutex
Expand Down Expand Up @@ -425,6 +427,12 @@ func (s *Stream) Context() context.Context {
return s.ctx
}

// SetContext sets the context of the stream. This will be deleted once the
// stats handler callouts all move to gRPC layer.
func (s *Stream) SetContext(ctx context.Context) {
s.ctx = ctx
}

// Method returns the method for the stream.
func (s *Stream) Method() string {
return s.method
Expand All @@ -437,6 +445,12 @@ func (s *Stream) Status() *status.Status {
return s.status
}

// HeaderWireLength returns the size of the headers of the stream as received
// from the wire. Valid only on the server.
func (s *Stream) HeaderWireLength() int {
return s.headerWireLength
}

// SetHeader sets the header metadata. This can be called multiple times.
// Server side only.
// This should not be called in parallel to other data writes.
Expand Down Expand Up @@ -698,7 +712,7 @@ type ClientTransport interface {
// Write methods for a given Stream will be called serially.
type ServerTransport interface {
// HandleStreams receives incoming streams using the given handler.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"the Context given is used as the base context for all streams started on this transport."?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per comment above, will wait until further discussion on the server handler transport to add this docstring.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided to do this, added this comment.

HandleStreams(func(*Stream))
HandleStreams(context.Context, func(*Stream))

// WriteHeader sends the header metadata for the given stream.
// WriteHeader may not be called on all streams.
Expand All @@ -717,9 +731,15 @@ type ServerTransport interface {
// handlers will be terminated asynchronously.
Close(err error)

// LocalAddr returns the local network address.
LocalAddr() net.Addr

// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr

// Peer returns the peer of the server transport.
Peer() *peer.Peer
Comment on lines +734 to +735
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we delete Local/RemoteAddr and only have this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done. There's an in flight PR for this but the contributor isn't getting back to my comments so I'll go ahead and just add it to this PR.


// Drain notifies the client this ServerTransport stops accepting new RPCs.
Drain(debugData string)

Expand Down
Loading
Loading