From e2f9777c63165ebacb90e921fd1deca57b510dba Mon Sep 17 00:00:00 2001 From: Vincent Boutour Date: Fri, 30 Dec 2022 17:40:30 +0100 Subject: [PATCH] refactor(exclusive)!: Using redis for distributed lock instead of amqp Signed-off-by: Vincent Boutour --- README.md | 10 +---- cmd/fibr/config.go | 3 -- cmd/fibr/fibr.go | 17 ++++++--- pkg/crud/crud.go | 54 +++++++++----------------- pkg/exclusive/exclusive.go | 47 +++++++++++++++++++++++ pkg/exif/exif.go | 42 +++++++++----------- pkg/exif/exif_test.go | 2 +- pkg/exif/metadata.go | 2 +- pkg/provider/amqp.go | 31 --------------- pkg/search/crud.go | 2 +- pkg/search/search.go | 44 +++++---------------- pkg/share/crud.go | 5 ++- pkg/share/event.go | 5 ++- pkg/share/share.go | 78 ++++++++++++-------------------------- pkg/webhook/crud.go | 4 +- pkg/webhook/event.go | 2 +- pkg/webhook/webhook.go | 72 ++++++++++++++--------------------- 17 files changed, 168 insertions(+), 252 deletions(-) create mode 100644 pkg/exclusive/exclusive.go delete mode 100644 pkg/provider/amqp.go diff --git a/README.md b/README.md index 7dc8e6bd..c27f9af0 100644 --- a/README.md +++ b/README.md @@ -220,8 +220,6 @@ Be careful when using the CLI, if someone list the processes on the system, they Usage of fibr: -address string [server] Listen address {FIBR_ADDRESS} - -amqpExclusiveRoutingKey string - [crud] AMQP Routing Key for exclusive lock on default exchange {FIBR_AMQP_EXCLUSIVE_ROUTING_KEY} (default "fibr.semaphore.start") -amqpExifExchange string [amqpExif] Exchange name {FIBR_AMQP_EXIF_EXCHANGE} (default "fibr") -amqpExifExclusive @@ -288,8 +286,6 @@ Usage of fibr: [owasp] Content-Security-Policy {FIBR_CSP} (default "default-src 'self'; base-uri 'self'; script-src 'self' 'httputils-nonce' unpkg.com/webp-hero@0.0.2/dist-cjs/ unpkg.com/leaflet@1.9.3/dist/ unpkg.com/leaflet.markercluster@1.5.1/; style-src 'self' 'httputils-nonce' unpkg.com/leaflet@1.9.3/dist/ unpkg.com/leaflet.markercluster@1.5.1/; img-src 'self' data: a.tile.openstreetmap.org b.tile.openstreetmap.org c.tile.openstreetmap.org") -exifAmqpExchange string [exif] AMQP Exchange Name {FIBR_EXIF_AMQP_EXCHANGE} (default "fibr") - -exifAmqpExclusiveRoutingKey string - [exif] AMQP Routing Key for exclusive lock on default exchange {FIBR_EXIF_AMQP_EXCLUSIVE_ROUTING_KEY} (default "fibr.semaphore.exif") -exifAmqpRoutingKey string [exif] AMQP Routing Key for exif {FIBR_EXIF_AMQP_ROUTING_KEY} (default "exif_input") -exifDirectAccess @@ -370,8 +366,6 @@ Usage of fibr: [redis] Redis Username, if any {FIBR_REDIS_USERNAME} -sanitizeOnStart [crud] Sanitize name on start {FIBR_SANITIZE_ON_START} - -searchAmqpExclusiveRoutingKey string - [search] AMQP Routing Key for exclusive lock on default exchange {FIBR_SEARCH_AMQP_EXCLUSIVE_ROUTING_KEY} (default "fibr.semaphore.search") -shareAmqpExchange string [share] AMQP Exchange Name {FIBR_SHARE_AMQP_EXCHANGE} (default "fibr.shares") -shareAmqpExclusiveRoutingKey string @@ -432,8 +426,6 @@ Usage of fibr: [alcotest] User-Agent for check {FIBR_USER_AGENT} (default "Alcotest") -webhookAmqpExchange string [webhook] AMQP Exchange Name {FIBR_WEBHOOK_AMQP_EXCHANGE} (default "fibr.webhooks") - -webhookAmqpExclusiveRoutingKey string - [webhook] AMQP Routing Key for exclusive lock on default exchange {FIBR_WEBHOOK_AMQP_EXCLUSIVE_ROUTING_KEY} (default "fibr.semaphore.webhooks") -webhookAmqpRoutingKey string [webhook] AMQP Routing Key for webhook {FIBR_WEBHOOK_AMQP_ROUTING_KEY} (default "webhook") -webhookSecret string @@ -450,4 +442,4 @@ Fibr doesn't handle multiple instances running at the same time on the same `roo Shares' metadatas are stored in a file, loaded at the start of the application. If an _instance A_ adds a share, _instance B_ can't see it. If they are both behind the same load-balancer, it can leads to an erratic behavior. Fibr has also an internal cron that purge expired shares and write the new metadatas to the file. If _instance A_ adds a share and _instance B_ runs the cron, the share added in _instance A_ is lost. -If you enable AMQP, it can handle thoses behaviours by using an exclusive lock with an AMQP semaphore mechanism. +If you enable Redis caching, it can handle thoses behaviours by using an exclusive lock with the Redis semaphore mechanism. diff --git a/cmd/fibr/config.go b/cmd/fibr/config.go index 5b1f1dcc..d2a19071 100644 --- a/cmd/fibr/config.go +++ b/cmd/fibr/config.go @@ -9,7 +9,6 @@ import ( basicMemory "github.com/ViBiOh/auth/v2/pkg/store/memory" "github.com/ViBiOh/fibr/pkg/crud" "github.com/ViBiOh/fibr/pkg/exif" - "github.com/ViBiOh/fibr/pkg/search" "github.com/ViBiOh/fibr/pkg/share" "github.com/ViBiOh/fibr/pkg/storage" "github.com/ViBiOh/fibr/pkg/thumbnail" @@ -52,7 +51,6 @@ type configuration struct { amqpShare amqphandler.Config amqpWebhook amqphandler.Config redis redis.Config - search search.Config disableAuth *bool } @@ -84,7 +82,6 @@ func newConfig() (configuration, error) { amqpShare: amqphandler.Flags(fs, "amqpShare", flags.NewOverride("Exchange", "fibr.shares"), flags.NewOverride("Queue", "fibr.share-"+generateIdentityName()), flags.NewOverride("RoutingKey", "share"), flags.NewOverride("Exclusive", true), flags.NewOverride("RetryInterval", time.Duration(0))), amqpWebhook: amqphandler.Flags(fs, "amqpWebhook", flags.NewOverride("Exchange", "fibr.webhooks"), flags.NewOverride("Queue", "fibr.webhook-"+generateIdentityName()), flags.NewOverride("RoutingKey", "webhook"), flags.NewOverride("Exclusive", true), flags.NewOverride("RetryInterval", time.Duration(0))), redis: redis.Flags(fs, "redis", flags.NewOverride("Address", "")), - search: search.Flags(fs, "search"), disableAuth: flags.Bool(fs, "", "auth", "NoAuth", "Disable basic authentification", false, nil), }, fs.Parse(os.Args[1:]) } diff --git a/cmd/fibr/fibr.go b/cmd/fibr/fibr.go index 86186483..5c019f75 100644 --- a/cmd/fibr/fibr.go +++ b/cmd/fibr/fibr.go @@ -14,6 +14,7 @@ import ( authMiddleware "github.com/ViBiOh/auth/v2/pkg/middleware" basicMemory "github.com/ViBiOh/auth/v2/pkg/store/memory" "github.com/ViBiOh/fibr/pkg/crud" + "github.com/ViBiOh/fibr/pkg/exclusive" "github.com/ViBiOh/fibr/pkg/exif" "github.com/ViBiOh/fibr/pkg/fibr" "github.com/ViBiOh/fibr/pkg/provider" @@ -93,13 +94,18 @@ func main() { rendererApp, err := renderer.New(config.renderer, content, fibr.FuncMap, client.tracer.GetTracer("renderer")) logger.Fatal(err) - exifApp, err := exif.New(config.exif, storageApp, prometheusRegisterer, client.tracer, client.amqp, client.redis) + var exclusiveApp exclusive.App + if client.redis.Enabled() { + exclusiveApp = exclusive.New(client.redis) + } + + exifApp, err := exif.New(config.exif, storageApp, prometheusRegisterer, client.tracer, client.amqp, client.redis, exclusiveApp) logger.Fatal(err) - webhookApp, err := webhook.New(config.webhook, storageApp, prometheusRegisterer, client.amqp, rendererApp, thumbnailApp) + webhookApp, err := webhook.New(config.webhook, storageApp, prometheusRegisterer, client.amqp, rendererApp, thumbnailApp, exclusiveApp) logger.Fatal(err) - shareApp, err := share.New(config.share, storageApp, client.amqp) + shareApp, err := share.New(config.share, storageApp, client.amqp, exclusiveApp) logger.Fatal(err) amqpThumbnailApp, err := amqphandler.New(config.amqpThumbnail, client.amqp, client.tracer.GetTracer("amqp_handler_thumbnail"), thumbnailApp.AMQPHandler) @@ -114,10 +120,9 @@ func main() { amqpWebhookApp, err := amqphandler.New(config.amqpWebhook, client.amqp, client.tracer.GetTracer("amqp_handler_webhook"), webhookApp.AMQPHandler) logger.Fatal(err) - searchApp, err := search.New(config.search, filteredStorage, thumbnailApp, exifApp, client.amqp, client.tracer.GetTracer("search")) - logger.Fatal(err) + searchApp := search.New(filteredStorage, thumbnailApp, exifApp, exclusiveApp, client.tracer.GetTracer("search")) - crudApp, err := crud.New(config.crud, storageApp, filteredStorage, rendererApp, shareApp, webhookApp, thumbnailApp, exifApp, searchApp, eventBus.Push, client.amqp, client.tracer.GetTracer("crud")) + crudApp, err := crud.New(config.crud, storageApp, filteredStorage, rendererApp, shareApp, webhookApp, thumbnailApp, exifApp, searchApp, eventBus.Push, exclusiveApp, client.tracer.GetTracer("crud")) logger.Fatal(err) var middlewareApp provider.Auth diff --git a/pkg/crud/crud.go b/pkg/crud/crud.go index 7fe03c36..d352c760 100644 --- a/pkg/crud/crud.go +++ b/pkg/crud/crud.go @@ -10,12 +10,12 @@ import ( "time" absto "github.com/ViBiOh/absto/pkg/model" + "github.com/ViBiOh/fibr/pkg/exclusive" "github.com/ViBiOh/fibr/pkg/exif" "github.com/ViBiOh/fibr/pkg/provider" "github.com/ViBiOh/fibr/pkg/search" "github.com/ViBiOh/fibr/pkg/thumbnail" "github.com/ViBiOh/flags" - "github.com/ViBiOh/httputils/v4/pkg/amqp" "github.com/ViBiOh/httputils/v4/pkg/logger" "github.com/ViBiOh/httputils/v4/pkg/renderer" "go.opentelemetry.io/otel/trace" @@ -30,18 +30,15 @@ var ( ) type App struct { - tracer trace.Tracer - rawStorageApp absto.Storage - storageApp absto.Storage - shareApp provider.ShareManager - webhookApp provider.WebhookManager - exifApp provider.ExifManager - searchApp search.App - pushEvent provider.EventProducer - - amqpClient *amqp.Client - amqpExclusiveRoutingKey string - + tracer trace.Tracer + rawStorageApp absto.Storage + storageApp absto.Storage + shareApp provider.ShareManager + webhookApp provider.WebhookManager + exifApp provider.ExifManager + searchApp search.App + pushEvent provider.EventProducer + exclusiveApp exclusive.App temporaryFolder string rendererApp renderer.App thumbnailApp thumbnail.App @@ -51,11 +48,10 @@ type App struct { } type Config struct { - amqpExclusiveRoutingKey *string - bcryptDuration *string - temporaryFolder *string - sanitizeOnStart *bool - chunkUpload *bool + bcryptDuration *string + temporaryFolder *string + sanitizeOnStart *bool + chunkUpload *bool } func Flags(fs *flag.FlagSet, prefix string) Config { @@ -65,12 +61,10 @@ func Flags(fs *flag.FlagSet, prefix string) Config { chunkUpload: flags.Bool(fs, prefix, "crud", "ChunkUpload", "Use chunk upload in browser", false, nil), temporaryFolder: flags.String(fs, prefix, "crud", "TemporaryFolder", "Temporary folder for chunk upload", "/tmp", nil), - - amqpExclusiveRoutingKey: flags.String(fs, prefix, "crud", "AmqpExclusiveRoutingKey", "AMQP Routing Key for exclusive lock on default exchange", "fibr.semaphore.start", nil), } } -func New(config Config, storageApp absto.Storage, filteredStorage absto.Storage, rendererApp renderer.App, shareApp provider.ShareManager, webhookApp provider.WebhookManager, thumbnailApp thumbnail.App, exifApp exif.App, searchApp search.App, eventProducer provider.EventProducer, amqpClient *amqp.Client, tracer trace.Tracer) (App, error) { +func New(config Config, storageApp absto.Storage, filteredStorage absto.Storage, rendererApp renderer.App, shareApp provider.ShareManager, webhookApp provider.WebhookManager, thumbnailApp thumbnail.App, exifApp exif.App, searchApp search.App, eventProducer provider.EventProducer, exclusiveApp exclusive.App, tracer trace.Tracer) (App, error) { app := App{ sanitizeOnStart: *config.sanitizeOnStart, @@ -89,14 +83,7 @@ func New(config Config, storageApp absto.Storage, filteredStorage absto.Storage, webhookApp: webhookApp, searchApp: searchApp, - amqpClient: amqpClient, - amqpExclusiveRoutingKey: strings.TrimSpace(*config.amqpExclusiveRoutingKey), - } - - if amqpClient != nil { - if err := amqpClient.SetupExclusive(app.amqpExclusiveRoutingKey); err != nil { - return app, fmt.Errorf("setup amqp exclusive: %w", err) - } + exclusiveApp: exclusiveApp, } bcryptDuration, err := time.ParseDuration(strings.TrimSpace(*config.bcryptDuration)) @@ -117,16 +104,11 @@ func New(config Config, storageApp absto.Storage, filteredStorage absto.Storage, } func (a App) Start(ctx context.Context) { - if a.amqpClient == nil { - a.start(ctx) - return - } - - if _, err := a.amqpClient.Exclusive(context.Background(), a.amqpExclusiveRoutingKey, time.Hour, func(ctx context.Context) error { + if err := a.exclusiveApp.Execute(ctx, "fibr:mutex:start", func(ctx context.Context) error { a.start(ctx) return nil }); err != nil { - logger.Error("get exclusive semaphore: %s", err) + logger.Error("start: %s", err) } } diff --git a/pkg/exclusive/exclusive.go b/pkg/exclusive/exclusive.go new file mode 100644 index 00000000..6cfc5e63 --- /dev/null +++ b/pkg/exclusive/exclusive.go @@ -0,0 +1,47 @@ +package exclusive + +import ( + "context" + "time" + + "github.com/ViBiOh/httputils/v4/pkg/redis" +) + +const SemaphoreDuration = time.Second * 10 + +type App struct { + redisClient redis.App +} + +func New(redisClient redis.App) App { + return App{ + redisClient: redisClient, + } +} + +func (a App) Execute(ctx context.Context, name string, action func(context.Context) error) error { + if !a.redisClient.Enabled() { + return action(ctx) + } + +exclusive: + acquired, err := a.redisClient.Exclusive(ctx, name, SemaphoreDuration, action) + if err != nil { + return err + } + + if !acquired { + time.Sleep(time.Second) + goto exclusive + } + + return nil +} + +func (a App) Try(ctx context.Context, name string, action func(context.Context) error) (bool, error) { + if !a.redisClient.Enabled() { + return true, action(ctx) + } + + return a.redisClient.Exclusive(ctx, name, SemaphoreDuration, action) +} diff --git a/pkg/exif/exif.go b/pkg/exif/exif.go index d14e8b12..f01db7f0 100644 --- a/pkg/exif/exif.go +++ b/pkg/exif/exif.go @@ -10,6 +10,7 @@ import ( absto "github.com/ViBiOh/absto/pkg/model" exas "github.com/ViBiOh/exas/pkg/model" + "github.com/ViBiOh/fibr/pkg/exclusive" "github.com/ViBiOh/fibr/pkg/provider" "github.com/ViBiOh/flags" amqpclient "github.com/ViBiOh/httputils/v4/pkg/amqp" @@ -34,12 +35,12 @@ type App struct { exifCacheApp cache.App[absto.Item, provider.Metadata] aggregateCacheApp cache.App[absto.Item, provider.Aggregate] - redisClient redis.App + exclusiveApp exclusive.App + redisClient redis.App - amqpClient *amqpclient.Client - amqpExchange string - amqpRoutingKey string - amqpExclusiveRoutingKey string + amqpClient *amqpclient.Client + amqpExchange string + amqpRoutingKey string exifRequest request.Request @@ -52,9 +53,8 @@ type Config struct { exifUser *string exifPass *string - amqpExchange *string - amqpRoutingKey *string - amqpExclusiveRoutingKey *string + amqpExchange *string + amqpRoutingKey *string maxSize *int directAccess *bool @@ -69,15 +69,13 @@ func Flags(fs *flag.FlagSet, prefix string) Config { directAccess: flags.Bool(fs, prefix, "exif", "DirectAccess", "Use Exas with direct access to filesystem (no large file upload, send a GET request, Basic Auth recommended)", false, nil), maxSize: flags.Int(fs, prefix, "exif", "MaxSize", "Max file size (in bytes) for extracting exif (0 to no limit). Not used if DirectAccess enabled.", 1024*1024*200, nil), - amqpExchange: flags.String(fs, prefix, "exif", "AmqpExchange", "AMQP Exchange Name", "fibr", nil), - amqpRoutingKey: flags.String(fs, prefix, "exif", "AmqpRoutingKey", "AMQP Routing Key for exif", "exif_input", nil), - amqpExclusiveRoutingKey: flags.String(fs, prefix, "exif", "AmqpExclusiveRoutingKey", "AMQP Routing Key for exclusive lock on default exchange", "fibr.semaphore.exif", nil), + amqpExchange: flags.String(fs, prefix, "exif", "AmqpExchange", "AMQP Exchange Name", "fibr", nil), + amqpRoutingKey: flags.String(fs, prefix, "exif", "AmqpRoutingKey", "AMQP Routing Key for exif", "exif_input", nil), } } -func New(config Config, storageApp absto.Storage, prometheusRegisterer prometheus.Registerer, tracerApp tracer.App, amqpClient *amqpclient.Client, redisClient redis.App) (App, error) { +func New(config Config, storageApp absto.Storage, prometheusRegisterer prometheus.Registerer, tracerApp tracer.App, amqpClient *amqpclient.Client, redisClient redis.App, exclusiveApp exclusive.App) (App, error) { var amqpExchange string - var amqpExclusiveRoutingKey string if amqpClient != nil { amqpExchange = strings.TrimSpace(*config.amqpExchange) @@ -85,12 +83,6 @@ func New(config Config, storageApp absto.Storage, prometheusRegisterer prometheu if err := amqpClient.Publisher(amqpExchange, "direct", nil); err != nil { return App{}, fmt.Errorf("configure amqp: %w", err) } - - amqpExclusiveRoutingKey = strings.TrimSpace(*config.amqpExclusiveRoutingKey) - - if err := amqpClient.SetupExclusive(amqpExclusiveRoutingKey); err != nil { - return App{}, fmt.Errorf("setup amqp exclusive: %w", err) - } } app := App{ @@ -100,13 +92,13 @@ func New(config Config, storageApp absto.Storage, prometheusRegisterer prometheu redisClient: redisClient, - amqpClient: amqpClient, - amqpExchange: amqpExchange, - amqpRoutingKey: strings.TrimSpace(*config.amqpRoutingKey), - amqpExclusiveRoutingKey: amqpExclusiveRoutingKey, + amqpClient: amqpClient, + amqpExchange: amqpExchange, + amqpRoutingKey: strings.TrimSpace(*config.amqpRoutingKey), - tracer: tracerApp.GetTracer("exif"), - storageApp: storageApp, + tracer: tracerApp.GetTracer("exif"), + exclusiveApp: exclusiveApp, + storageApp: storageApp, listStorageApp: storageApp.WithIgnoreFn(func(item absto.Item) bool { return !strings.HasSuffix(item.Name, ".json") }), diff --git a/pkg/exif/exif_test.go b/pkg/exif/exif_test.go index ec82f74e..8e9ce97a 100644 --- a/pkg/exif/exif_test.go +++ b/pkg/exif/exif_test.go @@ -13,7 +13,7 @@ func TestFlags(t *testing.T) { want string }{ "simple": { - "Usage of simple:\n -amqpExchange string\n \t[exif] AMQP Exchange Name {SIMPLE_AMQP_EXCHANGE} (default \"fibr\")\n -amqpExclusiveRoutingKey string\n \t[exif] AMQP Routing Key for exclusive lock on default exchange {SIMPLE_AMQP_EXCLUSIVE_ROUTING_KEY} (default \"fibr.semaphore.exif\")\n -amqpRoutingKey string\n \t[exif] AMQP Routing Key for exif {SIMPLE_AMQP_ROUTING_KEY} (default \"exif_input\")\n -directAccess\n \t[exif] Use Exas with direct access to filesystem (no large file upload, send a GET request, Basic Auth recommended) {SIMPLE_DIRECT_ACCESS}\n -maxSize int\n \t[exif] Max file size (in bytes) for extracting exif (0 to no limit). Not used if DirectAccess enabled. {SIMPLE_MAX_SIZE} (default 209715200)\n -password string\n \t[exif] Exif Tool URL Basic Password {SIMPLE_PASSWORD}\n -uRL string\n \t[exif] Exif Tool URL (exas) {SIMPLE_URL} (default \"http://exas:1080\")\n -user string\n \t[exif] Exif Tool URL Basic User {SIMPLE_USER}\n", + "Usage of simple:\n -amqpExchange string\n \t[exif] AMQP Exchange Name {SIMPLE_AMQP_EXCHANGE} (default \"fibr\")\n -amqpRoutingKey string\n \t[exif] AMQP Routing Key for exif {SIMPLE_AMQP_ROUTING_KEY} (default \"exif_input\")\n -directAccess\n \t[exif] Use Exas with direct access to filesystem (no large file upload, send a GET request, Basic Auth recommended) {SIMPLE_DIRECT_ACCESS}\n -maxSize int\n \t[exif] Max file size (in bytes) for extracting exif (0 to no limit). Not used if DirectAccess enabled. {SIMPLE_MAX_SIZE} (default 209715200)\n -password string\n \t[exif] Exif Tool URL Basic Password {SIMPLE_PASSWORD}\n -uRL string\n \t[exif] Exif Tool URL (exas) {SIMPLE_URL} (default \"http://exas:1080\")\n -user string\n \t[exif] Exif Tool URL Basic User {SIMPLE_USER}\n", }, } diff --git a/pkg/exif/metadata.go b/pkg/exif/metadata.go index 1a02b471..8e436548 100644 --- a/pkg/exif/metadata.go +++ b/pkg/exif/metadata.go @@ -31,7 +31,7 @@ func WithDescription(description string) MetadataOption { func (a App) update(ctx context.Context, item absto.Item, opts ...MetadataOption) (provider.Metadata, error) { var metadata provider.Metadata - return metadata, provider.Exclusive(ctx, a.amqpClient, a.amqpExclusiveRoutingKey, func(ctx context.Context) error { + return metadata, a.exclusiveApp.Execute(ctx, "fibr:mutex:"+item.ID, func(ctx context.Context) error { var err error metadata, err := a.GetMetadataFor(ctx, item) diff --git a/pkg/provider/amqp.go b/pkg/provider/amqp.go deleted file mode 100644 index 5a4773e7..00000000 --- a/pkg/provider/amqp.go +++ /dev/null @@ -1,31 +0,0 @@ -package provider - -import ( - "context" - "time" - - "github.com/ViBiOh/httputils/v4/pkg/amqp" -) - -const SemaphoreDuration = time.Second * 10 - -func Exclusive(ctx context.Context, amqpClient *amqp.Client, name string, action func(ctx context.Context) error) error { - if amqpClient == nil { - return action(ctx) - } - -exclusive: - acquired, err := amqpClient.Exclusive(ctx, name, SemaphoreDuration, func(ctx context.Context) error { - return action(ctx) - }) - if err != nil { - return err - } - - if !acquired { - time.Sleep(time.Second) - goto exclusive - } - - return nil -} diff --git a/pkg/search/crud.go b/pkg/search/crud.go index 0fdf08d7..fedb5f3d 100644 --- a/pkg/search/crud.go +++ b/pkg/search/crud.go @@ -64,7 +64,7 @@ func (a App) Delete(ctx context.Context, item absto.Item, name string) error { } func (a App) update(ctx context.Context, item absto.Item, opts ...SearchesOption) error { - return provider.Exclusive(ctx, a.amqpClient, a.amqpExclusiveRoutingKey, func(ctx context.Context) error { + return a.exclusiveApp.Execute(ctx, "fibr:mutex:"+item.ID, func(ctx context.Context) error { searches, err := a.load(ctx, item) if err != nil { return fmt.Errorf("load: %w", err) diff --git a/pkg/search/search.go b/pkg/search/search.go index a0c0cbc6..ee64e2cf 100644 --- a/pkg/search/search.go +++ b/pkg/search/search.go @@ -1,61 +1,35 @@ package search import ( - "flag" - "fmt" "net/http" - "strings" "time" absto "github.com/ViBiOh/absto/pkg/model" + "github.com/ViBiOh/fibr/pkg/exclusive" "github.com/ViBiOh/fibr/pkg/exif" "github.com/ViBiOh/fibr/pkg/provider" "github.com/ViBiOh/fibr/pkg/thumbnail" - "github.com/ViBiOh/flags" - "github.com/ViBiOh/httputils/v4/pkg/amqp" httpModel "github.com/ViBiOh/httputils/v4/pkg/model" "github.com/ViBiOh/httputils/v4/pkg/tracer" "go.opentelemetry.io/otel/trace" ) -type Config struct { - amqpExclusiveRoutingKey *string -} - -func Flags(fs *flag.FlagSet, prefix string) Config { - return Config{ - amqpExclusiveRoutingKey: flags.String(fs, prefix, "search", "AmqpExclusiveRoutingKey", "AMQP Routing Key for exclusive lock on default exchange", "fibr.semaphore.search", nil), - } -} - type App struct { - tracer trace.Tracer - storageApp absto.Storage - exifApp provider.ExifManager - amqpClient *amqp.Client - amqpExclusiveRoutingKey string - thumbnailApp thumbnail.App + tracer trace.Tracer + storageApp absto.Storage + exifApp provider.ExifManager + exclusiveApp exclusive.App + thumbnailApp thumbnail.App } -func New(config Config, storageApp absto.Storage, thumbnailApp thumbnail.App, exifApp exif.App, amqpClient *amqp.Client, tracer trace.Tracer) (App, error) { - var amqpExclusiveRoutingKey string - - if amqpClient != nil { - amqpExclusiveRoutingKey = strings.TrimSpace(*config.amqpExclusiveRoutingKey) - - if err := amqpClient.SetupExclusive(amqpExclusiveRoutingKey); err != nil { - return App{}, fmt.Errorf("setup amqp exclusive: %w", err) - } - } - +func New(storageApp absto.Storage, thumbnailApp thumbnail.App, exifApp exif.App, exclusiveApp exclusive.App, tracer trace.Tracer) App { return App{ tracer: tracer, storageApp: storageApp, thumbnailApp: thumbnailApp, exifApp: exifApp, - - amqpExclusiveRoutingKey: amqpExclusiveRoutingKey, - }, nil + exclusiveApp: exclusiveApp, + } } func (a App) Files(r *http.Request, request provider.Request) (items []absto.Item, err error) { diff --git a/pkg/share/crud.go b/pkg/share/crud.go index e389dc2d..7cf64039 100644 --- a/pkg/share/crud.go +++ b/pkg/share/crud.go @@ -7,6 +7,7 @@ import ( "sort" "time" + "github.com/ViBiOh/fibr/pkg/exclusive" "github.com/ViBiOh/fibr/pkg/provider" "github.com/ViBiOh/httputils/v4/pkg/sha" "github.com/ViBiOh/httputils/v4/pkg/uuid" @@ -48,7 +49,7 @@ func (a *App) List() (output []provider.Share) { func (a *App) Create(ctx context.Context, filepath string, edit, story bool, password string, isDir bool, duration time.Duration) (string, error) { var id string - _, err := a.Exclusive(ctx, a.amqpExclusiveRoutingKey, provider.SemaphoreDuration, func(ctx context.Context) error { + _, err := a.Exclusive(ctx, "create", exclusive.SemaphoreDuration, func(ctx context.Context) error { var err error id, err = a.generateID() if err != nil { @@ -86,7 +87,7 @@ func (a *App) Create(ctx context.Context, filepath string, edit, story bool, pas } func (a *App) Delete(ctx context.Context, id string) error { - _, err := a.Exclusive(ctx, a.amqpExclusiveRoutingKey, provider.SemaphoreDuration, func(_ context.Context) error { + _, err := a.Exclusive(ctx, id, exclusive.SemaphoreDuration, func(_ context.Context) error { return a.delete(ctx, id) }) diff --git a/pkg/share/event.go b/pkg/share/event.go index a5465fed..2f21f500 100644 --- a/pkg/share/event.go +++ b/pkg/share/event.go @@ -6,6 +6,7 @@ import ( "strings" absto "github.com/ViBiOh/absto/pkg/model" + "github.com/ViBiOh/fibr/pkg/exclusive" "github.com/ViBiOh/fibr/pkg/provider" "github.com/ViBiOh/httputils/v4/pkg/logger" ) @@ -25,7 +26,7 @@ func (a *App) EventConsumer(ctx context.Context, e provider.Event) { } func (a *App) renameItem(ctx context.Context, old, new absto.Item) error { - _, err := a.Exclusive(ctx, a.amqpExclusiveRoutingKey, provider.SemaphoreDuration, func(_ context.Context) error { + _, err := a.Exclusive(ctx, old.ID, exclusive.SemaphoreDuration, func(ctx context.Context) error { for id, share := range a.shares { if strings.HasPrefix(share.Path, old.Pathname) { share.Path = strings.Replace(share.Path, old.Pathname, new.Pathname, 1) @@ -46,7 +47,7 @@ func (a *App) renameItem(ctx context.Context, old, new absto.Item) error { } func (a *App) deleteItem(ctx context.Context, item absto.Item) error { - _, err := a.Exclusive(ctx, a.amqpExclusiveRoutingKey, provider.SemaphoreDuration, func(_ context.Context) error { + _, err := a.Exclusive(ctx, item.ID, exclusive.SemaphoreDuration, func(_ context.Context) error { for id, share := range a.shares { if strings.HasPrefix(share.Path, item.Pathname) { if err := a.delete(ctx, id); err != nil { diff --git a/pkg/share/share.go b/pkg/share/share.go index b0ea4f9b..a831377e 100644 --- a/pkg/share/share.go +++ b/pkg/share/share.go @@ -10,6 +10,7 @@ import ( "time" absto "github.com/ViBiOh/absto/pkg/model" + "github.com/ViBiOh/fibr/pkg/exclusive" "github.com/ViBiOh/fibr/pkg/provider" "github.com/ViBiOh/flags" "github.com/ViBiOh/httputils/v4/pkg/amqp" @@ -22,65 +23,52 @@ type GetNow func() time.Time var shareFilename = provider.MetadataDirectoryName + "/shares.json" type App struct { - storageApp absto.Storage - shares map[string]provider.Share - clock GetNow - - amqpClient *amqp.Client - amqpExchange string - amqpRoutingKey string - amqpExclusiveRoutingKey string - - mutex sync.RWMutex + exclusiveApp exclusive.App + storageApp absto.Storage + shares map[string]provider.Share + clock GetNow + amqpClient *amqp.Client + amqpExchange string + amqpRoutingKey string + mutex sync.RWMutex } type Config struct { - amqpExchange *string - amqpRoutingKey *string - amqpExclusiveRoutingKey *string + amqpExchange *string + amqpRoutingKey *string } func Flags(fs *flag.FlagSet, prefix string) Config { return Config{ - amqpExchange: flags.String(fs, prefix, "share", "AmqpExchange", "AMQP Exchange Name", "fibr.shares", nil), - amqpRoutingKey: flags.String(fs, prefix, "share", "AmqpRoutingKey", "AMQP Routing Key for share", "share", nil), - amqpExclusiveRoutingKey: flags.String(fs, prefix, "share", "AmqpExclusiveRoutingKey", "AMQP Routing Key for exclusive lock on default exchange", "fibr.semaphore.shares", nil), + amqpExchange: flags.String(fs, prefix, "share", "AmqpExchange", "AMQP Exchange Name", "fibr.shares", nil), + amqpRoutingKey: flags.String(fs, prefix, "share", "AmqpRoutingKey", "AMQP Routing Key for share", "share", nil), } } -func New(config Config, storageApp absto.Storage, amqpClient *amqp.Client) (*App, error) { +func New(config Config, storageApp absto.Storage, amqpClient *amqp.Client, exclusiveApp exclusive.App) (*App, error) { var amqpExchange string - var amqpExclusiveRoutingKey string if amqpClient != nil { amqpExchange = strings.TrimSpace(*config.amqpExchange) - amqpExclusiveRoutingKey = strings.TrimSpace(*config.amqpExclusiveRoutingKey) if err := amqpClient.Publisher(amqpExchange, "fanout", nil); err != nil { return &App{}, fmt.Errorf("configure amqp: %w", err) } - - amqpExchange = strings.TrimSpace(*config.amqpExchange) - if err := amqpClient.SetupExclusive(amqpExclusiveRoutingKey); err != nil { - return &App{}, fmt.Errorf("setup amqp exclusive: %w", err) - } } return &App{ - clock: time.Now, - - shares: make(map[string]provider.Share), - storageApp: storageApp, - - amqpClient: amqpClient, - amqpExchange: amqpExchange, - amqpRoutingKey: strings.TrimSpace(*config.amqpRoutingKey), - amqpExclusiveRoutingKey: amqpExclusiveRoutingKey, + clock: time.Now, + shares: make(map[string]provider.Share), + storageApp: storageApp, + exclusiveApp: exclusiveApp, + amqpClient: amqpClient, + amqpExchange: amqpExchange, + amqpRoutingKey: strings.TrimSpace(*config.amqpRoutingKey), }, nil } -func (a *App) Exclusive(ctx context.Context, name string, duration time.Duration, action func(ctx context.Context) error) (bool, error) { - fn := func() error { +func (a *App) Exclusive(ctx context.Context, name string, _ time.Duration, action func(ctx context.Context) error) (bool, error) { + return a.exclusiveApp.Try(ctx, "fibr:mutex:"+name, func(ctx context.Context) error { a.mutex.Lock() defer a.mutex.Unlock() @@ -89,25 +77,7 @@ func (a *App) Exclusive(ctx context.Context, name string, duration time.Duration } return action(ctx) - } - - if a.amqpClient == nil { - return true, fn() - } - -exclusive: - acquired, err := a.amqpClient.Exclusive(ctx, name, duration, func(ctx context.Context) error { - return fn() }) - if err != nil { - return true, err - } - if !acquired { - time.Sleep(time.Second) - goto exclusive - } - - return true, nil } func (a *App) Get(requestPath string) provider.Share { @@ -136,7 +106,7 @@ func (a *App) Start(ctx context.Context) { }).OnSignal(syscall.SIGUSR1) if a.amqpClient != nil { - purgeCron.Exclusive(a, a.amqpExclusiveRoutingKey, provider.SemaphoreDuration) + purgeCron.Exclusive(a, "purge", exclusive.SemaphoreDuration) } purgeCron.Start(ctx, a.cleanShares) diff --git a/pkg/webhook/crud.go b/pkg/webhook/crud.go index dd6b74ea..6472d654 100644 --- a/pkg/webhook/crud.go +++ b/pkg/webhook/crud.go @@ -46,7 +46,7 @@ func (a *App) List() (output []provider.Webhook) { func (a *App) Create(ctx context.Context, pathname string, recursive bool, kind provider.WebhookKind, url string, types []provider.EventType) (string, error) { var id string - return id, a.Exclusive(ctx, a.amqpExclusiveRoutingKey, func(ctx context.Context) (err error) { + return id, a.Exclusive(ctx, "create", func(ctx context.Context) (err error) { id, err = a.generateID() if err != nil { return fmt.Errorf("generate id: %w", err) @@ -78,7 +78,7 @@ func (a *App) Create(ctx context.Context, pathname string, recursive bool, kind } func (a *App) Delete(ctx context.Context, id string) error { - return a.Exclusive(ctx, a.amqpExclusiveRoutingKey, func(_ context.Context) error { + return a.Exclusive(ctx, id, func(_ context.Context) error { return a.delete(ctx, id) }) } diff --git a/pkg/webhook/event.go b/pkg/webhook/event.go index 256f2e9b..169acc66 100644 --- a/pkg/webhook/event.go +++ b/pkg/webhook/event.go @@ -210,7 +210,7 @@ func (a *App) accessEvent(event provider.Event) string { } func (a *App) deleteItem(ctx context.Context, item absto.Item) error { - return a.Exclusive(ctx, a.amqpExclusiveRoutingKey, func(_ context.Context) error { + return a.Exclusive(ctx, item.ID, func(_ context.Context) error { for id, webhook := range a.webhooks { if webhook.Pathname == item.Pathname { if err := a.delete(ctx, id); err != nil { diff --git a/pkg/webhook/webhook.go b/pkg/webhook/webhook.go index 32366181..7d95339f 100644 --- a/pkg/webhook/webhook.go +++ b/pkg/webhook/webhook.go @@ -8,6 +8,7 @@ import ( "sync" absto "github.com/ViBiOh/absto/pkg/model" + "github.com/ViBiOh/fibr/pkg/exclusive" "github.com/ViBiOh/fibr/pkg/provider" "github.com/ViBiOh/fibr/pkg/thumbnail" "github.com/ViBiOh/flags" @@ -21,76 +22,61 @@ import ( var webhookFilename = provider.MetadataDirectoryName + "/webhooks.json" type App struct { - storageApp absto.Storage - webhooks map[string]provider.Webhook - counter *prometheus.CounterVec - - amqpClient *amqp.Client - amqpExchange string - amqpExclusiveRoutingKey string - amqpRoutingKey string - - hmacSecret []byte - - rendererApp renderer.App - thumbnailApp thumbnail.App - - mutex sync.RWMutex + exclusiveApp exclusive.App + storageApp absto.Storage + webhooks map[string]provider.Webhook + counter *prometheus.CounterVec + amqpClient *amqp.Client + amqpExchange string + amqpRoutingKey string + rendererApp renderer.App + hmacSecret []byte + thumbnailApp thumbnail.App + mutex sync.RWMutex } type Config struct { hmacSecret *string - amqpExchange *string - amqpRoutingKey *string - amqpExclusiveRoutingKey *string + amqpExchange *string + amqpRoutingKey *string } func Flags(fs *flag.FlagSet, prefix string) Config { return Config{ - hmacSecret: flags.String(fs, prefix, "webhook", "Secret", "Secret for HMAC Signature", "", nil), - - amqpExchange: flags.String(fs, prefix, "webhook", "AmqpExchange", "AMQP Exchange Name", "fibr.webhooks", nil), - amqpRoutingKey: flags.String(fs, prefix, "webhook", "AmqpRoutingKey", "AMQP Routing Key for webhook", "webhook", nil), - amqpExclusiveRoutingKey: flags.String(fs, prefix, "webhook", "AmqpExclusiveRoutingKey", "AMQP Routing Key for exclusive lock on default exchange", "fibr.semaphore.webhooks", nil), + hmacSecret: flags.String(fs, prefix, "webhook", "Secret", "Secret for HMAC Signature", "", nil), + amqpExchange: flags.String(fs, prefix, "webhook", "AmqpExchange", "AMQP Exchange Name", "fibr.webhooks", nil), + amqpRoutingKey: flags.String(fs, prefix, "webhook", "AmqpRoutingKey", "AMQP Routing Key for webhook", "webhook", nil), } } -func New(config Config, storageApp absto.Storage, prometheusRegisterer prometheus.Registerer, amqpClient *amqp.Client, rendererApp renderer.App, thumbnailApp thumbnail.App) (*App, error) { +func New(config Config, storageApp absto.Storage, prometheusRegisterer prometheus.Registerer, amqpClient *amqp.Client, rendererApp renderer.App, thumbnailApp thumbnail.App, exclusiveApp exclusive.App) (*App, error) { var amqpExchange string - var amqpExclusiveRoutingKey string if amqpClient != nil { amqpExchange = strings.TrimSpace(*config.amqpExchange) - amqpExclusiveRoutingKey = strings.TrimSpace(*config.amqpExclusiveRoutingKey) if err := amqpClient.Publisher(amqpExchange, "fanout", nil); err != nil { return &App{}, fmt.Errorf("configure amqp: %w", err) } - - amqpExchange = strings.TrimSpace(*config.amqpExchange) - if err := amqpClient.SetupExclusive(amqpExclusiveRoutingKey); err != nil { - return &App{}, fmt.Errorf("setup amqp exclusive: %w", err) - } } return &App{ - storageApp: storageApp, - rendererApp: rendererApp, - thumbnailApp: thumbnailApp, - webhooks: make(map[string]provider.Webhook), - counter: prom.CounterVec(prometheusRegisterer, "fibr", "webhook", "item", "code"), - hmacSecret: []byte(*config.hmacSecret), - - amqpClient: amqpClient, - amqpExchange: amqpExchange, - amqpRoutingKey: strings.TrimSpace(*config.amqpRoutingKey), - amqpExclusiveRoutingKey: amqpExclusiveRoutingKey, + storageApp: storageApp, + rendererApp: rendererApp, + thumbnailApp: thumbnailApp, + exclusiveApp: exclusiveApp, + webhooks: make(map[string]provider.Webhook), + counter: prom.CounterVec(prometheusRegisterer, "fibr", "webhook", "item", "code"), + hmacSecret: []byte(*config.hmacSecret), + amqpClient: amqpClient, + amqpExchange: amqpExchange, + amqpRoutingKey: strings.TrimSpace(*config.amqpRoutingKey), }, nil } func (a *App) Exclusive(ctx context.Context, name string, action func(ctx context.Context) error) error { - return provider.Exclusive(ctx, a.amqpClient, a.amqpExclusiveRoutingKey, func(ctx context.Context) error { + return a.exclusiveApp.Execute(ctx, "fibr:mutex:"+name, func(ctx context.Context) error { a.mutex.Lock() defer a.mutex.Unlock()