-
-
Notifications
You must be signed in to change notification settings - Fork 121
/
Copy patholric.go
484 lines (408 loc) · 13.3 KB
/
olric.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
// Copyright 2018-2024 Burak Sezer
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
Package olric provides a distributed cache and in-memory key/value data store.
It can be used both as an embedded Go library and as a language-independent
service.
With Olric, you can instantly create a fast, scalable, shared pool of RAM across
a cluster of computers.
Olric is designed to be a distributed cache. But it also provides Publish/Subscribe,
data replication, failure detection and simple anti-entropy services.
So it can be used as an ordinary key/value data store to scale your cloud
application.
*/
package olric
import (
"context"
"fmt"
"net"
"runtime"
"strconv"
"strings"
"sync"
"time"
"github.com/buraksezer/olric/config"
"github.com/buraksezer/olric/hasher"
"github.com/buraksezer/olric/internal/checkpoint"
"github.com/buraksezer/olric/internal/cluster/balancer"
"github.com/buraksezer/olric/internal/cluster/partitions"
"github.com/buraksezer/olric/internal/cluster/routingtable"
"github.com/buraksezer/olric/internal/dmap"
"github.com/buraksezer/olric/internal/environment"
"github.com/buraksezer/olric/internal/locker"
"github.com/buraksezer/olric/internal/protocol"
"github.com/buraksezer/olric/internal/pubsub"
"github.com/buraksezer/olric/internal/server"
"github.com/buraksezer/olric/pkg/flog"
"github.com/hashicorp/logutils"
"github.com/pkg/errors"
"github.com/tidwall/redcon"
"golang.org/x/sync/errgroup"
)
// ReleaseVersion is the current stable version of Olric
const ReleaseVersion string = "0.6.0-alpha.1"
var (
// ErrOperationTimeout is returned when an operation times out.
ErrOperationTimeout = errors.New("operation timeout")
// ErrServerGone means that a cluster member is closed unexpectedly.
ErrServerGone = errors.New("server is gone")
// ErrKeyNotFound means that returned when a key could not be found.
ErrKeyNotFound = errors.New("key not found")
// ErrKeyFound means that the requested key found in the cluster.
ErrKeyFound = errors.New("key found")
// ErrWriteQuorum means that write quorum cannot be reached to operate.
ErrWriteQuorum = errors.New("write quorum cannot be reached")
// ErrReadQuorum means that read quorum cannot be reached to operate.
ErrReadQuorum = errors.New("read quorum cannot be reached")
// ErrLockNotAcquired is returned when the requested lock could not be acquired
ErrLockNotAcquired = errors.New("lock not acquired")
// ErrNoSuchLock is returned when the requested lock does not exist
ErrNoSuchLock = errors.New("no such lock")
// ErrClusterQuorum means that the cluster could not reach a healthy numbers of members to operate.
ErrClusterQuorum = errors.New("failed to find enough peers to create quorum")
// ErrKeyTooLarge means that the given key is too large to process.
// Maximum length of a key is 256 bytes.
ErrKeyTooLarge = errors.New("key too large")
// ErrEntryTooLarge returned if the required space for an entry is bigger than table size.
ErrEntryTooLarge = errors.New("entry too large for the configured table size")
// ErrConnRefused returned if the target node refused a connection request.
// It is good to call RefreshMetadata to update the underlying data structures.
ErrConnRefused = errors.New("connection refused")
)
// Olric implements a distributed cache and in-memory key/value data store.
// It can be used both as an embedded Go library and as a language-independent
// service.
type Olric struct {
// name is BindAddr:BindPort. It defines servers unique name in the cluster.
name string
env *environment.Environment
config *config.Config
log *flog.Logger
hashFunc hasher.Hasher
// Logical units to store data
primary *partitions.Partitions
backup *partitions.Partitions
// RESP server and clients.
server *server.Server
client *server.Client
rt *routingtable.RoutingTable
balancer *balancer.Balancer
pubsub *pubsub.Service
dmap *dmap.Service
// Structures for flow control
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
// Callback function. Olric calls this after
// the server is ready to accept new connections.
started func()
}
func prepareConfig(c *config.Config) (*config.Config, error) {
if c == nil {
return nil, fmt.Errorf("config cannot be nil")
}
err := c.Sanitize()
if err != nil {
return nil, err
}
err = c.Validate()
if err != nil {
return nil, err
}
err = c.SetupNetworkConfig()
if err != nil {
return nil, err
}
c.MemberlistConfig.Name = net.JoinHostPort(c.BindAddr,
strconv.Itoa(c.BindPort))
filter := &logutils.LevelFilter{
Levels: []logutils.LogLevel{"DEBUG", "WARN", "ERROR", "INFO"},
MinLevel: logutils.LogLevel(strings.ToUpper(c.LogLevel)),
Writer: c.Logger.Writer(),
}
c.Logger.SetOutput(filter)
return c, nil
}
func initializeServices(db *Olric) error {
db.rt = routingtable.New(db.env)
db.env.Set("routingtable", db.rt)
db.balancer = balancer.New(db.env)
// Add Services
dt, err := pubsub.NewService(db.env)
if err != nil {
return err
}
db.pubsub = dt.(*pubsub.Service)
dm, err := dmap.NewService(db.env)
if err != nil {
return err
}
db.dmap = dm.(*dmap.Service)
return nil
}
// New creates a new Olric instance, otherwise returns an error.
func New(c *config.Config) (*Olric, error) {
var err error
c, err = prepareConfig(c)
if err != nil {
return nil, err
}
e := environment.New()
e.Set("config", c)
// Set the hash function. Olric distributes keys over partitions by hashing.
partitions.SetHashFunc(c.Hasher)
flogger := flog.New(c.Logger)
flogger.SetLevel(c.LogVerbosity)
if c.LogLevel == "DEBUG" {
flogger.ShowLineNumber(1)
}
e.Set("logger", flogger)
client := server.NewClient(c.Client)
e.Set("client", client)
e.Set("primary", partitions.New(c.PartitionCount, partitions.PRIMARY))
e.Set("backup", partitions.New(c.PartitionCount, partitions.BACKUP))
e.Set("locker", locker.New())
ctx, cancel := context.WithCancel(context.Background())
db := &Olric{
name: c.MemberlistConfig.Name,
env: e,
log: flogger,
config: c,
hashFunc: c.Hasher,
client: client,
primary: e.Get("primary").(*partitions.Partitions),
backup: e.Get("backup").(*partitions.Partitions),
started: c.Started,
ctx: ctx,
cancel: cancel,
}
// Create a Redcon server instance
rc := &server.Config{
BindAddr: c.BindAddr,
BindPort: c.BindPort,
KeepAlivePeriod: c.KeepAlivePeriod,
}
srv := server.New(rc, flogger)
srv.SetPreConditionFunc(db.preconditionFunc)
db.server = srv
e.Set("server", srv)
err = initializeServices(db)
if err != nil {
return nil, err
}
db.registerCommandHandlers()
return db, nil
}
func (db *Olric) preconditionFunc(conn redcon.Conn, _ redcon.Command) bool {
err := db.isOperable()
if err != nil {
protocol.WriteError(conn, err)
return false
}
return true
}
func (db *Olric) registerCommandHandlers() {
db.server.ServeMux().HandleFunc(protocol.Generic.Ping, db.pingCommandHandler)
db.server.ServeMux().HandleFunc(protocol.Cluster.RoutingTable, db.clusterRoutingTableCommandHandler)
db.server.ServeMux().HandleFunc(protocol.Generic.Stats, db.statsCommandHandler)
db.server.ServeMux().HandleFunc(protocol.Cluster.Members, db.clusterMembersCommandHandler)
}
// callStartedCallback checks passed checkpoint count and calls the callback
// function.
func (db *Olric) callStartedCallback() {
defer db.wg.Done()
timer := time.NewTimer(10 * time.Millisecond)
defer timer.Stop()
for {
timer.Reset(10 * time.Millisecond)
select {
case <-timer.C:
if checkpoint.AllPassed() {
if db.started != nil {
db.started()
}
return
}
case <-db.ctx.Done():
return
}
}
}
func convertClusterError(err error) error {
switch {
case errors.Is(err, routingtable.ErrClusterQuorum):
return ErrClusterQuorum
case errors.Is(err, routingtable.ErrServerGone):
return ErrServerGone
case errors.Is(err, routingtable.ErrOperationTimeout):
return ErrOperationTimeout
default:
return err
}
}
// isOperable controls bootstrapping status and cluster quorum to prevent split-brain syndrome.
func (db *Olric) isOperable() error {
if err := db.rt.CheckMemberCountQuorum(); err != nil {
return convertClusterError(err)
}
// An Olric node has to be bootstrapped to function properly.
return db.rt.CheckBootstrap()
}
// Start starts background servers and joins the cluster. You still must call Shutdown
// method if Start function returns an early error.
func (db *Olric) Start() error {
db.log.V(1).Printf("[INFO] Olric %s on %s/%s %s", ReleaseVersion, runtime.GOOS, runtime.GOARCH, runtime.Version())
// This error group is responsible to run the TCP server at background and report errors.
errGr, ctx := errgroup.WithContext(context.Background())
errGr.Go(func() error {
return db.server.ListenAndServe()
})
select {
case <-db.server.StartedCtx.Done():
// TCP server has been started
case <-ctx.Done():
// TCP server could not be started due to an error. There is no need to run
// Olric.Shutdown here because we could not start anything.
return errGr.Wait()
}
// Balancer works periodically to balance partition data across the cluster.
if err := db.balancer.Start(); err != nil {
if err != nil {
db.log.V(2).Printf("[ERROR] Failed to run the balancer subsystem: %v", err)
}
return err
}
// First, we need to join the cluster. Then, the routing table has been started.
if err := db.rt.Join(); err != nil {
if err != nil {
db.log.V(2).Printf("[ERROR] Failed to join the Olric cluster: %v", err)
}
return err
}
// Start routing table service and member discovery subsystem.
if err := db.rt.Start(); err != nil {
if err != nil {
db.log.V(2).Printf("[ERROR] Failed to run the routing table subsystem: %v", err)
}
return err
}
// Start publish-subscribe service
if err := db.pubsub.Start(); err != nil {
if err != nil {
db.log.V(2).Printf("[ERROR] Failed to run the Publish-Subscribe service: %v", err)
}
return err
}
// Start distributed map service
if err := db.dmap.Start(); err != nil {
if err != nil {
db.log.V(2).Printf("[ERROR] Failed to run the Distributed Map service: %v", err)
}
return err
}
// Warn the user about his/her choice of configuration
if db.config.ReplicationMode == config.AsyncReplicationMode && db.config.WriteQuorum > 1 {
db.log.V(2).
Printf("[WARN] Olric is running in async replication mode. WriteQuorum (%d) is ineffective",
db.config.WriteQuorum)
}
if db.started != nil {
db.wg.Add(1)
go db.callStartedCallback()
}
db.log.V(2).Printf("[INFO] Node name in the cluster: %s",
db.name)
if db.config.Interface != "" {
db.log.V(2).Printf("[INFO] Olric uses interface: %s",
db.config.Interface)
}
db.log.V(2).Printf("[INFO] Olric bindAddr: %s, bindPort: %d",
db.config.BindAddr, db.config.BindPort)
db.log.V(2).Printf("[INFO] Replication count is %d", db.config.ReplicaCount)
// Wait for the TCP server.
return errGr.Wait()
}
// Shutdown stops background servers and leaves the cluster.
func (db *Olric) Shutdown(ctx context.Context) error {
select {
case <-db.ctx.Done():
// Shutdown only once.
return nil
default:
}
db.cancel()
var latestError error
if err := db.pubsub.Shutdown(ctx); err != nil {
db.log.V(2).Printf("[ERROR] Failed to shutdown PubSub service: %v", err)
latestError = err
}
if err := db.dmap.Shutdown(ctx); err != nil {
db.log.V(2).Printf("[ERROR] Failed to shutdown DMap service: %v", err)
latestError = err
}
if err := db.balancer.Shutdown(ctx); err != nil {
db.log.V(2).Printf("[ERROR] Failed to shutdown balancer service: %v", err)
latestError = err
}
if err := db.rt.Shutdown(ctx); err != nil {
db.log.V(2).Printf("[ERROR] Failed to shutdown routing table service: %v", err)
latestError = err
}
// Shutdown Redcon server
if err := db.server.Shutdown(ctx); err != nil {
db.log.V(2).Printf("[ERROR] Failed to shutdown RESP server: %v", err)
latestError = err
}
done := make(chan struct{})
go func() {
defer func() {
close(done)
}()
db.wg.Wait()
}()
select {
case <-ctx.Done():
case <-done:
}
// db.name will be shown as empty string, if the program is killed before
// bootstrapping.
db.log.V(2).Printf("[INFO] %s is gone", db.name)
return latestError
}
func convertDMapError(err error) error {
switch {
case errors.Is(err, dmap.ErrKeyFound):
return ErrKeyFound
case errors.Is(err, dmap.ErrKeyNotFound):
return ErrKeyNotFound
case errors.Is(err, dmap.ErrDMapNotFound):
return ErrKeyNotFound
case errors.Is(err, dmap.ErrLockNotAcquired):
return ErrLockNotAcquired
case errors.Is(err, dmap.ErrNoSuchLock):
return ErrNoSuchLock
case errors.Is(err, dmap.ErrReadQuorum):
return ErrReadQuorum
case errors.Is(err, dmap.ErrWriteQuorum):
return ErrWriteQuorum
case errors.Is(err, dmap.ErrServerGone):
return ErrServerGone
case errors.Is(err, dmap.ErrKeyTooLarge):
return ErrKeyTooLarge
case errors.Is(err, dmap.ErrEntryTooLarge):
return ErrEntryTooLarge
default:
return convertClusterError(err)
}
}