Skip to content

Commit

Permalink
once: add package 'once'
Browse files Browse the repository at this point in the history
Provides for a construct like a sync.Once, but caring about whether
the function succeeded.
  • Loading branch information
hdonnay committed May 11, 2017
1 parent bd21952 commit cec8caa
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 0 deletions.
26 changes: 26 additions & 0 deletions once/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
Copyright (c) 2017, Vimeo, LLC.
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

The views and conclusions contained in the software and documentation are those
of the authors and should not be interpreted as representing official policies,
either expressed or implied, of Vimeo, LLC.
94 changes: 94 additions & 0 deletions once/once.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Package once provides for running a running a function once, until successful.
package once

import (
"context"
"fmt"
"sync"
"sync/atomic"
)

// Success is an object that will perform exactly one action if successful.
type Success struct {
// This is an atomic instead of, say, a bool so that callers can hot-path without acquiring a lock.
done uint32
// This Cond protects the running bool, so that only one Do() execution is happening at once.
sync.Cond
// Running is set to true when a goroutine is calling the provided function.
running bool
}

// New returns a Success, ready to use.
func New() *Success {
o := &Success{}
o.L = &sync.Mutex{}
return o
}

// Do calls the function f if and only if Do is being called for the
// first time for this instance of Success and previous calls were not successful.
// In other words, given
// var once *Success = New()
// if once.Do(ctx, f) is called multiple times, f will be invoked until it returns
// a non-nil error, even if f has a different value in each invocation.
// A new instance of Success is required for each function to execute.
//
// Do is intended for initialization that must be run exactly once if successful.
//
// Because no call to Do returns until the one call to f returns, if f causes
// Do to be called, it will deadlock.
//
// If f panics, Do considers it to have returned with an error, so future calls
// of Do will invoke f again.
//
// If the context has been canceled before f is called successfully,
// context.Canceled will be returned. Callers are responsible to gracefully handle
// this event.
func (o *Success) Do(ctx context.Context, f func() error) error {
if atomic.LoadUint32(&o.done) != 0 {
return nil
}

o.L.Lock()
defer o.L.Unlock()

for o.running {
if err := ctx.Err(); err != nil {
return err
}
o.Wait()
}
o.running = true
defer func() {
o.running = false
}()

if err := ctx.Err(); err != nil {
return err
}
if atomic.LoadUint32(&o.done) != 0 {
return nil
}

if err := o.invoke(f); err != nil {
// Wake up just one goroutine to make the next attempt.
o.Signal()
return err
}

atomic.StoreUint32(&o.done, 1)
o.Broadcast()
return nil
}

func (o *Success) invoke(f func() error) (err error) {
// This does a pointer to an interface so that the deferred func can change the
// error when the stack gets unwound.
defer func(e *error) {
if r := recover(); r != nil {
*e = fmt.Errorf("recovered: %v", r)
}
}(&err)
err = f()
return err
}
53 changes: 53 additions & 0 deletions once/once_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package once_test

import (
"context"
"errors"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/vimeo/go-util/once"
)

func TestThunder(t *testing.T) {
var tgt uint32 = 2
var calls uint32
var wg sync.WaitGroup
add := func() error {
t.Log("called")
time.Sleep(time.Millisecond)
if atomic.AddUint32(&calls, 1) != tgt {
return errors.New("errored")
}
return nil
}
o := once.New()

for i := 0; i < 20; i++ {
wg.Add(1)
go func(i int) {
err := o.Do(context.Background(), add)
t.Logf("%d:\t%v", i, err)
wg.Done()
}(i)
}
wg.Wait()

if calls != tgt {
t.Fatalf("calls = %d", calls)
}
}

// Make sure that panicing actually returns an error.
func TestPanic(t *testing.T) {
o := once.New()
err := o.Do(context.Background(), func() error {
panic("panic'd")
})
t.Log(err)
if err == nil {
t.Fatalf("wanted error, got %v", err)
}
}

0 comments on commit cec8caa

Please sign in to comment.