-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgordian.go
175 lines (157 loc) · 4.03 KB
/
gordian.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
// The gordian package provides a simple framework for building multiclient
// websocket applications.
package gordian
import (
"encoding/json"
"errors"
"log"
"net/http"
"github.com/gorilla/websocket"
)
// Control types.
const (
Connect = iota
Register
Establish
Abort
Close
)
var (
upgrader = websocket.Upgrader{}
)
// ClientId is a user-defined client identifier, which can be of any hashable type.
type ClientId interface{}
// MessageData is a user-defined message payload.
type MessageData interface{}
// Message is the internal message format
type Message struct {
From ClientId // From is the originating client.
To ClientId // To is the destination client.
Type string // Type is the type of message.
Data MessageData // Data is the message payload.
}
// Unmarshal decodes json data in an incoming message
func (m *Message) Unmarshal(data interface{}) error {
jsonData, ok := m.Data.(json.RawMessage)
if !ok {
return errors.New("Data is not a json.RawMessage")
}
return json.Unmarshal(jsonData, data)
}
// Client stores state and control information for a client.
type Client struct {
Id ClientId // Id is a unique identifier.
Ctrl int // Ctrl is the current control type.
Conn *websocket.Conn // Conn is the connection info provided by the websocket package.
Request *http.Request // Request is the original http request
outBox chan Message
}
// Gordian processes and distributes messages and manages clients.
type Gordian struct {
Control chan *Client // Control is used to pass client control information within Gordian.
InBox chan Message // InBox passes incoming messages from clients to Gordian.
OutBox chan Message // OutBox passes outgoing messages from Gordian to clients.
manage chan *Client
clients map[ClientId]*Client
bufSize int
}
// New constructs an initialized Gordian instance.
func New(bufSize int) *Gordian {
g := &Gordian{
Control: make(chan *Client),
InBox: make(chan Message, bufSize),
OutBox: make(chan Message, bufSize),
manage: make(chan *Client),
clients: make(map[ClientId]*Client),
bufSize: bufSize,
}
return g
}
// Run starts Gordian's event loop.
func (g *Gordian) Run() {
go func() {
for {
select {
case msg := <-g.OutBox:
if client, ok := g.clients[msg.To]; ok {
select {
case client.outBox <- msg:
default:
}
}
case client := <-g.manage:
switch client.Ctrl {
case Establish:
g.clients[client.Id] = client
case Close:
close(client.outBox)
delete(g.clients, client.Id)
}
}
}
}()
}
// ServeHTTP handles a websocket connection
func (g *Gordian) ServeHTTP(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
g.Control <- &Client{Ctrl: Connect, Conn: conn, Request: r}
client := <-g.Control
if client.Id == nil || client.Ctrl != Register {
client.Ctrl = Abort
g.Control <- client
return
}
client.outBox = make(chan Message, g.bufSize)
client.Ctrl = Establish
g.manage <- client
g.Control <- client
go g.writeToWS(client)
g.readFromWS(client)
client.Ctrl = Close
g.Control <- client
g.manage <- client
}
// readFromWS reads a client websocket message and passes it into the system.
func (g *Gordian) readFromWS(client *Client) {
for {
jsonMsg := map[string]json.RawMessage{}
err := client.Conn.ReadJSON(&jsonMsg)
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
log.Println(err)
}
return
}
typeStr := ""
err = json.Unmarshal(jsonMsg["type"], &typeStr)
if err != nil {
log.Println(err)
continue
}
msg := Message{
From: client.Id,
Type: typeStr,
Data: jsonMsg["data"],
}
g.InBox <- msg
}
}
// writeToWS sends a message to a client's websocket.
func (g *Gordian) writeToWS(client *Client) {
for {
msg, ok := <-client.outBox
if !ok {
return
}
jsonMsg := map[string]interface{}{
"type": msg.Type,
"data": msg.Data,
}
if err := websocket.WriteJSON(client.Conn, jsonMsg); err != nil {
log.Println(err)
}
}
}