Skip to content

Commit

Permalink
refactor: Using redis for pubsub
Browse files Browse the repository at this point in the history
Signed-off-by: Vincent Boutour <[email protected]>
  • Loading branch information
ViBiOh committed Jan 21, 2023
1 parent fb799d9 commit b84b1f6
Show file tree
Hide file tree
Showing 16 changed files with 364 additions and 245 deletions.
18 changes: 2 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,18 +234,6 @@ Usage of fibr:
[amqpExif] RoutingKey name {FIBR_AMQP_EXIF_ROUTING_KEY} (default "exif_output")
-amqpPrefetch int
[amqp] Prefetch count for QoS {FIBR_AMQP_PREFETCH} (default 1)
-amqpShareExchange string
[amqpShare] Exchange name {FIBR_AMQP_SHARE_EXCHANGE} (default "fibr.shares")
-amqpShareExclusive
[amqpShare] Queue exclusive mode (for fanout exchange) {FIBR_AMQP_SHARE_EXCLUSIVE} (default true)
-amqpShareMaxRetry uint
[amqpShare] Max send retries {FIBR_AMQP_SHARE_MAX_RETRY} (default 3)
-amqpShareQueue string
[amqpShare] Queue name {FIBR_AMQP_SHARE_QUEUE} (default "fibr.share-<random>")
-amqpShareRetryInterval duration
[amqpShare] Interval duration when send fails {FIBR_AMQP_SHARE_RETRY_INTERVAL}
-amqpShareRoutingKey string
[amqpShare] RoutingKey name {FIBR_AMQP_SHARE_ROUTING_KEY} (default "share")
-amqpThumbnailExchange string
[amqpThumbnail] Exchange name {FIBR_AMQP_THUMBNAIL_EXCHANGE} (default "fibr")
-amqpThumbnailExclusive
Expand Down Expand Up @@ -366,10 +354,8 @@ Usage of fibr:
[redis] Redis Username, if any {FIBR_REDIS_USERNAME}
-sanitizeOnStart
[crud] Sanitize name on start {FIBR_SANITIZE_ON_START}
-shareAmqpExchange string
[share] AMQP Exchange Name {FIBR_SHARE_AMQP_EXCHANGE} (default "fibr.shares")
-shareAmqpRoutingKey string
[share] AMQP Routing Key for share {FIBR_SHARE_AMQP_ROUTING_KEY} (default "share")
-sharePubSubChannel string
[share] Channel name {FIBR_SHARE_PUB_SUB_CHANNEL} (default "fibr:shares-channel")
-shutdownTimeout duration
[server] Shutdown Timeout {FIBR_SHUTDOWN_TIMEOUT} (default 10s)
-storageFileSystemDirectory /data
Expand Down
2 changes: 0 additions & 2 deletions cmd/fibr/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type configuration struct {
amqp amqp.Config
amqpThumbnail amqphandler.Config
amqpExif amqphandler.Config
amqpShare amqphandler.Config
amqpWebhook amqphandler.Config
redis redis.Config
disableAuth *bool
Expand Down Expand Up @@ -79,7 +78,6 @@ func newConfig() (configuration, error) {
amqp: amqp.Flags(fs, "amqp"),
amqpThumbnail: amqphandler.Flags(fs, "amqpThumbnail", flags.NewOverride("Exchange", "fibr"), flags.NewOverride("Queue", "fibr.thumbnail"), flags.NewOverride("RoutingKey", "thumbnail_output")),
amqpExif: amqphandler.Flags(fs, "amqpExif", flags.NewOverride("Exchange", "fibr"), flags.NewOverride("Queue", "fibr.exif"), flags.NewOverride("RoutingKey", "exif_output")),
amqpShare: amqphandler.Flags(fs, "amqpShare", flags.NewOverride("Exchange", "fibr.shares"), flags.NewOverride("Queue", "fibr.share-"+generateIdentityName()), flags.NewOverride("RoutingKey", "share"), flags.NewOverride("Exclusive", true), flags.NewOverride("RetryInterval", time.Duration(0))),
amqpWebhook: amqphandler.Flags(fs, "amqpWebhook", flags.NewOverride("Exchange", "fibr.webhooks"), flags.NewOverride("Queue", "fibr.webhook-"+generateIdentityName()), flags.NewOverride("RoutingKey", "webhook"), flags.NewOverride("Exclusive", true), flags.NewOverride("RetryInterval", time.Duration(0))),
redis: redis.Flags(fs, "redis", flags.NewOverride("Address", "")),
disableAuth: flags.Bool(fs, "", "auth", "NoAuth", "Disable basic authentification", false, nil),
Expand Down
8 changes: 2 additions & 6 deletions cmd/fibr/fibr.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func main() {
webhookApp, err := webhook.New(config.webhook, storageApp, prometheusRegisterer, client.amqp, rendererApp, thumbnailApp, exclusiveApp)
logger.Fatal(err)

shareApp, err := share.New(config.share, storageApp, client.amqp, exclusiveApp)
shareApp, err := share.New(config.share, storageApp, client.redis, exclusiveApp)
logger.Fatal(err)

amqpThumbnailApp, err := amqphandler.New(config.amqpThumbnail, client.amqp, client.tracer.GetTracer("amqp_handler_thumbnail"), thumbnailApp.AMQPHandler)
Expand All @@ -116,9 +116,6 @@ func main() {
amqpExifApp, err := amqphandler.New(config.amqpExif, client.amqp, client.tracer.GetTracer("amqp_handler_exif"), metadataApp.AMQPHandler)
logger.Fatal(err)

amqpShareApp, err := amqphandler.New(config.amqpShare, client.amqp, client.tracer.GetTracer("amqp_handler_share"), shareApp.AMQPHandler)
logger.Fatal(err)

amqpWebhookApp, err := amqphandler.New(config.amqpWebhook, client.amqp, client.tracer.GetTracer("amqp_handler_webhook"), webhookApp.AMQPHandler)
logger.Fatal(err)

Expand All @@ -140,7 +137,6 @@ func main() {

go amqpThumbnailApp.Start(doneCtx)
go amqpExifApp.Start(doneCtx)
go amqpShareApp.Start(doneCtx)
go amqpWebhookApp.Start(doneCtx)
go webhookApp.Start(endCtx)
go shareApp.Start(endCtx)
Expand All @@ -151,5 +147,5 @@ func main() {
go appServer.Start(endCtx, "http", httputils.Handler(handler, client.health, recoverer.Middleware, client.prometheus.Middleware, client.tracer.Middleware, owasp.New(config.owasp).Middleware))

client.health.WaitForTermination(appServer.Done())
server.GracefulWait(appServer.Done(), promServer.Done(), amqpExifApp.Done(), amqpShareApp.Done(), amqpWebhookApp.Done(), eventBus.Done())
server.GracefulWait(appServer.Done(), promServer.Done(), amqpExifApp.Done(), amqpWebhookApp.Done(), eventBus.Done(), shareApp.Done())
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/ViBiOh/auth/v2 v2.14.20
github.com/ViBiOh/exas v0.6.0
github.com/ViBiOh/flags v1.2.0
github.com/ViBiOh/httputils/v4 v4.52.6
github.com/ViBiOh/httputils/v4 v4.52.9
github.com/ViBiOh/vith v0.5.10
github.com/golang/mock v1.6.0
github.com/prometheus/client_golang v1.14.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ github.com/ViBiOh/exas v0.6.0 h1:UWQuzesHShBSMkQw7mWAj4NBwL1Hb/fDcgzk/UpacYQ=
github.com/ViBiOh/exas v0.6.0/go.mod h1:w6aibKNptsqVwIIIeuifr/1JM551/idiiKhHgDzf7sk=
github.com/ViBiOh/flags v1.2.0 h1:DaujjNXzD29KxKyp4eZdn7c9+uBN5DokWgDAe7DcUmc=
github.com/ViBiOh/flags v1.2.0/go.mod h1:UyMB5zeD/aId7Xw3x7577ZNU298JmukzOcV8p/H2W1s=
github.com/ViBiOh/httputils/v4 v4.52.6 h1:/d848oQoxwBpwO7VpvxsmTDaHNuDOZaBGoTeqRhRdEs=
github.com/ViBiOh/httputils/v4 v4.52.6/go.mod h1:YA+JJC5yrILqjk+OI9L0emleE2rXD3Q/z6gcS22IZlE=
github.com/ViBiOh/httputils/v4 v4.52.9 h1:gagMLSxpngff7uCambsa49GgHgdeIiwaQvNS7ssaO8I=
github.com/ViBiOh/httputils/v4 v4.52.9/go.mod h1:YA+JJC5yrILqjk+OI9L0emleE2rXD3Q/z6gcS22IZlE=
github.com/ViBiOh/vith v0.5.10 h1:BriyDGDFmBRpJpbQhsFxIlq2bT9HmuJ/q903Y+0w0S0=
github.com/ViBiOh/vith v0.5.10/go.mod h1:JcGIWVjvXA0zYHz/picn+xMIdk7ljbYjDp7d16BufzM=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down
8 changes: 6 additions & 2 deletions pkg/exclusive/exclusive.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ func New(redisClient redis.Client) App {
}
}

func (a App) Enabled() bool {
return a.redisClient != nil && a.redisClient.Enabled()
}

func (a App) Execute(ctx context.Context, name string, duration time.Duration, action func(context.Context) error) error {
if !a.redisClient.Enabled() {
if !a.Enabled() {
return action(ctx)
}

Expand All @@ -39,7 +43,7 @@ exclusive:
}

func (a App) Try(ctx context.Context, name string, duration time.Duration, action func(context.Context) error) (bool, error) {
if !a.redisClient.Enabled() {
if !a.Enabled() {
return true, action(ctx)
}

Expand Down
132 changes: 0 additions & 132 deletions pkg/mocks/cache.go

This file was deleted.

Loading

0 comments on commit b84b1f6

Please sign in to comment.