Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove system ask timeout and pass timeout to asks #515

Merged
merged 3 commits into from
Nov 11, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
refactor: remove system ask timeout and pass timeout to asks
BREAKING CHANGE: - WithReplyTimeout removed
  - All ask-based methods on PID and context accept timeout argument
  - No more system-wide ask timeout
Tochemey committed Nov 11, 2024
commit 376e07070250f5be85c7c88c3a540f5e806305a5
2 changes: 0 additions & 2 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
@@ -245,7 +245,6 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
name: name,
logger: log.New(log.ErrorLevel, os.Stderr),
expireActorAfter: DefaultPassivationTimeout,
askTimeout: DefaultAskTimeout,
actorInitMaxRetries: DefaultInitMaxRetries,
supervisorDirective: DefaultSupervisoryStrategy,
locker: sync.Mutex{},
@@ -1358,7 +1357,6 @@ func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor, o
// pid inherit the actor system settings defined during instantiation
pidOpts := []pidOption{
withInitMaxRetries(x.actorInitMaxRetries),
withAskTimeout(x.askTimeout),
withCustomLogger(x.logger),
withActorSystem(x),
withSupervisorDirective(x.supervisorDirective),
17 changes: 3 additions & 14 deletions actors/actor_system_test.go
Original file line number Diff line number Diff line change
@@ -196,7 +196,6 @@ func TestActorSystem(t *testing.T) {
"test",
WithPassivationDisabled(),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
WithClustering(provider, 9, 1, gossipPort, clusterPort, new(testActor)),
)
@@ -239,7 +238,7 @@ func TestActorSystem(t *testing.T) {
require.True(t, proto.Equal(remoteAddr, addr))

remoting := NewRemoting()
reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), DefaultAskTimeout)
reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), 20*time.Second)
require.NoError(t, err)
require.NotNil(t, reply)

@@ -285,7 +284,6 @@ func TestActorSystem(t *testing.T) {
"test",
WithPassivationDisabled(),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
)
require.NoError(t, err)
@@ -384,7 +382,6 @@ func TestActorSystem(t *testing.T) {
"test",
WithPassivationDisabled(),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
)
require.NoError(t, err)
@@ -561,7 +558,6 @@ func TestActorSystem(t *testing.T) {
"test",
WithPassivationDisabled(),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
)
require.NoError(t, err)
@@ -641,7 +637,6 @@ func TestActorSystem(t *testing.T) {
"test",
WithPassivationDisabled(),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
)
require.NoError(t, err)
@@ -667,7 +662,7 @@ func TestActorSystem(t *testing.T) {
Id: "",
},
)
reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), DefaultAskTimeout)
reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), 20*time.Second)
require.Error(t, err)
require.Nil(t, reply)

@@ -695,7 +690,6 @@ func TestActorSystem(t *testing.T) {
"test",
WithPassivationDisabled(),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
)
require.NoError(t, err)
@@ -719,7 +713,6 @@ func TestActorSystem(t *testing.T) {
"test",
WithPassivationDisabled(),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
)
require.NoError(t, err)
@@ -741,7 +734,6 @@ func TestActorSystem(t *testing.T) {
"test",
WithPassivationDisabled(),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
)
require.NoError(t, err)
@@ -1074,7 +1066,6 @@ func TestActorSystem(t *testing.T) {
"test",
WithExpireActorAfter(passivateAfter),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
WithClustering(provider, 9, 1, gossipPort, clusterPort, new(testActor)),
)
@@ -1249,7 +1240,6 @@ func TestActorSystem(t *testing.T) {
"test",
WithPassivationDisabled(),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
WithClustering(provider, 9, 1, gossipPort, clusterPort, new(testActor)),
)
@@ -1545,7 +1535,6 @@ func TestActorSystem(t *testing.T) {
"test",
WithPassivationDisabled(),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
WithClustering(provider, 9, 1, gossipPort, clusterPort, new(exchanger)),
)
@@ -1583,7 +1572,7 @@ func TestActorSystem(t *testing.T) {
require.NotNil(t, addr)

// send the message to exchanger actor one using remote messaging
reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), DefaultAskTimeout)
reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), 20*time.Second)

require.NoError(t, err)
require.NotNil(t, reply)
3 changes: 0 additions & 3 deletions actors/api_test.go
Original file line number Diff line number Diff line change
@@ -136,7 +136,6 @@ func TestAsk(t *testing.T) {
sys, err := NewActorSystem(
"test",
WithLogger(logger),
WithReplyTimeout(replyTimeout),
WithPassivationDisabled(),
)
// assert there are no error
@@ -265,7 +264,6 @@ func TestAsk(t *testing.T) {
sys, err := NewActorSystem(
"test",
WithLogger(logger),
WithReplyTimeout(replyTimeout),
WithPassivationDisabled(),
)
// assert there are no error
@@ -309,7 +307,6 @@ func TestAsk(t *testing.T) {
sys, err := NewActorSystem(
"test",
WithLogger(logger),
WithReplyTimeout(replyTimeout),
WithPassivationDisabled(),
)
// assert there are no error
2 changes: 2 additions & 0 deletions actors/errors.go
Original file line number Diff line number Diff line change
@@ -87,6 +87,8 @@ var (
ErrSchedulerNotStarted = errors.New("scheduler has not started")
// ErrInvalidMessage is returned when an invalid remote message is sent
ErrInvalidMessage = func(err error) error { return fmt.Errorf("invalid remote message: %w", err) }
// ErrInvalidTimeout is returned when a given timeout is negative or zero
ErrInvalidTimeout = errors.New("invalid timeout")
)

// eof returns true if the given error is an EOF error
1 change: 0 additions & 1 deletion actors/helper_test.go
Original file line number Diff line number Diff line change
@@ -450,7 +450,6 @@ func startClusterSystem(t *testing.T, nodeName, serverAddr string) (ActorSystem,
actorSystemName,
WithPassivationDisabled(),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
WithPeerStateLoopInterval(500*time.Millisecond),
WithCluster(
10 changes: 0 additions & 10 deletions actors/option.go
Original file line number Diff line number Diff line change
@@ -67,16 +67,6 @@ func WithLogger(logger log.Logger) Option {
)
}

// WithReplyTimeout sets how long in seconds an actor should reply a command
// in a receive-reply pattern
func WithReplyTimeout(timeout time.Duration) Option {
return OptionFunc(
func(a *actorSystem) {
a.askTimeout = timeout
},
)
}

// WithActorInitMaxRetries sets the number of times to retry an actor init process
func WithActorInitMaxRetries(max int) Option {
return OptionFunc(
5 changes: 0 additions & 5 deletions actors/option_test.go
Original file line number Diff line number Diff line change
@@ -52,11 +52,6 @@ func TestOption(t *testing.T) {
option: WithExpireActorAfter(2 * time.Second),
expected: actorSystem{expireActorAfter: 2. * time.Second},
},
{
name: "WithReplyTimeout",
option: WithReplyTimeout(2 * time.Second),
expected: actorSystem{askTimeout: 2. * time.Second},
},
{
name: "WithActorInitMaxRetries",
option: WithActorInitMaxRetries(2),
36 changes: 19 additions & 17 deletions actors/pid.go
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@ import (
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/tochemey/goakt/v2/address"
@@ -97,10 +98,6 @@ type PID struct {
// any further resources like memory and cpu. The default value is 120 seconds
passivateAfter atomic.Duration

// specifies how long the sender of a mail should wait to receiveLoop a reply
// when using Ask. The default value is 5s
askTimeout atomic.Duration

// specifies the maximum of retries to attempt when the actor
// initialization fails. The default value is 5
initMaxRetries atomic.Int32
@@ -214,7 +211,6 @@ func newPID(ctx context.Context, address *address.Address, actor Actor, opts ...
pid.latestReceiveDuration.Store(0)
pid.running.Store(false)
pid.passivateAfter.Store(DefaultPassivationTimeout)
pid.askTimeout.Store(DefaultAskTimeout)
pid.initTimeout.Store(DefaultInitTimeout)

for _, opt := range opts {
@@ -461,7 +457,6 @@ func (pid *PID) SpawnChild(ctx context.Context, name string, actor Actor, opts .
pidOptions := []pidOption{
withInitMaxRetries(int(pid.initMaxRetries.Load())),
withPassivationAfter(pid.passivateAfter.Load()),
withAskTimeout(pid.askTimeout.Load()),
withCustomLogger(pid.logger),
withActorSystem(pid.system),
withSupervisorDirective(pid.supervisorDirective),
@@ -546,16 +541,18 @@ func (pid *PID) PipeTo(ctx context.Context, to *PID, task future.Task) error {

// Ask sends a synchronous message to another actor and expect a response.
// This block until a response is received or timed out.
func (pid *PID) Ask(ctx context.Context, to *PID, message proto.Message) (response proto.Message, err error) {
func (pid *PID) Ask(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) {
if !to.IsRunning() {
return nil, ErrDead
}

if timeout <= 0 {
return nil, ErrInvalidTimeout
}

receiveContext := contextFromPool()
receiveContext.build(ctx, pid, to, message, false)

to.doReceive(receiveContext)
timeout := pid.askTimeout.Load()

select {
case result := <-receiveContext.response:
@@ -602,7 +599,7 @@ func (pid *PID) SendAsync(ctx context.Context, actorName string, message proto.M
// SendSync sends a synchronous message to another actor and expect a response.
// The location of the given actor is transparent to the caller.
// This block until a response is received or timed out.
func (pid *PID) SendSync(ctx context.Context, actorName string, message proto.Message) (response proto.Message, err error) {
func (pid *PID) SendSync(ctx context.Context, actorName string, message proto.Message, timeout time.Duration) (response proto.Message, err error) {
if !pid.IsRunning() {
return nil, ErrDead
}
@@ -613,10 +610,10 @@ func (pid *PID) SendSync(ctx context.Context, actorName string, message proto.Me
}

if cid != nil {
return pid.Ask(ctx, cid, message)
return pid.Ask(ctx, cid, message, timeout)
}

reply, err := pid.RemoteAsk(ctx, addr, message)
reply, err := pid.RemoteAsk(ctx, addr, message, timeout)
if err != nil {
return nil, err
}
@@ -639,12 +636,12 @@ func (pid *PID) BatchTell(ctx context.Context, to *PID, messages ...proto.Messag
// BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages.
// The messages will be processed one after the other in the order they are sent.
// This is a design choice to follow the simple principle of one message at a time processing by actors.
func (pid *PID) BatchAsk(ctx context.Context, to *PID, messages ...proto.Message) (responses chan proto.Message, err error) {
func (pid *PID) BatchAsk(ctx context.Context, to *PID, messages []proto.Message, timeout time.Duration) (responses chan proto.Message, err error) {
responses = make(chan proto.Message, len(messages))
defer close(responses)

for i := 0; i < len(messages); i++ {
response, err := pid.Ask(ctx, to, messages[i])
response, err := pid.Ask(ctx, to, messages[i], timeout)
if err != nil {
return nil, err
}
@@ -733,11 +730,15 @@ func (pid *PID) RemoteTell(ctx context.Context, to *address.Address, message pro
}

// RemoteAsk sends a synchronous message to another actor remotely and expect a response.
func (pid *PID) RemoteAsk(ctx context.Context, to *address.Address, message proto.Message) (response *anypb.Any, err error) {
func (pid *PID) RemoteAsk(ctx context.Context, to *address.Address, message proto.Message, timeout time.Duration) (response *anypb.Any, err error) {
if pid.remoting == nil {
return nil, ErrRemotingDisabled
}

if timeout <= 0 {
return nil, ErrInvalidTimeout
}

marshaled, err := anypb.New(message)
if err != nil {
return nil, err
@@ -759,6 +760,7 @@ func (pid *PID) RemoteAsk(ctx context.Context, to *address.Address, message prot
Receiver: to.Address,
Message: marshaled,
},
Timeout: durationpb.New(timeout),
}

stream := remoteService.RemoteAsk(ctx)
@@ -860,7 +862,7 @@ func (pid *PID) RemoteBatchTell(ctx context.Context, to *address.Address, messag
// RemoteBatchAsk sends a synchronous bunch of messages to a remote actor and expect responses in the same order as the messages.
// Messages are processed one after the other in the order they are sent.
// This can hinder performance if it is not properly used.
func (pid *PID) RemoteBatchAsk(ctx context.Context, to *address.Address, messages []proto.Message) (responses []*anypb.Any, err error) {
func (pid *PID) RemoteBatchAsk(ctx context.Context, to *address.Address, messages []proto.Message, timeout time.Duration) (responses []*anypb.Any, err error) {
if pid.remoting == nil {
return nil, ErrRemotingDisabled
}
@@ -886,6 +888,7 @@ func (pid *PID) RemoteBatchAsk(ctx context.Context, to *address.Address, message
Receiver: to.Address,
Message: packed,
},
Timeout: durationpb.New(timeout),
},
)
}
@@ -1210,7 +1213,6 @@ func (pid *PID) init(ctx context.Context) error {
func (pid *PID) reset() {
pid.latestReceiveTime.Store(time.Time{})
pid.passivateAfter.Store(DefaultPassivationTimeout)
pid.askTimeout.Store(DefaultAskTimeout)
pid.shutdownTimeout.Store(DefaultShutdownTimeout)
pid.initMaxRetries.Store(DefaultInitMaxRetries)
pid.latestReceiveDuration.Store(0)
8 changes: 0 additions & 8 deletions actors/pid_option.go
Original file line number Diff line number Diff line change
@@ -41,14 +41,6 @@ func withPassivationAfter(duration time.Duration) pidOption {
}
}

// withAskTimeout sets how long in seconds an actor should reply a command
// in a receive-reply pattern
func withAskTimeout(timeout time.Duration) pidOption {
return func(pid *PID) {
pid.askTimeout.Store(timeout)
}
}

// withInitMaxRetries sets the number of times to retry an actor init process
func withInitMaxRetries(max int) pidOption {
return func(pid *PID) {
5 changes: 0 additions & 5 deletions actors/pid_option_test.go
Original file line number Diff line number Diff line change
@@ -62,11 +62,6 @@ func TestPIDOptions(t *testing.T) {
option: withPassivationAfter(time.Second),
expected: &PID{passivateAfter: atomicDuration},
},
{
name: "WithAskTimeout",
option: withAskTimeout(time.Second),
expected: &PID{askTimeout: atomicDuration},
},
{
name: "WithInitMaxRetries",
option: withInitMaxRetries(5),
Loading

Unchanged files with check annotations Beta

actors.WithLogger(log.DiscardLogger),
actors.WithActorInitMaxRetries(1),
actors.WithSupervisorDirective(actors.NewStopDirective()),
actors.WithReplyTimeout(receivingTimeout))

Check failure on line 50 in bench/benchmark_test.go

GitHub Actions / lint

undefined: actors.WithReplyTimeout

Check failure on line 50 in bench/benchmark_test.go

GitHub Actions / lint

undefined: actors.WithReplyTimeout
// start the actor system
_ = actorSystem.Start(ctx)
actors.WithLogger(log.DiscardLogger),
actors.WithActorInitMaxRetries(1),
actors.WithSupervisorDirective(actors.NewStopDirective()),
actors.WithReplyTimeout(receivingTimeout))

Check failure on line 95 in bench/benchmark_test.go

GitHub Actions / lint

undefined: actors.WithReplyTimeout

Check failure on line 95 in bench/benchmark_test.go

GitHub Actions / lint

undefined: actors.WithReplyTimeout
// start the actor system
_ = actorSystem.Start(ctx)
actors.WithLogger(log.DiscardLogger),
actors.WithActorInitMaxRetries(1),
actors.WithSupervisorDirective(actors.NewStopDirective()),
actors.WithReplyTimeout(receivingTimeout))

Check failure on line 134 in bench/benchmark_test.go

GitHub Actions / lint

undefined: actors.WithReplyTimeout

Check failure on line 134 in bench/benchmark_test.go

GitHub Actions / lint

undefined: actors.WithReplyTimeout
// start the actor system
_ = actorSystem.Start(ctx)
actors.WithLogger(log.DiscardLogger),
actors.WithActorInitMaxRetries(1),
actors.WithSupervisorDirective(actors.NewStopDirective()),
actors.WithReplyTimeout(receivingTimeout))

Check failure on line 173 in bench/benchmark_test.go

GitHub Actions / lint

undefined: actors.WithReplyTimeout
// start the actor system
_ = actorSystem.Start(ctx)
actors.WithLogger(log.DiscardLogger),
actors.WithActorInitMaxRetries(1),
actors.WithExpireActorAfter(5*time.Second),
actors.WithReplyTimeout(receivingTimeout))

Check failure on line 212 in bench/benchmark_test.go

GitHub Actions / lint

undefined: actors.WithReplyTimeout
// start the actor system
_ = actorSystem.Start(ctx)
actors.WithLogger(log.DiscardLogger),
actors.WithActorInitMaxRetries(1),
actors.WithSupervisorDirective(actors.NewStopDirective()),
actors.WithReplyTimeout(receivingTimeout))

Check failure on line 255 in bench/benchmark_test.go

GitHub Actions / lint

undefined: actors.WithReplyTimeout
// start the actor system
_ = actorSystem.Start(ctx)
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
if _, err := sender.Ask(ctx, receiver, new(benchmarkpb.BenchRequest)); err != nil {

Check failure on line 275 in bench/benchmark_test.go

GitHub Actions / lint

not enough arguments in call to sender.Ask
b.Fatal(err)
}
atomic.AddInt64(&counter, 1)