Skip to content

Commit

Permalink
feat: add metric instrumentation (#196)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Dec 31, 2023
1 parent f862fff commit f51555f
Show file tree
Hide file tree
Showing 13 changed files with 526 additions and 25 deletions.
49 changes: 44 additions & 5 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ import (
internalpb "github.com/tochemey/goakt/internal/v1"
"github.com/tochemey/goakt/internal/v1/internalpbconnect"
"github.com/tochemey/goakt/log"
"github.com/tochemey/goakt/metric"
addresspb "github.com/tochemey/goakt/pb/address/v1"
eventspb "github.com/tochemey/goakt/pb/events/v1"
"github.com/tochemey/goakt/pkg/types"
"github.com/tochemey/goakt/telemetry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"go.uber.org/atomic"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
Expand Down Expand Up @@ -189,6 +190,8 @@ type actorSystem struct {
// specifies whether tracing is enabled
traceEnabled atomic.Bool
tracer trace.Tracer
// specifies whether metric is enabled
metricEnabled atomic.Bool
}

// enforce compilation error when all methods of the ActorSystem interface are not implemented
Expand Down Expand Up @@ -224,13 +227,14 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
eventsStream: eventstream.New(),
partitionHasher: hash.DefaultHasher(),
actorInitTimeout: DefaultInitTimeout,
tracer: trace.NewNoopTracerProvider().Tracer(name),
tracer: noop.NewTracerProvider().Tracer(name),
}
// set the atomic settings
system.hasStarted.Store(false)
system.remotingEnabled.Store(false)
system.clusterEnabled.Store(false)
system.traceEnabled.Store(false)
system.metricEnabled.Store(false)

// apply the various options
for _, opt := range opts {
Expand All @@ -239,11 +243,21 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {

// set the tracer when tracing is enabled
if system.traceEnabled.Load() {
system.tracer = otel.GetTracerProvider().Tracer(name)
system.tracer = system.telemetry.Tracer
}

// set the message scheduler
system.scheduler = newScheduler(system.logger, system.shutdownTimeout, withSchedulerCluster(system.cluster))

// set metric and return the registration error in case there is one
// re-register metric when metric is enabled
if system.metricEnabled.Load() {
// log the error but don't panic
if err := system.registerMetrics(); err != nil {
return nil, err
}
}

return system, nil
}

Expand Down Expand Up @@ -404,6 +418,11 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (PID,
opts = append(opts, withTracing())
}

// set the pid metric option
if x.metricEnabled.Load() {
opts = append(opts, withMetric())
}

// create an instance of the actor ref
pid, err := newPID(spanCtx,
actorPath,
Expand Down Expand Up @@ -1146,7 +1165,7 @@ func (x *actorSystem) enableRemoting(ctx context.Context) {
// add some logging information
x.logger.Info("enabling remoting...")
// create a function to handle the observability
interceptor := func(tp trace.TracerProvider, mp metric.MeterProvider) connect.Interceptor {
interceptor := func(tp trace.TracerProvider, mp otelmetric.MeterProvider) connect.Interceptor {
return otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(tp),
otelconnect.WithMeterProvider(mp),
Expand Down Expand Up @@ -1281,3 +1300,23 @@ func (x *actorSystem) housekeeper() {
// add some logging
x.logger.Info("Housekeeping has stopped...")
}

// registerMetrics register the PID metrics with OTel instrumentation.
func (x *actorSystem) registerMetrics() error {
// grab the OTel meter
meter := x.telemetry.Meter
// create an instance of the ActorMetrics
metrics, err := metric.NewActorSystemMetric(meter)
// handle the error
if err != nil {
return err
}

// register the metrics
_, err = meter.RegisterCallback(func(ctx context.Context, observer otelmetric.Observer) error {
observer.ObserveInt64(metrics.ActorsCount(), int64(x.NumActors()))
return nil
}, metrics.ActorsCount())

return err
}
49 changes: 48 additions & 1 deletion actors/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ import (
"github.com/tochemey/goakt/log"
addresspb "github.com/tochemey/goakt/pb/address/v1"
eventspb "github.com/tochemey/goakt/pb/events/v1"
"github.com/tochemey/goakt/telemetry"
testpb "github.com/tochemey/goakt/test/data/pb/v1"
testkit "github.com/tochemey/goakt/testkit/discovery"
"github.com/travisjeffery/go-dynaport"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -874,7 +877,7 @@ func TestActorSystem(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, consumer)

// create the blackhole actor
// create the black hole actor
actor := &BlackHole{}
actorRef, err := sys.Spawn(ctx, "BlackHole ", actor)
assert.NoError(t, err)
Expand Down Expand Up @@ -1023,4 +1026,48 @@ func TestActorSystem(t *testing.T) {
provider.AssertExpectations(t)
})
})
t.Run("With Metric enabled", func(t *testing.T) {
r := sdkmetric.NewManualReader()
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r))
// create an instance of telemetry
tel := telemetry.New(telemetry.WithMeterProvider(mp))

ctx := context.TODO()
sys, _ := NewActorSystem("testSys",
WithMetric(),
WithTelemetry(tel),
WithStash(5),
WithLogger(log.DiscardLogger))

// start the actor system
err := sys.Start(ctx)
assert.NoError(t, err)

actor := NewTester()
actorRef, err := sys.Spawn(ctx, "Test", actor)
assert.NoError(t, err)
assert.NotNil(t, actorRef)

// create a message to send to the test actor
message := new(testpb.TestSend)
// send the message to the actor
err = Tell(ctx, actorRef, message)
// perform some assertions
require.NoError(t, err)

// Should collect 4 metrics, 3 for the actor and 1 for the actor system
got := &metricdata.ResourceMetrics{}
err = r.Collect(ctx, got)
require.NoError(t, err)
assert.Len(t, got.ScopeMetrics, 1)
assert.Len(t, got.ScopeMetrics[0].Metrics, 4)

// stop the actor after some time
time.Sleep(time.Second)

t.Cleanup(func() {
err = sys.Stop(ctx)
assert.NoError(t, err)
})
})
}
7 changes: 7 additions & 0 deletions actors/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,10 @@ func WithTracing() Option {
system.traceEnabled.Store(true)
})
}

// WithMetric enables metrics
func WithMetric() Option {
return OptionFunc(func(system *actorSystem) {
system.metricEnabled.Store(true)
})
}
5 changes: 5 additions & 0 deletions actors/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ func TestOptions(t *testing.T) {
option: WithTracing(),
expected: actorSystem{traceEnabled: atomicTrue},
},
{
name: "WithMetric",
option: WithMetric(),
expected: actorSystem{metricEnabled: atomicTrue},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down
81 changes: 73 additions & 8 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,18 @@ import (
internalpb "github.com/tochemey/goakt/internal/v1"
"github.com/tochemey/goakt/internal/v1/internalpbconnect"
"github.com/tochemey/goakt/log"
"github.com/tochemey/goakt/metric"
addresspb "github.com/tochemey/goakt/pb/address/v1"
eventspb "github.com/tochemey/goakt/pb/events/v1"
messagespb "github.com/tochemey/goakt/pb/messages/v1"
"github.com/tochemey/goakt/pkg/http"
"github.com/tochemey/goakt/pkg/slices"
"github.com/tochemey/goakt/pkg/types"
"github.com/tochemey/goakt/telemetry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
Expand Down Expand Up @@ -148,7 +150,7 @@ type PID interface {
// unstash unstashes the oldest message in the stash and prepends to the mailbox
unstash() error
// toDeadletters add the given message to the deadletters queue
emitDeadletter(recvCtx ReceiveContext, err error)
emitDeadletter(receiveCtx ReceiveContext, err error)
// removeChild is a utility function to remove child actor
removeChild(pid PID)
}
Expand Down Expand Up @@ -238,6 +240,11 @@ type pid struct {
// define tracing
traceEnabled atomic.Bool
tracer trace.Tracer

// set the metric settings
restartCount *atomic.Int64
childrenCount *atomic.Int64
metricEnabled atomic.Bool
}

// enforce compilation error
Expand Down Expand Up @@ -265,7 +272,9 @@ func newPID(ctx context.Context, actorPath *Path, actor Actor, opts ...pidOption
stashBuffer: nil,
stashSemaphore: sync.Mutex{},
eventsStream: nil,
tracer: trace.NewNoopTracerProvider().Tracer("PID"),
tracer: noop.NewTracerProvider().Tracer("PID"),
restartCount: atomic.NewInt64(0),
childrenCount: atomic.NewInt64(0),
}

// set some of the defaults values
Expand All @@ -278,6 +287,7 @@ func newPID(ctx context.Context, actorPath *Path, actor Actor, opts ...pidOption
pid.replyTimeout.Store(DefaultReplyTimeout)
pid.initTimeout.Store(DefaultInitTimeout)
pid.traceEnabled.Store(false)
pid.metricEnabled.Store(false)

// set the custom options to override the default values
for _, opt := range opts {
Expand All @@ -300,7 +310,7 @@ func newPID(ctx context.Context, actorPath *Path, actor Actor, opts ...pidOption

// set the tracer when tracing is enabled
if pid.traceEnabled.Load() {
pid.tracer = otel.GetTracerProvider().Tracer("PID")
pid.tracer = pid.telemetry.Tracer
}

// initialize the actor and init processing public
Expand All @@ -314,6 +324,14 @@ func newPID(ctx context.Context, actorPath *Path, actor Actor, opts ...pidOption
go pid.passivationListener()
}

// register metrics. However, we don't panic when we fail to register
if pid.metricEnabled.Load() {
if err := pid.registerMetrics(); err != nil {
// return the error
return nil, errors.Wrapf(err, "failed to register actor=%s metrics", pid.ActorPath().String())
}
}

// return the actor reference
return pid, nil
}
Expand All @@ -335,6 +353,9 @@ func (p *pid) Child(name string) (PID, error) {

// check whether the child actor already exist and just return the PID
if cid, ok := p.children.Get(childActorPath); ok {
// increment the children count
p.childrenCount.Inc()
// return the child PID
return cid, nil
}
return nil, ErrActorNotFound(childActorPath.String())
Expand Down Expand Up @@ -498,6 +519,9 @@ func (p *pid) Restart(ctx context.Context) error {

// successful restart
span.SetStatus(codes.Ok, "Restart")
// increment the restart count
p.restartCount.Inc()
// return
return nil
}

Expand Down Expand Up @@ -559,6 +583,11 @@ func (p *pid) SpawnChild(ctx context.Context, name string, actor Actor) (PID, er
opts = append(opts, withTracing())
}

// set the pid metric option
if p.metricEnabled.Load() {
opts = append(opts, withMetric())
}

// create the child pid
cid, err := newPID(spanCtx,
childActorPath,
Expand Down Expand Up @@ -588,6 +617,11 @@ func (p *pid) SpawnChild(ctx context.Context, name string, actor Actor) (PID, er

// StashSize returns the stash buffer size
func (p *pid) StashSize() uint64 {
// avoid panic in case stash is not enabled
if p.stashBuffer == nil {
return 0
}
// return the stash buffer size
return p.stashBuffer.Size()
}

Expand Down Expand Up @@ -1138,6 +1172,13 @@ func (p *pid) reset() {
p.mailbox.Reset()
// reset the behavior stack
p.resetBehavior()
// re-register metric when metric is enabled
if p.metricEnabled.Load() {
// log the error but don't panic
if err := p.registerMetrics(); err != nil {
p.logger.Error(errors.Wrapf(err, "failed to register actor=%s metrics", p.ActorPath().String()))
}
}
}

func (p *pid) freeWatchers(ctx context.Context) {
Expand Down Expand Up @@ -1468,17 +1509,17 @@ func (p *pid) removeChild(pid PID) {
}

// emitDeadletter emit the given message to the deadletters queue
func (p *pid) emitDeadletter(recvCtx ReceiveContext, err error) {
func (p *pid) emitDeadletter(receiveCtx ReceiveContext, err error) {
// only send to the stream when defined
if p.eventsStream == nil {
return
}
// marshal the message
msg, _ := anypb.New(recvCtx.Message())
msg, _ := anypb.New(receiveCtx.Message())
// grab the sender
var senderAddr *addresspb.Address
if recvCtx.Sender() != nil || recvCtx.Sender() != NoSender {
senderAddr = recvCtx.Sender().ActorPath().RemoteAddress()
if receiveCtx.Sender() != nil || receiveCtx.Sender() != NoSender {
senderAddr = receiveCtx.Sender().ActorPath().RemoteAddress()
}

// define the receiver
Expand All @@ -1494,3 +1535,27 @@ func (p *pid) emitDeadletter(recvCtx ReceiveContext, err error) {
// add to the events stream
p.eventsStream.Publish(deadlettersTopic, deadletter)
}

// registerMetrics register the PID metrics with OTel instrumentation.
func (p *pid) registerMetrics() error {
// grab the OTel meter
meter := p.telemetry.Meter
// create an instance of the ActorMetrics
metrics, err := metric.NewActorMetric(meter)
// handle the error
if err != nil {
return err
}

// register the metrics
_, err = meter.RegisterCallback(func(ctx context.Context, observer otelmetric.Observer) error {
observer.ObserveInt64(metrics.ChildrenCount(), p.childrenCount.Load())
observer.ObserveInt64(metrics.StashCount(), int64(p.StashSize()))
observer.ObserveInt64(metrics.RestartCount(), p.restartCount.Load())
return nil
}, metrics.ChildrenCount(),
metrics.StashCount(),
metrics.RestartCount())

return err
}
Loading

0 comments on commit f51555f

Please sign in to comment.