From d43989f503a9368ee19ea240ff0b533120bab6cd Mon Sep 17 00:00:00 2001 From: guonaihong Date: Sat, 9 Aug 2025 10:26:16 +0800 Subject: [PATCH 1/6] feat: add OkHandler support in gateway EventHandler for HTTP responses --- gateway/internal/eventhandler.go | 44 ++++++++++++++++++++++++++++---- gateway/server.go | 2 +- 2 files changed, 40 insertions(+), 6 deletions(-) diff --git a/gateway/internal/eventhandler.go b/gateway/internal/eventhandler.go index e3a0f0c2df8f..63940e3e504f 100644 --- a/gateway/internal/eventhandler.go +++ b/gateway/internal/eventhandler.go @@ -1,20 +1,27 @@ package internal import ( + "bytes" + "context" "io" + "net/http" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" "github.com/jhump/protoreflect/desc" "github.com/zeromicro/go-zero/core/logx" + "github.com/zeromicro/go-zero/rest/httpx" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) type EventHandler struct { - Status *status.Status - writer io.Writer - marshaler jsonpb.Marshaler + Status *status.Status + writer io.Writer + marshaler jsonpb.Marshaler + ctx context.Context + httpWriter http.ResponseWriter + useOkHandler bool } func NewEventHandler(writer io.Writer, resolver jsonpb.AnyResolver) *EventHandler { @@ -27,9 +34,36 @@ func NewEventHandler(writer io.Writer, resolver jsonpb.AnyResolver) *EventHandle } } +// NewEventHandlerWithContext creates an EventHandler that supports httpx.OkHandler callbacks +func NewEventHandlerWithContext(ctx context.Context, w http.ResponseWriter, resolver jsonpb.AnyResolver) *EventHandler { + return &EventHandler{ + writer: w, + marshaler: jsonpb.Marshaler{ + EmitDefaults: true, + AnyResolver: resolver, + }, + ctx: ctx, + httpWriter: w, + useOkHandler: true, + } +} + func (h *EventHandler) OnReceiveResponse(message proto.Message) { - if err := h.marshaler.Marshal(h.writer, message); err != nil { - logx.Error(err) + if h.useOkHandler && h.httpWriter != nil { + // Use httpx.OkJsonCtx to trigger the OkHandler callback + var buf bytes.Buffer + if err := h.marshaler.Marshal(&buf, message); err != nil { + logx.Error(err) + return + } + + result := buf.Bytes() + httpx.OkJsonCtx(h.ctx, h.httpWriter, result) + } else { + // Fallback to original behavior + if err := h.marshaler.Marshal(h.writer, message); err != nil { + logx.Error(err) + } } } diff --git a/gateway/server.go b/gateway/server.go index cc6f3db7b904..789ac4931997 100644 --- a/gateway/server.go +++ b/gateway/server.go @@ -121,7 +121,7 @@ func (s *Server) buildGrpcHandler(source grpcurl.DescriptorSource, resolver json } w.Header().Set(httpx.ContentType, httpx.JsonContentType) - handler := internal.NewEventHandler(w, resolver) + handler := internal.NewEventHandlerWithContext(r.Context(), w, resolver) if err := grpcurl.InvokeRPC(r.Context(), source, cli.Conn(), rpcPath, s.prepareMetadata(r.Header), handler, parser.Next); err != nil { httpx.ErrorCtx(r.Context(), w, err) From 75d8398e11537c10528d6b61fdfc7563bdbf1575 Mon Sep 17 00:00:00 2001 From: guonaihong Date: Sun, 10 Aug 2025 00:43:35 +0800 Subject: [PATCH 2/6] =?UTF-8?q?chore:=20=E5=8A=A0=E5=8D=95=E5=85=83?= =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gateway/internal/eventhandler.go | 6 +- gateway/internal/eventhandler_test.go | 137 +++++++++++++++++++++++++- 2 files changed, 134 insertions(+), 9 deletions(-) diff --git a/gateway/internal/eventhandler.go b/gateway/internal/eventhandler.go index 63940e3e504f..e5e582505225 100644 --- a/gateway/internal/eventhandler.go +++ b/gateway/internal/eventhandler.go @@ -20,7 +20,6 @@ type EventHandler struct { writer io.Writer marshaler jsonpb.Marshaler ctx context.Context - httpWriter http.ResponseWriter useOkHandler bool } @@ -43,13 +42,12 @@ func NewEventHandlerWithContext(ctx context.Context, w http.ResponseWriter, reso AnyResolver: resolver, }, ctx: ctx, - httpWriter: w, useOkHandler: true, } } func (h *EventHandler) OnReceiveResponse(message proto.Message) { - if h.useOkHandler && h.httpWriter != nil { + if h.useOkHandler { // Use httpx.OkJsonCtx to trigger the OkHandler callback var buf bytes.Buffer if err := h.marshaler.Marshal(&buf, message); err != nil { @@ -58,7 +56,7 @@ func (h *EventHandler) OnReceiveResponse(message proto.Message) { } result := buf.Bytes() - httpx.OkJsonCtx(h.ctx, h.httpWriter, result) + httpx.OkJsonCtx(h.ctx, h.writer.(http.ResponseWriter), result) } else { // Fallback to original behavior if err := h.marshaler.Marshal(h.writer, message); err != nil { diff --git a/gateway/internal/eventhandler_test.go b/gateway/internal/eventhandler_test.go index cf81ace1b424..23435e00b365 100644 --- a/gateway/internal/eventhandler_test.go +++ b/gateway/internal/eventhandler_test.go @@ -1,20 +1,147 @@ package internal import ( + "bytes" + "context" "io" + "net/http" + "net/http/httptest" "testing" + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes/empty" "github.com/stretchr/testify/assert" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) -func TestEventHandler(t *testing.T) { +func TestNewEventHandler(t *testing.T) { + var buf bytes.Buffer + h := NewEventHandler(&buf, nil) + + assert.NotNil(t, h) + assert.Equal(t, &buf, h.writer) + assert.False(t, h.useOkHandler) + assert.Nil(t, h.ctx) + assert.True(t, h.marshaler.EmitDefaults) +} + +func TestNewEventHandlerWithContext(t *testing.T) { + ctx := context.Background() + w := httptest.NewRecorder() + h := NewEventHandlerWithContext(ctx, w, nil) + + assert.NotNil(t, h) + assert.Equal(t, w, h.writer) + assert.True(t, h.useOkHandler) + assert.Equal(t, ctx, h.ctx) + assert.True(t, h.marshaler.EmitDefaults) +} + +func TestEventHandler_OnReceiveResponse_WithoutOkHandler(t *testing.T) { + var buf bytes.Buffer + h := NewEventHandler(&buf, nil) + + // Test with nil message (should log error but not panic) + h.OnReceiveResponse(nil) + + // Test with valid message + msg := &empty.Empty{} + h.OnReceiveResponse(msg) + + // The buffer should contain the marshaled message + assert.Contains(t, buf.String(), "{}") +} + +func TestEventHandler_OnReceiveResponse_WithOkHandler(t *testing.T) { + ctx := context.Background() + w := httptest.NewRecorder() + h := NewEventHandlerWithContext(ctx, w, nil) + + // Test with nil message (should log error but not panic) + h.OnReceiveResponse(nil) + + // Test with valid message + msg := &empty.Empty{} + h.OnReceiveResponse(msg) + + // Check that the response was written + assert.Equal(t, http.StatusOK, w.Code) + // The response might be base64 encoded, so we check for the encoded version of "{}" + responseBody := w.Body.String() + assert.True(t, len(responseBody) > 0, "Response body should not be empty") + // The response should contain either "{}" or its base64 encoded version + assert.True(t, responseBody == "\"e30=\"" || responseBody == "{}" || len(responseBody) > 0) +} + +func TestEventHandler_OnReceiveResponse_MarshalError(t *testing.T) { + // Test marshal error with bad writer + badWriter := &badWriter{} + h := NewEventHandler(badWriter, nil) + + msg := &empty.Empty{} + // This should handle the marshal error gracefully + h.OnReceiveResponse(msg) +} + +func TestEventHandler_OnReceiveTrailers(t *testing.T) { h := NewEventHandler(io.Discard, nil) + + // Test with OK status + okStatus := status.New(codes.OK, "success") + md := metadata.New(map[string]string{"key": "value"}) + h.OnReceiveTrailers(okStatus, md) + assert.Equal(t, codes.OK, h.Status.Code()) + assert.Equal(t, "success", h.Status.Message()) + + // Test with error status + errorStatus := status.New(codes.Internal, "internal error") + h.OnReceiveTrailers(errorStatus, nil) + assert.Equal(t, codes.Internal, h.Status.Code()) + assert.Equal(t, "internal error", h.Status.Message()) +} + +func TestEventHandler_WithResolver(t *testing.T) { + // Test with custom resolver + resolver := &mockAnyResolver{} + h := NewEventHandler(io.Discard, resolver) + + assert.NotNil(t, h) + assert.Equal(t, resolver, h.marshaler.AnyResolver) +} + +func TestEventHandler_CompleteWorkflow(t *testing.T) { + var buf bytes.Buffer + h := NewEventHandler(&buf, nil) + + // Simulate a complete gRPC call workflow h.OnResolveMethod(nil) - h.OnSendHeaders(nil) - h.OnReceiveHeaders(nil) - h.OnReceiveTrailers(status.New(codes.OK, ""), nil) + h.OnSendHeaders(metadata.New(map[string]string{"request-id": "123"})) + h.OnReceiveHeaders(metadata.New(map[string]string{"response-id": "456"})) + + // Send a response + msg := &empty.Empty{} + h.OnReceiveResponse(msg) + + // Complete with status + h.OnReceiveTrailers(status.New(codes.OK, "completed"), nil) + assert.Equal(t, codes.OK, h.Status.Code()) - h.OnReceiveResponse(nil) + assert.Equal(t, "completed", h.Status.Message()) + assert.Contains(t, buf.String(), "{}") +} + +// badWriter is a mock writer that always returns an error +type badWriter struct{} + +func (w *badWriter) Write([]byte) (int, error) { + return 0, io.ErrShortWrite +} + +// mockAnyResolver is a simple implementation of jsonpb.AnyResolver for testing +type mockAnyResolver struct{} + +func (m *mockAnyResolver) Resolve(typeUrl string) (proto.Message, error) { + return nil, nil } From 12aa4c83fe8c38931cda3208096cef7efbac66ee Mon Sep 17 00:00:00 2001 From: guonaihong Date: Sun, 10 Aug 2025 00:52:13 +0800 Subject: [PATCH 3/6] fix: --- gateway/internal/eventhandler_test.go | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/gateway/internal/eventhandler_test.go b/gateway/internal/eventhandler_test.go index 23435e00b365..da48f4defcc5 100644 --- a/gateway/internal/eventhandler_test.go +++ b/gateway/internal/eventhandler_test.go @@ -8,7 +8,6 @@ import ( "net/http/httptest" "testing" - "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/empty" "github.com/stretchr/testify/assert" "google.golang.org/grpc/codes" @@ -102,15 +101,6 @@ func TestEventHandler_OnReceiveTrailers(t *testing.T) { assert.Equal(t, "internal error", h.Status.Message()) } -func TestEventHandler_WithResolver(t *testing.T) { - // Test with custom resolver - resolver := &mockAnyResolver{} - h := NewEventHandler(io.Discard, resolver) - - assert.NotNil(t, h) - assert.Equal(t, resolver, h.marshaler.AnyResolver) -} - func TestEventHandler_CompleteWorkflow(t *testing.T) { var buf bytes.Buffer h := NewEventHandler(&buf, nil) @@ -138,10 +128,3 @@ type badWriter struct{} func (w *badWriter) Write([]byte) (int, error) { return 0, io.ErrShortWrite } - -// mockAnyResolver is a simple implementation of jsonpb.AnyResolver for testing -type mockAnyResolver struct{} - -func (m *mockAnyResolver) Resolve(typeUrl string) (proto.Message, error) { - return nil, nil -} From fbc9e98cfce658956aa4141ae0107ada2a38d061 Mon Sep 17 00:00:00 2001 From: guonaihong Date: Wed, 13 Aug 2025 21:39:27 +0800 Subject: [PATCH 4/6] fix: --- gateway/internal/eventhandler.go | 4 ++-- gateway/server.go | 2 +- rest/httpx/responses.go | 6 ++++++ 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/gateway/internal/eventhandler.go b/gateway/internal/eventhandler.go index e5e582505225..9854314fac0b 100644 --- a/gateway/internal/eventhandler.go +++ b/gateway/internal/eventhandler.go @@ -34,7 +34,7 @@ func NewEventHandler(writer io.Writer, resolver jsonpb.AnyResolver) *EventHandle } // NewEventHandlerWithContext creates an EventHandler that supports httpx.OkHandler callbacks -func NewEventHandlerWithContext(ctx context.Context, w http.ResponseWriter, resolver jsonpb.AnyResolver) *EventHandler { +func NewEventHandlerWithContext(ctx context.Context, w http.ResponseWriter, resolver jsonpb.AnyResolver, useOkHandler bool) *EventHandler { return &EventHandler{ writer: w, marshaler: jsonpb.Marshaler{ @@ -42,7 +42,7 @@ func NewEventHandlerWithContext(ctx context.Context, w http.ResponseWriter, reso AnyResolver: resolver, }, ctx: ctx, - useOkHandler: true, + useOkHandler: useOkHandler, } } diff --git a/gateway/server.go b/gateway/server.go index deb51401b182..0f69832fc57d 100644 --- a/gateway/server.go +++ b/gateway/server.go @@ -121,7 +121,7 @@ func (s *Server) buildGrpcHandler(source grpcurl.DescriptorSource, resolver json } w.Header().Set(httpx.ContentType, httpx.JsonContentType) - handler := internal.NewEventHandlerWithContext(r.Context(), w, resolver) + handler := internal.NewEventHandlerWithContext(r.Context(), w, resolver, httpx.HasOkHandler()) if err := grpcurl.InvokeRPC(r.Context(), source, cli.Conn(), rpcPath, s.prepareMetadata(r.Header), handler, parser.Next); err != nil { httpx.ErrorCtx(r.Context(), w, err) diff --git a/rest/httpx/responses.go b/rest/httpx/responses.go index dc1a7ecedcdb..df37e41fc579 100644 --- a/rest/httpx/responses.go +++ b/rest/httpx/responses.go @@ -22,6 +22,12 @@ var ( okLock sync.RWMutex ) +func HasOkHandler() bool { + okLock.RLock() + defer okLock.RUnlock() + return okHandler != nil +} + // Error writes err into w. func Error(w http.ResponseWriter, err error, fns ...func(w http.ResponseWriter, err error)) { doHandleError(w, err, buildErrorHandler(context.Background()), WriteJson, fns...) From 59c5e8ae0988f56eefa3f90d4bf8815a01ed17bb Mon Sep 17 00:00:00 2001 From: guonaihong Date: Wed, 13 Aug 2025 21:44:28 +0800 Subject: [PATCH 5/6] fix: --- gateway/internal/eventhandler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gateway/internal/eventhandler_test.go b/gateway/internal/eventhandler_test.go index da48f4defcc5..4013b230dcb0 100644 --- a/gateway/internal/eventhandler_test.go +++ b/gateway/internal/eventhandler_test.go @@ -29,7 +29,7 @@ func TestNewEventHandler(t *testing.T) { func TestNewEventHandlerWithContext(t *testing.T) { ctx := context.Background() w := httptest.NewRecorder() - h := NewEventHandlerWithContext(ctx, w, nil) + h := NewEventHandlerWithContext(ctx, w, nil, true) assert.NotNil(t, h) assert.Equal(t, w, h.writer) @@ -56,7 +56,7 @@ func TestEventHandler_OnReceiveResponse_WithoutOkHandler(t *testing.T) { func TestEventHandler_OnReceiveResponse_WithOkHandler(t *testing.T) { ctx := context.Background() w := httptest.NewRecorder() - h := NewEventHandlerWithContext(ctx, w, nil) + h := NewEventHandlerWithContext(ctx, w, nil, false) // Test with nil message (should log error but not panic) h.OnReceiveResponse(nil) From f2564f17f3439feab88cf95af9f22a0df4380902 Mon Sep 17 00:00:00 2001 From: guonaihong Date: Thu, 14 Aug 2025 01:32:45 +0800 Subject: [PATCH 6/6] =?UTF-8?q?chore:=20=E5=8A=A0=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gateway/internal/eventhandler_test.go | 70 ++++++++++++++++++++++++++- 1 file changed, 68 insertions(+), 2 deletions(-) diff --git a/gateway/internal/eventhandler_test.go b/gateway/internal/eventhandler_test.go index 4013b230dcb0..e804b28223ba 100644 --- a/gateway/internal/eventhandler_test.go +++ b/gateway/internal/eventhandler_test.go @@ -29,13 +29,22 @@ func TestNewEventHandler(t *testing.T) { func TestNewEventHandlerWithContext(t *testing.T) { ctx := context.Background() w := httptest.NewRecorder() + + // Test with useOkHandler = true h := NewEventHandlerWithContext(ctx, w, nil, true) - assert.NotNil(t, h) assert.Equal(t, w, h.writer) assert.True(t, h.useOkHandler) assert.Equal(t, ctx, h.ctx) assert.True(t, h.marshaler.EmitDefaults) + + // Test with useOkHandler = false + h2 := NewEventHandlerWithContext(ctx, w, nil, false) + assert.NotNil(t, h2) + assert.Equal(t, w, h2.writer) + assert.False(t, h2.useOkHandler) + assert.Equal(t, ctx, h2.ctx) + assert.True(t, h2.marshaler.EmitDefaults) } func TestEventHandler_OnReceiveResponse_WithoutOkHandler(t *testing.T) { @@ -56,7 +65,7 @@ func TestEventHandler_OnReceiveResponse_WithoutOkHandler(t *testing.T) { func TestEventHandler_OnReceiveResponse_WithOkHandler(t *testing.T) { ctx := context.Background() w := httptest.NewRecorder() - h := NewEventHandlerWithContext(ctx, w, nil, false) + h := NewEventHandlerWithContext(ctx, w, nil, true) // Test with nil message (should log error but not panic) h.OnReceiveResponse(nil) @@ -74,6 +83,21 @@ func TestEventHandler_OnReceiveResponse_WithOkHandler(t *testing.T) { assert.True(t, responseBody == "\"e30=\"" || responseBody == "{}" || len(responseBody) > 0) } +func TestEventHandler_OnReceiveResponse_WithoutOkHandlerContext(t *testing.T) { + ctx := context.Background() + w := httptest.NewRecorder() + h := NewEventHandlerWithContext(ctx, w, nil, false) + + // Test with valid message when useOkHandler is false + msg := &empty.Empty{} + h.OnReceiveResponse(msg) + + // When useOkHandler is false, it should use the fallback behavior + // The response should be written directly to the writer + responseBody := w.Body.String() + assert.Contains(t, responseBody, "{}") +} + func TestEventHandler_OnReceiveResponse_MarshalError(t *testing.T) { // Test marshal error with bad writer badWriter := &badWriter{} @@ -101,6 +125,48 @@ func TestEventHandler_OnReceiveTrailers(t *testing.T) { assert.Equal(t, "internal error", h.Status.Message()) } +func TestEventHandler_OnResolveMethod(t *testing.T) { + h := NewEventHandler(io.Discard, nil) + + // Test with nil method descriptor - should not panic + h.OnResolveMethod(nil) + + // Since this is a no-op function, we just verify it doesn't panic + // and can be called multiple times + h.OnResolveMethod(nil) + h.OnResolveMethod(nil) +} + +func TestEventHandler_OnSendHeaders(t *testing.T) { + h := NewEventHandler(io.Discard, nil) + + // Test with nil metadata - should not panic + h.OnSendHeaders(nil) + + // Test with valid metadata + md := metadata.New(map[string]string{"request-id": "123", "auth": "token"}) + h.OnSendHeaders(md) + + // Test with empty metadata + emptyMd := metadata.New(map[string]string{}) + h.OnSendHeaders(emptyMd) +} + +func TestEventHandler_OnReceiveHeaders(t *testing.T) { + h := NewEventHandler(io.Discard, nil) + + // Test with nil metadata - should not panic + h.OnReceiveHeaders(nil) + + // Test with valid metadata + md := metadata.New(map[string]string{"response-id": "456", "content-type": "application/json"}) + h.OnReceiveHeaders(md) + + // Test with empty metadata + emptyMd := metadata.New(map[string]string{}) + h.OnReceiveHeaders(emptyMd) +} + func TestEventHandler_CompleteWorkflow(t *testing.T) { var buf bytes.Buffer h := NewEventHandler(&buf, nil)