Skip to content

Commit

Permalink
refactor(exclusive)!: Using redis for distributed lock instead of amqp
Browse files Browse the repository at this point in the history
Signed-off-by: Vincent Boutour <[email protected]>
  • Loading branch information
ViBiOh committed Dec 30, 2022
1 parent bf10acb commit e2f9777
Show file tree
Hide file tree
Showing 17 changed files with 168 additions and 252 deletions.
10 changes: 1 addition & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,6 @@ Be careful when using the CLI, if someone list the processes on the system, they
Usage of fibr:
-address string
[server] Listen address {FIBR_ADDRESS}
-amqpExclusiveRoutingKey string
[crud] AMQP Routing Key for exclusive lock on default exchange {FIBR_AMQP_EXCLUSIVE_ROUTING_KEY} (default "fibr.semaphore.start")
-amqpExifExchange string
[amqpExif] Exchange name {FIBR_AMQP_EXIF_EXCHANGE} (default "fibr")
-amqpExifExclusive
Expand Down Expand Up @@ -288,8 +286,6 @@ Usage of fibr:
[owasp] Content-Security-Policy {FIBR_CSP} (default "default-src 'self'; base-uri 'self'; script-src 'self' 'httputils-nonce' unpkg.com/[email protected]/dist-cjs/ unpkg.com/[email protected]/dist/ unpkg.com/[email protected]/; style-src 'self' 'httputils-nonce' unpkg.com/[email protected]/dist/ unpkg.com/[email protected]/; img-src 'self' data: a.tile.openstreetmap.org b.tile.openstreetmap.org c.tile.openstreetmap.org")
-exifAmqpExchange string
[exif] AMQP Exchange Name {FIBR_EXIF_AMQP_EXCHANGE} (default "fibr")
-exifAmqpExclusiveRoutingKey string
[exif] AMQP Routing Key for exclusive lock on default exchange {FIBR_EXIF_AMQP_EXCLUSIVE_ROUTING_KEY} (default "fibr.semaphore.exif")
-exifAmqpRoutingKey string
[exif] AMQP Routing Key for exif {FIBR_EXIF_AMQP_ROUTING_KEY} (default "exif_input")
-exifDirectAccess
Expand Down Expand Up @@ -370,8 +366,6 @@ Usage of fibr:
[redis] Redis Username, if any {FIBR_REDIS_USERNAME}
-sanitizeOnStart
[crud] Sanitize name on start {FIBR_SANITIZE_ON_START}
-searchAmqpExclusiveRoutingKey string
[search] AMQP Routing Key for exclusive lock on default exchange {FIBR_SEARCH_AMQP_EXCLUSIVE_ROUTING_KEY} (default "fibr.semaphore.search")
-shareAmqpExchange string
[share] AMQP Exchange Name {FIBR_SHARE_AMQP_EXCHANGE} (default "fibr.shares")
-shareAmqpExclusiveRoutingKey string
Expand Down Expand Up @@ -432,8 +426,6 @@ Usage of fibr:
[alcotest] User-Agent for check {FIBR_USER_AGENT} (default "Alcotest")
-webhookAmqpExchange string
[webhook] AMQP Exchange Name {FIBR_WEBHOOK_AMQP_EXCHANGE} (default "fibr.webhooks")
-webhookAmqpExclusiveRoutingKey string
[webhook] AMQP Routing Key for exclusive lock on default exchange {FIBR_WEBHOOK_AMQP_EXCLUSIVE_ROUTING_KEY} (default "fibr.semaphore.webhooks")
-webhookAmqpRoutingKey string
[webhook] AMQP Routing Key for webhook {FIBR_WEBHOOK_AMQP_ROUTING_KEY} (default "webhook")
-webhookSecret string
Expand All @@ -450,4 +442,4 @@ Fibr doesn't handle multiple instances running at the same time on the same `roo
Shares' metadatas are stored in a file, loaded at the start of the application. If an _instance A_ adds a share, _instance B_ can't see it. If they are both behind the same load-balancer, it can leads to an erratic behavior. Fibr has also an internal cron that purge expired shares and write the new metadatas to the file. If _instance A_ adds a share and _instance B_ runs the cron, the share added in _instance A_ is lost.
If you enable AMQP, it can handle thoses behaviours by using an exclusive lock with an AMQP semaphore mechanism.
If you enable Redis caching, it can handle thoses behaviours by using an exclusive lock with the Redis semaphore mechanism.
3 changes: 0 additions & 3 deletions cmd/fibr/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
basicMemory "github.com/ViBiOh/auth/v2/pkg/store/memory"
"github.com/ViBiOh/fibr/pkg/crud"
"github.com/ViBiOh/fibr/pkg/exif"
"github.com/ViBiOh/fibr/pkg/search"
"github.com/ViBiOh/fibr/pkg/share"
"github.com/ViBiOh/fibr/pkg/storage"
"github.com/ViBiOh/fibr/pkg/thumbnail"
Expand Down Expand Up @@ -52,7 +51,6 @@ type configuration struct {
amqpShare amqphandler.Config
amqpWebhook amqphandler.Config
redis redis.Config
search search.Config
disableAuth *bool
}

Expand Down Expand Up @@ -84,7 +82,6 @@ func newConfig() (configuration, error) {
amqpShare: amqphandler.Flags(fs, "amqpShare", flags.NewOverride("Exchange", "fibr.shares"), flags.NewOverride("Queue", "fibr.share-"+generateIdentityName()), flags.NewOverride("RoutingKey", "share"), flags.NewOverride("Exclusive", true), flags.NewOverride("RetryInterval", time.Duration(0))),
amqpWebhook: amqphandler.Flags(fs, "amqpWebhook", flags.NewOverride("Exchange", "fibr.webhooks"), flags.NewOverride("Queue", "fibr.webhook-"+generateIdentityName()), flags.NewOverride("RoutingKey", "webhook"), flags.NewOverride("Exclusive", true), flags.NewOverride("RetryInterval", time.Duration(0))),
redis: redis.Flags(fs, "redis", flags.NewOverride("Address", "")),
search: search.Flags(fs, "search"),
disableAuth: flags.Bool(fs, "", "auth", "NoAuth", "Disable basic authentification", false, nil),
}, fs.Parse(os.Args[1:])
}
17 changes: 11 additions & 6 deletions cmd/fibr/fibr.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
authMiddleware "github.com/ViBiOh/auth/v2/pkg/middleware"
basicMemory "github.com/ViBiOh/auth/v2/pkg/store/memory"
"github.com/ViBiOh/fibr/pkg/crud"
"github.com/ViBiOh/fibr/pkg/exclusive"
"github.com/ViBiOh/fibr/pkg/exif"
"github.com/ViBiOh/fibr/pkg/fibr"
"github.com/ViBiOh/fibr/pkg/provider"
Expand Down Expand Up @@ -93,13 +94,18 @@ func main() {
rendererApp, err := renderer.New(config.renderer, content, fibr.FuncMap, client.tracer.GetTracer("renderer"))
logger.Fatal(err)

exifApp, err := exif.New(config.exif, storageApp, prometheusRegisterer, client.tracer, client.amqp, client.redis)
var exclusiveApp exclusive.App
if client.redis.Enabled() {
exclusiveApp = exclusive.New(client.redis)
}

exifApp, err := exif.New(config.exif, storageApp, prometheusRegisterer, client.tracer, client.amqp, client.redis, exclusiveApp)
logger.Fatal(err)

webhookApp, err := webhook.New(config.webhook, storageApp, prometheusRegisterer, client.amqp, rendererApp, thumbnailApp)
webhookApp, err := webhook.New(config.webhook, storageApp, prometheusRegisterer, client.amqp, rendererApp, thumbnailApp, exclusiveApp)
logger.Fatal(err)

shareApp, err := share.New(config.share, storageApp, client.amqp)
shareApp, err := share.New(config.share, storageApp, client.amqp, exclusiveApp)
logger.Fatal(err)

amqpThumbnailApp, err := amqphandler.New(config.amqpThumbnail, client.amqp, client.tracer.GetTracer("amqp_handler_thumbnail"), thumbnailApp.AMQPHandler)
Expand All @@ -114,10 +120,9 @@ func main() {
amqpWebhookApp, err := amqphandler.New(config.amqpWebhook, client.amqp, client.tracer.GetTracer("amqp_handler_webhook"), webhookApp.AMQPHandler)
logger.Fatal(err)

searchApp, err := search.New(config.search, filteredStorage, thumbnailApp, exifApp, client.amqp, client.tracer.GetTracer("search"))
logger.Fatal(err)
searchApp := search.New(filteredStorage, thumbnailApp, exifApp, exclusiveApp, client.tracer.GetTracer("search"))

crudApp, err := crud.New(config.crud, storageApp, filteredStorage, rendererApp, shareApp, webhookApp, thumbnailApp, exifApp, searchApp, eventBus.Push, client.amqp, client.tracer.GetTracer("crud"))
crudApp, err := crud.New(config.crud, storageApp, filteredStorage, rendererApp, shareApp, webhookApp, thumbnailApp, exifApp, searchApp, eventBus.Push, exclusiveApp, client.tracer.GetTracer("crud"))
logger.Fatal(err)

var middlewareApp provider.Auth
Expand Down
54 changes: 18 additions & 36 deletions pkg/crud/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (
"time"

absto "github.com/ViBiOh/absto/pkg/model"
"github.com/ViBiOh/fibr/pkg/exclusive"
"github.com/ViBiOh/fibr/pkg/exif"
"github.com/ViBiOh/fibr/pkg/provider"
"github.com/ViBiOh/fibr/pkg/search"
"github.com/ViBiOh/fibr/pkg/thumbnail"
"github.com/ViBiOh/flags"
"github.com/ViBiOh/httputils/v4/pkg/amqp"
"github.com/ViBiOh/httputils/v4/pkg/logger"
"github.com/ViBiOh/httputils/v4/pkg/renderer"
"go.opentelemetry.io/otel/trace"
Expand All @@ -30,18 +30,15 @@ var (
)

type App struct {
tracer trace.Tracer
rawStorageApp absto.Storage
storageApp absto.Storage
shareApp provider.ShareManager
webhookApp provider.WebhookManager
exifApp provider.ExifManager
searchApp search.App
pushEvent provider.EventProducer

amqpClient *amqp.Client
amqpExclusiveRoutingKey string

tracer trace.Tracer
rawStorageApp absto.Storage
storageApp absto.Storage
shareApp provider.ShareManager
webhookApp provider.WebhookManager
exifApp provider.ExifManager
searchApp search.App
pushEvent provider.EventProducer
exclusiveApp exclusive.App
temporaryFolder string
rendererApp renderer.App
thumbnailApp thumbnail.App
Expand All @@ -51,11 +48,10 @@ type App struct {
}

type Config struct {
amqpExclusiveRoutingKey *string
bcryptDuration *string
temporaryFolder *string
sanitizeOnStart *bool
chunkUpload *bool
bcryptDuration *string
temporaryFolder *string
sanitizeOnStart *bool
chunkUpload *bool
}

func Flags(fs *flag.FlagSet, prefix string) Config {
Expand All @@ -65,12 +61,10 @@ func Flags(fs *flag.FlagSet, prefix string) Config {

chunkUpload: flags.Bool(fs, prefix, "crud", "ChunkUpload", "Use chunk upload in browser", false, nil),
temporaryFolder: flags.String(fs, prefix, "crud", "TemporaryFolder", "Temporary folder for chunk upload", "/tmp", nil),

amqpExclusiveRoutingKey: flags.String(fs, prefix, "crud", "AmqpExclusiveRoutingKey", "AMQP Routing Key for exclusive lock on default exchange", "fibr.semaphore.start", nil),
}
}

func New(config Config, storageApp absto.Storage, filteredStorage absto.Storage, rendererApp renderer.App, shareApp provider.ShareManager, webhookApp provider.WebhookManager, thumbnailApp thumbnail.App, exifApp exif.App, searchApp search.App, eventProducer provider.EventProducer, amqpClient *amqp.Client, tracer trace.Tracer) (App, error) {
func New(config Config, storageApp absto.Storage, filteredStorage absto.Storage, rendererApp renderer.App, shareApp provider.ShareManager, webhookApp provider.WebhookManager, thumbnailApp thumbnail.App, exifApp exif.App, searchApp search.App, eventProducer provider.EventProducer, exclusiveApp exclusive.App, tracer trace.Tracer) (App, error) {
app := App{
sanitizeOnStart: *config.sanitizeOnStart,

Expand All @@ -89,14 +83,7 @@ func New(config Config, storageApp absto.Storage, filteredStorage absto.Storage,
webhookApp: webhookApp,
searchApp: searchApp,

amqpClient: amqpClient,
amqpExclusiveRoutingKey: strings.TrimSpace(*config.amqpExclusiveRoutingKey),
}

if amqpClient != nil {
if err := amqpClient.SetupExclusive(app.amqpExclusiveRoutingKey); err != nil {
return app, fmt.Errorf("setup amqp exclusive: %w", err)
}
exclusiveApp: exclusiveApp,
}

bcryptDuration, err := time.ParseDuration(strings.TrimSpace(*config.bcryptDuration))
Expand All @@ -117,16 +104,11 @@ func New(config Config, storageApp absto.Storage, filteredStorage absto.Storage,
}

func (a App) Start(ctx context.Context) {
if a.amqpClient == nil {
a.start(ctx)
return
}

if _, err := a.amqpClient.Exclusive(context.Background(), a.amqpExclusiveRoutingKey, time.Hour, func(ctx context.Context) error {
if err := a.exclusiveApp.Execute(ctx, "fibr:mutex:start", func(ctx context.Context) error {
a.start(ctx)
return nil
}); err != nil {
logger.Error("get exclusive semaphore: %s", err)
logger.Error("start: %s", err)
}
}

Expand Down
47 changes: 47 additions & 0 deletions pkg/exclusive/exclusive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package exclusive

import (
"context"
"time"

"github.com/ViBiOh/httputils/v4/pkg/redis"
)

const SemaphoreDuration = time.Second * 10

type App struct {
redisClient redis.App
}

func New(redisClient redis.App) App {
return App{
redisClient: redisClient,
}
}

func (a App) Execute(ctx context.Context, name string, action func(context.Context) error) error {
if !a.redisClient.Enabled() {
return action(ctx)
}

exclusive:
acquired, err := a.redisClient.Exclusive(ctx, name, SemaphoreDuration, action)
if err != nil {
return err
}

if !acquired {
time.Sleep(time.Second)
goto exclusive
}

return nil
}

func (a App) Try(ctx context.Context, name string, action func(context.Context) error) (bool, error) {
if !a.redisClient.Enabled() {
return true, action(ctx)
}

return a.redisClient.Exclusive(ctx, name, SemaphoreDuration, action)
}
42 changes: 17 additions & 25 deletions pkg/exif/exif.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

absto "github.com/ViBiOh/absto/pkg/model"
exas "github.com/ViBiOh/exas/pkg/model"
"github.com/ViBiOh/fibr/pkg/exclusive"
"github.com/ViBiOh/fibr/pkg/provider"
"github.com/ViBiOh/flags"
amqpclient "github.com/ViBiOh/httputils/v4/pkg/amqp"
Expand All @@ -34,12 +35,12 @@ type App struct {
exifCacheApp cache.App[absto.Item, provider.Metadata]
aggregateCacheApp cache.App[absto.Item, provider.Aggregate]

redisClient redis.App
exclusiveApp exclusive.App
redisClient redis.App

amqpClient *amqpclient.Client
amqpExchange string
amqpRoutingKey string
amqpExclusiveRoutingKey string
amqpClient *amqpclient.Client
amqpExchange string
amqpRoutingKey string

exifRequest request.Request

Expand All @@ -52,9 +53,8 @@ type Config struct {
exifUser *string
exifPass *string

amqpExchange *string
amqpRoutingKey *string
amqpExclusiveRoutingKey *string
amqpExchange *string
amqpRoutingKey *string

maxSize *int
directAccess *bool
Expand All @@ -69,28 +69,20 @@ func Flags(fs *flag.FlagSet, prefix string) Config {
directAccess: flags.Bool(fs, prefix, "exif", "DirectAccess", "Use Exas with direct access to filesystem (no large file upload, send a GET request, Basic Auth recommended)", false, nil),
maxSize: flags.Int(fs, prefix, "exif", "MaxSize", "Max file size (in bytes) for extracting exif (0 to no limit). Not used if DirectAccess enabled.", 1024*1024*200, nil),

amqpExchange: flags.String(fs, prefix, "exif", "AmqpExchange", "AMQP Exchange Name", "fibr", nil),
amqpRoutingKey: flags.String(fs, prefix, "exif", "AmqpRoutingKey", "AMQP Routing Key for exif", "exif_input", nil),
amqpExclusiveRoutingKey: flags.String(fs, prefix, "exif", "AmqpExclusiveRoutingKey", "AMQP Routing Key for exclusive lock on default exchange", "fibr.semaphore.exif", nil),
amqpExchange: flags.String(fs, prefix, "exif", "AmqpExchange", "AMQP Exchange Name", "fibr", nil),
amqpRoutingKey: flags.String(fs, prefix, "exif", "AmqpRoutingKey", "AMQP Routing Key for exif", "exif_input", nil),
}
}

func New(config Config, storageApp absto.Storage, prometheusRegisterer prometheus.Registerer, tracerApp tracer.App, amqpClient *amqpclient.Client, redisClient redis.App) (App, error) {
func New(config Config, storageApp absto.Storage, prometheusRegisterer prometheus.Registerer, tracerApp tracer.App, amqpClient *amqpclient.Client, redisClient redis.App, exclusiveApp exclusive.App) (App, error) {
var amqpExchange string
var amqpExclusiveRoutingKey string

if amqpClient != nil {
amqpExchange = strings.TrimSpace(*config.amqpExchange)

if err := amqpClient.Publisher(amqpExchange, "direct", nil); err != nil {
return App{}, fmt.Errorf("configure amqp: %w", err)
}

amqpExclusiveRoutingKey = strings.TrimSpace(*config.amqpExclusiveRoutingKey)

if err := amqpClient.SetupExclusive(amqpExclusiveRoutingKey); err != nil {
return App{}, fmt.Errorf("setup amqp exclusive: %w", err)
}
}

app := App{
Expand All @@ -100,13 +92,13 @@ func New(config Config, storageApp absto.Storage, prometheusRegisterer prometheu

redisClient: redisClient,

amqpClient: amqpClient,
amqpExchange: amqpExchange,
amqpRoutingKey: strings.TrimSpace(*config.amqpRoutingKey),
amqpExclusiveRoutingKey: amqpExclusiveRoutingKey,
amqpClient: amqpClient,
amqpExchange: amqpExchange,
amqpRoutingKey: strings.TrimSpace(*config.amqpRoutingKey),

tracer: tracerApp.GetTracer("exif"),
storageApp: storageApp,
tracer: tracerApp.GetTracer("exif"),
exclusiveApp: exclusiveApp,
storageApp: storageApp,
listStorageApp: storageApp.WithIgnoreFn(func(item absto.Item) bool {
return !strings.HasSuffix(item.Name, ".json")
}),
Expand Down
2 changes: 1 addition & 1 deletion pkg/exif/exif_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestFlags(t *testing.T) {
want string
}{
"simple": {
"Usage of simple:\n -amqpExchange string\n \t[exif] AMQP Exchange Name {SIMPLE_AMQP_EXCHANGE} (default \"fibr\")\n -amqpExclusiveRoutingKey string\n \t[exif] AMQP Routing Key for exclusive lock on default exchange {SIMPLE_AMQP_EXCLUSIVE_ROUTING_KEY} (default \"fibr.semaphore.exif\")\n -amqpRoutingKey string\n \t[exif] AMQP Routing Key for exif {SIMPLE_AMQP_ROUTING_KEY} (default \"exif_input\")\n -directAccess\n \t[exif] Use Exas with direct access to filesystem (no large file upload, send a GET request, Basic Auth recommended) {SIMPLE_DIRECT_ACCESS}\n -maxSize int\n \t[exif] Max file size (in bytes) for extracting exif (0 to no limit). Not used if DirectAccess enabled. {SIMPLE_MAX_SIZE} (default 209715200)\n -password string\n \t[exif] Exif Tool URL Basic Password {SIMPLE_PASSWORD}\n -uRL string\n \t[exif] Exif Tool URL (exas) {SIMPLE_URL} (default \"http://exas:1080\")\n -user string\n \t[exif] Exif Tool URL Basic User {SIMPLE_USER}\n",
"Usage of simple:\n -amqpExchange string\n \t[exif] AMQP Exchange Name {SIMPLE_AMQP_EXCHANGE} (default \"fibr\")\n -amqpRoutingKey string\n \t[exif] AMQP Routing Key for exif {SIMPLE_AMQP_ROUTING_KEY} (default \"exif_input\")\n -directAccess\n \t[exif] Use Exas with direct access to filesystem (no large file upload, send a GET request, Basic Auth recommended) {SIMPLE_DIRECT_ACCESS}\n -maxSize int\n \t[exif] Max file size (in bytes) for extracting exif (0 to no limit). Not used if DirectAccess enabled. {SIMPLE_MAX_SIZE} (default 209715200)\n -password string\n \t[exif] Exif Tool URL Basic Password {SIMPLE_PASSWORD}\n -uRL string\n \t[exif] Exif Tool URL (exas) {SIMPLE_URL} (default \"http://exas:1080\")\n -user string\n \t[exif] Exif Tool URL Basic User {SIMPLE_USER}\n",
},
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/exif/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func WithDescription(description string) MetadataOption {
func (a App) update(ctx context.Context, item absto.Item, opts ...MetadataOption) (provider.Metadata, error) {
var metadata provider.Metadata

return metadata, provider.Exclusive(ctx, a.amqpClient, a.amqpExclusiveRoutingKey, func(ctx context.Context) error {
return metadata, a.exclusiveApp.Execute(ctx, "fibr:mutex:"+item.ID, func(ctx context.Context) error {
var err error

metadata, err := a.GetMetadataFor(ctx, item)
Expand Down
Loading

0 comments on commit e2f9777

Please sign in to comment.