diff --git a/go.mod b/go.mod index 1d3ca39..b619def 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/armon/go-metrics v0.4.1 // indirect github.com/bep/debounce v1.2.1 github.com/dhowden/tag v0.0.0-20220618230019-adf36e896086 - github.com/egfanboy/mediapire-common v0.0.0-20231219000342-fbb6228cf11c + github.com/egfanboy/mediapire-common v0.0.0-20240522004433-cbd1b5041bc7 github.com/fsnotify/fsnotify v1.5.4 github.com/google/go-cmp v0.5.8 // indirect github.com/google/uuid v1.4.0 diff --git a/go.sum b/go.sum index 1aca885..37212cf 100644 --- a/go.sum +++ b/go.sum @@ -30,6 +30,8 @@ github.com/dhowden/tag v0.0.0-20220618230019-adf36e896086 h1:ORubSQoKnncsBnR4zD9 github.com/dhowden/tag v0.0.0-20220618230019-adf36e896086/go.mod h1:Z3Lomva4pyMWYezjMAU5QWRh0p1VvO4199OHlFnyKkM= github.com/egfanboy/mediapire-common v0.0.0-20231219000342-fbb6228cf11c h1:9eHPsRctkeGZGeEoKwqTDpishk/JHvq+p9AMzJ2mkZk= github.com/egfanboy/mediapire-common v0.0.0-20231219000342-fbb6228cf11c/go.mod h1:fkbcyi+5inobqkjLF9wtVdDioekNiFixiXgHBi0UEpQ= +github.com/egfanboy/mediapire-common v0.0.0-20240522004433-cbd1b5041bc7 h1:2icXJQ+GwLQwHD8ikzMfT2WvV6lgIcs4g4VX2y+QN6w= +github.com/egfanboy/mediapire-common v0.0.0-20240522004433-cbd1b5041bc7/go.mod h1:fkbcyi+5inobqkjLF9wtVdDioekNiFixiXgHBi0UEpQ= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= diff --git a/internal/media/async.go b/internal/media/async.go index dc2bb0a..03b4991 100644 --- a/internal/media/async.go +++ b/internal/media/async.go @@ -95,6 +95,41 @@ func handleTransferMessage(ctx context.Context, msg amqp091.Delivery) error { return nil } +func handleDeleteMessage(ctx context.Context, msg amqp091.Delivery) error { + var deleteMsg messaging.DeleteMediaMessage + + // acknowledge the message + msg.Ack(false) + + err := json.Unmarshal(msg.Body, &deleteMsg) + if err != nil { + msg := "failed to unmarshal delete message" + log.Err(err).Msg(msg) + + return err + } + + appInstance := app.GetApp() + + // Get the media for this node + input, ok := deleteMsg.MediaToDelete[appInstance.NodeId] + if !ok { + log.Info().Msg("Delete request has no inputs from this host") + + return nil + } + + mediaService := NewMediaService() + + err = mediaService.DeleteMedia(ctx, input) + if err != nil { + log.Err(err).Msg("Failed to delete all requested media") + } + + return err +} + func init() { rabbitmq.RegisterConsumer(handleTransferMessage, messaging.TopicTransfer) + rabbitmq.RegisterConsumer(handleDeleteMessage, messaging.TopicDeleteMedia) } diff --git a/internal/media/media-api.go b/internal/media/media-api.go index 4a64d0d..6cfbc4e 100644 --- a/internal/media/media-api.go +++ b/internal/media/media-api.go @@ -14,4 +14,5 @@ type MediaApi interface { StreamMedia(ctx context.Context, id uuid.UUID) ([]byte, error) UnsetDirectory(directory string) error DownloadMedia(ctx context.Context, ids []uuid.UUID) ([]byte, error) + DeleteMedia(ctx context.Context, ids []uuid.UUID) error } diff --git a/internal/media/media-service.go b/internal/media/media-service.go index 4706e0e..f6ed0d1 100644 --- a/internal/media/media-service.go +++ b/internal/media/media-service.go @@ -134,6 +134,7 @@ func (s *mediaService) ScanDirectory(directory string) (err error) { item.Path = path item.Id = uuid.New() + item.ParentDir = directory items = append(items, item) }() @@ -268,6 +269,60 @@ func (s *mediaService) DownloadMedia(ctx context.Context, ids []uuid.UUID) ([]by return buf.Bytes(), err } +func (s *mediaService) DeleteMedia(ctx context.Context, ids []uuid.UUID) error { + failedToDelete := make([]string, 0) + + for _, itemId := range ids { + item, err := s.getMediaItemFromId(ctx, itemId) + if err != nil { + failedToDelete = append(failedToDelete, fmt.Sprintf("Failed to get item with id %q", itemId)) + + continue + } + + err = os.Remove(item.Path) + if err != nil { + failedToDelete = append(failedToDelete, fmt.Sprintf("Failed to delete item with id %q: %s", itemId, err.Error())) + } + + err = s.removeItemFromCache(item) + if err != nil { + log.Err(err).Msg("Failed to remove item from the cache") + } + + } + + if len(failedToDelete) > 0 { + return fmt.Errorf("encountered the following errors during delete: %s", strings.Join(failedToDelete, "\n")) + } + + return nil +} + +func (s *mediaService) removeItemFromCache(item types.MediaItem) error { + log.Info().Msgf("Removing item with id %q from the media cache", item.Id) + + // remove the item from the lookup + delete(mediaLookup, item.Id) + + if _, ok := mediaCache[item.ParentDir]; !ok { + return fmt.Errorf("parent dir for item %q is not in the cache", item.Id) + } + + newCache := make([]types.MediaItem, len(mediaCache)) + + for _, cachedItem := range mediaCache[item.ParentDir] { + // different item, add it to the new cache + if cachedItem.Id != item.Id { + newCache = append(newCache, cachedItem) + } + } + + mediaCache[item.ParentDir] = newCache + + return nil +} + func NewMediaService() MediaApi { return &mediaService{app: app.GetApp()} } diff --git a/pkg/types/media.go b/pkg/types/media.go index a428ca1..a8164d6 100644 --- a/pkg/types/media.go +++ b/pkg/types/media.go @@ -8,6 +8,8 @@ type MediaItem struct { Path string `json:"-"` Id uuid.UUID `json:"id"` Metadata interface{} `json:"metadata"` + // Top level directory this item belongs to + ParentDir string `json:"-"` } type DownloadRequest []uuid.UUID