Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: upgrade the otelconnect dependency and fix breaking change #205

Merged
merged 2 commits into from
Jan 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -1164,21 +1164,34 @@ func (x *actorSystem) enableClustering(ctx context.Context) {
func (x *actorSystem) enableRemoting(ctx context.Context) {
// add some logging information
x.logger.Info("enabling remoting...")
// create a function to handle the observability
interceptor := func(tp trace.TracerProvider, mp otelmetric.MeterProvider) connect.Interceptor {
return otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(tp),
otelconnect.WithMeterProvider(mp),

// define a variable to hold the interceptor
var interceptor *otelconnect.Interceptor
var err error
// only set the observability when metric or trace is enabled
if x.metricEnabled.Load() || x.traceEnabled.Load() {
// create an interceptor and panic
interceptor, err = otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(x.telemetry.TracerProvider),
otelconnect.WithMeterProvider(x.telemetry.MeterProvider),
)
// panic when there is an error
if err != nil {
x.logger.Panic(errors.Wrap(err, "failed to initialize observability feature"))
}
}

// create the handler option
var opts []connect.HandlerOption
// set handler options when interceptor is defined
if interceptor != nil {
opts = append(opts, connect.WithInterceptors(interceptor))
}

// create a http service mux
mux := http.NewServeMux()
// create the resource and handler
path, handler := internalpbconnect.NewRemotingServiceHandler(
x,
connect.WithInterceptors(interceptor(x.telemetry.TracerProvider, x.telemetry.MeterProvider)),
)
path, handler := internalpbconnect.NewRemotingServiceHandler(x, opts...)
mux.Handle(path, handler)
// create the address
serverAddr := fmt.Sprintf("%s:%d", x.remotingHost, x.remotingPort)
Expand Down
45 changes: 40 additions & 5 deletions actors/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,18 @@ func RemoteTell(ctx context.Context, to *addresspb.Address, message proto.Messag
return ErrInvalidRemoteMessage(err)
}

// create an interceptor
interceptor, err := otelconnect.NewInterceptor()
// handle the error
if err != nil {
return err
}

// create an instance of remote client service
remoteClient := internalpbconnect.NewRemotingServiceClient(
http.Client(),
http.URL(to.GetHost(), int(to.GetPort())),
connect.WithInterceptors(otelconnect.NewInterceptor()),
connect.WithInterceptors(interceptor),
connect.WithGRPC(),
)
// prepare the rpcRequest to send
Expand All @@ -225,11 +232,18 @@ func RemoteAsk(ctx context.Context, to *addresspb.Address, message proto.Message
return nil, ErrInvalidRemoteMessage(err)
}

// create an interceptor
interceptor, err := otelconnect.NewInterceptor()
// handle the error
if err != nil {
return nil, err
}

// create an instance of remote client service
remoteClient := internalpbconnect.NewRemotingServiceClient(
http.Client(),
http.URL(to.GetHost(), int(to.GetPort())),
connect.WithInterceptors(otelconnect.NewInterceptor()),
connect.WithInterceptors(interceptor),
connect.WithGRPC(),
)
// prepare the rpcRequest to send
Expand All @@ -252,11 +266,18 @@ func RemoteAsk(ctx context.Context, to *addresspb.Address, message proto.Message

// RemoteLookup look for an actor address on a remote node.
func RemoteLookup(ctx context.Context, host string, port int, name string) (addr *addresspb.Address, err error) {
// create an interceptor
interceptor, err := otelconnect.NewInterceptor()
// handle the error
if err != nil {
return nil, err
}

// create an instance of remote client service
remoteClient := internalpbconnect.NewRemotingServiceClient(
http.Client(),
http.URL(host, port),
connect.WithInterceptors(otelconnect.NewInterceptor()),
connect.WithInterceptors(interceptor),
connect.WithGRPC(),
)

Expand All @@ -283,6 +304,13 @@ func RemoteLookup(ctx context.Context, host string, port int, name string) (addr

// RemoteBatchTell sends bulk asynchronous messages to an actor
func RemoteBatchTell(ctx context.Context, to *addresspb.Address, messages ...proto.Message) error {
// create an interceptor
interceptor, err := otelconnect.NewInterceptor()
// handle the error
if err != nil {
return err
}

// define a variable holding the remote messages
var remoteMessages []*anypb.Any
// iterate the list of messages and pack them
Expand All @@ -301,7 +329,7 @@ func RemoteBatchTell(ctx context.Context, to *addresspb.Address, messages ...pro
remoteClient := internalpbconnect.NewRemotingServiceClient(
http.Client(),
http.URL(to.GetHost(), int(to.GetPort())),
connect.WithInterceptors(otelconnect.NewInterceptor()),
connect.WithInterceptors(interceptor),
connect.WithGRPC(),
)

Expand All @@ -320,6 +348,13 @@ func RemoteBatchTell(ctx context.Context, to *addresspb.Address, messages ...pro

// RemoteBatchAsk sends bulk messages to an actor with responses expected
func RemoteBatchAsk(ctx context.Context, to *addresspb.Address, messages ...proto.Message) (responses []*anypb.Any, err error) {
// create an interceptor
interceptor, err := otelconnect.NewInterceptor()
// handle the error
if err != nil {
return nil, err
}

// define a variable holding the remote messages
var remoteMessages []*anypb.Any
// iterate the list of messages and pack them
Expand All @@ -338,7 +373,7 @@ func RemoteBatchAsk(ctx context.Context, to *addresspb.Address, messages ...prot
remoteClient := internalpbconnect.NewRemotingServiceClient(
http.Client(),
http.URL(to.GetHost(), int(to.GetPort())),
connect.WithInterceptors(otelconnect.NewInterceptor()),
connect.WithInterceptors(interceptor),
connect.WithGRPC(),
)

Expand Down
87 changes: 69 additions & 18 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,12 +791,18 @@ func (p *pid) BatchAsk(ctx context.Context, to PID, messages ...proto.Message) (
// RemoteLookup look for an actor address on a remote node. If the actorSystem is nil then the lookup will be done
// using the same actor system as the PID actor system
func (p *pid) RemoteLookup(ctx context.Context, host string, port int, name string) (addr *addresspb.Address, err error) {
// get the gRPC client connection options
clientConnectionOptions, err := p.gRPCClientConnectionOptions()
// handle the error
if err != nil {
return nil, err
}

// create an instance of remote client service
remoteClient := internalpbconnect.NewRemotingServiceClient(
p.httpClient,
http.URL(host, port),
connect.WithInterceptors(p.interceptor()),
connect.WithGRPC(),
clientConnectionOptions...,
)

// prepare the request to send
Expand Down Expand Up @@ -827,12 +833,18 @@ func (p *pid) RemoteTell(ctx context.Context, to *addresspb.Address, message pro
return err
}

// get the gRPC client connection options
clientConnectionOptions, err := p.gRPCClientConnectionOptions()
// handle the error
if err != nil {
return err
}

// create an instance of remote client service
remoteClient := internalpbconnect.NewRemotingServiceClient(
p.httpClient,
http.URL(to.GetHost(), int(to.GetPort())),
connect.WithInterceptors(p.interceptor()),
connect.WithGRPC(),
clientConnectionOptions...,
)

// construct the from address
Expand Down Expand Up @@ -874,12 +886,18 @@ func (p *pid) RemoteAsk(ctx context.Context, to *addresspb.Address, message prot
return nil, err
}

// get the gRPC client connection options
clientConnectionOptions, err := p.gRPCClientConnectionOptions()
// handle the error
if err != nil {
return nil, err
}

// create an instance of remote client service
remoteClient := internalpbconnect.NewRemotingServiceClient(
p.httpClient,
http.URL(to.GetHost(), int(to.GetPort())),
connect.WithInterceptors(p.interceptor()),
connect.WithGRPC(),
clientConnectionOptions...,
)

// construct the from address
Expand Down Expand Up @@ -939,12 +957,18 @@ func (p *pid) RemoteBatchTell(ctx context.Context, to *addresspb.Address, messag
Id: p.ActorPath().ID().String(),
}

// get the gRPC client connection options
clientConnectionOptions, err := p.gRPCClientConnectionOptions()
// handle the error
if err != nil {
return err
}

// create an instance of remote client service
remoteClient := internalpbconnect.NewRemotingServiceClient(
http.Client(),
http.URL(to.GetHost(), int(to.GetPort())),
connect.WithInterceptors(otelconnect.NewInterceptor()),
connect.WithGRPC(),
clientConnectionOptions...,
)

// prepare the remote batch tell request
Expand Down Expand Up @@ -987,12 +1011,18 @@ func (p *pid) RemoteBatchAsk(ctx context.Context, to *addresspb.Address, message
Id: p.ActorPath().ID().String(),
}

// get the gRPC client connection options
clientConnectionOptions, err := p.gRPCClientConnectionOptions()
// handle the error
if err != nil {
return nil, err
}

// create an instance of remote client service
remoteClient := internalpbconnect.NewRemotingServiceClient(
http.Client(),
http.URL(to.GetHost(), int(to.GetPort())),
connect.WithInterceptors(otelconnect.NewInterceptor()),
connect.WithGRPC(),
clientConnectionOptions...,
)

// prepare the remote batch tell request
Expand Down Expand Up @@ -1376,14 +1406,6 @@ func (p *pid) passivationListener() {
p.logger.Infof("Actor=%s successfully passivated", p.ActorPath().String())
}

// interceptor create an interceptor based upon the telemetry provided
func (p *pid) interceptor() connect.Interceptor {
return otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(p.telemetry.TracerProvider),
otelconnect.WithMeterProvider(p.telemetry.MeterProvider),
)
}

// setBehavior is a utility function that helps set the actor behavior
func (p *pid) setBehavior(behavior Behavior) {
p.semaphore.Lock()
Expand Down Expand Up @@ -1559,3 +1581,32 @@ func (p *pid) registerMetrics() error {

return err
}

// gRPCClientConnectionOptions returns the gRPC client connections options
func (p *pid) gRPCClientConnectionOptions() ([]connect.ClientOption, error) {
// define a variable to hold the interceptor
var interceptor *otelconnect.Interceptor
var err error
// only set the observability when metric or trace is enabled
if p.metricEnabled.Load() || p.traceEnabled.Load() {
// create an interceptor and panic
interceptor, err = otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(p.telemetry.TracerProvider),
otelconnect.WithMeterProvider(p.telemetry.MeterProvider))
// panic when there is an error
if err != nil {
return nil, errors.Wrap(err, "failed to initialize observability feature")
}
}

// create the handler option
clientConnectionOptions := []connect.ClientOption{
connect.WithGRPC(),
}
// add the grpc connection
// set handler options when interceptor is defined
if interceptor != nil {
clientConnectionOptions = append(clientConnectionOptions, connect.WithInterceptors(interceptor))
}
return clientConnectionOptions, err
}
7 changes: 6 additions & 1 deletion examples/actor-cluster/dnssd/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,14 @@ func (s *AccountService) Stop(ctx context.Context) error {
func (s *AccountService) listenAndServe() {
// create a http service mux
mux := http.NewServeMux()
// create an interceptor
interceptor, err := otelconnect.NewInterceptor()
if err != nil {
s.logger.Panic(err)
}
// create the resource and handler
path, handler := samplepbconnect.NewAccountServiceHandler(s,
connect.WithInterceptors(otelconnect.NewInterceptor()))
connect.WithInterceptors(interceptor))
mux.Handle(path, handler)
// create the address
serverAddr := fmt.Sprintf(":%d", s.port)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.21

require (
connectrpc.com/connect v1.14.0
connectrpc.com/otelconnect v0.6.0
connectrpc.com/otelconnect v0.7.0
github.com/buraksezer/olric v0.5.4
github.com/caarlos0/env/v10 v10.0.0
github.com/cespare/xxhash/v2 v2.2.0
Expand Down Expand Up @@ -53,7 +53,7 @@ require (
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.20.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
connectrpc.com/connect v1.14.0 h1:PDS+J7uoz5Oui2VEOMcfz6Qft7opQM9hPiKvtGC01pA=
connectrpc.com/connect v1.14.0/go.mod h1:uoAq5bmhhn43TwhaKdGKN/bZcGtzPW1v+ngDTn5u+8s=
connectrpc.com/otelconnect v0.6.0 h1:VJAdQL9+sgdUw9+7+J+jq8pQo/h1S7tSFv2+vDcR7bU=
connectrpc.com/otelconnect v0.6.0/go.mod h1:jdcs0uiwXQVmSMgTJ2dAaWR5VbpNd7QKNkuoH7n86RA=
connectrpc.com/otelconnect v0.7.0 h1:ZH55ZZtcJOTKWWLy3qmL4Pam4RzRWBJFOqTPyAqCXkY=
connectrpc.com/otelconnect v0.7.0/go.mod h1:Bt2ivBymHZHqxvo4HkJ0EwHuUzQN6k2l0oH+mp/8nwc=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/RoaringBitmap/roaring v1.2.1/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA=
github.com/RoaringBitmap/roaring v1.5.0 h1:V0VCSiHjroItEYCM3guC8T83ehi5QMt3oM9EefTTOms=
Expand Down Expand Up @@ -65,8 +65,9 @@ github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs=
Expand Down
Loading