Skip to content

Commit

Permalink
refactor: maintenance changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Feb 22, 2025
1 parent 40fde94 commit d77d313
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 66 deletions.
20 changes: 10 additions & 10 deletions actor/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ type actorSystem struct {
reflection *reflection

peersStateLoopInterval time.Duration
clusterStore *clusterStore
clusterStore *cluster.Store
clusterConfig *ClusterConfig
rebalancingQueue chan *internalpb.PeerState
rebalancedNodes goset.Set[string]
Expand Down Expand Up @@ -1392,7 +1392,7 @@ func (x *actorSystem) completeRebalancing() {
// removePeerStateFromStore removes the peer state from the cluster store
func (x *actorSystem) removePeerStateFromStore(address string) error {
x.locker.Lock()
if err := x.clusterStore.remove(address); err != nil {
if err := x.clusterStore.DeletePeerState(address); err != nil {
x.locker.Unlock()
return err
}
Expand All @@ -1404,7 +1404,7 @@ func (x *actorSystem) removePeerStateFromStore(address string) error {
// getPeerStateFromStore returns the peer state from the cluster store
func (x *actorSystem) getPeerStateFromStore(address string) (*internalpb.PeerState, error) {
x.locker.Lock()
peerState, ok := x.clusterStore.get(address)
peerState, ok := x.clusterStore.GetPeerState(address)
x.locker.Unlock()
if !ok {
return nil, ErrPeerNotFound
Expand Down Expand Up @@ -1441,7 +1441,7 @@ func (x *actorSystem) enableClustering(ctx context.Context) error {
return errors.New("clustering needs remoting to be enabled")
}

clusterStore, err := newClusterStore(x.clusterConfig.WAL(), x.logger)
clusterStore, err := cluster.NewStore(x.clusterConfig.WAL(), x.logger)
if err != nil {
x.logger.Errorf("failed to initialize peers cache: %v", err)
x.locker.Unlock()
Expand Down Expand Up @@ -1469,7 +1469,7 @@ func (x *actorSystem) enableClustering(ctx context.Context) error {
cluster.WithReadQuorum(x.clusterConfig.ReadQuorum()),
cluster.WithReplicaCount(x.clusterConfig.ReplicaCount()),
cluster.WithTLS(x.serverTLS, x.clientTLS),
cluster.WithKVStoreSize(x.clusterConfig.KVStoreSize()),
cluster.WithTableSize(x.clusterConfig.TableSize()),
)
if err != nil {
x.logger.Errorf("failed to initialize cluster engine: %v", err)
Expand Down Expand Up @@ -1692,7 +1692,7 @@ func (x *actorSystem) clusterEventsLoop() {
// only leader can start rebalancing. Just remove from the peer state from your cluster state
// to free up resources
if !x.cluster.IsLeader(ctx) {
if err := x.clusterStore.remove(nodeLeft.GetAddress()); err != nil {
if err := x.clusterStore.DeletePeerState(nodeLeft.GetAddress()); err != nil {
x.logger.Errorf("%s failed to remove left node=(%s) from cluster store: %w", x.name, nodeLeft.GetAddress(), err)
}
continue
Expand All @@ -1703,7 +1703,7 @@ func (x *actorSystem) clusterEventsLoop() {
}

x.rebalancedNodes.Add(nodeLeft.GetAddress())
if peerState, ok := x.clusterStore.get(nodeLeft.GetAddress()); ok {
if peerState, ok := x.clusterStore.GetPeerState(nodeLeft.GetAddress()); ok {
x.rebalanceLocker.Lock()
x.rebalancingQueue <- peerState
x.rebalanceLocker.Unlock()
Expand All @@ -1714,7 +1714,7 @@ func (x *actorSystem) clusterEventsLoop() {
}
}

// peersStateLoop fetches the cluster peers' PeerState and update the node clusterStore
// peersStateLoop fetches the cluster peers' PeerState and update the node Store
func (x *actorSystem) peersStateLoop() {
x.logger.Info("peers state synchronization has started...")
ticker := ticker.New(x.peersStateLoopInterval)
Expand Down Expand Up @@ -1824,7 +1824,7 @@ func (x *actorSystem) processPeerState(ctx context.Context, peer *cluster.Peer)
}

x.logger.Debugf("peer (%s) actors count (%d)", peerAddress, len(peerState.GetActors()))
if err := x.clusterStore.set(peerState); err != nil {
if err := x.clusterStore.PersistPeerState(peerState); err != nil {
x.logger.Error(err)
return err
}
Expand Down Expand Up @@ -2225,7 +2225,7 @@ func (x *actorSystem) shutdownCluster(ctx context.Context, actorRefs []ActorRef)

x.rebalanceLocker.Unlock()
if x.clusterStore != nil {
x.clusterStore.close()
x.clusterStore.Close()
}
}
return nil
Expand Down
16 changes: 8 additions & 8 deletions actor/cluster_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type ClusterConfig struct {
discoveryPort int
peersPort int
kinds []Actor
kvStoreSize uint64
tableSize uint64
wal string
}

Expand All @@ -68,7 +68,7 @@ func NewClusterConfig() *ClusterConfig {
readQuorum: 1,
replicaCount: 1,
partitionCount: 271,
kvStoreSize: 20 * size.MB,
tableSize: 20 * size.MB,
wal: filepath.Join(homedir, "goakt", "data"),
}
}
Expand Down Expand Up @@ -186,16 +186,16 @@ func (x *ClusterConfig) WithReadQuorum(count uint32) *ClusterConfig {
return x
}

// WithKVStoreSize sets the key/value in-memory storage size
// WithTableSize sets the key/value in-memory storage size
// The default values is 20MB
func (x *ClusterConfig) WithKVStoreSize(size uint64) *ClusterConfig {
x.kvStoreSize = size
func (x *ClusterConfig) WithTableSize(size uint64) *ClusterConfig {
x.tableSize = size
return x
}

// KVStoreSize returns the cluster storage size
func (x *ClusterConfig) KVStoreSize() uint64 {
return x.kvStoreSize
// TableSize returns the cluster storage size
func (x *ClusterConfig) TableSize() uint64 {
return x.tableSize
}

// WAL returns the WAL directory
Expand Down
4 changes: 2 additions & 2 deletions actor/cluster_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestClusterConfig(t *testing.T) {
WithWriteQuorum(1).
WithReadQuorum(1).
WithPartitionCount(3).
WithKVStoreSize(10 * size.MB).
WithTableSize(10 * size.MB).
WithWAL(tempdir).
WithDiscovery(disco)

Expand All @@ -64,7 +64,7 @@ func TestClusterConfig(t *testing.T) {
assert.EqualValues(t, 1, config.ReadQuorum())
assert.EqualValues(t, 1, config.WriteQuorum())
assert.EqualValues(t, 3, config.PartitionCount())
assert.Exactly(t, uint64(10*size.MB), config.KVStoreSize())
assert.Exactly(t, uint64(10*size.MB), config.TableSize())
assert.Exactly(t, tempdir, config.WAL())
assert.True(t, disco == config.Discovery())
assert.Len(t, config.Kinds(), 3)
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/reugn/go-quartz v0.14.0
github.com/spaolacci/murmur3 v1.1.0
github.com/stretchr/testify v1.10.0
github.com/tochemey/olric v0.2.0
github.com/tochemey/olric v0.2.1
github.com/travisjeffery/go-dynaport v1.0.0
github.com/zeebo/xxh3 v1.0.2
go.uber.org/atomic v1.11.0
Expand Down Expand Up @@ -52,7 +52,7 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/btree v1.1.3 // indirect
github.com/google/gnostic-models v0.6.9 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/go-cmp v0.7.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
Expand Down Expand Up @@ -84,7 +84,7 @@ require (
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
golang.org/x/crypto v0.33.0 // indirect
golang.org/x/crypto v0.34.0 // indirect
golang.org/x/mod v0.23.0 // indirect
golang.org/x/oauth2 v0.26.0 // indirect
golang.org/x/sys v0.30.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
Expand Down Expand Up @@ -248,8 +248,8 @@ github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JT
github.com/tidwall/redcon v1.6.2 h1:5qfvrrybgtO85jnhSravmkZyC0D+7WstbfCs3MmPhow=
github.com/tidwall/redcon v1.6.2/go.mod h1:p5Wbsgeyi2VSTBWOcA5vRXrOb9arFTcU2+ZzFjqV75Y=
github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg=
github.com/tochemey/olric v0.2.0 h1:y/uoVNNVdMn+pbUI01i6TqeD48+VgjRcoRGOzVzkdVY=
github.com/tochemey/olric v0.2.0/go.mod h1:vRpB3Xb0L4J7acugeAfrIXcwLDlCwRsC10s1jCYND1Y=
github.com/tochemey/olric v0.2.1 h1:6RhHx5lBNgRj5IXatuu5nAaHviOmgukxpZDFtUfL52A=
github.com/tochemey/olric v0.2.1/go.mod h1:3V8kqOJwN4eQj6M0BbLCMTDSdDhZJitVY4aUuOJUA28=
github.com/travisjeffery/go-dynaport v1.0.0 h1:m/qqf5AHgB96CMMSworIPyo1i7NZueRsnwdzdCJ8Ajw=
github.com/travisjeffery/go-dynaport v1.0.0/go.mod h1:0LHuDS4QAx+mAc4ri3WkQdavgVoBIZ7cE9ob17KIAJk=
github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q=
Expand Down Expand Up @@ -278,8 +278,8 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus=
golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M=
golang.org/x/crypto v0.34.0 h1:+/C6tk6rf/+t5DhUketUbD1aNGqiSX3j15Z6xuIDlBA=
golang.org/x/crypto v0.34.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
Expand Down
6 changes: 3 additions & 3 deletions internal/cluster/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ type Engine struct {
statesMap olric.DMap
jobKeysMap olric.DMap
actorKindsMap olric.DMap
kvStoreSize uint64
tableSize uint64

// specifies the discovery node
node *discovery.Node
Expand Down Expand Up @@ -217,7 +217,7 @@ func NewEngine(name string, disco discovery.Provider, host *discovery.Node, opts
Mutex: new(sync.Mutex),
nodeJoinedEventsFilter: goset.NewSet[string](),
nodeLeftEventsFilter: goset.NewSet[string](),
kvStoreSize: 20 * size.MB,
tableSize: 20 * size.MB,
running: atomic.NewBool(false),
}
// apply the various options
Expand Down Expand Up @@ -946,7 +946,7 @@ func (x *Engine) buildConfig() (*config.Config, error) {

// set the cluster storage tableSize
options := storage.NewConfig(nil)
options.Add("tableSize", x.kvStoreSize)
options.Add("tableSize", x.tableSize)

// create the config and return it
conf := &config.Config{
Expand Down
6 changes: 3 additions & 3 deletions internal/cluster/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ func WithTLS(serverConfig, clientConfig *tls.Config) Option {
})
}

// WithKVStoreSize sets the cluster table storage size
func WithKVStoreSize(size uint64) Option {
// WithTableSize sets the cluster table storage size
func WithTableSize(size uint64) Option {
return OptionFunc(func(eng *Engine) {
eng.kvStoreSize = size
eng.tableSize = size
})
}
4 changes: 2 additions & 2 deletions internal/cluster/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ func TestOptions(t *testing.T) {
},
{
name: "WithStorageSize",
option: WithKVStoreSize(size),
expected: Engine{kvStoreSize: size},
option: WithTableSize(size),
expected: Engine{tableSize: size},
},
}

Expand Down
26 changes: 13 additions & 13 deletions actor/cluster_store.go → internal/cluster/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* SOFTWARE.
*/

package actors
package cluster

import (
"net"
Expand All @@ -36,14 +36,14 @@ import (
"github.com/tochemey/goakt/v3/log"
)

type clusterStore struct {
type Store struct {
store *logsm.LogSM
logger log.Logger
}

// newClusterStore creates an instance of clusterStore
// NewStore creates an instance of Store
// TODO: add custom options. At the moment the default should be enough
func newClusterStore(dir string, logger log.Logger) (*clusterStore, error) {
func NewStore(dir string, logger log.Logger) (*Store, error) {
store, err := logsm.Open(dir,
logsm.WithLogger(logger),
logsm.WithL0TargetNum(5),
Expand All @@ -58,21 +58,21 @@ func newClusterStore(dir string, logger log.Logger) (*clusterStore, error) {
return nil, err
}

return &clusterStore{
return &Store{
store: store,
logger: logger,
}, nil
}

// set adds a peer to the cache
func (s *clusterStore) set(peer *internalpb.PeerState) error {
// PersistPeerState adds a peer to the cache
func (s *Store) PersistPeerState(peer *internalpb.PeerState) error {
peerAddress := net.JoinHostPort(peer.GetHost(), strconv.Itoa(int(peer.GetPeersPort())))
value, _ := proto.Marshal(peer)
return s.store.Set(peerAddress, value)
}

// get retrieve a peer from the cache
func (s *clusterStore) get(peerAddress string) (*internalpb.PeerState, bool) {
// GetPeerState retrieve a peer from the cache
func (s *Store) GetPeerState(peerAddress string) (*internalpb.PeerState, bool) {
value, ok := s.store.Get(peerAddress)
if !ok {
return nil, false
Expand All @@ -89,12 +89,12 @@ func (s *clusterStore) get(peerAddress string) (*internalpb.PeerState, bool) {
return peer, ok
}

// remove deletes a peer from the cache
func (s *clusterStore) remove(peerAddress string) error {
// DeletePeerState deletes a peer from the cache
func (s *Store) DeletePeerState(peerAddress string) error {
return s.store.Delete(peerAddress)
}

// close resets the cache
func (s *clusterStore) close() {
// Close resets the cache
func (s *Store) Close() {
s.store.Close()
}
Loading

0 comments on commit d77d313

Please sign in to comment.