-
Notifications
You must be signed in to change notification settings - Fork 0
/
future.go
108 lines (86 loc) · 2.2 KB
/
future.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package async
import (
"sync/atomic"
)
const (
PanicSetOnImmediateFuture = "you cannot set a value on an immediate future"
)
type immediateFuture[T any] struct {
value T
}
func (f immediateFuture[T]) Get() <-chan T {
ch := make(chan T, 1)
ch <- f.value
return ch
}
func (f immediateFuture[T]) Set(_ T) bool {
panic(PanicSetOnImmediateFuture)
}
// NewFutureImmediate creates a future of type T that has a value that is already set.
func NewFutureImmediate[T any](value T) Future[T] {
return immediateFuture[T]{value: value}
}
type future[T any] struct {
value atomic.Pointer[T]
consumers atomic.Pointer[[]chan T]
publishedIdx atomic.Int32
}
// NewFuture creates a new future of type T.
func NewFuture[T any]() Future[T] {
f := future[T]{}
f.consumers.Store(&[]chan T{}) // init the consumers to an empty list
return &f
}
func (f *future[T]) Get() <-chan T {
defer f.tryNotifyConsumers()
return f.addConsumer()
}
func (f *future[T]) addConsumer() <-chan T {
// we do not want rendezvous here as it will block the caller of tryNotifyConsumers()
ch := make(chan T, 1)
for {
oldListenersPtr := f.consumers.Load()
oldListeners := *oldListenersPtr
newListeners := make([]chan T, len(oldListeners)+1)
copy(newListeners, oldListeners)
newListeners[len(oldListeners)] = ch
if f.consumers.CompareAndSwap(oldListenersPtr, &newListeners) {
break
}
}
return ch
}
func (f *future[T]) tryNotifyConsumers() {
value := f.value.Load()
// check if the value has been set
if value == nil {
return
}
// determine if there are any new consumers that have not been notified
consumers := *f.consumers.Load()
consumerCount := len(consumers)
publishedIdx := f.publishedIdx.Load()
newConsumers := consumers[publishedIdx:consumerCount]
if len(newConsumers) == 0 {
// no new consumers
return
}
if !f.publishedIdx.CompareAndSwap(publishedIdx, int32(consumerCount)) {
// concurrency guard to prevent duplicate publications
return
}
// notify the new consumers
go func() {
for _, consumer := range newConsumers {
consumer <- *value
close(consumer)
}
}()
}
func (f *future[T]) Set(value T) bool {
ok := f.value.CompareAndSwap(nil, &value)
if ok {
f.tryNotifyConsumers()
}
return ok
}