-
Notifications
You must be signed in to change notification settings - Fork 77
/
Copy pathstream.go
108 lines (97 loc) · 2.54 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package kuberesolver
import (
"context"
"encoding/json"
"fmt"
"io"
"sync"
"google.golang.org/grpc/grpclog"
)
// Interface can be implemented by anything that knows how to watch and report changes.
type watchInterface interface {
// Stops watching. Will close the channel returned by ResultChan(). Releases
// any resources used by the watch.
Stop()
// Returns a chan which will receive all the events. If an error occurs
// or Stop() is called, this channel will be closed, in which case the
// watch should be completely cleaned up.
ResultChan() <-chan Event
}
// StreamWatcher turns any stream for which you can write a Decoder interface
// into a watch.Interface.
type streamWatcher struct {
result chan Event
r io.ReadCloser
decoder *json.Decoder
sync.Mutex
stopped bool
}
// NewStreamWatcher creates a StreamWatcher from the given io.ReadClosers.
func newStreamWatcher(r io.ReadCloser) watchInterface {
sw := &streamWatcher{
r: r,
decoder: json.NewDecoder(r),
result: make(chan Event),
}
go sw.receive()
return sw
}
// ResultChan implements Interface.
func (sw *streamWatcher) ResultChan() <-chan Event {
return sw.result
}
// Stop implements Interface.
func (sw *streamWatcher) Stop() {
sw.Lock()
defer sw.Unlock()
if !sw.stopped {
sw.stopped = true
sw.r.Close()
}
}
// stopping returns true if Stop() was called previously.
func (sw *streamWatcher) stopping() bool {
sw.Lock()
defer sw.Unlock()
return sw.stopped
}
// receive reads result from the decoder in a loop and sends down the result channel.
func (sw *streamWatcher) receive() {
defer close(sw.result)
defer sw.Stop()
for {
obj, err := sw.Decode()
if err != nil {
// Ignore expected error.
if sw.stopping() {
return
}
switch err {
case io.EOF:
// watch closed normally
case context.Canceled:
// canceled normally
case io.ErrUnexpectedEOF:
grpclog.Infof("kuberesolver: Unexpected EOF during watch stream event decoding: %v", err)
default:
grpclog.Infof("kuberesolver: Unable to decode an event from the watch stream: %v", err)
}
return
}
sw.result <- obj
}
}
// Decode blocks until it can return the next object in the writer. Returns an error
// if the writer is closed or an object can't be decoded.
func (sw *streamWatcher) Decode() (Event, error) {
var got Event
if err := sw.decoder.Decode(&got); err != nil {
return Event{}, err
}
switch got.Type {
case Added, Modified, Deleted, Error:
return got, nil
default:
return Event{}, fmt.Errorf("got invalid watch event type: %v", got.Type)
}
}