Skip to content

Commit

Permalink
refactor(amqp): Using exclusive pattern around concurrent file access
Browse files Browse the repository at this point in the history
Signed-off-by: Vincent Boutour <[email protected]>
  • Loading branch information
ViBiOh committed Dec 30, 2022
1 parent 2741fa4 commit 043695a
Show file tree
Hide file tree
Showing 25 changed files with 194 additions and 165 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ Usage of fibr:
[owasp] Content-Security-Policy {FIBR_CSP} (default "default-src 'self'; base-uri 'self'; script-src 'self' 'httputils-nonce' unpkg.com/[email protected]/dist-cjs/ unpkg.com/[email protected]/dist/ unpkg.com/[email protected]/; style-src 'self' 'httputils-nonce' unpkg.com/[email protected]/dist/ unpkg.com/[email protected]/; img-src 'self' data: a.tile.openstreetmap.org b.tile.openstreetmap.org c.tile.openstreetmap.org")
-exifAmqpExchange string
[exif] AMQP Exchange Name {FIBR_EXIF_AMQP_EXCHANGE} (default "fibr")
-exifAmqpExclusiveRoutingKey string
[exif] AMQP Routing Key for exclusive lock on default exchange {FIBR_EXIF_AMQP_EXCLUSIVE_ROUTING_KEY} (default "fibr.semaphore.exif")
-exifAmqpRoutingKey string
[exif] AMQP Routing Key for exif {FIBR_EXIF_AMQP_ROUTING_KEY} (default "exif_input")
-exifDirectAccess
Expand Down Expand Up @@ -355,7 +357,7 @@ Usage of fibr:
-publicURL string
Public URL {FIBR_PUBLIC_URL} (default "http://localhost:1080")
-readTimeout duration
[server] Read Timeout {FIBR_READ_TIMEOUT} (default 2m0s)
[server] Read Timeout {FIBR_READ_TIMEOUT} (default 1m0s)
-redisAddress string
[redis] Redis Address fqdn:port (blank to disable) {FIBR_REDIS_ADDRESS}
-redisAlias string
Expand Down Expand Up @@ -437,7 +439,7 @@ Usage of fibr:
-webhookSecret string
[webhook] Secret for HMAC Signature {FIBR_WEBHOOK_SECRET}
-writeTimeout duration
[server] Write Timeout {FIBR_WRITE_TIMEOUT} (default 2m0s)
[server] Write Timeout {FIBR_WRITE_TIMEOUT} (default 1m0s)
```
# Caveats
Expand Down
2 changes: 1 addition & 1 deletion cmd/fibr/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func newConfig() (configuration, error) {
fs := flag.NewFlagSet("fibr", flag.ExitOnError)

return configuration{
appServer: server.Flags(fs, "", flags.NewOverride("ReadTimeout", 2*time.Minute), flags.NewOverride("WriteTimeout", 2*time.Minute)),
appServer: server.Flags(fs, "", flags.NewOverride("ReadTimeout", time.Minute), flags.NewOverride("WriteTimeout", time.Minute)),
promServer: server.Flags(fs, "prometheus", flags.NewOverride("Port", uint(9090)), flags.NewOverride("IdleTimeout", 10*time.Second), flags.NewOverride("ShutdownTimeout", 5*time.Second)),
health: health.Flags(fs, ""),

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/ViBiOh/auth/v2 v2.14.20
github.com/ViBiOh/exas v0.6.0
github.com/ViBiOh/flags v1.2.0
github.com/ViBiOh/httputils/v4 v4.51.1
github.com/ViBiOh/httputils/v4 v4.51.2
github.com/ViBiOh/vith v0.5.10
github.com/golang/mock v1.6.0
github.com/prometheus/client_golang v1.14.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ github.com/ViBiOh/exas v0.6.0 h1:UWQuzesHShBSMkQw7mWAj4NBwL1Hb/fDcgzk/UpacYQ=
github.com/ViBiOh/exas v0.6.0/go.mod h1:w6aibKNptsqVwIIIeuifr/1JM551/idiiKhHgDzf7sk=
github.com/ViBiOh/flags v1.2.0 h1:DaujjNXzD29KxKyp4eZdn7c9+uBN5DokWgDAe7DcUmc=
github.com/ViBiOh/flags v1.2.0/go.mod h1:UyMB5zeD/aId7Xw3x7577ZNU298JmukzOcV8p/H2W1s=
github.com/ViBiOh/httputils/v4 v4.51.1 h1:kTklOFl5zgzweh6CNxtwRJhYC4/GNZ8oYHxQkhHGmxQ=
github.com/ViBiOh/httputils/v4 v4.51.1/go.mod h1:wFfDWFpelX/bwli5s1VLGOOFiT1sgIg9/D/ACyy7Lg8=
github.com/ViBiOh/httputils/v4 v4.51.2 h1:nLXltNr1nre8SNzUfkCjtGRGjMkTFwVUpWHB6LtRmrU=
github.com/ViBiOh/httputils/v4 v4.51.2/go.mod h1:wFfDWFpelX/bwli5s1VLGOOFiT1sgIg9/D/ACyy7Lg8=
github.com/ViBiOh/vith v0.5.10 h1:BriyDGDFmBRpJpbQhsFxIlq2bT9HmuJ/q903Y+0w0S0=
github.com/ViBiOh/vith v0.5.10/go.mod h1:JcGIWVjvXA0zYHz/picn+xMIdk7ljbYjDp7d16BufzM=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down
2 changes: 1 addition & 1 deletion pkg/crud/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (a App) CreateSavedSearch(w http.ResponseWriter, r *http.Request, request p
return
}

if err = a.searchApp.Update(ctx, item, provider.Search{
if err = a.searchApp.Add(ctx, item, provider.Search{
ID: sha.New(name),
Name: name,
Query: r.URL.RawQuery,
Expand Down
3 changes: 2 additions & 1 deletion pkg/crud/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

absto "github.com/ViBiOh/absto/pkg/model"
"github.com/ViBiOh/fibr/pkg/provider"
"github.com/ViBiOh/fibr/pkg/search"
"github.com/ViBiOh/fibr/pkg/thumbnail"
"github.com/ViBiOh/httputils/v4/pkg/concurrent"
"github.com/ViBiOh/httputils/v4/pkg/logger"
Expand Down Expand Up @@ -64,7 +65,7 @@ func (a App) list(ctx context.Context, request provider.Request, message rendere
}
}()

var savedSearches map[string]provider.Search
var savedSearches search.Searches
savedSearchDone := make(chan struct{})
go func() {
defer close(savedSearchDone)
Expand Down
13 changes: 3 additions & 10 deletions pkg/crud/post.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"strconv"
"strings"

absto "github.com/ViBiOh/absto/pkg/model"
"github.com/ViBiOh/fibr/pkg/provider"
"github.com/ViBiOh/httputils/v4/pkg/model"
"github.com/ViBiOh/httputils/v4/pkg/renderer"
Expand Down Expand Up @@ -169,20 +168,14 @@ func (a App) handlePostDescription(w http.ResponseWriter, r *http.Request, reque
return
}

exif, err := a.exifApp.GetMetadataFor(ctx, item)
if err != nil && !absto.IsNotExist(err) {
a.error(w, r, request, err)
return
}

exif.Description = r.FormValue("description")
description := r.FormValue("description")

if err = a.exifApp.SaveExifFor(ctx, item, exif); err != nil {
if err = a.exifApp.UpdateDescription(ctx, item, description); err != nil {
a.error(w, r, request, err)
return
}

go a.notify(tracer.CopyToBackground(ctx), provider.NewDescriptionEvent(item, a.bestSharePath(item.Pathname), exif.Description, a.rendererApp))
go a.notify(tracer.CopyToBackground(ctx), provider.NewDescriptionEvent(item, a.bestSharePath(item.Pathname), description, a.rendererApp))

a.rendererApp.Redirect(w, r, fmt.Sprintf("?d=%s#%s", request.Display, item.ID), renderer.NewSuccessMessage("Description successfully edited"))
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/exif/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ func (a App) ListAggregateFor(ctx context.Context, items ...absto.Item) (map[str
return output, nil
}

func (a App) SaveExifFor(ctx context.Context, item absto.Item, exif provider.Metadata) error {
return a.exifCacheApp.EvictOnSuccess(ctx, item, a.saveMetadata(ctx, item, exif))
}

func (a App) SaveAggregateFor(ctx context.Context, item absto.Item, aggregate provider.Aggregate) error {
return a.aggregateCacheApp.EvictOnSuccess(ctx, item, a.saveMetadata(ctx, item, aggregate))
}
Expand Down
15 changes: 2 additions & 13 deletions pkg/exif/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"

absto "github.com/ViBiOh/absto/pkg/model"
"github.com/ViBiOh/fibr/pkg/provider"
"github.com/ViBiOh/httputils/v4/pkg/logger"
"github.com/ViBiOh/httputils/v4/pkg/tracer"
Expand All @@ -27,16 +26,6 @@ func (a App) AMQPHandler(ctx context.Context, message amqp.Delivery) error {
return nil
}

exif, err := a.GetMetadataFor(ctx, resp.Item)
if err != nil && !absto.IsNotExist(err) {
logger.WithField("item", resp.Item.Pathname).Error("load exif: %s", err)
}

exif.Exif = resp.Exif

if err := a.SaveExifFor(ctx, resp.Item, exif); err != nil {
return fmt.Errorf("save: %w", err)
}

return a.processExif(ctx, resp.Item, exif, true)
_, err := a.update(ctx, resp.Item, WithExif(resp.Exif))
return err
}
8 changes: 4 additions & 4 deletions pkg/exif/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,19 @@ func (a App) handleUploadEvent(ctx context.Context, item absto.Item, aggregate b
return a.publishExifRequest(ctx, item)
}

exif, err := a.extractAndSaveExif(ctx, item)
metadata, err := a.extractAndSaveExif(ctx, item)
if err != nil {
return fmt.Errorf("extract and save exif: %w", err)
}

if exif.IsZero() {
if metadata.IsZero() {
return nil
}

return a.processExif(ctx, item, exif, aggregate)
return a.processMetadata(ctx, item, metadata, aggregate)
}

func (a App) processExif(ctx context.Context, item absto.Item, exif provider.Metadata, aggregate bool) error {
func (a App) processMetadata(ctx context.Context, item absto.Item, exif provider.Metadata, aggregate bool) error {
if err := a.updateDate(ctx, item, exif); err != nil {
return fmt.Errorf("update date: %w", err)
}
Expand Down
51 changes: 23 additions & 28 deletions pkg/exif/exif.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
amqpclient "github.com/ViBiOh/httputils/v4/pkg/amqp"
"github.com/ViBiOh/httputils/v4/pkg/cache"
"github.com/ViBiOh/httputils/v4/pkg/httpjson"
"github.com/ViBiOh/httputils/v4/pkg/logger"
prom "github.com/ViBiOh/httputils/v4/pkg/prometheus"
"github.com/ViBiOh/httputils/v4/pkg/redis"
"github.com/ViBiOh/httputils/v4/pkg/request"
Expand All @@ -37,9 +36,10 @@ type App struct {

redisClient redis.App

amqpClient *amqpclient.Client
amqpExchange string
amqpRoutingKey string
amqpClient *amqpclient.Client
amqpExchange string
amqpRoutingKey string
amqpExclusiveRoutingKey string

exifRequest request.Request

Expand All @@ -52,8 +52,9 @@ type Config struct {
exifUser *string
exifPass *string

amqpExchange *string
amqpRoutingKey *string
amqpExchange *string
amqpRoutingKey *string
amqpExclusiveRoutingKey *string

maxSize *int
directAccess *bool
Expand All @@ -68,19 +69,28 @@ func Flags(fs *flag.FlagSet, prefix string) Config {
directAccess: flags.Bool(fs, prefix, "exif", "DirectAccess", "Use Exas with direct access to filesystem (no large file upload, send a GET request, Basic Auth recommended)", false, nil),
maxSize: flags.Int(fs, prefix, "exif", "MaxSize", "Max file size (in bytes) for extracting exif (0 to no limit). Not used if DirectAccess enabled.", 1024*1024*200, nil),

amqpExchange: flags.String(fs, prefix, "exif", "AmqpExchange", "AMQP Exchange Name", "fibr", nil),
amqpRoutingKey: flags.String(fs, prefix, "exif", "AmqpRoutingKey", "AMQP Routing Key for exif", "exif_input", nil),
amqpExchange: flags.String(fs, prefix, "exif", "AmqpExchange", "AMQP Exchange Name", "fibr", nil),
amqpRoutingKey: flags.String(fs, prefix, "exif", "AmqpRoutingKey", "AMQP Routing Key for exif", "exif_input", nil),
amqpExclusiveRoutingKey: flags.String(fs, prefix, "exif", "AmqpExclusiveRoutingKey", "AMQP Routing Key for exclusive lock on default exchange", "fibr.semaphore.exif", nil),
}
}

func New(config Config, storageApp absto.Storage, prometheusRegisterer prometheus.Registerer, tracerApp tracer.App, amqpClient *amqpclient.Client, redisClient redis.App) (App, error) {
var amqpExchange string
var amqpExclusiveRoutingKey string

if amqpClient != nil {
amqpExchange = strings.TrimSpace(*config.amqpExchange)

if err := amqpClient.Publisher(amqpExchange, "direct", nil); err != nil {
return App{}, fmt.Errorf("configure amqp: %w", err)
}

amqpExclusiveRoutingKey = strings.TrimSpace(*config.amqpExclusiveRoutingKey)

if err := amqpClient.SetupExclusive(amqpExclusiveRoutingKey); err != nil {
return App{}, fmt.Errorf("setup amqp exclusive: %w", err)
}
}

app := App{
Expand All @@ -90,9 +100,10 @@ func New(config Config, storageApp absto.Storage, prometheusRegisterer prometheu

redisClient: redisClient,

amqpClient: amqpClient,
amqpExchange: amqpExchange,
amqpRoutingKey: strings.TrimSpace(*config.amqpRoutingKey),
amqpClient: amqpClient,
amqpExchange: amqpExchange,
amqpRoutingKey: strings.TrimSpace(*config.amqpRoutingKey),
amqpExclusiveRoutingKey: amqpExclusiveRoutingKey,

tracer: tracerApp.GetTracer("exif"),
storageApp: storageApp,
Expand Down Expand Up @@ -145,23 +156,7 @@ func (a App) extractAndSaveExif(ctx context.Context, item absto.Item) (provider.
return provider.Metadata{}, fmt.Errorf("extract exif: %w", err)
}

metadata, err := a.GetMetadataFor(ctx, item)
if err != nil && !absto.IsNotExist(err) {
logger.WithField("item", item.Pathname).Error("load exif: %s", err)
}

metadata.Exif = exif

if exif.IsZero() {
logger.WithField("item", item.Pathname).Debug("no exif")
return metadata, nil
}

if err = a.SaveExifFor(ctx, item, metadata); err != nil {
return metadata, fmt.Errorf("save exif: %w", err)
}

return metadata, nil
return a.update(ctx, item, WithExif(exif))
}

func (a App) extractExif(ctx context.Context, item absto.Item) (exif exas.Exif, err error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/exif/exif_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestFlags(t *testing.T) {
want string
}{
"simple": {
"Usage of simple:\n -amqpExchange string\n \t[exif] AMQP Exchange Name {SIMPLE_AMQP_EXCHANGE} (default \"fibr\")\n -amqpRoutingKey string\n \t[exif] AMQP Routing Key for exif {SIMPLE_AMQP_ROUTING_KEY} (default \"exif_input\")\n -directAccess\n \t[exif] Use Exas with direct access to filesystem (no large file upload, send a GET request, Basic Auth recommended) {SIMPLE_DIRECT_ACCESS}\n -maxSize int\n \t[exif] Max file size (in bytes) for extracting exif (0 to no limit). Not used if DirectAccess enabled. {SIMPLE_MAX_SIZE} (default 209715200)\n -password string\n \t[exif] Exif Tool URL Basic Password {SIMPLE_PASSWORD}\n -uRL string\n \t[exif] Exif Tool URL (exas) {SIMPLE_URL} (default \"http://exas:1080\")\n -user string\n \t[exif] Exif Tool URL Basic User {SIMPLE_USER}\n",
"Usage of simple:\n -amqpExchange string\n \t[exif] AMQP Exchange Name {SIMPLE_AMQP_EXCHANGE} (default \"fibr\")\n -amqpExclusiveRoutingKey string\n \t[exif] AMQP Routing Key for exclusive lock on default exchange {SIMPLE_AMQP_EXCLUSIVE_ROUTING_KEY} (default \"fibr.semaphore.exif\")\n -amqpRoutingKey string\n \t[exif] AMQP Routing Key for exif {SIMPLE_AMQP_ROUTING_KEY} (default \"exif_input\")\n -directAccess\n \t[exif] Use Exas with direct access to filesystem (no large file upload, send a GET request, Basic Auth recommended) {SIMPLE_DIRECT_ACCESS}\n -maxSize int\n \t[exif] Max file size (in bytes) for extracting exif (0 to no limit). Not used if DirectAccess enabled. {SIMPLE_MAX_SIZE} (default 209715200)\n -password string\n \t[exif] Exif Tool URL Basic Password {SIMPLE_PASSWORD}\n -uRL string\n \t[exif] Exif Tool URL (exas) {SIMPLE_URL} (default \"http://exas:1080\")\n -user string\n \t[exif] Exif Tool URL Basic User {SIMPLE_USER}\n",
},
}

Expand Down
57 changes: 57 additions & 0 deletions pkg/exif/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package exif

import (
"context"
"fmt"

absto "github.com/ViBiOh/absto/pkg/model"
exas "github.com/ViBiOh/exas/pkg/model"
"github.com/ViBiOh/fibr/pkg/provider"
"github.com/ViBiOh/httputils/v4/pkg/logger"
)

type MetadataOption func(provider.Metadata) provider.Metadata

func WithExif(exif exas.Exif) MetadataOption {
return func(instance provider.Metadata) provider.Metadata {
instance.Exif = exif

return instance
}
}

func WithDescription(description string) MetadataOption {
return func(instance provider.Metadata) provider.Metadata {
instance.Description = description

return instance
}
}

func (a App) update(ctx context.Context, item absto.Item, opts ...MetadataOption) (provider.Metadata, error) {
var metadata provider.Metadata

return metadata, provider.Exclusive(ctx, a.amqpClient, a.amqpExclusiveRoutingKey, func(ctx context.Context) error {
var err error

metadata, err := a.GetMetadataFor(ctx, item)
if err != nil && !absto.IsNotExist(err) {
logger.WithField("item", item.Pathname).Error("load exif: %s", err)
}

for _, opt := range opts {
metadata = opt(metadata)
}

if err = a.exifCacheApp.EvictOnSuccess(ctx, item, a.saveMetadata(ctx, item, metadata)); err != nil {
return fmt.Errorf("save exif: %w", err)
}

return nil
})
}

func (a App) UpdateDescription(ctx context.Context, item absto.Item, description string) error {
_, err := a.update(ctx, item, WithDescription(description))
return err
}
12 changes: 6 additions & 6 deletions pkg/mocks/interfaces.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 043695a

Please sign in to comment.