Skip to content

Commit

Permalink
Merge pull request #223 from yywing/issue_222
Browse files Browse the repository at this point in the history
Closes 222
  • Loading branch information
Zerpet authored Sep 28, 2023
2 parents 831d90b + f4ea4b2 commit fbae907
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 35 deletions.
40 changes: 24 additions & 16 deletions allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ const (

// allocator maintains a bitset of allocated numbers.
type allocator struct {
pool *big.Int
last int
low int
high int
pool *big.Int
follow int
low int
high int
}

// NewAllocator reserves and frees integers out of a range between low and
Expand All @@ -31,10 +31,10 @@ type allocator struct {
// sizeof(big.Word)
func newAllocator(low, high int) *allocator {
return &allocator{
pool: big.NewInt(0),
last: low,
low: low,
high: high,
pool: big.NewInt(0),
follow: low,
low: low,
high: high,
}
}

Expand Down Expand Up @@ -69,21 +69,29 @@ func (a allocator) String() string {
// O(N) worst case runtime where N is allocated, but usually O(1) due to a
// rolling index into the oldest allocation.
func (a *allocator) next() (int, bool) {
wrapped := a.last
wrapped := a.follow
defer func() {
// make a.follow point to next value
if a.follow == a.high {
a.follow = a.low
} else {
a.follow += 1
}
}()

// Find trailing bit
for ; a.last <= a.high; a.last++ {
if a.reserve(a.last) {
return a.last, true
for ; a.follow <= a.high; a.follow++ {
if a.reserve(a.follow) {
return a.follow, true
}
}

// Find preceding free'd pool
a.last = a.low
a.follow = a.low

for ; a.last < wrapped; a.last++ {
if a.reserve(a.last) {
return a.last, true
for ; a.follow < wrapped; a.follow++ {
if a.reserve(a.follow) {
return a.follow, true
}
}

Expand Down
22 changes: 22 additions & 0 deletions allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,28 @@ func TestAllocatorShouldReuseReleased(t *testing.T) {
}
}

func TestAllocatorShouldNotReuseEarly(t *testing.T) {
a := newAllocator(1, 2)

first, _ := a.next()
if want, got := 1, first; want != got {
t.Fatalf("expected allocation to be %d, got: %d", want, got)
}

a.release(first)

second, _ := a.next()
if want, got := 2, second; want != got {
t.Fatalf("expected second allocation to be %d, got: %d", want, got)
}

third, _ := a.next()
if want, got := first, third; want != got {
t.Fatalf("expected third allocation to be %d, got: %d", want, got)
}

}

func TestAllocatorReleasesKeepUpWithAllocationsForAllSizes(t *testing.T) {
if testing.Short() {
t.Skip()
Expand Down
13 changes: 12 additions & 1 deletion channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Channel struct {

// closed is set to 1 when the channel has been closed - see Channel.send()
closed int32
close chan struct{}

// true when we will never notify again
noNotify bool
Expand Down Expand Up @@ -86,6 +87,7 @@ func newChannel(c *Connection, id uint16) *Channel {
confirms: newConfirms(),
recv: (*Channel).recvMethod,
errors: make(chan *Error, 1),
close: make(chan struct{}),
}
}

Expand Down Expand Up @@ -146,6 +148,7 @@ func (ch *Channel) shutdown(e *Error) {
}

close(ch.errors)
close(ch.close)
ch.noNotify = true
})
}
Expand Down Expand Up @@ -368,7 +371,11 @@ func (ch *Channel) dispatch(msg message) {
// deliveries are in flight and a no-wait cancel has happened

default:
ch.rpc <- msg
select {
case <-ch.close:
return
case ch.rpc <- msg:
}
}
}

Expand Down Expand Up @@ -468,6 +475,10 @@ code set to '200'.
It is safe to call this method multiple times.
*/
func (ch *Channel) Close() error {
if ch.IsClosed() {
return nil
}

defer ch.connection.closeChannel(ch, nil)
return ch.call(
&channelClose{ReplyCode: replySuccess},
Expand Down
56 changes: 38 additions & 18 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ type Connection struct {
blocks []chan Blocking

errors chan *Error
// if connection is closed should close this chan
close chan struct{}

Config Config // The negotiated Config after connection.open

Expand Down Expand Up @@ -263,6 +265,7 @@ func Open(conn io.ReadWriteCloser, config Config) (*Connection, error) {
rpc: make(chan message),
sends: make(chan time.Time),
errors: make(chan *Error, 1),
close: make(chan struct{}),
deadlines: make(chan readDeadliner, 1),
}
go c.reader(conn)
Expand Down Expand Up @@ -597,6 +600,8 @@ func (c *Connection) shutdown(err *Error) {
}

c.conn.Close()
// reader exit
close(c.close)

c.channels = nil
c.allocator = nil
Expand Down Expand Up @@ -634,15 +639,23 @@ func (c *Connection) dispatch0(f frame) {
c <- Blocking{Active: false}
}
default:
c.rpc <- m
select {
case <-c.close:
return
case c.rpc <- m:
}

}
case *heartbeatFrame:
// kthx - all reads reset our deadline. so we can drop this
default:
// lolwat - channel0 only responds to methods and heartbeats
if err := c.closeWith(ErrUnexpectedFrame); err != nil {
Logger.Printf("error sending connectionCloseOk with ErrUnexpectedFrame, error: %+v", err)
}
// closeWith use call don't block reader
go func() {
if err := c.closeWith(ErrUnexpectedFrame); err != nil {
Logger.Printf("error sending connectionCloseOk with ErrUnexpectedFrame, error: %+v", err)
}
}()
}
}

Expand Down Expand Up @@ -689,9 +702,12 @@ func (c *Connection) dispatchClosed(f frame) {
// we are already closed, so do nothing
default:
// unexpected method on closed channel
if err := c.closeWith(ErrClosed); err != nil {
Logger.Printf("error sending connectionCloseOk with ErrClosed, error: %+v", err)
}
// closeWith use call don't block reader
go func() {
if err := c.closeWith(ErrClosed); err != nil {
Logger.Printf("error sending connectionCloseOk with ErrClosed, error: %+v", err)
}
}()
}
}
}
Expand Down Expand Up @@ -813,13 +829,16 @@ func (c *Connection) allocateChannel() (*Channel, error) {

// releaseChannel removes a channel from the registry as the final part of the
// channel lifecycle
func (c *Connection) releaseChannel(id uint16) {
func (c *Connection) releaseChannel(ch *Channel) {
c.m.Lock()
defer c.m.Unlock()

if !c.IsClosed() {
delete(c.channels, id)
c.allocator.release(int(id))
got, ok := c.channels[ch.id]
if ok && got == ch {
delete(c.channels, ch.id)
c.allocator.release(int(ch.id))
}
}
}

Expand All @@ -831,7 +850,7 @@ func (c *Connection) openChannel() (*Channel, error) {
}

if err := ch.open(); err != nil {
c.releaseChannel(ch.id)
c.releaseChannel(ch)
return nil, err
}
return ch, nil
Expand All @@ -842,7 +861,7 @@ func (c *Connection) openChannel() (*Channel, error) {
// this connection.
func (c *Connection) closeChannel(ch *Channel, e *Error) {
ch.shutdown(e)
c.releaseChannel(ch.id)
c.releaseChannel(ch)
}

/*
Expand All @@ -863,13 +882,14 @@ func (c *Connection) call(req message, res ...message) error {
}
}

msg, ok := <-c.rpc
if !ok {
err, errorsChanIsOpen := <-c.errors
if !errorsChanIsOpen {
return ErrClosed
var msg message
select {
case e, ok := <-c.errors:
if ok {
return e
}
return err
return ErrClosed
case msg = <-c.rpc:
}

// Try to match one of the result types
Expand Down

0 comments on commit fbae907

Please sign in to comment.