Skip to content

Commit

Permalink
feat: Adding amqp listener for thumbnail generation
Browse files Browse the repository at this point in the history
Signed-off-by: Vincent Boutour <[email protected]>
  • Loading branch information
ViBiOh committed Aug 28, 2022
1 parent bdc785b commit 878fcb8
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 0 deletions.
5 changes: 5 additions & 0 deletions cmd/fibr/fibr.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func main() {
exifConfig := exif.Flags(fs, "exif")

amqpConfig := amqp.Flags(fs, "amqp")
amqpThumbnailConfig := amqphandler.Flags(fs, "amqpThumbnail", flags.NewOverride("Exchange", "fibr"), flags.NewOverride("Queue", "fibr.thumbnail"), flags.NewOverride("RoutingKey", "thumbnail_output"))
amqpExifConfig := amqphandler.Flags(fs, "amqpExif", flags.NewOverride("Exchange", "fibr"), flags.NewOverride("Queue", "fibr.exif"), flags.NewOverride("RoutingKey", "exif_output"))
amqpShareConfig := amqphandler.Flags(fs, "amqpShare", flags.NewOverride("Exchange", "fibr.shares"), flags.NewOverride("Queue", "fibr.share-"+generateIdentityName()), flags.NewOverride("RoutingKey", "share"), flags.NewOverride("Exclusive", true), flags.NewOverride("RetryInterval", time.Duration(0)))
amqpWebhookConfig := amqphandler.Flags(fs, "amqpWebhook", flags.NewOverride("Exchange", "fibr.webhooks"), flags.NewOverride("Queue", "fibr.webhook-"+generateIdentityName()), flags.NewOverride("RoutingKey", "webhook"), flags.NewOverride("Exclusive", true), flags.NewOverride("RetryInterval", time.Duration(0)))
Expand Down Expand Up @@ -148,6 +149,9 @@ func main() {
shareApp, err := share.New(shareConfig, storageProvider, amqpClient)
logger.Fatal(err)

amqpThumbnailApp, err := amqphandler.New(amqpThumbnailConfig, amqpClient, tracerApp.GetTracer("amqp_handler_thumbnail"), thumbnailApp.AMQPHandler)
logger.Fatal(err)

amqpExifApp, err := amqphandler.New(amqpExifConfig, amqpClient, tracerApp.GetTracer("amqp_handler_exif"), exifApp.AMQPHandler)
logger.Fatal(err)

Expand All @@ -170,6 +174,7 @@ func main() {

ctx := context.Background()

go amqpThumbnailApp.Start(ctx, healthApp.Done())
go amqpExifApp.Start(ctx, healthApp.Done())
go amqpShareApp.Start(ctx, healthApp.Done())
go amqpWebhookApp.Start(ctx, healthApp.Done())
Expand Down
4 changes: 4 additions & 0 deletions pkg/exif/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ import (
absto "github.com/ViBiOh/absto/pkg/model"
"github.com/ViBiOh/fibr/pkg/provider"
"github.com/ViBiOh/httputils/v4/pkg/logger"
"github.com/ViBiOh/httputils/v4/pkg/tracer"
"github.com/streadway/amqp"
)

func (a App) AMQPHandler(ctx context.Context, message amqp.Delivery) error {
ctx, end := tracer.StartSpan(ctx, a.tracer, "amqp")
defer end()

var resp provider.ExifResponse

if err := json.Unmarshal(message.Body, &resp); err != nil {
Expand Down
29 changes: 29 additions & 0 deletions pkg/thumbnail/amqp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package thumbnail

import (
"context"
"encoding/json"
"fmt"

absto "github.com/ViBiOh/absto/pkg/model"
"github.com/ViBiOh/httputils/v4/pkg/tracer"
vith "github.com/ViBiOh/vith/pkg/model"
"github.com/streadway/amqp"
)

func (a App) AMQPHandler(ctx context.Context, message amqp.Delivery) error {
ctx, end := tracer.StartSpan(ctx, a.tracer, "amqp")
defer end()

var req vith.Request
if err := json.Unmarshal(message.Body, &req); err != nil {
return fmt.Errorf("decode: %w", err)
}

key := redisKey(a.PathForScale(absto.Item{
ID: absto.ID(req.Input),
Pathname: req.Input,
}, req.Scale))

return a.redisClient.Delete(ctx, key)
}

0 comments on commit 878fcb8

Please sign in to comment.