Skip to content
This repository has been archived by the owner on Nov 3, 2023. It is now read-only.

Commit

Permalink
redis: add service pool configuration (#230)
Browse files Browse the repository at this point in the history
  • Loading branch information
ortuman authored May 13, 2022
1 parent 2d7bfce commit f6f6e1c
Show file tree
Hide file tree
Showing 9 changed files with 609 additions and 24 deletions.
3 changes: 2 additions & 1 deletion config/example.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
# cache:
# type: redis
# redis:
# address: localhost:6379
# addresses:
# - localhost:6379

#cluster:
# type: kv
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/Masterminds/squirrel v1.1.0
github.com/bgentry/speakeasy v0.1.0
github.com/cespare/xxhash v1.1.0
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cockroachdb/errors v1.8.4
github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKz
github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7/go.mod h1:6E6s8o2AE4KhCrqr6GRJjdC/gNfTdxkIXvuGZZda2VM=
github.com/Masterminds/squirrel v1.1.0 h1:baP1qLdoQCeTw3ifCdOq2dkYc6vGcmRdaociKLbEJXs=
github.com/Masterminds/squirrel v1.1.0/go.mod h1:yaPeOnPG5ZRwL9oKdTsO/prlkPbXWZlRVMQ/gGlzIuA=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0=
github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
Expand All @@ -45,6 +46,7 @@ github.com/bgentry/speakeasy v0.1.0 h1:ByYyxL9InA1OWqxJqqp2A5pYHUrCiAL6K3J+LKSsQ
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
Expand Down Expand Up @@ -371,6 +373,7 @@ github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrf
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/cached/cached.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func New(cfg Config, rep repository.Repository, logger kitlog.Logger) (repositor
if cfg.Type != rediscache.Type {
return nil, fmt.Errorf("unrecognized repository cache type: %s", cfg.Type)
}
c := rediscache.New(cfg.Redis)
c := rediscache.New(cfg.Redis, logger)

return &CachedRepository{
User: &cachedUserRep{c: c, rep: rep, logger: logger},
Expand Down
52 changes: 30 additions & 22 deletions pkg/storage/cached/redis/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"errors"
"time"

"github.com/go-kit/log"

"github.com/go-redis/redis/v8"
)

Expand All @@ -27,7 +29,8 @@ const Type = "redis"

// Config contains Redis cache configuration.
type Config struct {
Address string `fig:"address"`
SRV string `fig:"srv"`
Addresses []string `fig:"addresses"`
Username string `fig:"username"`
Password string `fig:"password"`
DB int `fig:"db"`
Expand All @@ -40,21 +43,26 @@ type Config struct {
// Cache is Redis cache implementation.
type Cache struct {
cfg Config
client *redis.Client
sp servicePool
ttl time.Duration
logger log.Logger
}

// New creates and returns an initialized Redis Cache instance.
func New(cfg Config) *Cache {
return &Cache{cfg: cfg}
func New(cfg Config, logger log.Logger) *Cache {
return &Cache{
cfg: cfg,
logger: log.With(logger, "cache", "redis"),
}
}

// Type satisfies Cache interface.
func (c *Cache) Type() string { return Type }

// Get satisfies Cache interface.
func (c *Cache) Get(ctx context.Context, ns, key string) ([]byte, error) {
val, err := c.client.HGet(ctx, ns, key).Result()
cl := c.sp.getClient(ns)
val, err := cl.HGet(ctx, ns, key).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
return nil, nil
Expand All @@ -66,25 +74,29 @@ func (c *Cache) Get(ctx context.Context, ns, key string) ([]byte, error) {

// Put satisfies Cache interface.
func (c *Cache) Put(ctx context.Context, ns, key string, val []byte) error {
if err := c.client.HSet(ctx, ns, key, val).Err(); err != nil {
cl := c.sp.getClient(ns)
if err := cl.HSet(ctx, ns, key, val).Err(); err != nil {
return err
}
return c.client.Expire(ctx, ns, c.ttl).Err()
return cl.Expire(ctx, ns, c.ttl).Err()
}

// Del satisfies Cache interface.
func (c *Cache) Del(ctx context.Context, ns string, keys ...string) error {
return c.client.HDel(ctx, ns, keys...).Err()
cl := c.sp.getClient(ns)
return cl.HDel(ctx, ns, keys...).Err()
}

// DelNS removes all keys contained under a given namespace from the cache store.
func (c *Cache) DelNS(ctx context.Context, ns string) error {
return c.client.Del(ctx, ns).Err()
cl := c.sp.getClient(ns)
return cl.Del(ctx, ns).Err()
}

// HasKey satisfies Cache interface.
func (c *Cache) HasKey(ctx context.Context, ns, key string) (bool, error) {
res := c.client.HExists(ctx, ns, key)
cl := c.sp.getClient(ns)
res := cl.HExists(ctx, ns, key)
if err := res.Err(); err != nil {
return false, err
}
Expand All @@ -93,19 +105,15 @@ func (c *Cache) HasKey(ctx context.Context, ns, key string) (bool, error) {

// Start satisfies Cache interface.
func (c *Cache) Start(ctx context.Context) error {
c.client = redis.NewClient(&redis.Options{
Addr: c.cfg.Address,
Username: c.cfg.Username,
Password: c.cfg.Password,
DB: c.cfg.DB,
DialTimeout: c.cfg.DialTimeout,
ReadTimeout: c.cfg.ReadTimeout,
WriteTimeout: c.cfg.WriteTimeout,
})
return c.client.Ping(ctx).Err()
if len(c.cfg.SRV) > 0 {
c.sp = newSRVServicePool(c.cfg, c.logger)
} else {
c.sp = newStaticServicePool(c.cfg)
}
return c.sp.start(ctx)
}

// Stop satisfies Cache interface.
func (c *Cache) Stop(_ context.Context) error {
return c.client.Close()
func (c *Cache) Stop(ctx context.Context) error {
return c.sp.stop(ctx)
}
33 changes: 33 additions & 0 deletions pkg/storage/cached/redis/jumphash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2022 The jackal Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rediscache

// jumpHash consistently chooses a hash bucket number in the range
// [0, numBuckets) for the given key. numBuckets must be >= 1.
//
// Copied from github.com/dgryski/go-jump/blob/master/jump.go (MIT license).
func jumpHash(key uint64, numBuckets int) int32 {

var b int64 = -1
var j int64

for j < int64(numBuckets) {
b = j
key = key*2862933555777941757 + 1
j = int64(float64(b+1) * (float64(int64(1)<<31) / float64((key>>33)+1)))
}

return int32(b)
}
Loading

0 comments on commit f6f6e1c

Please sign in to comment.