Skip to content

Commit b7db344

Browse files
authored
feat: add a metric to ActorSystem and PID (#594)
1 parent 86b15aa commit b7db344

File tree

8 files changed

+290
-22
lines changed

8 files changed

+290
-22
lines changed

actors/actor_system.go

+51-8
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ import (
6767

6868
// ActorSystem defines the contract of an actor system
6969
type ActorSystem interface {
70+
// Metric returns the actor system metric.
71+
// The metric does not include any cluster data
72+
Metric(ctx context.Context) *Metric
7073
// Name returns the actor system name
7174
Name() string
7275
// Actors returns the list of Actors that are alive on a given running node.
@@ -286,6 +289,9 @@ type actorSystem struct {
286289
rebalancing *atomic.Bool
287290
rebalanceLocker *sync.Mutex
288291
shutdownHooks []ShutdownHook
292+
293+
actorsCounter *atomic.Uint64
294+
deadlettersCounter *atomic.Uint64
289295
}
290296

291297
var (
@@ -332,6 +338,8 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
332338
shutdownHooks: make([]ShutdownHook, 0),
333339
nodesInprocess: goset.NewSet[string](),
334340
rebalanceLocker: &sync.Mutex{},
341+
actorsCounter: atomic.NewUint64(0),
342+
deadlettersCounter: atomic.NewUint64(0),
335343
}
336344

337345
system.started.Store(false)
@@ -456,6 +464,20 @@ func (x *actorSystem) Stop(ctx context.Context) error {
456464
return x.shutdown(ctx)
457465
}
458466

467+
// Metrics returns the actor system metrics.
468+
// The metrics does not include any cluster data
469+
func (x *actorSystem) Metric(ctx context.Context) *Metric {
470+
if x.started.Load() {
471+
x.getSetDeadlettersCount(ctx)
472+
return &Metric{
473+
deadlettersCount: int64(x.deadlettersCounter.Load()),
474+
actorsCount: int64(x.actorsCounter.Load()),
475+
uptime: x.Uptime(),
476+
}
477+
}
478+
return nil
479+
}
480+
459481
// Running returns true when the actor system is running
460482
func (x *actorSystem) Running() bool {
461483
return x.started.Load()
@@ -597,7 +619,7 @@ func (x *actorSystem) InCluster() bool {
597619
// NumActors returns the total number of active actors on a given running node.
598620
// This does not account for the total number of actors in the cluster
599621
func (x *actorSystem) NumActors() uint64 {
600-
return uint64(len(x.Actors()))
622+
return x.actorsCounter.Load()
601623
}
602624

603625
// Spawn creates or returns the instance of a given actor in the system
@@ -625,6 +647,7 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor, opts
625647
return nil, err
626648
}
627649

650+
x.actorsCounter.Inc()
628651
// add the given actor to the tree and supervise it
629652
_ = x.actors.AddNode(x.userGuardian, pid)
630653
x.actors.AddWatcher(pid, x.janitor)
@@ -661,6 +684,7 @@ func (x *actorSystem) SpawnNamedFromFunc(ctx context.Context, name string, recei
661684
return nil, err
662685
}
663686

687+
x.actorsCounter.Inc()
664688
_ = x.actors.AddNode(x.userGuardian, pid)
665689
x.actors.AddWatcher(pid, x.janitor)
666690
x.broadcastActor(pid)
@@ -691,8 +715,8 @@ func (x *actorSystem) Kill(ctx context.Context, name string) error {
691715
pidNode, exist := x.actors.GetNode(actorAddress.String())
692716
if exist {
693717
pid := pidNode.GetValue()
694-
// stop the given actor. No need to record error in the span context
695-
// because the shutdown method is taking care of that
718+
// decrement the actors count since we are stopping the actor
719+
x.actorsCounter.Dec()
696720
return pid.Shutdown(ctx)
697721
}
698722

@@ -1226,7 +1250,7 @@ func (x *actorSystem) handleRemoteTell(ctx context.Context, to *PID, message pro
12261250
return Tell(ctx, to, message)
12271251
}
12281252

1229-
// getRootGuardian returns the system root guardian
1253+
// getRootGuardian returns the system rootGuardian guardian
12301254
func (x *actorSystem) getRootGuardian() *PID {
12311255
x.locker.Lock()
12321256
rootGuardian := x.rootGuardian
@@ -1779,16 +1803,16 @@ func (x *actorSystem) setHostPort() error {
17791803
return nil
17801804
}
17811805

1782-
// spawnRootGuardian creates the root guardian
1806+
// spawnRootGuardian creates the rootGuardian guardian
17831807
func (x *actorSystem) spawnRootGuardian(ctx context.Context) error {
17841808
var err error
17851809
actorName := x.reservedName(rootGuardianType)
17861810
x.rootGuardian, err = x.configPID(ctx, actorName, newRootGuardian())
17871811
if err != nil {
1788-
return fmt.Errorf("actor=%s failed to start root guardian: %w", actorName, err)
1812+
return fmt.Errorf("actor=%s failed to start rootGuardian guardian: %w", actorName, err)
17891813
}
17901814

1791-
// rootGuardian is the root node of the actors tree
1815+
// rootGuardian is the rootGuardian node of the actors tree
17921816
_ = x.actors.AddNode(NoSender, x.rootGuardian)
17931817
return nil
17941818
}
@@ -1880,7 +1904,7 @@ func (x *actorSystem) spawnRebalancer(ctx context.Context) error {
18801904
// spawnDeadletters creates the deadletters synthetic actor
18811905
func (x *actorSystem) spawnDeadletters(ctx context.Context) error {
18821906
var err error
1883-
actorName := x.reservedName(deadletters)
1907+
actorName := x.reservedName(deadlettersType)
18841908
x.deadletters, err = x.configPID(ctx,
18851909
actorName,
18861910
newDeadLetters(),
@@ -1935,6 +1959,25 @@ func (x *actorSystem) cleanupCluster(ctx context.Context, actorNames []string) e
19351959
return eg.Wait()
19361960
}
19371961

1962+
// getSetDeadlettersCount gets and sets the deadletter count
1963+
func (x *actorSystem) getSetDeadlettersCount(ctx context.Context) {
1964+
var (
1965+
to = x.getDeadletters()
1966+
from = x.getSystemGuardian()
1967+
message = new(internalpb.GetDeadlettersCount)
1968+
)
1969+
if to.IsRunning() {
1970+
// ask the deadletter actor for the count
1971+
// using the default ask timeout
1972+
// note: no need to check for error because this call is internal
1973+
message, _ := from.Ask(ctx, to, message, DefaultAskTimeout)
1974+
// cast the response received from the deadletters
1975+
deadlettersCount := message.(*internalpb.DeadlettersCount)
1976+
// set the counter
1977+
x.deadlettersCounter.Store(uint64(deadlettersCount.GetTotalCount()))
1978+
}
1979+
}
1980+
19381981
func isReservedName(name string) bool {
19391982
return strings.HasPrefix(name, systemNamePrefix)
19401983
}

actors/actor_system_test.go

+10-6
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ func TestActorSystem(t *testing.T) {
7979
actorRef, err := sys.Spawn(ctx, "Test", actor)
8080
assert.Error(t, err)
8181
assert.EqualError(t, err, ErrActorSystemNotStarted.Error())
82+
assert.Nil(t, sys.Metric(ctx))
8283
assert.Nil(t, actorRef)
8384
assert.Zero(t, sys.Uptime())
8485
})
@@ -952,12 +953,14 @@ func TestActorSystem(t *testing.T) {
952953
err = sys.Unsubscribe(consumer)
953954
require.NoError(t, err)
954955

955-
t.Cleanup(
956-
func() {
957-
err = sys.Stop(ctx)
958-
assert.NoError(t, err)
959-
},
960-
)
956+
metric := sys.Metric(ctx)
957+
require.NotNil(t, metric)
958+
require.EqualValues(t, 1, metric.ActorsCount())
959+
require.EqualValues(t, 5, metric.DeadlettersCount())
960+
require.NotZero(t, metric.Uptime())
961+
962+
err = sys.Stop(ctx)
963+
assert.NoError(t, err)
961964
})
962965
t.Run("With deadletters subscription when not started", func(t *testing.T) {
963966
sys, _ := NewActorSystem("testSys", WithLogger(log.DiscardLogger))
@@ -1738,4 +1741,5 @@ func TestActorSystem(t *testing.T) {
17381741
// shutdown the nats server gracefully
17391742
srv.Shutdown()
17401743
})
1744+
t.Run("With Metrics", func(t *testing.T) {})
17411745
}

actors/metric.go

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2022-2025 Arsene Tochemey Gandote
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package actors
26+
27+
import "time"
28+
29+
// Metric defines the actor system metric
30+
type Metric struct {
31+
// DeadlettersCount returns the total number of deadletters
32+
deadlettersCount int64
33+
// ActorsCount returns the total number of actors in the system
34+
actorsCount int64
35+
// Uptime returns the number of seconds since the actor/system started
36+
uptime int64
37+
}
38+
39+
// DeadlettersCount returns the total number of deadletters
40+
func (m Metric) DeadlettersCount() int64 {
41+
return m.deadlettersCount
42+
}
43+
44+
// ActorsCount returns the total number of actors either in the system
45+
// or the total number of child actor given a specific PID
46+
func (m Metric) ActorsCount() int64 {
47+
return m.actorsCount
48+
}
49+
50+
// Uptime returns the number of seconds since the actor/system started
51+
func (m Metric) Uptime() int64 {
52+
return m.uptime
53+
}
54+
55+
// ActorMetric defines actor specific metrics
56+
type ActorMetric struct {
57+
// DeadlettersCount returns the total number of deadletters
58+
deadlettersCount uint64
59+
// ActorsCount returns the total number of child actor given a specific PID
60+
childrenCount uint64
61+
// Uptime returns the number of seconds since the actor/system started
62+
uptime int64
63+
// LastProcessingDuration returns the duration of the latest message processed
64+
latestProcessedDuration time.Duration
65+
// RestartCount returns the total number of re-starts by the given PID
66+
restartCount uint64
67+
// ProcessedCount returns the total number of messages processed at a given time
68+
processedCount uint64
69+
// StashSize returns the stash size at a given time
70+
stashSize uint64
71+
}
72+
73+
// LatestProcessedDuration returns the duration of the latest message processed duration
74+
func (x ActorMetric) LatestProcessedDuration() time.Duration {
75+
return x.latestProcessedDuration
76+
}
77+
78+
// RestartCount returns the total number of re-starts by the given PID
79+
func (x ActorMetric) RestartCount() uint64 {
80+
return x.restartCount
81+
}
82+
83+
// DeadlettersCount returns the total number of deadletters
84+
func (x ActorMetric) DeadlettersCount() uint64 {
85+
return x.deadlettersCount
86+
}
87+
88+
// ChidrenCount returns the total number of child actor given a specific PID
89+
func (x ActorMetric) ChidrenCount() uint64 {
90+
return x.childrenCount
91+
}
92+
93+
// Uptime returns the number of seconds since the actor/system started
94+
func (x ActorMetric) Uptime() int64 {
95+
return x.uptime
96+
}
97+
98+
// ProcessedCount returns the total number of messages processed at a given time
99+
func (x ActorMetric) ProcessedCount() uint64 {
100+
return x.processedCount
101+
}
102+
103+
// StashSize returns the stash size at a given time
104+
func (x ActorMetric) StashSize() uint64 {
105+
return x.stashSize
106+
}

actors/pid.go

+48
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,32 @@ func newPID(ctx context.Context, address *address.Address, actor Actor, opts ...
220220
return pid, nil
221221
}
222222

223+
// Metric returns the actor system metrics.
224+
// The metric does not include any cluster data
225+
func (pid *PID) Metric(ctx context.Context) *ActorMetric {
226+
if pid.IsRunning() {
227+
var (
228+
uptime = pid.Uptime()
229+
latestProcessedDuration = pid.LatestProcessedDuration()
230+
childrenCount = pid.ChildrenCount()
231+
deadlettersCount = pid.getDeadlettersCount(ctx)
232+
restartCount = pid.RestartCount()
233+
processedCount = pid.ProcessedCount() - 1 // 1 because of the PostStart message
234+
stashSize = pid.StashSize()
235+
)
236+
return &ActorMetric{
237+
deadlettersCount: uint64(deadlettersCount),
238+
childrenCount: uint64(childrenCount),
239+
uptime: uptime,
240+
latestProcessedDuration: latestProcessedDuration,
241+
restartCount: uint64(restartCount),
242+
processedCount: uint64(processedCount),
243+
stashSize: stashSize,
244+
}
245+
}
246+
return nil
247+
}
248+
223249
// Uptime returns the number of seconds since the actor started
224250
func (pid *PID) Uptime() int64 {
225251
if pid.IsRunning() {
@@ -1768,3 +1794,25 @@ func (pid *PID) suspend(reason string) {
17681794
Reason: reason,
17691795
})
17701796
}
1797+
1798+
// getDeadlettersCount gets deadletters
1799+
func (pid *PID) getDeadlettersCount(ctx context.Context) int64 {
1800+
var (
1801+
name = pid.Name()
1802+
to = pid.ActorSystem().getDeadletters()
1803+
from = pid.ActorSystem().getSystemGuardian()
1804+
message = &internalpb.GetDeadlettersCount{
1805+
ActorId: &name,
1806+
}
1807+
)
1808+
if to.IsRunning() {
1809+
// ask the deadletter actor for the count
1810+
// using the default ask timeout
1811+
// note: no need to check for error because this call is internal
1812+
message, _ := from.Ask(ctx, to, message, DefaultAskTimeout)
1813+
// cast the response received from the deadletters
1814+
deadlettersCount := message.(*internalpb.DeadlettersCount)
1815+
return deadlettersCount.GetTotalCount()
1816+
}
1817+
return 0
1818+
}

actors/pid_test.go

+14
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ func TestReceive(t *testing.T) {
7272
require.NoError(t, err)
7373
assert.NotNil(t, pid)
7474

75+
lib.Pause(time.Second)
76+
7577
// let us send 10 messages to the actor
7678
count := 10
7779
for i := 0; i < count; i++ {
@@ -89,6 +91,16 @@ func TestReceive(t *testing.T) {
8991
assert.Zero(t, pid.ChildrenCount())
9092
assert.NotZero(t, pid.LatestProcessedDuration())
9193
assert.EqualValues(t, 10, pid.ProcessedCount()-1) // 1 because of the PostStart message
94+
metric := pid.Metric(ctx)
95+
require.NotNil(t, metric)
96+
assert.NotZero(t, metric.LatestProcessedDuration())
97+
assert.Zero(t, metric.ChidrenCount())
98+
assert.EqualValues(t, 10, metric.ProcessedCount())
99+
assert.NotZero(t, metric.Uptime())
100+
assert.Zero(t, metric.RestartCount())
101+
assert.Zero(t, metric.StashSize())
102+
assert.Zero(t, metric.DeadlettersCount())
103+
92104
// stop the actor
93105
err = pid.Shutdown(ctx)
94106
assert.NoError(t, err)
@@ -1514,6 +1526,8 @@ func TestSpawnChild(t *testing.T) {
15141526
lib.Pause(time.Second)
15151527

15161528
require.Len(t, parent.Children(), 1)
1529+
metric := parent.Metric(ctx)
1530+
require.EqualValues(t, 1, metric.ChidrenCount())
15171531

15181532
// stop the child actor
15191533
require.NoError(t, child.Shutdown(ctx))

0 commit comments

Comments
 (0)