Skip to content

Commit

Permalink
#32 Add cron for cleanup of keys from DiceDB (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucifercr07 authored Oct 22, 2024
1 parent 0d419f6 commit d8d50c8
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 59 deletions.
8 changes: 6 additions & 2 deletions .env.sample
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
DICEDB_ADDR=localhost:7379
DICEDB_ADMIN_ADDR=localhost:7379
DICEDB_ADMIN_USERNAME=diceadmin
DICEDB_ADMIN_PASSWORD=
DICEDB_ADDR=localhost:7380
DICEDB_USERNAME=dice
DICEDB_PASSWORD=
SERVER_PORT=:8080
IS_TEST_ENVIRONMENT=false
REQUEST_LIMIT_PER_MIN=1000
REQUEST_WINDOW_SEC=60
ALLOWED_ORIGINS=http://localhost:3000
ALLOWED_ORIGINS=http://localhost:3000
CRON_CLEANUP_FREQUENCY_MINS=15
58 changes: 40 additions & 18 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,59 +1,81 @@
package config

import (
"fmt"
"log/slog"
"os"
"strconv"
"strings"
"time"

"github.com/joho/godotenv"
)

// Config holds the application configuration
type Config struct {
// Config for DiceDBAdmin instance. This instance holds internal keys
// and is separate from DiceDB hosting global key pool i.e. user facing.
DiceDBAdmin struct {
Addr string // Field for the Dice address
Username string // Field for the username
Password string // Field for the password
}
// Config for DiceDB User instance. This instance holds internal keys
// and is separate from DiceDB hosting global key pool i.e. user facing.
DiceDB struct {
Addr string // Field for the Dice address
Username string // Field for the username
Password string // Field for the password
}
Server struct {
Port string // Field for the server port
IsTestEnv bool
RequestLimitPerMin int64 // Field for the request limit
RequestWindowSec float64 // Field for the time window in float64
AllowedOrigins []string // Field for the allowed origins
Port string // Field for the server port
IsTestEnv bool
RequestLimitPerMin int64 // Field for the request limit
RequestWindowSec float64 // Field for the time window in float64
AllowedOrigins []string // Field for the allowed origins
CronCleanupFrequency time.Duration // Field for configuring key cleanup cron
}
}

// LoadConfig loads the application configuration from environment variables or defaults
func LoadConfig() *Config {
err := godotenv.Load()
if err != nil {
fmt.Println("Warning: .env file not found, falling back to system environment variables.")
slog.Debug("Warning: .env file not found, falling back to system environment variables.")
}

return &Config{
DiceDBAdmin: struct {
Addr string
Username string
Password string
}{
Addr: getEnv("DICEDB_ADMIN_ADDR", "localhost:7379"), // Default DiceDB Admin address
Username: getEnv("DICEDB_ADMIN_USERNAME", "diceadmin"), // Default DiceDB Admin username
Password: getEnv("DICEDB_ADMIN_PASSWORD", ""), // Default DiceDB Admin password
},
DiceDB: struct {
Addr string
Username string
Password string
}{
Addr: getEnv("DICEDB_ADDR", "localhost:7379"), // Default Dice address
Addr: getEnv("DICEDB_ADDR", "localhost:7380"), // Default DiceDB address
Username: getEnv("DICEDB_USERNAME", "dice"), // Default username
Password: getEnv("DICEDB_PASSWORD", ""), // Default password
},
Server: struct {
Port string
IsTestEnv bool
RequestLimitPerMin int64
RequestWindowSec float64
AllowedOrigins []string
Port string
IsTestEnv bool
RequestLimitPerMin int64
RequestWindowSec float64
AllowedOrigins []string
CronCleanupFrequency time.Duration
}{
Port: getEnv("SERVER_PORT", ":8080"),
IsTestEnv: getEnvBool("IS_TEST_ENVIRONMENT", false), // Default server port
RequestLimitPerMin: getEnvInt("REQUEST_LIMIT_PER_MIN", 1000), // Default request limit
RequestWindowSec: getEnvFloat64("REQUEST_WINDOW_SEC", 60), // Default request window in float64
AllowedOrigins: getEnvArray("ALLOWED_ORIGINS", []string{"http://localhost:3000"}), // Default allowed origins
Port: getEnv("SERVER_PORT", ":8080"),
IsTestEnv: getEnvBool("IS_TEST_ENVIRONMENT", false), // Default server port
RequestLimitPerMin: getEnvInt("REQUEST_LIMIT_PER_MIN", 1000), // Default request limit
RequestWindowSec: getEnvFloat64("REQUEST_WINDOW_SEC", 60), // Default request window in float64
AllowedOrigins: getEnvArray("ALLOWED_ORIGINS", []string{"http://localhost:3000"}), // Default allowed origins
CronCleanupFrequency: time.Duration(getEnvInt("CRON_CLEANUP_FREQUENCY_MINS", 15)) * time.Minute, // Default cron cleanup frequency
},
}
}
Expand Down
20 changes: 18 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
services:
dicedb:
dicedbadmin:
image: dicedb/dicedb:latest
ports:
- "7379:7379"
Expand All @@ -11,15 +11,31 @@ services:
networks:
- dice-network

dicedb:
image: dicedb/dicedb:latest
ports:
- "7380:7379"
healthcheck:
test: [ "CMD", "PING" ]
interval: 10s
timeout: 3s
retries: 3
networks:
- dice-network

backend:
build:
context: .
ports:
- "8080:8080"
depends_on:
- dicedbadmin
- dicedb
environment:
- DICEDB_ADDR=localhost:7379
- DICEDB_ADMIN_ADDR=localhost:7379
- DICEDB_ADMIN_USERNAME=${DICEDB_ADMIN_USERNAME}
- DICEDB_ADMIN_PASSWORD=${DICEDB_ADMIN_PASSWORD}
- DICEDB_ADDR=localhost:7380
- DICEDB_USERNAME=${DICEDB_USERNAME}
- DICEDB_PASSWORD=${DICEDB_PASSWORD}
networks:
Expand Down
30 changes: 21 additions & 9 deletions internal/db/dicedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,27 @@ func (db *DiceDB) CloseDiceDB() {
}
}

func InitDiceClient(configValue *config.Config) (*DiceDB, error) {
diceClient := dicedb.NewClient(&dicedb.Options{
Addr: configValue.DiceDB.Addr,
Username: configValue.DiceDB.Username,
Password: configValue.DiceDB.Password,
DialTimeout: 10 * time.Second,
MaxRetries: 10,
EnablePrettyResponse: true,
})
func InitDiceClient(configValue *config.Config, isAdmin bool) (*DiceDB, error) {
var diceClient *dicedb.Client
if isAdmin {
diceClient = dicedb.NewClient(&dicedb.Options{
Addr: configValue.DiceDBAdmin.Addr,
Username: configValue.DiceDBAdmin.Username,
Password: configValue.DiceDBAdmin.Password,
DialTimeout: 10 * time.Second,
MaxRetries: 10,
EnablePrettyResponse: true,
})
} else {
diceClient = dicedb.NewClient(&dicedb.Options{
Addr: configValue.DiceDB.Addr,
Username: configValue.DiceDB.Username,
Password: configValue.DiceDB.Password,
DialTimeout: 10 * time.Second,
MaxRetries: 10,
EnablePrettyResponse: true,
})
}

// Ping the dicedb client to verify the connection
err := diceClient.Ping(context.Background()).Err()
Expand Down
30 changes: 24 additions & 6 deletions internal/middleware/ratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log/slog"
"net/http"
"server/internal/db"
"server/internal/server/utils"
mock "server/internal/tests/dbmocks"
"strconv"
"strings"
Expand Down Expand Up @@ -58,7 +59,7 @@ func RateLimiter(client *db.DiceDB, next http.Handler, limit int64, window float
// Check if the request count exceeds the limit
if requestCount >= limit {
slog.Warn("Request limit exceeded", "count", requestCount)
addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window))
addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window), 0)
http.Error(w, "429 - Too Many Requests", http.StatusTooManyRequests)
return
}
Expand All @@ -77,7 +78,22 @@ func RateLimiter(client *db.DiceDB, next http.Handler, limit int64, window float
}
}

addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window))
// Get the cron last cleanup run time
var lastCronCleanupTime int64
resp := client.Client.Get(ctx, utils.LastCronCleanupTimeUnixMs)
if resp.Err() != nil && !errors.Is(resp.Err(), dicedb.Nil) {
slog.Error("Failed to get last cron cleanup time for headers", slog.Any("err", resp.Err().Error()))
}

if resp.Val() != "" {
lastCronCleanupTime, err = strconv.ParseInt(resp.Val(), 10, 64)
if err != nil {
slog.Error("Error converting last cron cleanup time", "error", err)
}
}

addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window),
lastCronCleanupTime)

slog.Info("Request processed", "count", requestCount+1)
next.ServeHTTP(w, r)
Expand Down Expand Up @@ -126,7 +142,7 @@ func MockRateLimiter(client *mock.DiceDBMock, next http.Handler, limit int64, wi
// Check if the request limit has been exceeded
if requestCount >= limit {
slog.Warn("Request limit exceeded", "count", requestCount)
addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window))
addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window), 0)
http.Error(w, "429 - Too Many Requests", http.StatusTooManyRequests)
return
}
Expand All @@ -147,19 +163,21 @@ func MockRateLimiter(client *mock.DiceDBMock, next http.Handler, limit int64, wi
}
}

addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window))
addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window), 0)

slog.Info("Request processed", "count", requestCount)
next.ServeHTTP(w, r)
})
}

func addRateLimitHeaders(w http.ResponseWriter, limit, remaining, used, resetTime int64) {
func addRateLimitHeaders(w http.ResponseWriter, limit, remaining, used, resetTime, cronLastCleanupTime int64) {
w.Header().Set("x-ratelimit-limit", strconv.FormatInt(limit, 10))
w.Header().Set("x-ratelimit-remaining", strconv.FormatInt(remaining, 10))
w.Header().Set("x-ratelimit-used", strconv.FormatInt(used, 10))
w.Header().Set("x-ratelimit-reset", strconv.FormatInt(resetTime, 10))
w.Header().Set("x-last-cleanup-time", strconv.FormatInt(cronLastCleanupTime, 10))

// Expose the rate limit headers to the client
w.Header().Set("Access-Control-Expose-Headers", "x-ratelimit-limit, x-ratelimit-remaining, x-ratelimit-used, x-ratelimit-reset")
w.Header().Set("Access-Control-Expose-Headers", "x-ratelimit-limit, x-ratelimit-remaining,"+
"x-ratelimit-used, x-ratelimit-reset, x-last-cleanup-time")
}
83 changes: 83 additions & 0 deletions internal/server/cleanup_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package server

import (
"context"
"errors"
"log/slog"
"server/internal/db"
"server/internal/server/utils"
"strconv"
"sync"
"time"

"github.com/dicedb/dicedb-go"
)

type CleanupManager struct {
diceDBAdminClient *db.DiceDB
diceDBClient *db.DiceDB
cronFrequency time.Duration
}

func NewCleanupManager(diceDBAdminClient *db.DiceDB,
diceDBClient *db.DiceDB, cronFrequency time.Duration) *CleanupManager {
return &CleanupManager{
diceDBAdminClient: diceDBAdminClient,
diceDBClient: diceDBClient,
cronFrequency: cronFrequency,
}
}

func (c *CleanupManager) Run(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
c.start(ctx)
}

func (c *CleanupManager) start(ctx context.Context) {
ticker := time.NewTicker(c.cronFrequency)
defer ticker.Stop()

// Get the last cron run time
resp := c.diceDBAdminClient.Client.Get(ctx, utils.LastCronCleanupTimeUnixMs)
if resp.Err() != nil {
if errors.Is(resp.Err(), dicedb.Nil) {
// Default to current time
cleanupTime := strconv.FormatInt(time.Now().UnixMilli(), 10)
slog.Debug("Defaulting last cron cleanup time key since not set", slog.Any("cleanupTime", cleanupTime))
resp := c.diceDBAdminClient.Client.Set(ctx, utils.LastCronCleanupTimeUnixMs, cleanupTime, -1)
if resp.Err() != nil {
slog.Error("Failed to set default value for last cron cleanup time key",
slog.Any("err", resp.Err().Error()))
}
} else {
slog.Error("Failed to get last cron cleanup time", slog.Any("err", resp.Err().Error()))
}
}

for {
select {
case <-ticker.C:
c.runCronTasks()
case <-ctx.Done():
slog.Info("Shutting down cleanup manager")
return
}
}
}

func (c *CleanupManager) runCronTasks() {
// Flush the user DiceDB instance
resp := c.diceDBClient.Client.FlushDB(c.diceDBClient.Ctx)
if resp.Err() != nil {
slog.Error("Failed to flush keys from DiceDB user instance.")
}

// Update last cron run time on DiceDB instance
cleanupTime := strconv.FormatInt(time.Now().UnixMilli(), 10)
resp = c.diceDBAdminClient.Client.Set(c.diceDBClient.Ctx, utils.LastCronCleanupTimeUnixMs,
cleanupTime, -1)
slog.Debug("Updating last cron cleanup time key", slog.Any("cleanupTime", cleanupTime))
if resp.Err() != nil {
slog.Error("Failed to set LastCronCleanupTimeUnixMs")
}
}
Loading

0 comments on commit d8d50c8

Please sign in to comment.