Skip to content

Commit

Permalink
feat(handlers): hash based service updates (#43)
Browse files Browse the repository at this point in the history
* feat(handlers): add hash-based change detection for service updates

* feat(handlers): add hash-based change detection for service updates

* perf(arr): optimize health checks with singleflight and background refresh

* refactor(services): synchronize version fetching

* ci: add redis service container to GitHub Actions workflow

* fix: prevent double close in cache tests
  • Loading branch information
s0up4200 authored Nov 15, 2024
1 parent e6f2067 commit 9dce121
Show file tree
Hide file tree
Showing 23 changed files with 1,266 additions and 579 deletions.
18 changes: 18 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ jobs:
POSTGRES_PASSWORD: dashbrr
POSTGRES_DB: dashbrr_test
options: --health-cmd pg_isready --health-interval 1s --health-timeout 5s --health-retries 60
test_redis:
image: redis:7-alpine
ports:
- "6379:6379"
options: --health-cmd "redis-cli ping" --health-interval 1s --health-timeout 5s --health-retries 60
steps:
- name: Checkout
uses: actions/checkout@v4
Expand All @@ -84,7 +89,20 @@ jobs:
go-version: ${{ env.GO_VERSION }}
cache: true

- name: Initialize test database
run: |
PGPASSWORD=dashbrr psql -h localhost -U dashbrr -d dashbrr_test -f docker-compose/init.sql
- name: Test
env:
DASHBRR__DB_TYPE: postgres
DASHBRR__DB_HOST: localhost
DASHBRR__DB_PORT: 5432
DASHBRR__DB_USER: dashbrr
DASHBRR__DB_PASSWORD: dashbrr
DASHBRR__DB_NAME: dashbrr_test
REDIS_HOST: localhost
REDIS_PORT: 6379
run: go run gotest.tools/gotestsum@latest --junitfile unit-tests.xml --format pkgname -- ./... -tags=integration

- name: Test Summary
Expand Down
214 changes: 179 additions & 35 deletions internal/api/handlers/autobrr.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"context"
"fmt"
"net/http"
"strings"
"sync"
"time"

"github.com/gin-gonic/gin"
Expand Down Expand Up @@ -35,13 +37,23 @@ type AutobrrHandler struct {
db *database.DB
store cache.Store
sf *singleflight.Group

lastReleasesHash map[string]string
lastStatsHash map[string]string
lastIRCStatusHash map[string]string
hashMu sync.Mutex
}

func NewAutobrrHandler(db *database.DB, store cache.Store) *AutobrrHandler {
return &AutobrrHandler{
db: db,
store: store,
sf: &singleflight.Group{},

// Initialize the new maps
lastReleasesHash: make(map[string]string),
lastStatsHash: make(map[string]string),
lastIRCStatusHash: make(map[string]string),
}
}

Expand All @@ -59,9 +71,9 @@ func (h *AutobrrHandler) GetAutobrrReleases(c *gin.Context) {
return
}

log.Debug().
Str("instanceId", instanceId).
Msg("GetAutobrrReleases called")
//log.Debug().
// Str("instanceId", instanceId).
// Msg("GetAutobrrReleases called")

cacheKey := releasesPrefix + instanceId
ctx := context.Background()
Expand Down Expand Up @@ -95,19 +107,34 @@ func (h *AutobrrHandler) GetAutobrrReleases(c *gin.Context) {
status := http.StatusInternalServerError
if err == context.DeadlineExceeded || err == context.Canceled {
status = http.StatusGatewayTimeout
log.Error().Err(err).Str("instanceId", instanceId).Msg("Request timeout while fetching Autobrr releases")
log.Error().Err(err).Str("instanceId", instanceId).Msg("[Autobrr] Request timeout while fetching releases")
} else {
log.Error().Err(err).Str("instanceId", instanceId).Msg("Failed to fetch Autobrr releases")
log.Error().Err(err).Str("instanceId", instanceId).Msg("[Autobrr] Failed to fetch releases")
}
c.JSON(status, gin.H{"error": err.Error()})
return
}

releases = result.(types.ReleasesResponse)

log.Debug().
Str("instanceId", instanceId).
Msg("Successfully retrieved and cached Autobrr releases")
h.hashMu.Lock()
currentHash := createAutobrrReleaseHash(releases)
lastHash := h.lastReleasesHash[instanceId]

// Only log when there are releases and the hash has changed
if (lastHash == "" || currentHash != lastHash) && len(releases.Data) > 0 {
log.Debug().
Str("instanceId", instanceId).
Msg("[Autobrr] Successfully refreshed releases cache")
}

if currentHash != lastHash {
log.Debug().
Str("instanceId", instanceId).
Msg("Autobrr releases changed")
h.lastReleasesHash[instanceId] = currentHash
}
h.hashMu.Unlock()

// Broadcast releases update via SSE
h.broadcastReleases(instanceId, releases)
Expand All @@ -129,9 +156,9 @@ func (h *AutobrrHandler) GetAutobrrReleaseStats(c *gin.Context) {
return
}

log.Debug().
Str("instanceId", instanceId).
Msg("GetAutobrrReleaseStats called")
//log.Debug().
// Str("instanceId", instanceId).
// Msg("GetAutobrrReleaseStats called")

cacheKey := statsPrefix + instanceId
ctx := context.Background()
Expand Down Expand Up @@ -177,10 +204,24 @@ func (h *AutobrrHandler) GetAutobrrReleaseStats(c *gin.Context) {

stats = result.(types.AutobrrStats)

log.Debug().
Str("instanceId", instanceId).
Interface("stats", stats).
Msg("Successfully retrieved and cached autobrr release stats")
h.hashMu.Lock()
currentHash := createAutobrrStatsHash(stats)
lastHash := h.lastStatsHash[instanceId]

// Only log when there are stats and the hash has changed
if (lastHash == "" || currentHash != lastHash) && stats.TotalCount > 0 {
log.Debug().
Str("instanceId", instanceId).
Msg("[Autobrr] Successfully refreshed release stats cache")
}

if currentHash != lastHash {
log.Debug().
Str("instanceId", instanceId).
Msg("[Autobrr] Stats updated")
h.lastStatsHash[instanceId] = currentHash
}
h.hashMu.Unlock()

// Broadcast stats update via SSE
h.broadcastStats(instanceId, stats)
Expand Down Expand Up @@ -244,13 +285,28 @@ func (h *AutobrrHandler) GetAutobrrIRCStatus(c *gin.Context) {

status = result.([]types.IRCStatus)

log.Debug().
Str("instanceId", instanceId).
Msg("Successfully retrieved and cached Autobrr IRC status")

// Broadcast IRC status update via SSE
h.broadcastIRCStatus(instanceId, status)

h.hashMu.Lock()
currentHash := createIRCStatusHash(status)
lastHash := h.lastIRCStatusHash[instanceId]

// Only log when there are status entries and the hash has changed
if (lastHash == "" || currentHash != lastHash) && len(status) > 0 {
log.Debug().
Str("instanceId", instanceId).
Msg("[Autobrr] Successfully refreshed IRC status cache")
}

if currentHash != lastHash {
log.Debug().
Str("instanceId", instanceId).
Msg("Autobrr IRC status changed")
h.lastIRCStatusHash[instanceId] = currentHash
}
h.hashMu.Unlock()

c.JSON(http.StatusOK, status)
}

Expand Down Expand Up @@ -391,14 +447,13 @@ func (h *AutobrrHandler) fetchAndCacheIRC(ctx context.Context, instanceId, cache
log.Warn().
Err(err).
Str("instanceId", instanceId).
Msg("Failed to cache Autobrr IRC status")
Msg("[Autobrr] Failed to cache IRC status")
}

return status, nil
}

func (h *AutobrrHandler) refreshStatsCache(instanceId, cacheKey string) {
// Use singleflight for refresh operations
sfKey := fmt.Sprintf("stats_refresh:%s", instanceId)
result, err, _ := h.sf.Do(sfKey, func() (interface{}, error) {
ctx := context.Background()
Expand All @@ -409,23 +464,38 @@ func (h *AutobrrHandler) refreshStatsCache(instanceId, cacheKey string) {
log.Error().
Err(err).
Str("instanceId", instanceId).
Msg("Failed to refresh Autobrr release stats cache")
Msg("[Autobrr] Failed to refresh release stats cache")
return
}

if err == nil {
stats := result.(types.AutobrrStats)
log.Debug().
Str("instanceId", instanceId).
Msg("Successfully refreshed Autobrr release stats cache")

h.hashMu.Lock()
currentHash := createAutobrrStatsHash(stats)
lastHash := h.lastStatsHash[instanceId]

// Only log when there are stats and the hash has changed
if (lastHash == "" || currentHash != lastHash) && stats.TotalCount > 0 {
log.Debug().
Str("instanceId", instanceId).
Msg("[Autobrr] Successfully refreshed release stats cache")
}

if currentHash != lastHash {
log.Debug().
Str("instanceId", instanceId).
Msg("[Autobrr] Stats updated")
h.lastStatsHash[instanceId] = currentHash
}
h.hashMu.Unlock()

// Broadcast stats update via SSE
h.broadcastStats(instanceId, stats)
}
}

func (h *AutobrrHandler) refreshIRCCache(instanceId, cacheKey string) {
// Use singleflight for refresh operations
sfKey := fmt.Sprintf("irc_refresh:%s", instanceId)
result, err, _ := h.sf.Do(sfKey, func() (interface{}, error) {
ctx := context.Background()
Expand All @@ -436,23 +506,37 @@ func (h *AutobrrHandler) refreshIRCCache(instanceId, cacheKey string) {
log.Error().
Err(err).
Str("instanceId", instanceId).
Msg("Failed to refresh autobrr IRC status cache")
Msg("[Autobrr] Failed to refresh IRC status cache")
return
}

if err == nil {
status := result.([]types.IRCStatus)
log.Debug().
Str("instanceId", instanceId).
Msg("Successfully refreshed autobrr IRC status cache")

h.hashMu.Lock()
currentHash := createIRCStatusHash(status)
lastHash := h.lastIRCStatusHash[instanceId]

if (lastHash == "" || currentHash != lastHash) && len(status) > 0 {
log.Debug().
Str("instanceId", instanceId).
Msg("[Autobrr] Successfully refreshed IRC status cache")
}

if currentHash != lastHash {
log.Debug().
Str("instanceId", instanceId).
Msg("Autobrr IRC status changed")
h.lastIRCStatusHash[instanceId] = currentHash
}
h.hashMu.Unlock()

// Broadcast IRC status update via SSE
h.broadcastIRCStatus(instanceId, status)
}
}

func (h *AutobrrHandler) refreshReleasesCache(instanceId, cacheKey string) {
// Use singleflight for refresh operations
sfKey := fmt.Sprintf("releases_refresh:%s", instanceId)
result, err, _ := h.sf.Do(sfKey, func() (interface{}, error) {
ctx := context.Background()
Expand All @@ -463,17 +547,77 @@ func (h *AutobrrHandler) refreshReleasesCache(instanceId, cacheKey string) {
log.Error().
Err(err).
Str("instanceId", instanceId).
Msg("Failed to refresh autobrr releases cache")
Msg("[Autobrr] Failed to refresh releases cache")
return
}

if err == nil {
releases := result.(types.ReleasesResponse)
log.Debug().
Str("instanceId", instanceId).
Msg("Successfully refreshed autobrr releases cache")

h.hashMu.Lock()
currentHash := createAutobrrReleaseHash(releases)
lastHash := h.lastReleasesHash[instanceId]

if (lastHash == "" || currentHash != lastHash) && len(releases.Data) > 0 {
log.Debug().
Str("instanceId", instanceId).
Msg("[Autobrr] Successfully refreshed releases cache")
}

if currentHash != lastHash {
log.Debug().
Str("instanceId", instanceId).
Msg("Autobrr releases changed")
h.lastReleasesHash[instanceId] = currentHash
}
h.hashMu.Unlock()

// Broadcast releases update via SSE
h.broadcastReleases(instanceId, releases)
}
}

// createAutobrrReleaseHash generates a unique hash representing the current state of Autobrr releases
// The hash includes key release details like title, protocol, and filter status
// This allows for efficient detection of release changes without deep comparison
func createAutobrrReleaseHash(releases types.ReleasesResponse) string {
if len(releases.Data) == 0 {
return ""
}

var sb strings.Builder
for _, release := range releases.Data {
fmt.Fprintf(&sb, "%s:%s:%s,",
release.Title,
release.Protocol,
release.FilterStatus)
}
return sb.String()
}

// createAutobrrStatsHash generates a hash representing the current Autobrr statistics
// The hash includes total counts, filtered, rejected, and push-related statistics
// Useful for detecting changes in overall release processing statistics
func createAutobrrStatsHash(stats types.AutobrrStats) string {
return fmt.Sprintf("%d:%d:%d:%d:%d",
stats.TotalCount,
stats.FilteredCount,
stats.FilterRejectedCount,
stats.PushApprovedCount,
stats.PushRejectedCount)
}

// createIRCStatusHash generates a unique hash representing the current IRC connection statuses
// The hash includes the name, health status, and enabled state of each IRC connection
// Helps in detecting changes in IRC connection states efficiently
func createIRCStatusHash(status []types.IRCStatus) string {
if len(status) == 0 {
return ""
}

var sb strings.Builder
for _, s := range status {
fmt.Fprintf(&sb, "%s:%v:%v,", s.Name, s.Healthy, s.Enabled)
}
return sb.String()
}
Loading

0 comments on commit 9dce121

Please sign in to comment.