From e60f4eef27af1eee5a6e01f47aeb13ddba4ba629 Mon Sep 17 00:00:00 2001 From: Parship Chowdhury Date: Sun, 27 Jul 2025 14:24:19 +0000 Subject: [PATCH 1/4] Fix Zipkin receiver keep-alive flag not being applied to HTTP server Signed-off-by: Parship Chowdhury --- cmd/collector/app/handler/zipkin_receiver.go | 69 ++++++++++++++++++- .../app/handler/zipkin_receiver_test.go | 53 ++++++++++++++ 2 files changed, 120 insertions(+), 2 deletions(-) diff --git a/cmd/collector/app/handler/zipkin_receiver.go b/cmd/collector/app/handler/zipkin_receiver.go index c18dc9fc5c2..c4a928ce77e 100644 --- a/cmd/collector/app/handler/zipkin_receiver.go +++ b/cmd/collector/app/handler/zipkin_receiver.go @@ -5,7 +5,11 @@ package handler import ( "context" + "errors" "fmt" + "net/http" + "reflect" + "unsafe" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver" "go.opentelemetry.io/collector/component" @@ -25,6 +29,61 @@ var ( zipkinID = component.NewID(zipkinComponentType) ) +// zipkinReceiverWrapper wraps the Opentelemetry zipkin receiver to apply keep alive settings +type zipkinReceiverWrapper struct { + receiver.Traces + keepAlive bool + logger *zap.Logger +} + +// start wraps the original start method to apply keep-alive settings +func (w *zipkinReceiverWrapper) Start(ctx context.Context, host component.Host) error { + err := w.Traces.Start(ctx, host) + if err != nil { + return err + } + + if !w.keepAlive { + if err := w.disableKeepAlive(); err != nil { + w.logger.Warn("Failed to disable keep-alive on Zipkin receiver", zap.Error(err)) + } else { + w.logger.Info("Disabled keep-alive on Zipkin receiver") + } + } + + return nil +} + +// disableKeepAlive use reflection and unsafe operations to access the internal HTTP server and disable keep alive +func (w *zipkinReceiverWrapper) disableKeepAlive() error { + receiverValue := reflect.ValueOf(w.Traces) + if receiverValue.Kind() == reflect.Ptr { + receiverValue = receiverValue.Elem() + } + + serverField := receiverValue.FieldByName("server") + if !serverField.IsValid() { + return errors.New("server field not found in zipkin receiver") + } + + if serverField.Kind() != reflect.Ptr || serverField.Type() != reflect.TypeOf((*http.Server)(nil)) { + return errors.New("server field is not of type *http.Server") + } + + if serverField.IsNil() { + return errors.New("server field is nil") + } + + serverPtr := (*http.Server)(unsafe.Pointer(serverField.Pointer())) + if serverPtr == nil { + return errors.New("server is nil") + } + serverPtr.SetKeepAlivesEnabled(false) + w.logger.Debug("Successfully disabled keep-alive on Zipkin HTTP server") + + return nil +} + // StartZipkinReceiver starts Zipkin receiver from OTEL Collector. func StartZipkinReceiver( options *flags.CollectorOptions, @@ -90,8 +149,14 @@ func startZipkinReceiver( if err != nil { return nil, fmt.Errorf("could not create Zipkin receiver: %w", err) } - if err := rcvr.Start(context.Background(), &otelHost{logger: logger}); err != nil { + wrappedReceiver := &zipkinReceiverWrapper{ // wrap the receiver to apply keep-alive settings + Traces: rcvr, + keepAlive: options.Zipkin.KeepAlive, + logger: logger, + } + + if err := wrappedReceiver.Start(context.Background(), &otelHost{logger: logger}); err != nil { return nil, fmt.Errorf("could not start Zipkin receiver: %w", err) } - return rcvr, nil + return wrappedReceiver, nil } diff --git a/cmd/collector/app/handler/zipkin_receiver_test.go b/cmd/collector/app/handler/zipkin_receiver_test.go index ec0452e704c..f2ef4827b7d 100644 --- a/cmd/collector/app/handler/zipkin_receiver_test.go +++ b/cmd/collector/app/handler/zipkin_receiver_test.go @@ -11,6 +11,7 @@ import ( "io" "net/http" "os" + "reflect" "testing" gogojsonpb "github.com/gogo/protobuf/jsonpb" @@ -160,3 +161,55 @@ func TestStartZipkinReceiver_Error(t *testing.T) { _, err = startZipkinReceiver(opts, logger, spanProcessor, tm, f, consumer.NewTraces, createTracesReceiver) assert.ErrorContains(t, err, "could not create Zipkin receiver") } + +func TestZipkinReceiverKeepAlive(t *testing.T) { + spanProcessor := &mockSpanProcessor{} + logger, _ := testutils.NewLogger() + tm := &tenancy.Manager{} + + testCases := []struct { + name string + keepAlive bool + }{ + { + name: "KeepAlive enabled", + keepAlive: true, + }, + { + name: "KeepAlive disabled", + keepAlive: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + opts := &flags.CollectorOptions{} + opts.Zipkin.Endpoint = ":0" + opts.Zipkin.KeepAlive = tc.keepAlive + + rec, err := StartZipkinReceiver(opts, logger, spanProcessor, tm) + require.NoError(t, err) + defer func() { + require.NoError(t, rec.Shutdown(context.Background())) + }() + + wrapper, ok := rec.(*zipkinReceiverWrapper) + require.True(t, ok, "receiver should be wrapped with zipkinReceiverWrapper") + assert.Equal(t, tc.keepAlive, wrapper.keepAlive, "keepAlive setting should match") + + // Try to access the internal server to verify keep-alive setting + // this is a test only verification using reflection + receiverValue := reflect.ValueOf(wrapper.Traces) + if receiverValue.Kind() == reflect.Ptr { + receiverValue = receiverValue.Elem() + } + + serverField := receiverValue.FieldByName("server") + if serverField.IsValid() && !serverField.IsNil() { + // we can not directly check the keep alive setting on the server + // because it's an internal state but we can verify if our wrapper was applied correctly or not + t.Logf("Server field found and is not nil for keepAlive=%v", tc.keepAlive) + } + }) + } +} From 8e8fafee91f4c49197495c354db1e75f66393a64 Mon Sep 17 00:00:00 2001 From: Parship Chowdhury Date: Sun, 27 Jul 2025 15:11:16 +0000 Subject: [PATCH 2/4] increase code coverage Signed-off-by: Parship Chowdhury --- cmd/collector/app/handler/zipkin_receiver.go | 8 ++ .../app/handler/zipkin_receiver_test.go | 79 +++++++++++++++++++ 2 files changed, 87 insertions(+) diff --git a/cmd/collector/app/handler/zipkin_receiver.go b/cmd/collector/app/handler/zipkin_receiver.go index c4a928ce77e..6764382f187 100644 --- a/cmd/collector/app/handler/zipkin_receiver.go +++ b/cmd/collector/app/handler/zipkin_receiver.go @@ -56,11 +56,19 @@ func (w *zipkinReceiverWrapper) Start(ctx context.Context, host component.Host) // disableKeepAlive use reflection and unsafe operations to access the internal HTTP server and disable keep alive func (w *zipkinReceiverWrapper) disableKeepAlive() error { + if w.Traces == nil { + return errors.New("receiver is nil") + } + receiverValue := reflect.ValueOf(w.Traces) if receiverValue.Kind() == reflect.Ptr { receiverValue = receiverValue.Elem() } + if receiverValue.Kind() != reflect.Struct { + return errors.New("receiver is not a struct") + } + serverField := receiverValue.FieldByName("server") if !serverField.IsValid() { return errors.New("server field not found in zipkin receiver") diff --git a/cmd/collector/app/handler/zipkin_receiver_test.go b/cmd/collector/app/handler/zipkin_receiver_test.go index f2ef4827b7d..bdc65183559 100644 --- a/cmd/collector/app/handler/zipkin_receiver_test.go +++ b/cmd/collector/app/handler/zipkin_receiver_test.go @@ -213,3 +213,82 @@ func TestZipkinReceiverKeepAlive(t *testing.T) { }) } } + +func TestZipkinReceiverWrapper_DisableKeepAlive_ErrorCases(t *testing.T) { + logger, _ := testutils.NewLogger() + + tests := []struct { + name string + receiver receiver.Traces + expectedErrMsg string + }{ + { + name: "nil receiver", + receiver: nil, + expectedErrMsg: "receiver is nil", + }, + { + name: "receiver without server field", + receiver: &mockReceiverWithoutServerField{}, + expectedErrMsg: "server field not found in zipkin receiver", + }, + { + name: "receiver with wrong server field type", + receiver: &mockReceiverWithWrongServerType{server: "not a server"}, + expectedErrMsg: "server field is not of type *http.Server", + }, + { + name: "receiver with nil server field", + receiver: &mockReceiverWithNilServer{server: nil}, + expectedErrMsg: "server field is nil", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + wrapper := &zipkinReceiverWrapper{ + Traces: tt.receiver, + keepAlive: false, + logger: logger, + } + + err := wrapper.disableKeepAlive() + require.Error(t, err) + assert.Contains(t, err.Error(), tt.expectedErrMsg) + }) + } +} + +type mockReceiverWithoutServerField struct{} + +func (*mockReceiverWithoutServerField) Start(_ context.Context, _ component.Host) error { + return nil +} + +func (*mockReceiverWithoutServerField) Shutdown(_ context.Context) error { + return nil +} + +type mockReceiverWithWrongServerType struct { + server string +} + +func (*mockReceiverWithWrongServerType) Start(_ context.Context, _ component.Host) error { + return nil +} + +func (*mockReceiverWithWrongServerType) Shutdown(_ context.Context) error { + return nil +} + +type mockReceiverWithNilServer struct { + server *http.Server +} + +func (*mockReceiverWithNilServer) Start(_ context.Context, _ component.Host) error { + return nil +} + +func (*mockReceiverWithNilServer) Shutdown(_ context.Context) error { + return nil +} From 0896565c78d3525a08856ae0780594534116be32 Mon Sep 17 00:00:00 2001 From: Parship Chowdhury Date: Sun, 27 Jul 2025 15:28:08 +0000 Subject: [PATCH 3/4] increase code coverage 2 Signed-off-by: Parship Chowdhury --- .../app/handler/zipkin_receiver_test.go | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/cmd/collector/app/handler/zipkin_receiver_test.go b/cmd/collector/app/handler/zipkin_receiver_test.go index bdc65183559..6e5cdb52549 100644 --- a/cmd/collector/app/handler/zipkin_receiver_test.go +++ b/cmd/collector/app/handler/zipkin_receiver_test.go @@ -242,6 +242,11 @@ func TestZipkinReceiverWrapper_DisableKeepAlive_ErrorCases(t *testing.T) { receiver: &mockReceiverWithNilServer{server: nil}, expectedErrMsg: "server field is nil", }, + { + name: "non-struct receiver (string type)", + receiver: func() receiver.Traces { s := mockStringReceiver("test"); return &s }(), + expectedErrMsg: "receiver is not a struct", + }, } for _, tt := range tests { @@ -292,3 +297,57 @@ func (*mockReceiverWithNilServer) Start(_ context.Context, _ component.Host) err func (*mockReceiverWithNilServer) Shutdown(_ context.Context) error { return nil } + +type mockStringReceiver string + +func (*mockStringReceiver) Start(_ context.Context, _ component.Host) error { + return nil +} + +func (*mockStringReceiver) Shutdown(_ context.Context) error { + return nil +} + +func TestZipkinReceiverWrapper_DisableKeepAlive_SuccessPath(t *testing.T) { + logger, _ := testutils.NewLogger() + + mockReceiver := &mockReceiverWithValidServer{ + server: &http.Server{}, + } + + wrapper := &zipkinReceiverWrapper{ + Traces: mockReceiver, + keepAlive: false, + logger: logger, + } + + err := wrapper.disableKeepAlive() + require.NoError(t, err) +} + +type mockReceiverWithValidServer struct { + server *http.Server +} + +func (*mockReceiverWithValidServer) Start(_ context.Context, _ component.Host) error { + return nil +} + +func (*mockReceiverWithValidServer) Shutdown(_ context.Context) error { + return nil +} + +func TestZipkinReceiverWrapper_Start_WithDisableKeepAliveError(t *testing.T) { + logger, _ := testutils.NewLogger() + + mockReceiver := &mockReceiverWithoutServerField{} + + wrapper := &zipkinReceiverWrapper{ + Traces: mockReceiver, + keepAlive: false, + logger: logger, + } + + err := wrapper.Start(context.Background(), &otelHost{logger: logger}) + require.NoError(t, err) +} From ffde2eb499f096a393ee475feaa4fe9f1fdb18b8 Mon Sep 17 00:00:00 2001 From: Parship Chowdhury Date: Sun, 27 Jul 2025 20:09:50 +0000 Subject: [PATCH 4/4] removed if serverPtr == nil check as it was unreachable Signed-off-by: Parship Chowdhury --- cmd/collector/app/handler/zipkin_receiver.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/cmd/collector/app/handler/zipkin_receiver.go b/cmd/collector/app/handler/zipkin_receiver.go index 6764382f187..05d137a47f6 100644 --- a/cmd/collector/app/handler/zipkin_receiver.go +++ b/cmd/collector/app/handler/zipkin_receiver.go @@ -83,9 +83,6 @@ func (w *zipkinReceiverWrapper) disableKeepAlive() error { } serverPtr := (*http.Server)(unsafe.Pointer(serverField.Pointer())) - if serverPtr == nil { - return errors.New("server is nil") - } serverPtr.SetKeepAlivesEnabled(false) w.logger.Debug("Successfully disabled keep-alive on Zipkin HTTP server")