diff --git a/actor/actor_system.go b/actor/actor_system.go index 22017ea5..c3a7dcd2 100644 --- a/actor/actor_system.go +++ b/actor/actor_system.go @@ -285,7 +285,7 @@ type actorSystem struct { reflection *reflection peersStateLoopInterval time.Duration - clusterStore *clusterStore + clusterStore *cluster.Store clusterConfig *ClusterConfig rebalancingQueue chan *internalpb.PeerState rebalancedNodes goset.Set[string] @@ -1392,7 +1392,7 @@ func (x *actorSystem) completeRebalancing() { // removePeerStateFromStore removes the peer state from the cluster store func (x *actorSystem) removePeerStateFromStore(address string) error { x.locker.Lock() - if err := x.clusterStore.remove(address); err != nil { + if err := x.clusterStore.DeletePeerState(address); err != nil { x.locker.Unlock() return err } @@ -1404,7 +1404,7 @@ func (x *actorSystem) removePeerStateFromStore(address string) error { // getPeerStateFromStore returns the peer state from the cluster store func (x *actorSystem) getPeerStateFromStore(address string) (*internalpb.PeerState, error) { x.locker.Lock() - peerState, ok := x.clusterStore.get(address) + peerState, ok := x.clusterStore.GetPeerState(address) x.locker.Unlock() if !ok { return nil, ErrPeerNotFound @@ -1441,7 +1441,7 @@ func (x *actorSystem) enableClustering(ctx context.Context) error { return errors.New("clustering needs remoting to be enabled") } - clusterStore, err := newClusterStore(x.clusterConfig.WAL(), x.logger) + clusterStore, err := cluster.NewStore(x.clusterConfig.WAL(), x.logger) if err != nil { x.logger.Errorf("failed to initialize peers cache: %v", err) x.locker.Unlock() @@ -1469,7 +1469,7 @@ func (x *actorSystem) enableClustering(ctx context.Context) error { cluster.WithReadQuorum(x.clusterConfig.ReadQuorum()), cluster.WithReplicaCount(x.clusterConfig.ReplicaCount()), cluster.WithTLS(x.serverTLS, x.clientTLS), - cluster.WithKVStoreSize(x.clusterConfig.KVStoreSize()), + cluster.WithTableSize(x.clusterConfig.TableSize()), ) if err != nil { x.logger.Errorf("failed to initialize cluster engine: %v", err) @@ -1692,7 +1692,7 @@ func (x *actorSystem) clusterEventsLoop() { // only leader can start rebalancing. Just remove from the peer state from your cluster state // to free up resources if !x.cluster.IsLeader(ctx) { - if err := x.clusterStore.remove(nodeLeft.GetAddress()); err != nil { + if err := x.clusterStore.DeletePeerState(nodeLeft.GetAddress()); err != nil { x.logger.Errorf("%s failed to remove left node=(%s) from cluster store: %w", x.name, nodeLeft.GetAddress(), err) } continue @@ -1703,7 +1703,7 @@ func (x *actorSystem) clusterEventsLoop() { } x.rebalancedNodes.Add(nodeLeft.GetAddress()) - if peerState, ok := x.clusterStore.get(nodeLeft.GetAddress()); ok { + if peerState, ok := x.clusterStore.GetPeerState(nodeLeft.GetAddress()); ok { x.rebalanceLocker.Lock() x.rebalancingQueue <- peerState x.rebalanceLocker.Unlock() @@ -1714,7 +1714,7 @@ func (x *actorSystem) clusterEventsLoop() { } } -// peersStateLoop fetches the cluster peers' PeerState and update the node clusterStore +// peersStateLoop fetches the cluster peers' PeerState and update the node Store func (x *actorSystem) peersStateLoop() { x.logger.Info("peers state synchronization has started...") ticker := ticker.New(x.peersStateLoopInterval) @@ -1824,7 +1824,7 @@ func (x *actorSystem) processPeerState(ctx context.Context, peer *cluster.Peer) } x.logger.Debugf("peer (%s) actors count (%d)", peerAddress, len(peerState.GetActors())) - if err := x.clusterStore.set(peerState); err != nil { + if err := x.clusterStore.PersistPeerState(peerState); err != nil { x.logger.Error(err) return err } @@ -2225,7 +2225,7 @@ func (x *actorSystem) shutdownCluster(ctx context.Context, actorRefs []ActorRef) x.rebalanceLocker.Unlock() if x.clusterStore != nil { - x.clusterStore.close() + x.clusterStore.Close() } } return nil diff --git a/actor/cluster_config.go b/actor/cluster_config.go index d5261b8f..3f05a280 100644 --- a/actor/cluster_config.go +++ b/actor/cluster_config.go @@ -49,7 +49,7 @@ type ClusterConfig struct { discoveryPort int peersPort int kinds []Actor - kvStoreSize uint64 + tableSize uint64 wal string } @@ -68,7 +68,7 @@ func NewClusterConfig() *ClusterConfig { readQuorum: 1, replicaCount: 1, partitionCount: 271, - kvStoreSize: 20 * size.MB, + tableSize: 20 * size.MB, wal: filepath.Join(homedir, "goakt", "data"), } } @@ -186,16 +186,16 @@ func (x *ClusterConfig) WithReadQuorum(count uint32) *ClusterConfig { return x } -// WithKVStoreSize sets the key/value in-memory storage size +// WithTableSize sets the key/value in-memory storage size // The default values is 20MB -func (x *ClusterConfig) WithKVStoreSize(size uint64) *ClusterConfig { - x.kvStoreSize = size +func (x *ClusterConfig) WithTableSize(size uint64) *ClusterConfig { + x.tableSize = size return x } -// KVStoreSize returns the cluster storage size -func (x *ClusterConfig) KVStoreSize() uint64 { - return x.kvStoreSize +// TableSize returns the cluster storage size +func (x *ClusterConfig) TableSize() uint64 { + return x.tableSize } // WAL returns the WAL directory diff --git a/actor/cluster_config_test.go b/actor/cluster_config_test.go index 9d32ed8d..f89ed9bb 100644 --- a/actor/cluster_config_test.go +++ b/actor/cluster_config_test.go @@ -52,7 +52,7 @@ func TestClusterConfig(t *testing.T) { WithWriteQuorum(1). WithReadQuorum(1). WithPartitionCount(3). - WithKVStoreSize(10 * size.MB). + WithTableSize(10 * size.MB). WithWAL(tempdir). WithDiscovery(disco) @@ -64,7 +64,7 @@ func TestClusterConfig(t *testing.T) { assert.EqualValues(t, 1, config.ReadQuorum()) assert.EqualValues(t, 1, config.WriteQuorum()) assert.EqualValues(t, 3, config.PartitionCount()) - assert.Exactly(t, uint64(10*size.MB), config.KVStoreSize()) + assert.Exactly(t, uint64(10*size.MB), config.TableSize()) assert.Exactly(t, tempdir, config.WAL()) assert.True(t, disco == config.Discovery()) assert.Len(t, config.Kinds(), 3) diff --git a/go.mod b/go.mod index 342fa597..95a0b71e 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/reugn/go-quartz v0.14.0 github.com/spaolacci/murmur3 v1.1.0 github.com/stretchr/testify v1.10.0 - github.com/tochemey/olric v0.2.0 + github.com/tochemey/olric v0.2.1 github.com/travisjeffery/go-dynaport v1.0.0 github.com/zeebo/xxh3 v1.0.2 go.uber.org/atomic v1.11.0 @@ -52,7 +52,7 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/google/btree v1.1.3 // indirect github.com/google/gnostic-models v0.6.9 // indirect - github.com/google/go-cmp v0.6.0 // indirect + github.com/google/go-cmp v0.7.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect @@ -84,7 +84,7 @@ require ( github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/x448/float16 v0.8.4 // indirect - golang.org/x/crypto v0.33.0 // indirect + golang.org/x/crypto v0.34.0 // indirect golang.org/x/mod v0.23.0 // indirect golang.org/x/oauth2 v0.26.0 // indirect golang.org/x/sys v0.30.0 // indirect diff --git a/go.sum b/go.sum index 26302833..3762776c 100644 --- a/go.sum +++ b/go.sum @@ -88,8 +88,8 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -248,8 +248,8 @@ github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JT github.com/tidwall/redcon v1.6.2 h1:5qfvrrybgtO85jnhSravmkZyC0D+7WstbfCs3MmPhow= github.com/tidwall/redcon v1.6.2/go.mod h1:p5Wbsgeyi2VSTBWOcA5vRXrOb9arFTcU2+ZzFjqV75Y= github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg= -github.com/tochemey/olric v0.2.0 h1:y/uoVNNVdMn+pbUI01i6TqeD48+VgjRcoRGOzVzkdVY= -github.com/tochemey/olric v0.2.0/go.mod h1:vRpB3Xb0L4J7acugeAfrIXcwLDlCwRsC10s1jCYND1Y= +github.com/tochemey/olric v0.2.1 h1:6RhHx5lBNgRj5IXatuu5nAaHviOmgukxpZDFtUfL52A= +github.com/tochemey/olric v0.2.1/go.mod h1:3V8kqOJwN4eQj6M0BbLCMTDSdDhZJitVY4aUuOJUA28= github.com/travisjeffery/go-dynaport v1.0.0 h1:m/qqf5AHgB96CMMSworIPyo1i7NZueRsnwdzdCJ8Ajw= github.com/travisjeffery/go-dynaport v1.0.0/go.mod h1:0LHuDS4QAx+mAc4ri3WkQdavgVoBIZ7cE9ob17KIAJk= github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q= @@ -278,8 +278,8 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= -golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= +golang.org/x/crypto v0.34.0 h1:+/C6tk6rf/+t5DhUketUbD1aNGqiSX3j15Z6xuIDlBA= +golang.org/x/crypto v0.34.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= diff --git a/internal/cluster/engine.go b/internal/cluster/engine.go index 1e6521ab..c51507fd 100644 --- a/internal/cluster/engine.go +++ b/internal/cluster/engine.go @@ -159,7 +159,7 @@ type Engine struct { statesMap olric.DMap jobKeysMap olric.DMap actorKindsMap olric.DMap - kvStoreSize uint64 + tableSize uint64 // specifies the discovery node node *discovery.Node @@ -217,7 +217,7 @@ func NewEngine(name string, disco discovery.Provider, host *discovery.Node, opts Mutex: new(sync.Mutex), nodeJoinedEventsFilter: goset.NewSet[string](), nodeLeftEventsFilter: goset.NewSet[string](), - kvStoreSize: 20 * size.MB, + tableSize: 20 * size.MB, running: atomic.NewBool(false), } // apply the various options @@ -946,7 +946,7 @@ func (x *Engine) buildConfig() (*config.Config, error) { // set the cluster storage tableSize options := storage.NewConfig(nil) - options.Add("tableSize", x.kvStoreSize) + options.Add("tableSize", x.tableSize) // create the config and return it conf := &config.Config{ diff --git a/internal/cluster/option.go b/internal/cluster/option.go index e9e87743..a4594b50 100644 --- a/internal/cluster/option.go +++ b/internal/cluster/option.go @@ -130,9 +130,9 @@ func WithTLS(serverConfig, clientConfig *tls.Config) Option { }) } -// WithKVStoreSize sets the cluster table storage size -func WithKVStoreSize(size uint64) Option { +// WithTableSize sets the cluster table storage size +func WithTableSize(size uint64) Option { return OptionFunc(func(eng *Engine) { - eng.kvStoreSize = size + eng.tableSize = size }) } diff --git a/internal/cluster/option_test.go b/internal/cluster/option_test.go index dc457226..cabff8e3 100644 --- a/internal/cluster/option_test.go +++ b/internal/cluster/option_test.go @@ -103,8 +103,8 @@ func TestOptions(t *testing.T) { }, { name: "WithStorageSize", - option: WithKVStoreSize(size), - expected: Engine{kvStoreSize: size}, + option: WithTableSize(size), + expected: Engine{tableSize: size}, }, } diff --git a/actor/cluster_store.go b/internal/cluster/store.go similarity index 81% rename from actor/cluster_store.go rename to internal/cluster/store.go index 02e08dd6..41311d7e 100644 --- a/actor/cluster_store.go +++ b/internal/cluster/store.go @@ -22,7 +22,7 @@ * SOFTWARE. */ -package actors +package cluster import ( "net" @@ -36,14 +36,14 @@ import ( "github.com/tochemey/goakt/v3/log" ) -type clusterStore struct { +type Store struct { store *logsm.LogSM logger log.Logger } -// newClusterStore creates an instance of clusterStore +// NewStore creates an instance of Store // TODO: add custom options. At the moment the default should be enough -func newClusterStore(dir string, logger log.Logger) (*clusterStore, error) { +func NewStore(dir string, logger log.Logger) (*Store, error) { store, err := logsm.Open(dir, logsm.WithLogger(logger), logsm.WithL0TargetNum(5), @@ -58,21 +58,21 @@ func newClusterStore(dir string, logger log.Logger) (*clusterStore, error) { return nil, err } - return &clusterStore{ + return &Store{ store: store, logger: logger, }, nil } -// set adds a peer to the cache -func (s *clusterStore) set(peer *internalpb.PeerState) error { +// PersistPeerState adds a peer to the cache +func (s *Store) PersistPeerState(peer *internalpb.PeerState) error { peerAddress := net.JoinHostPort(peer.GetHost(), strconv.Itoa(int(peer.GetPeersPort()))) value, _ := proto.Marshal(peer) return s.store.Set(peerAddress, value) } -// get retrieve a peer from the cache -func (s *clusterStore) get(peerAddress string) (*internalpb.PeerState, bool) { +// GetPeerState retrieve a peer from the cache +func (s *Store) GetPeerState(peerAddress string) (*internalpb.PeerState, bool) { value, ok := s.store.Get(peerAddress) if !ok { return nil, false @@ -89,12 +89,12 @@ func (s *clusterStore) get(peerAddress string) (*internalpb.PeerState, bool) { return peer, ok } -// remove deletes a peer from the cache -func (s *clusterStore) remove(peerAddress string) error { +// DeletePeerState deletes a peer from the cache +func (s *Store) DeletePeerState(peerAddress string) error { return s.store.Delete(peerAddress) } -// close resets the cache -func (s *clusterStore) close() { +// Close resets the cache +func (s *Store) Close() { s.store.Close() } diff --git a/actor/cluster_store_test.go b/internal/cluster/store_test.go similarity index 84% rename from actor/cluster_store_test.go rename to internal/cluster/store_test.go index b2a21b2d..0ca34547 100644 --- a/actor/cluster_store_test.go +++ b/internal/cluster/store_test.go @@ -22,7 +22,7 @@ * SOFTWARE. */ -package actors +package cluster import ( "testing" @@ -39,16 +39,16 @@ func TestClusterStore(t *testing.T) { t.Run("Open when directory exists", func(t *testing.T) { dir := t.TempDir() logger := log.DiscardLogger - store, err := newClusterStore(dir, logger) + store, err := NewStore(dir, logger) require.NoError(t, err) require.NotNil(t, store) - store.close() + store.Close() }) t.Run("Open when directory is invalid", func(t *testing.T) { dir := "/" logger := log.DiscardLogger - store, err := newClusterStore(dir, logger) + store, err := NewStore(dir, logger) require.Error(t, err) require.Nil(t, store) }) @@ -56,7 +56,7 @@ func TestClusterStore(t *testing.T) { t.Run("SetAndGet", func(t *testing.T) { dir := t.TempDir() logger := log.DiscardLogger - store, err := newClusterStore(dir, logger) + store, err := NewStore(dir, logger) require.NoError(t, err) require.NotNil(t, store) @@ -68,33 +68,33 @@ func TestClusterStore(t *testing.T) { } // Set successful - err = store.set(peerState) + err = store.PersistPeerState(peerState) require.NoError(t, err) // Get found key := "127.0.0.1:2281" - actual, ok := store.get(key) + actual, ok := store.GetPeerState(key) require.True(t, ok) assert.True(t, proto.Equal(peerState, actual)) // Get not found - _, ok = store.get("127.0.0.1:2282") + _, ok = store.GetPeerState("127.0.0.1:2282") require.False(t, ok) // Get unmarshalling failed // Assume that wrong bytes array in kept in the LogSM err = store.store.Set(key, []byte("hello")) require.NoError(t, err) - _, ok = store.get(key) + _, ok = store.GetPeerState(key) require.False(t, ok) - store.close() + store.Close() }) t.Run("Remove", func(t *testing.T) { dir := t.TempDir() logger := log.DiscardLogger - store, err := newClusterStore(dir, logger) + store, err := NewStore(dir, logger) require.NoError(t, err) require.NotNil(t, store) @@ -106,22 +106,22 @@ func TestClusterStore(t *testing.T) { } // Set successful - err = store.set(peerState) + err = store.PersistPeerState(peerState) require.NoError(t, err) // Get found key := "127.0.0.1:2281" - actual, ok := store.get(key) + actual, ok := store.GetPeerState(key) require.True(t, ok) assert.True(t, proto.Equal(peerState, actual)) // Remove - err = store.remove(key) + err = store.DeletePeerState(key) require.NoError(t, err) - _, ok = store.get(key) + _, ok = store.GetPeerState(key) require.False(t, ok) - store.close() + store.Close() }) }