diff --git a/cmd/fibr/fibr.go b/cmd/fibr/fibr.go index 9b8c921c..fc807cbe 100644 --- a/cmd/fibr/fibr.go +++ b/cmd/fibr/fibr.go @@ -61,7 +61,9 @@ func main() { os.Exit(1) } - services, err := newServices(config, clients, adapters) + endCtx := clients.health.End(ctx) + + services, err := newServices(endCtx, config, clients, adapters) if err != nil { slog.Error("services", "err", err) os.Exit(1) @@ -69,8 +71,6 @@ func main() { ports := newPorts(config, clients, services) - endCtx := clients.health.End(ctx) - stopOnDone := Starters{services.amqpThumbnail, services.amqpExif, services.sanitizer} stopOnDone.Start(clients.health.Done(ctx)) defer stopOnDone.GracefulWait() diff --git a/cmd/fibr/service.go b/cmd/fibr/service.go index a98e707f..a6c8f975 100644 --- a/cmd/fibr/service.go +++ b/cmd/fibr/service.go @@ -1,6 +1,8 @@ package main import ( + "context" + "github.com/ViBiOh/fibr/pkg/crud" "github.com/ViBiOh/fibr/pkg/fibr" "github.com/ViBiOh/fibr/pkg/metadata" @@ -26,7 +28,7 @@ type services struct { thumbnail thumbnail.Service } -func newServices(config configuration, clients client, adapters adapters) (services, error) { +func newServices(ctx context.Context, config configuration, clients client, adapters adapters) (services, error) { thumbnailService, err := thumbnail.New(config.thumbnail, adapters.storage, clients.redis, clients.telemetry.MeterProvider(), clients.telemetry.TracerProvider(), clients.amqp) if err != nil { return services{}, err @@ -37,7 +39,7 @@ func newServices(config configuration, clients client, adapters adapters) (servi return services{}, err } - metadataService, err := metadata.New(config.metadata, adapters.storage, clients.telemetry.MeterProvider(), clients.telemetry.TracerProvider(), clients.amqp, clients.redis, adapters.exclusiveService) + metadataService, err := metadata.New(ctx, config.metadata, adapters.storage, clients.telemetry.MeterProvider(), clients.telemetry.TracerProvider(), clients.amqp, clients.redis, adapters.exclusiveService) if err != nil { return services{}, err } diff --git a/pkg/metadata/exif.go b/pkg/metadata/exif.go index b63e9163..0a63fcf3 100644 --- a/pkg/metadata/exif.go +++ b/pkg/metadata/exif.go @@ -75,7 +75,7 @@ func Flags(fs *flag.FlagSet, prefix string) *Config { return &config } -func New(config *Config, storageService absto.Storage, meterProvider metric.MeterProvider, traceProvider trace.TracerProvider, amqpClient *amqpclient.Client, redisClient redis.Client, exclusiveService exclusive.Service) (Service, error) { +func New(ctx context.Context, config *Config, storageService absto.Storage, meterProvider metric.MeterProvider, traceProvider trace.TracerProvider, amqpClient *amqpclient.Client, redisClient redis.Client, exclusiveService exclusive.Service) (Service, error) { var amqpExchange string if amqpClient != nil { @@ -127,7 +127,7 @@ func New(config *Config, storageService absto.Storage, meterProvider metric.Mete } return service.loadExif(ctx, item) - }, traceProvider).WithMaxConcurrency(provider.MaxConcurrency) + }, traceProvider).WithMaxConcurrency(provider.MaxConcurrency).WithClientSideCaching(ctx, "fibr_exif") service.aggregateCache = cache.New(redisClient, redisKey, func(ctx context.Context, item absto.Item) (provider.Aggregate, error) { if !item.IsDir() { @@ -135,7 +135,7 @@ func New(config *Config, storageService absto.Storage, meterProvider metric.Mete } return service.loadAggregate(ctx, item) - }, traceProvider).WithMaxConcurrency(provider.MaxConcurrency) + }, traceProvider).WithMaxConcurrency(provider.MaxConcurrency).WithClientSideCaching(ctx, "fibr_aggregate") return service, nil }