Skip to content

Commit

Permalink
refactor: enhance pidMap (#198)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Jan 1, 2024
1 parent b5d0522 commit 6638364
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 19 deletions.
18 changes: 9 additions & 9 deletions actors/mailbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ type receiveContextBuffer struct {
// specifies the number of messages to stash
capacity *atomic.Uint64
buffer chan ReceiveContext
mu sync.Mutex
mu *sync.RWMutex
}

// newReceiveContextBuffer creates a Mailbox with a fixed capacity
func newReceiveContextBuffer(capacity uint64) Mailbox {
return &receiveContextBuffer{
capacity: atomic.NewUint64(capacity),
buffer: make(chan ReceiveContext, capacity),
mu: sync.Mutex{},
mu: &sync.RWMutex{},
}
}

Expand All @@ -89,8 +89,8 @@ func (x *receiveContextBuffer) Push(msg ReceiveContext) error {

// Pop fetches a message from the mailbox
func (x *receiveContextBuffer) Pop() (msg ReceiveContext, err error) {
x.mu.Lock()
defer x.mu.Unlock()
x.mu.RLock()
defer x.mu.RUnlock()
select {
case msg := <-x.buffer:
return msg, nil
Expand All @@ -101,15 +101,15 @@ func (x *receiveContextBuffer) Pop() (msg ReceiveContext, err error) {

// Iterator returns a channel that can be used to iterate over the mailbox
func (x *receiveContextBuffer) Iterator() <-chan ReceiveContext {
x.mu.Lock()
defer x.mu.Unlock()
x.mu.RLock()
defer x.mu.RUnlock()
return x.buffer
}

// IsEmpty returns true when the buffer is empty
func (x *receiveContextBuffer) IsEmpty() bool {
x.mu.Lock()
defer x.mu.Unlock()
x.mu.RLock()
defer x.mu.RUnlock()
return len(x.buffer) == 0
}

Expand All @@ -123,7 +123,7 @@ func (x *receiveContextBuffer) Clone() Mailbox {
return &receiveContextBuffer{
capacity: x.capacity,
buffer: make(chan ReceiveContext, x.capacity.Load()),
mu: sync.Mutex{},
mu: &sync.RWMutex{},
}
}

Expand Down
22 changes: 12 additions & 10 deletions actors/pid_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,31 @@ package actors

import (
"sync"
"sync/atomic"
)

type pidMap struct {
mu sync.Mutex
mu *sync.RWMutex
size atomic.Int64
pids map[string]PID
}

func newPIDMap(cap int) *pidMap {
return &pidMap{
mu: sync.Mutex{},
mu: &sync.RWMutex{},
pids: make(map[string]PID, cap),
}
}

// Len returns the number of PIDs
func (m *pidMap) Len() int {
m.mu.Lock()
defer m.mu.Unlock()
return len(m.pids)
return int(m.size.Load())
}

// Get retrieves a pid by its address
func (m *pidMap) Get(path *Path) (pid PID, ok bool) {
m.mu.Lock()
defer m.mu.Unlock()
m.mu.RLock()
defer m.mu.RUnlock()
pid, ok = m.pids[path.String()]
return
}
Expand All @@ -59,23 +59,25 @@ func (m *pidMap) Get(path *Path) (pid PID, ok bool) {
func (m *pidMap) Set(pid PID) {
m.mu.Lock()
m.pids[pid.ActorPath().String()] = pid
m.size.Add(1)
m.mu.Unlock()
}

// Delete removes a pid from the map
func (m *pidMap) Delete(addr *Path) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.pids, addr.String())
m.size.Add(-1)
m.mu.Unlock()
}

// List returns all actors as a slice
func (m *pidMap) List() []PID {
m.mu.Lock()
defer m.mu.Unlock()
out := make([]PID, 0, len(m.pids))
out := make([]PID, 0, m.size.Load())
for _, actor := range m.pids {
out = append(out, actor)
}
m.mu.Unlock()
return out
}
1 change: 1 addition & 0 deletions actors/pid_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,5 @@ func TestPIDMap(t *testing.T) {
// list the map
lst = pidMap.List()
assert.Len(t, lst, 0)
assert.EqualValues(t, 0, pidMap.Len())
}

0 comments on commit 6638364

Please sign in to comment.