Skip to content

Commit 1c29b18

Browse files
authored
fix: fix wrong metric recording and cleanup code base (#440)
1 parent 7b070e1 commit 1c29b18

File tree

5 files changed

+145
-191
lines changed

5 files changed

+145
-191
lines changed

actors/actor_system_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -1013,15 +1013,16 @@ func TestActorSystem(t *testing.T) {
10131013
got := &metricdata.ResourceMetrics{}
10141014
err = r.Collect(ctx, got)
10151015
require.NoError(t, err)
1016-
assert.Len(t, got.ScopeMetrics, 1)
1017-
assert.Len(t, got.ScopeMetrics[0].Metrics, 5)
1016+
require.Len(t, got.ScopeMetrics, 1)
1017+
require.Len(t, got.ScopeMetrics[0].Metrics, 6)
10181018

10191019
expected := []string{
10201020
"actor_child_count",
10211021
"actor_stash_count",
10221022
"actor_restart_count",
10231023
"actors_count",
10241024
"actor_processed_count",
1025+
"actor_received_duration",
10251026
}
10261027
// sort the array
10271028
sort.Strings(expected)

actors/api.go

+39-58
Original file line numberDiff line numberDiff line change
@@ -48,30 +48,22 @@ func Ask(ctx context.Context, to *PID, message proto.Message, timeout time.Durat
4848
return nil, ErrDead
4949
}
5050

51-
var messageContext *ReceiveContext
52-
53-
switch msg := message.(type) {
54-
case *internalpb.RemoteMessage:
55-
var actual proto.Message
56-
if actual, err = msg.GetMessage().UnmarshalNew(); err != nil {
57-
return nil, ErrInvalidRemoteMessage(err)
58-
}
59-
messageContext = newReceiveContext(ctx, NoSender, to, actual).WithRemoteSender(msg.GetSender())
60-
default:
61-
messageContext = newReceiveContext(ctx, NoSender, to, message).WithRemoteSender(RemoteNoSender)
51+
receiveContext, err := toReceiveContext(ctx, to, message)
52+
if err != nil {
53+
return nil, err
6254
}
6355

64-
to.doReceive(messageContext)
56+
to.doReceive(receiveContext)
6557

6658
// await patiently to receive the response from the actor
6759
select {
68-
case response = <-messageContext.response:
69-
to.setLastProcessingDuration(time.Since(to.getLastProcessingTime()))
60+
case response = <-receiveContext.response:
61+
to.recordLatestReceiveDurationMetric(ctx)
7062
return
7163
case <-time.After(timeout):
72-
to.setLastProcessingDuration(time.Since(to.getLastProcessingTime()))
64+
to.recordLatestReceiveDurationMetric(ctx)
7365
err = ErrRequestTimeout
74-
to.toDeadletterQueue(messageContext, err)
66+
to.toDeadletterQueue(receiveContext, err)
7567
return
7668
}
7769
}
@@ -82,66 +74,41 @@ func Tell(ctx context.Context, to *PID, message proto.Message) error {
8274
return ErrDead
8375
}
8476

85-
var messageContext *ReceiveContext
86-
87-
switch msg := message.(type) {
88-
case *internalpb.RemoteMessage:
89-
var (
90-
actual proto.Message
91-
err error
92-
)
93-
94-
if actual, err = msg.GetMessage().UnmarshalNew(); err != nil {
95-
return ErrInvalidRemoteMessage(err)
96-
}
97-
messageContext = newReceiveContext(ctx, NoSender, to, actual).WithRemoteSender(msg.GetSender())
98-
default:
99-
messageContext = newReceiveContext(ctx, NoSender, to, message).WithRemoteSender(RemoteNoSender)
77+
receiveContext, err := toReceiveContext(ctx, to, message)
78+
if err != nil {
79+
return err
10080
}
10181

102-
to.doReceive(messageContext)
103-
to.setLastProcessingDuration(time.Since(to.getLastProcessingTime()))
82+
to.doReceive(receiveContext)
83+
to.recordLatestReceiveDurationMetric(ctx)
10484
return nil
10585
}
10686

10787
// BatchTell sends bulk asynchronous messages to an actor
88+
// The messages will be processed one after the other in the order they are sent
89+
// This is a design choice to follow the simple principle of one message at a time processing by actors.
10890
func BatchTell(ctx context.Context, to *PID, messages ...proto.Message) error {
109-
if !to.IsRunning() {
110-
return ErrDead
111-
}
112-
113-
for i := 0; i < len(messages); i++ {
114-
message := messages[i]
115-
messageContext := newReceiveContext(ctx, NoSender, to, message).WithRemoteSender(RemoteNoSender)
116-
to.doReceive(messageContext)
91+
// messages are processed one after the other
92+
for _, mesage := range messages {
93+
if err := Tell(ctx, to, mesage); err != nil {
94+
return err
95+
}
11796
}
118-
to.setLastProcessingDuration(time.Since(to.getLastProcessingTime()))
11997
return nil
12098
}
12199

122100
// BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages.
123101
// The messages will be processed one after the other in the order they are sent
124102
// This is a design choice to follow the simple principle of one message at a time processing by actors.
125103
func BatchAsk(ctx context.Context, to *PID, timeout time.Duration, messages ...proto.Message) (responses chan proto.Message, err error) {
126-
if !to.IsRunning() {
127-
return nil, ErrDead
128-
}
129-
130104
responses = make(chan proto.Message, len(messages))
131105
defer close(responses)
132-
133-
for i := 0; i < len(messages); i++ {
134-
receiveContext := newReceiveContext(ctx, NoSender, to, messages[i])
135-
to.doReceive(receiveContext)
136-
select {
137-
case result := <-receiveContext.response:
138-
responses <- result
139-
to.setLastProcessingDuration(time.Since(to.getLastProcessingTime()))
140-
case <-time.After(timeout):
141-
to.setLastProcessingDuration(time.Since(to.getLastProcessingTime()))
142-
to.toDeadletterQueue(receiveContext, ErrRequestTimeout)
143-
return nil, ErrRequestTimeout
106+
for _, mesage := range messages {
107+
response, err := Ask(ctx, to, mesage, timeout)
108+
if err != nil {
109+
return nil, err
144110
}
111+
responses <- response
145112
}
146113
return
147114
}
@@ -499,3 +466,17 @@ func RemoteSpawn(ctx context.Context, host string, port int, name, actorType str
499466
}
500467
return nil
501468
}
469+
470+
// toReceiveContext creates a ReceiveContext provided a message and a receiver
471+
func toReceiveContext(ctx context.Context, to *PID, message proto.Message) (*ReceiveContext, error) {
472+
switch msg := message.(type) {
473+
case *internalpb.RemoteMessage:
474+
actual, err := msg.GetMessage().UnmarshalNew()
475+
if err != nil {
476+
return nil, ErrInvalidRemoteMessage(err)
477+
}
478+
return newReceiveContext(ctx, NoSender, to, actual).WithRemoteSender(msg.GetSender()), nil
479+
default:
480+
return newReceiveContext(ctx, NoSender, to, message).WithRemoteSender(RemoteNoSender), nil
481+
}
482+
}

0 commit comments

Comments
 (0)