Skip to content

Commit

Permalink
refactor: upgrade the otelconnect dependency and fix breaking change (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Jan 13, 2024
1 parent 3acc3d3 commit 78d9ceb
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 38 deletions.
31 changes: 22 additions & 9 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -1164,21 +1164,34 @@ func (x *actorSystem) enableClustering(ctx context.Context) {
func (x *actorSystem) enableRemoting(ctx context.Context) {
// add some logging information
x.logger.Info("enabling remoting...")
// create a function to handle the observability
interceptor := func(tp trace.TracerProvider, mp otelmetric.MeterProvider) connect.Interceptor {
return otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(tp),
otelconnect.WithMeterProvider(mp),

// define a variable to hold the interceptor
var interceptor *otelconnect.Interceptor
var err error
// only set the observability when metric or trace is enabled
if x.metricEnabled.Load() || x.traceEnabled.Load() {
// create an interceptor and panic
interceptor, err = otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(x.telemetry.TracerProvider),
otelconnect.WithMeterProvider(x.telemetry.MeterProvider),
)
// panic when there is an error
if err != nil {
x.logger.Panic(errors.Wrap(err, "failed to initialize observability feature"))
}
}

// create the handler option
var opts []connect.HandlerOption
// set handler options when interceptor is defined
if interceptor != nil {
opts = append(opts, connect.WithInterceptors(interceptor))
}

// create a http service mux
mux := http.NewServeMux()
// create the resource and handler
path, handler := internalpbconnect.NewRemotingServiceHandler(
x,
connect.WithInterceptors(interceptor(x.telemetry.TracerProvider, x.telemetry.MeterProvider)),
)
path, handler := internalpbconnect.NewRemotingServiceHandler(x, opts...)
mux.Handle(path, handler)
// create the address
serverAddr := fmt.Sprintf("%s:%d", x.remotingHost, x.remotingPort)
Expand Down
45 changes: 40 additions & 5 deletions actors/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,18 @@ func RemoteTell(ctx context.Context, to *addresspb.Address, message proto.Messag
return ErrInvalidRemoteMessage(err)
}

// create an interceptor
interceptor, err := otelconnect.NewInterceptor()
// handle the error
if err != nil {
return err
}

// create an instance of remote client service
remoteClient := internalpbconnect.NewRemotingServiceClient(
http.Client(),
http.URL(to.GetHost(), int(to.GetPort())),
connect.WithInterceptors(otelconnect.NewInterceptor()),
connect.WithInterceptors(interceptor),
connect.WithGRPC(),
)
// prepare the rpcRequest to send
Expand All @@ -225,11 +232,18 @@ func RemoteAsk(ctx context.Context, to *addresspb.Address, message proto.Message
return nil, ErrInvalidRemoteMessage(err)
}

// create an interceptor
interceptor, err := otelconnect.NewInterceptor()
// handle the error
if err != nil {
return nil, err
}

// create an instance of remote client service
remoteClient := internalpbconnect.NewRemotingServiceClient(
http.Client(),
http.URL(to.GetHost(), int(to.GetPort())),
connect.WithInterceptors(otelconnect.NewInterceptor()),
connect.WithInterceptors(interceptor),
connect.WithGRPC(),
)
// prepare the rpcRequest to send
Expand All @@ -252,11 +266,18 @@ func RemoteAsk(ctx context.Context, to *addresspb.Address, message proto.Message

// RemoteLookup look for an actor address on a remote node.
func RemoteLookup(ctx context.Context, host string, port int, name string) (addr *addresspb.Address, err error) {
// create an interceptor
interceptor, err := otelconnect.NewInterceptor()
// handle the error
if err != nil {
return nil, err
}

// create an instance of remote client service
remoteClient := internalpbconnect.NewRemotingServiceClient(
http.Client(),
http.URL(host, port),
connect.WithInterceptors(otelconnect.NewInterceptor()),
connect.WithInterceptors(interceptor),
connect.WithGRPC(),
)

Expand All @@ -283,6 +304,13 @@ func RemoteLookup(ctx context.Context, host string, port int, name string) (addr

// RemoteBatchTell sends bulk asynchronous messages to an actor
func RemoteBatchTell(ctx context.Context, to *addresspb.Address, messages ...proto.Message) error {
// create an interceptor
interceptor, err := otelconnect.NewInterceptor()
// handle the error
if err != nil {
return err
}

// define a variable holding the remote messages
var remoteMessages []*anypb.Any
// iterate the list of messages and pack them
Expand All @@ -301,7 +329,7 @@ func RemoteBatchTell(ctx context.Context, to *addresspb.Address, messages ...pro
remoteClient := internalpbconnect.NewRemotingServiceClient(
http.Client(),
http.URL(to.GetHost(), int(to.GetPort())),
connect.WithInterceptors(otelconnect.NewInterceptor()),
connect.WithInterceptors(interceptor),
connect.WithGRPC(),
)

Expand All @@ -320,6 +348,13 @@ func RemoteBatchTell(ctx context.Context, to *addresspb.Address, messages ...pro

// RemoteBatchAsk sends bulk messages to an actor with responses expected
func RemoteBatchAsk(ctx context.Context, to *addresspb.Address, messages ...proto.Message) (responses []*anypb.Any, err error) {
// create an interceptor
interceptor, err := otelconnect.NewInterceptor()
// handle the error
if err != nil {
return nil, err
}

// define a variable holding the remote messages
var remoteMessages []*anypb.Any
// iterate the list of messages and pack them
Expand All @@ -338,7 +373,7 @@ func RemoteBatchAsk(ctx context.Context, to *addresspb.Address, messages ...prot
remoteClient := internalpbconnect.NewRemotingServiceClient(
http.Client(),
http.URL(to.GetHost(), int(to.GetPort())),
connect.WithInterceptors(otelconnect.NewInterceptor()),
connect.WithInterceptors(interceptor),
connect.WithGRPC(),
)

Expand Down
87 changes: 69 additions & 18 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,12 +791,18 @@ func (p *pid) BatchAsk(ctx context.Context, to PID, messages ...proto.Message) (
// RemoteLookup look for an actor address on a remote node. If the actorSystem is nil then the lookup will be done
// using the same actor system as the PID actor system
func (p *pid) RemoteLookup(ctx context.Context, host string, port int, name string) (addr *addresspb.Address, err error) {
// get the gRPC client connection options
clientConnectionOptions, err := p.gRPCClientConnectionOptions()
// handle the error
if err != nil {
return nil, err
}

// create an instance of remote client service
remoteClient := internalpbconnect.NewRemotingServiceClient(
p.httpClient,
http.URL(host, port),
connect.WithInterceptors(p.interceptor()),
connect.WithGRPC(),
clientConnectionOptions...,
)

// prepare the request to send
Expand Down Expand Up @@ -827,12 +833,18 @@ func (p *pid) RemoteTell(ctx context.Context, to *addresspb.Address, message pro
return err
}

// get the gRPC client connection options
clientConnectionOptions, err := p.gRPCClientConnectionOptions()
// handle the error
if err != nil {
return err
}

// create an instance of remote client service
remoteClient := internalpbconnect.NewRemotingServiceClient(
p.httpClient,
http.URL(to.GetHost(), int(to.GetPort())),
connect.WithInterceptors(p.interceptor()),
connect.WithGRPC(),
clientConnectionOptions...,
)

// construct the from address
Expand Down Expand Up @@ -874,12 +886,18 @@ func (p *pid) RemoteAsk(ctx context.Context, to *addresspb.Address, message prot
return nil, err
}

// get the gRPC client connection options
clientConnectionOptions, err := p.gRPCClientConnectionOptions()
// handle the error
if err != nil {
return nil, err
}

// create an instance of remote client service
remoteClient := internalpbconnect.NewRemotingServiceClient(
p.httpClient,
http.URL(to.GetHost(), int(to.GetPort())),
connect.WithInterceptors(p.interceptor()),
connect.WithGRPC(),
clientConnectionOptions...,
)

// construct the from address
Expand Down Expand Up @@ -939,12 +957,18 @@ func (p *pid) RemoteBatchTell(ctx context.Context, to *addresspb.Address, messag
Id: p.ActorPath().ID().String(),
}

// get the gRPC client connection options
clientConnectionOptions, err := p.gRPCClientConnectionOptions()
// handle the error
if err != nil {
return err
}

// create an instance of remote client service
remoteClient := internalpbconnect.NewRemotingServiceClient(
http.Client(),
http.URL(to.GetHost(), int(to.GetPort())),
connect.WithInterceptors(otelconnect.NewInterceptor()),
connect.WithGRPC(),
clientConnectionOptions...,
)

// prepare the remote batch tell request
Expand Down Expand Up @@ -987,12 +1011,18 @@ func (p *pid) RemoteBatchAsk(ctx context.Context, to *addresspb.Address, message
Id: p.ActorPath().ID().String(),
}

// get the gRPC client connection options
clientConnectionOptions, err := p.gRPCClientConnectionOptions()
// handle the error
if err != nil {
return nil, err
}

// create an instance of remote client service
remoteClient := internalpbconnect.NewRemotingServiceClient(
http.Client(),
http.URL(to.GetHost(), int(to.GetPort())),
connect.WithInterceptors(otelconnect.NewInterceptor()),
connect.WithGRPC(),
clientConnectionOptions...,
)

// prepare the remote batch tell request
Expand Down Expand Up @@ -1376,14 +1406,6 @@ func (p *pid) passivationListener() {
p.logger.Infof("Actor=%s successfully passivated", p.ActorPath().String())
}

// interceptor create an interceptor based upon the telemetry provided
func (p *pid) interceptor() connect.Interceptor {
return otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(p.telemetry.TracerProvider),
otelconnect.WithMeterProvider(p.telemetry.MeterProvider),
)
}

// setBehavior is a utility function that helps set the actor behavior
func (p *pid) setBehavior(behavior Behavior) {
p.semaphore.Lock()
Expand Down Expand Up @@ -1559,3 +1581,32 @@ func (p *pid) registerMetrics() error {

return err
}

// gRPCClientConnectionOptions returns the gRPC client connections options
func (p *pid) gRPCClientConnectionOptions() ([]connect.ClientOption, error) {
// define a variable to hold the interceptor
var interceptor *otelconnect.Interceptor
var err error
// only set the observability when metric or trace is enabled
if p.metricEnabled.Load() || p.traceEnabled.Load() {
// create an interceptor and panic
interceptor, err = otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(p.telemetry.TracerProvider),
otelconnect.WithMeterProvider(p.telemetry.MeterProvider))
// panic when there is an error
if err != nil {
return nil, errors.Wrap(err, "failed to initialize observability feature")
}
}

// create the handler option
clientConnectionOptions := []connect.ClientOption{
connect.WithGRPC(),
}
// add the grpc connection
// set handler options when interceptor is defined
if interceptor != nil {
clientConnectionOptions = append(clientConnectionOptions, connect.WithInterceptors(interceptor))
}
return clientConnectionOptions, err
}
7 changes: 6 additions & 1 deletion examples/actor-cluster/dnssd/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,14 @@ func (s *AccountService) Stop(ctx context.Context) error {
func (s *AccountService) listenAndServe() {
// create a http service mux
mux := http.NewServeMux()
// create an interceptor
interceptor, err := otelconnect.NewInterceptor()
if err != nil {
s.logger.Panic(err)
}
// create the resource and handler
path, handler := samplepbconnect.NewAccountServiceHandler(s,
connect.WithInterceptors(otelconnect.NewInterceptor()))
connect.WithInterceptors(interceptor))
mux.Handle(path, handler)
// create the address
serverAddr := fmt.Sprintf(":%d", s.port)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.21

require (
connectrpc.com/connect v1.14.0
connectrpc.com/otelconnect v0.6.0
connectrpc.com/otelconnect v0.7.0
github.com/buraksezer/olric v0.5.4
github.com/caarlos0/env/v10 v10.0.0
github.com/cespare/xxhash/v2 v2.2.0
Expand Down Expand Up @@ -53,7 +53,7 @@ require (
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.20.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
connectrpc.com/connect v1.14.0 h1:PDS+J7uoz5Oui2VEOMcfz6Qft7opQM9hPiKvtGC01pA=
connectrpc.com/connect v1.14.0/go.mod h1:uoAq5bmhhn43TwhaKdGKN/bZcGtzPW1v+ngDTn5u+8s=
connectrpc.com/otelconnect v0.6.0 h1:VJAdQL9+sgdUw9+7+J+jq8pQo/h1S7tSFv2+vDcR7bU=
connectrpc.com/otelconnect v0.6.0/go.mod h1:jdcs0uiwXQVmSMgTJ2dAaWR5VbpNd7QKNkuoH7n86RA=
connectrpc.com/otelconnect v0.7.0 h1:ZH55ZZtcJOTKWWLy3qmL4Pam4RzRWBJFOqTPyAqCXkY=
connectrpc.com/otelconnect v0.7.0/go.mod h1:Bt2ivBymHZHqxvo4HkJ0EwHuUzQN6k2l0oH+mp/8nwc=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/RoaringBitmap/roaring v1.2.1/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA=
github.com/RoaringBitmap/roaring v1.5.0 h1:V0VCSiHjroItEYCM3guC8T83ehi5QMt3oM9EefTTOms=
Expand Down Expand Up @@ -65,8 +65,9 @@ github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs=
Expand Down

0 comments on commit 78d9ceb

Please sign in to comment.