Skip to content

Commit

Permalink
Merge pull request #9620 from kobergj/FixNatsjsRegistry
Browse files Browse the repository at this point in the history
Repair Natsjskv Registry
  • Loading branch information
kobergj authored Jul 18, 2024
2 parents 775913c + 6d7d18a commit d75abae
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 35 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/fix-natsjskv-registry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: Repair nats-js-kv registry

The registry would always send traffic to only one pod. This is now fixed and load should be spread evenly. Also implements watcher method so the cache can use it.

https://github.com/owncloud/ocis/pull/9620
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ replace github.com/egirna/icap-client => github.com/fschade/icap-client v0.0.0-2

replace github.com/unrolled/secure => github.com/DeepDiver1975/secure v0.0.0-20240611112133-abc838fb797c

replace github.com/go-micro/plugins/v4/store/nats-js-kv => github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf

// exclude the v2 line of go-sqlite3 which was released accidentally and prevents pulling in newer versions of go-sqlite3
// see https://github.com/mattn/go-sqlite3/issues/965 for more details
exclude github.com/mattn/go-sqlite3 v2.0.3+incompatible
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1218,8 +1218,6 @@ github.com/go-micro/plugins/v4/server/http v1.2.2 h1:UK2/09AU0zV3wHELuR72TZzVU2v
github.com/go-micro/plugins/v4/server/http v1.2.2/go.mod h1:YuAjaSPxcn3LI8j2FUsqx0Rxunrj4YwDV41Ax76rLl0=
github.com/go-micro/plugins/v4/store/nats-js v1.2.1-0.20231129143103-d72facc652f0 h1:Qa1EBQ9UyCGecFAJQovl/MHGnvbcvDaM3qUoAG5Lnvk=
github.com/go-micro/plugins/v4/store/nats-js v1.2.1-0.20231129143103-d72facc652f0/go.mod h1:aCRl8JQmqIaonOl88nFPY/BOQnHPVHY9ngStzLkXnYk=
github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-20231226212146-94a49ba3e06e h1:hwH0qXT0J3UFYRi0UD+e3ItL92oW+jdPFA+3o/j6ASg=
github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-20231226212146-94a49ba3e06e/go.mod h1:Goi4eJ9SrKkxE6NsAVqBVNxfQFbwb7UbyII6743ldgM=
github.com/go-micro/plugins/v4/store/redis v1.2.1 h1:d9kwr9bSpoK9vkHkqcv+isQUbgBCHpfwCV57pcAPS6c=
github.com/go-micro/plugins/v4/store/redis v1.2.1/go.mod h1:MbCG0YiyPqETTtm7uHFmxQNCaW1o9hBoYtFwhbVjLUg=
github.com/go-micro/plugins/v4/transport/grpc v1.1.0 h1:mXfDYfFQLnVDzjGY3o84oe4prfux9h8txsnA19dKsj8=
Expand Down Expand Up @@ -1613,6 +1611,8 @@ github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa02
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf h1:X4Hm7mZFAE+vJZ62mcXuH9BywmKiAr9B4V5LQLcTr70=
github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf/go.mod h1:pjcozWijkNPbEtX5SIQaxEW/h8VAVZYTLx+70bmB3LY=
github.com/kolo/xmlrpc v0.0.0-20200310150728-e0350524596b/go.mod h1:o03bZfuBwAXHetKXuInt4S7omeXUu62/A845kiycsSQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down
82 changes: 51 additions & 31 deletions ocis-pkg/natsjsregistry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

natsjskv "github.com/go-micro/plugins/v4/store/nats-js-kv"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
"go-micro.dev/v4/registry"
"go-micro.dev/v4/store"
Expand All @@ -23,6 +24,8 @@ var (
_registryAddressEnv = "MICRO_REGISTRY_ADDRESS"
_registryUsernameEnv = "MICRO_REGISTRY_AUTH_USERNAME"
_registryPasswordEnv = "MICRO_REGISTRY_AUTH_PASSWORD"

_serviceDelimiter = "/"
)

func init() {
Expand Down Expand Up @@ -80,76 +83,93 @@ func (n *storeregistry) Register(s *registry.Service, _ ...registry.RegisterOpti
if s == nil {
return errors.New("wont store nil service")
}

unique := uuid.New().String()
if s.Metadata == nil {
s.Metadata = make(map[string]string)
}
s.Metadata["uuid"] = unique

b, err := json.Marshal(s)
if err != nil {
return err
}
return n.store.Write(&store.Record{
Key: s.Name,
Key: s.Name + _serviceDelimiter + unique,
Value: b,
Expiry: n.expiry,
})
}

// Deregister removes a service from the registry
// Deregister removes a service from the registry.
func (n *storeregistry) Deregister(s *registry.Service, _ ...registry.DeregisterOption) error {
n.lock.RLock()
defer n.lock.RUnlock()

return n.store.Delete(s.Name)
var unique string
if s.Metadata != nil {
unique = s.Metadata["uuid"]
}

return n.store.Delete(s.Name + _serviceDelimiter + unique)
}

// GetService gets a specific service from the registry
func (n *storeregistry) GetService(s string, _ ...registry.GetOption) ([]*registry.Service, error) {
n.lock.RLock()
defer n.lock.RUnlock()

recs, err := n.store.Read(s)
if err != nil {
return nil, err
}
svcs := make([]*registry.Service, 0, len(recs))
for _, rec := range recs {
var s registry.Service
if err := json.Unmarshal(rec.Value, &s); err != nil {
return nil, err
}
svcs = append(svcs, &s)
}
return svcs, nil
// avoid listing e.g. `webfinger` when requesting `web` by adding the delimiter to the service name
return n.listServices(store.ListPrefix(s + _serviceDelimiter))
}

// ListServices lists all registered services
func (n *storeregistry) ListServices(...registry.ListOption) ([]*registry.Service, error) {
return n.listServices()
}

// Watch allowes following the changes in the registry if it would be implemented
func (n *storeregistry) Watch(...registry.WatchOption) (registry.Watcher, error) {
return NewWatcher(n)
}

// String returns the name of the registry
func (n *storeregistry) String() string {
return n.typ
}

func (n *storeregistry) listServices(opts ...store.ListOption) ([]*registry.Service, error) {
n.lock.RLock()
defer n.lock.RUnlock()

keys, err := n.store.List()
keys, err := n.store.List(opts...)
if err != nil {
return nil, err
}

var svcs []*registry.Service
svcs := make([]*registry.Service, 0, len(keys))
for _, k := range keys {
s, err := n.GetService(k)
s, err := n.getService(k)
if err != nil {
// TODO: continue ?
return nil, err
}
svcs = append(svcs, s...)
svcs = append(svcs, s)

}
return svcs, nil
}

// Watch allowes following the changes in the registry if it would be implemented
func (n *storeregistry) Watch(...registry.WatchOption) (registry.Watcher, error) {
return nil, errors.New("watcher not implemented")
}

// String returns the name of the registry
func (n *storeregistry) String() string {
return n.typ
func (n *storeregistry) getService(s string) (*registry.Service, error) {
recs, err := n.store.Read(s)
if err != nil {
return nil, err
}
if len(recs) == 0 {
return nil, registry.ErrNotFound
}
var svc registry.Service
if err := json.Unmarshal(recs[0].Value, &svc); err != nil {
return nil, err
}
return &svc, nil
}

func (n *storeregistry) storeOptions(opts registry.Options) []store.Option {
Expand Down
74 changes: 74 additions & 0 deletions ocis-pkg/natsjsregistry/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package natsjsregistry

import (
"errors"

"github.com/nats-io/nats.go"
"go-micro.dev/v4/registry"
)

// NatsWatcher is the watcher of the nats interface
type NatsWatcher interface {
WatchAll(bucket string, opts ...nats.WatchOpt) (nats.KeyWatcher, error)
}

// Watcher is used to keep track of changes in the registry
type Watcher struct {
watch nats.KeyWatcher
updates <-chan nats.KeyValueEntry
reg *storeregistry
}

// NewWatcher returns a new watcher
func NewWatcher(s *storeregistry) (*Watcher, error) {
w, ok := s.store.(NatsWatcher)
if !ok {
return nil, errors.New("store does not implement watcher interface")
}

watcher, err := w.WatchAll("service-registry")
if err != nil {
return nil, err
}

return &Watcher{
watch: watcher,
updates: watcher.Updates(),
reg: s,
}, nil
}

// Next returns the next result. It is a blocking call
func (w *Watcher) Next() (*registry.Result, error) {
kve := <-w.updates
if kve == nil {
return nil, errors.New("watcher stopped")
}

service, err := w.reg.getService(kve.Key())
if err != nil {
return nil, err
}

var action string
switch kve.Operation() {
default:
action = "create"
case nats.KeyValuePut:
action = "create"
case nats.KeyValueDelete:
action = "delete"
case nats.KeyValuePurge:
action = "delete"
}

return &registry.Result{
Service: service,
Action: action,
}, nil
}

// Stop stops the watcher
func (w *Watcher) Stop() {
_ = w.watch.Stop()
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ github.com/go-micro/plugins/v4/server/http
# github.com/go-micro/plugins/v4/store/nats-js v1.2.1-0.20231129143103-d72facc652f0
## explicit; go 1.21
github.com/go-micro/plugins/v4/store/nats-js
# github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-20231226212146-94a49ba3e06e
# github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-20231226212146-94a49ba3e06e => github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf
## explicit; go 1.21
github.com/go-micro/plugins/v4/store/nats-js-kv
# github.com/go-micro/plugins/v4/store/redis v1.2.1
Expand Down Expand Up @@ -2435,3 +2435,4 @@ stash.kopano.io/kgol/rndm
# github.com/studio-b12/gowebdav => github.com/aduffeck/gowebdav v0.0.0-20231215102054-212d4a4374f6
# github.com/egirna/icap-client => github.com/fschade/icap-client v0.0.0-20240123094924-5af178158eaf
# github.com/unrolled/secure => github.com/DeepDiver1975/secure v0.0.0-20240611112133-abc838fb797c
# github.com/go-micro/plugins/v4/store/nats-js-kv => github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf

0 comments on commit d75abae

Please sign in to comment.