Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/internal/expose/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (m *Manager) Expose(ctx context.Context, req Request) (*Response, error) {
}

func (m *Manager) KeepAlive(ctx context.Context, domain string) error {
ticker := time.NewTicker(10 * time.Second)
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
defer m.stop(domain)

Expand Down
8 changes: 4 additions & 4 deletions management/internals/modules/reverseproxy/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ type Manager interface {
GetServiceByID(ctx context.Context, accountID, serviceID string) (*Service, error)
GetAccountServices(ctx context.Context, accountID string) ([]*Service, error)
GetServiceIDByTargetID(ctx context.Context, accountID string, resourceID string) (string, error)
ValidateExposePermission(ctx context.Context, accountID, peerID string) error
CreateServiceFromPeer(ctx context.Context, accountID, peerID string, service *Service) (*Service, error)
DeleteServiceFromPeer(ctx context.Context, accountID, peerID, serviceID string) error
ExpireServiceFromPeer(ctx context.Context, accountID, peerID, serviceID string) error
CreateServiceFromPeer(ctx context.Context, accountID, peerID string, req *ExposeServiceRequest) (*ExposeServiceResponse, error)
RenewServiceFromPeer(ctx context.Context, accountID, peerID, domain string) error
StopServiceFromPeer(ctx context.Context, accountID, peerID, domain string) error
StartExposeReaper(ctx context.Context)
}
112 changes: 55 additions & 57 deletions management/internals/modules/reverseproxy/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

137 changes: 137 additions & 0 deletions management/internals/modules/reverseproxy/manager/expose_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package manager

import (
"context"
"sync"
"time"

log "github.com/sirupsen/logrus"
)

const (
exposeTTL = 90 * time.Second
exposeReapInterval = 30 * time.Second
maxExposesPerPeer = 10
)

type trackedExpose struct {
mu sync.Mutex
domain string
accountID string
peerID string
lastRenewed time.Time
}

type exposeTracker struct {
activeExposes sync.Map
exposeCreateMu sync.Mutex
manager *managerImpl
}

func exposeKey(peerID, domain string) string {
return peerID + ":" + domain
}

// TrackExpose registers a new active expose session. Returns true if the expose
// was already tracked (duplicate).
func (t *exposeTracker) TrackExpose(peerID, domain, accountID string) bool {
key := exposeKey(peerID, domain)
_, loaded := t.activeExposes.LoadOrStore(key, &trackedExpose{
domain: domain,
accountID: accountID,
peerID: peerID,
lastRenewed: time.Now(),
})
return loaded
}

// UntrackExpose removes an active expose session from tracking.
func (t *exposeTracker) UntrackExpose(peerID, domain string) {
t.activeExposes.Delete(exposeKey(peerID, domain))
}

// CountPeerExposes returns the number of active expose sessions for a peer.
func (t *exposeTracker) CountPeerExposes(peerID string) int {
count := 0
t.activeExposes.Range(func(_, val any) bool {
if expose := val.(*trackedExpose); expose.peerID == peerID {
count++
}
return true
})
return count
}

// MaxExposesPerPeer returns the maximum number of concurrent exposes allowed per peer.
func (t *exposeTracker) MaxExposesPerPeer() int {
return maxExposesPerPeer
}

// RenewTrackedExpose updates the in-memory lastRenewed timestamp for a tracked expose.
// Returns false if the expose is not tracked.
func (t *exposeTracker) RenewTrackedExpose(peerID, domain string) bool {
key := exposeKey(peerID, domain)
val, ok := t.activeExposes.Load(key)
if !ok {
return false
}

expose := val.(*trackedExpose)
expose.mu.Lock()
expose.lastRenewed = time.Now()
expose.mu.Unlock()

return true
}

// StopTrackedExpose removes an active expose session from tracking.
// Returns false if the expose was not tracked.
func (t *exposeTracker) StopTrackedExpose(peerID, domain string) bool {
key := exposeKey(peerID, domain)
_, ok := t.activeExposes.LoadAndDelete(key)
return ok
}

// CheckPeerExposeLimitWithLock atomically checks whether the peer can create a new expose.
// Returns true if the peer is within the limit.
func (t *exposeTracker) CheckPeerExposeLimitWithLock(peerID string) bool {
t.exposeCreateMu.Lock()
defer t.exposeCreateMu.Unlock()
return t.CountPeerExposes(peerID) < maxExposesPerPeer
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

// StartExposeReaper starts a background goroutine that reaps expired expose sessions.
func (t *exposeTracker) StartExposeReaper(ctx context.Context) {
go func() {
ticker := time.NewTicker(exposeReapInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
t.reapExpiredExposes()
}
}
}()
}

func (t *exposeTracker) reapExpiredExposes() {
t.activeExposes.Range(func(key, val any) bool {
expose := val.(*trackedExpose)
expose.mu.Lock()
expired := time.Since(expose.lastRenewed) > exposeTTL
expose.mu.Unlock()

if expired {
if _, deleted := t.activeExposes.LoadAndDelete(key); deleted {
log.Infof("reaping expired expose session for peer %s, domain %s", expose.peerID, expose.domain)
if err := t.manager.deleteServiceFromPeer(context.Background(), expose.accountID, expose.peerID, expose.domain, true); err != nil {
log.Errorf("failed to delete expired peer-exposed service for domain %s: %v", expose.domain, err)
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}
return true
})
}
Loading
Loading