Skip to content

Commit

Permalink
storage: Add generic Watcher (#1442)
Browse files Browse the repository at this point in the history
* add generic watcher

* migrate rest of dhcp

* migrate dns

* fix dhcp

* fix missing trace

* fix missing logger

* use go 1.23 iterator

* add stop function, migrate discovery

* migrate dns records

* add metrics

* add GetOK

* fix partially

* use prefix everywhere

* fix wrong prefix

* fix dns bugs

* add iterator with relative keys

* split

* fix missing opt

* add key func, key parse

* fix lint

* fix final

* pass direction

* re-add old metric
  • Loading branch information
BeryJu authored Jan 17, 2025
1 parent e030575 commit 9c07a2e
Show file tree
Hide file tree
Showing 25 changed files with 424 additions and 420 deletions.
4 changes: 1 addition & 3 deletions pkg/roles/dhcp/api_leases.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,7 @@ type APILeasesWOLInput struct {

func (r *Role) APILeasesWOL() usecase.Interactor {
u := usecase.NewInteractor(func(ctx context.Context, input APILeasesWOLInput, output *struct{}) error {
r.leasesM.RLock()
l, ok := r.leases[input.Identifier]
r.leasesM.RUnlock()
l, ok := r.leases.GetPrefix(input.Identifier)
if !ok {
return status.InvalidArgument
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/roles/dhcp/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ func (r *Role) eventCreateLease(ev *roles.Event) {
address := ev.Payload.Data["address"].(string)
scopeName := ev.Payload.Data["scope"].(string)

r.scopesM.RLock()
scope := r.scopes[scopeName]
r.scopesM.RUnlock()
if scope == nil {
scope, ok := r.scopes.GetPrefix(scopeName)
if !ok {
r.log.Warn("event to create lease with missing scope", zap.String("scopeName", scopeName))
return
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/roles/dhcp/ipam_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,7 @@ func (i *InternalIPAM) IsIPFree(ip netip.Addr, identifier *string) bool {
return false
}
// check for existing leases
i.role.leasesM.RLock()
defer i.role.leasesM.RUnlock()
for _, l := range i.role.leases {
for _, l := range i.role.leases.Iter() {
// Ignore leases from other scopes
if l.ScopeKey != i.scope.Name {
continue
Expand Down
8 changes: 2 additions & 6 deletions pkg/roles/dhcp/leases.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ type Lease struct {
}

func (r *Role) FindLease(req *Request4) *Lease {
r.leasesM.RLock()
defer r.leasesM.RUnlock()
lease, ok := r.leases[r.DeviceIdentifier(req.DHCPv4)]
lease, ok := r.leases.GetPrefix(r.DeviceIdentifier(req.DHCPv4))
if !ok {
return nil
}
Expand Down Expand Up @@ -110,9 +108,7 @@ func (r *Role) leaseFromKV(raw *mvccpb.KeyValue) (*Lease, error) {
}
l.etcdKey = string(raw.Key)

r.scopesM.RLock()
scope, ok := r.scopes[l.ScopeKey]
r.scopesM.RUnlock()
scope, ok := r.scopes.GetPrefix(l.ScopeKey)
if !ok {
return l, fmt.Errorf("DHCP lease with invalid scope key: %s", l.ScopeKey)
}
Expand Down
70 changes: 0 additions & 70 deletions pkg/roles/dhcp/leases_watch.go

This file was deleted.

4 changes: 1 addition & 3 deletions pkg/roles/dhcp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ func (s *Scope) calculateUsage() {
usable := s.ipam.UsableSize()
dhcpScopeSize.WithLabelValues(s.Name).Set(float64(usable.Uint64()))
used := big.NewInt(0)
s.role.leasesM.RLock()
defer s.role.leasesM.RUnlock()
for _, lease := range s.role.leases {
for _, lease := range s.role.leases.Iter() {
if lease.ScopeKey != s.Name {
continue
}
Expand Down
71 changes: 48 additions & 23 deletions pkg/roles/dhcp/role.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (
"fmt"
"net"
"strings"
"sync"

"beryju.io/gravity/pkg/extconfig"
"beryju.io/gravity/pkg/roles"
apitypes "beryju.io/gravity/pkg/roles/api/types"
"beryju.io/gravity/pkg/roles/dhcp/oui"
"beryju.io/gravity/pkg/roles/dhcp/types"
"beryju.io/gravity/pkg/storage/watcher"
"github.com/getsentry/sentry-go"
"github.com/swaggest/rest/web"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.uber.org/zap"
"golang.org/x/net/ipv4"

Expand All @@ -25,17 +26,15 @@ type Role struct {
i roles.Instance
ctx context.Context

scopes map[string]*Scope
leases map[string]*Lease
scopes *watcher.Watcher[*Scope]
leases *watcher.Watcher[*Lease]

cfg *RoleConfig

s4 *handler4
log *zap.Logger

oui *oui.OuiDb
scopesM sync.RWMutex
leasesM sync.RWMutex
oui *oui.OuiDb
}

func init() {
Expand All @@ -46,14 +45,46 @@ func init() {

func New(instance roles.Instance) *Role {
r := &Role{
log: instance.Log(),
i: instance,
scopes: make(map[string]*Scope),
scopesM: sync.RWMutex{},
leases: make(map[string]*Lease),
leasesM: sync.RWMutex{},
ctx: instance.Context(),
log: instance.Log(),
i: instance,
ctx: instance.Context(),
}
r.scopes = watcher.New(
func(kv *mvccpb.KeyValue) (*Scope, error) {
s, err := r.scopeFromKV(kv)
if err != nil {
return nil, err
}
s.calculateUsage()
return s, nil
},
instance.KV(),
instance.KV().Key(
types.KeyRole,
types.KeyScopes,
).Prefix(true),
)

r.leases = watcher.New(
func(kv *mvccpb.KeyValue) (*Lease, error) {
s, err := r.leaseFromKV(kv)
if err != nil {
return nil, err
}
return s, nil
},
instance.KV(),
r.i.KV().Key(
types.KeyRole,
types.KeyLeases,
).Prefix(true), watcher.WithAfterInitialLoad[*Lease](func() {
// Re-calculate scope usage after all leases are loaded
for _, s := range r.scopes.Iter() {
s.calculateUsage()
}
}),
)

r.s4 = &handler4{
role: r,
}
Expand Down Expand Up @@ -89,17 +120,9 @@ func (r *Role) Start(ctx context.Context, config []byte) error {

start := sentry.TransactionFromContext(ctx).StartChild("gravity.dhcp.start")
defer start.Finish()
r.loadInitialScopes(start.Context())
r.loadInitialLeases(start.Context())

// Since scope usage relies on r.leases, but r.leases is loaded after the scopes,
// manually update the usage
for _, s := range r.scopes {
s.calculateUsage()
}

go r.startWatchScopes()
go r.startWatchLeases()
r.scopes.Start(r.ctx)
r.leases.Start(r.ctx)

if r.cfg.Port < 1 {
return nil
Expand Down Expand Up @@ -176,6 +199,8 @@ func (r *Role) startServer4() error {
}

func (r *Role) Stop() {
r.scopes.Stop()
r.leases.Stop()
if r.s4 != nil && r.s4.pc != nil {
r.s4.pc.Close()
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/roles/dhcp/scopes.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (r *Role) scopeFromKV(raw *mvccpb.KeyValue) (*Scope, error) {

s.etcdKey = string(raw.Key)

previous := r.scopes[s.Name]
previous := r.scopes.Get(s.Name)

ipamInst, err := s.ipamType(previous)
if err != nil {
Expand All @@ -99,13 +99,11 @@ func (s *Scope) ipamType(previous *Scope) (IPAM, error) {
func (r *Role) findScopeForRequest(req *Request4) *Scope {
var match *Scope
longestBits := 0
r.scopesM.RLock()
defer r.scopesM.RUnlock()
// To prioritise requests from a DHCP relay being matched correctly, give their subnet
// match a 1 bit more priority
const dhcpRelayBias = 1
const clientIPBias = 2
for _, scope := range r.scopes {
for _, scope := range r.scopes.Iter() {
// Check based on Client IP Address (highest priority)
clientIPMatchBits := scope.match(req.ClientIPAddr)
if clientIPMatchBits > -1 && clientIPMatchBits+clientIPBias > longestBits {
Expand Down
77 changes: 0 additions & 77 deletions pkg/roles/dhcp/scopes_watch.go

This file was deleted.

Loading

0 comments on commit 9c07a2e

Please sign in to comment.