Skip to content

Commit

Permalink
Use storage to track episodes
Browse files Browse the repository at this point in the history
  • Loading branch information
mxpv committed Feb 8, 2020
1 parent f2a8638 commit 7f24306
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 85 deletions.
12 changes: 11 additions & 1 deletion cmd/podsync/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/mxpv/podsync/pkg/config"
"github.com/mxpv/podsync/pkg/storage"
"github.com/mxpv/podsync/pkg/ytdl"
)

Expand Down Expand Up @@ -85,13 +86,18 @@ func main() {
log.WithError(err).Fatal("failed to load configuration file")
}

db, err := storage.NewBadger(opts.ConfigPath)
if err != nil {
log.WithError(err).Fatal("failed to open database")
}

// Queue of feeds to update
updates := make(chan *config.Feed, 16)
defer close(updates)

// Run updater thread
log.Debug("creating updater")
updater, err := NewUpdater(cfg, downloader)
updater, err := NewUpdater(cfg, downloader, db)
if err != nil {
log.WithError(err).Fatal("failed to create updater")
}
Expand Down Expand Up @@ -165,5 +171,9 @@ func main() {
log.WithError(err).Error("wait error")
}

if err := db.Close(); err != nil {
log.WithError(err).Error("failed to close database")
}

log.Info("gracefully stopped")
}
185 changes: 132 additions & 53 deletions cmd/podsync/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/mxpv/podsync/pkg/feed"
"github.com/mxpv/podsync/pkg/link"
"github.com/mxpv/podsync/pkg/model"
"github.com/mxpv/podsync/pkg/storage"
)

type Downloader interface {
Expand All @@ -27,10 +28,15 @@ type Downloader interface {
type Updater struct {
config *config.Config
downloader Downloader
db storage.Storage
}

func NewUpdater(config *config.Config, downloader Downloader) (*Updater, error) {
return &Updater{config: config, downloader: downloader}, nil
func NewUpdater(config *config.Config, downloader Downloader, db storage.Storage) (*Updater, error) {
return &Updater{
config: config,
downloader: downloader,
db: db,
}, nil
}

func (u *Updater) Update(ctx context.Context, feedConfig *config.Feed) error {
Expand All @@ -48,6 +54,26 @@ func (u *Updater) Update(ctx context.Context, feedConfig *config.Feed) error {
return errors.Wrapf(err, "failed to create directory for feed %q", feedConfig.ID)
}

if err := u.updateFeed(ctx, feedConfig); err != nil {
return err
}

if err := u.downloadEpisodes(ctx, feedConfig, feedPath); err != nil {
return err
}

if err := u.buildXML(ctx, feedConfig); err != nil {
return err
}

elapsed := time.Since(started)
nextUpdate := time.Now().Add(feedConfig.UpdatePeriod.Duration)
log.Infof("successfully updated feed in %s, next update at %s", elapsed, nextUpdate.Format(time.Kitchen))
return nil
}

// updateFeed pulls API for new episodes and saves them to database
func (u *Updater) updateFeed(ctx context.Context, feedConfig *config.Feed) error {
// Create an updater for this feed type
provider, err := u.makeBuilder(ctx, feedConfig)
if err != nil {
Expand All @@ -63,60 +89,126 @@ func (u *Updater) Update(ctx context.Context, feedConfig *config.Feed) error {

log.Debugf("received %d episode(s) for %q", len(result.Episodes), result.Title)

// Since there is no way to detect the size of an episode after download and encoding via API,
// we'll patch XML feed with values from this map
sizes := map[string]int64{}
if err := u.db.AddFeed(ctx, result); err != nil {
return err
}

log.Debug("successfully saved updates to storage")
return nil
}

func (u *Updater) downloadEpisodes(ctx context.Context, feedConfig *config.Feed, targetDir string) error {
var (
feedID = feedConfig.ID
updateList []*model.Episode
)

// Build the list of files to download
if err := u.db.WalkFiles(ctx, feedID, func(file *model.File) error {
if file.Status != model.EpisodeNew && file.Status != model.EpisodeError {
// File already downloaded
return nil
}

episode, err := u.db.GetEpisode(ctx, feedID, file.EpisodeID)
if err != nil {
return errors.Wrapf(err, "failed to query episode %q from database", file.EpisodeID)
}

updateList = append(updateList, episode)
return nil
}); err != nil {
return errors.Wrapf(err, "failed to build update list")
}

// The number of episodes downloaded during this update
downloaded := 0
log.Debugf("update list: %+v", updateList)

// Download and encode episodes
for idx, episode := range result.Episodes {
// Download pending episodes
for idx, episode := range updateList {
logger := log.WithFields(log.Fields{
"index": idx,
"episode_id": episode.ID,
})

episodePath := filepath.Join(feedPath, u.episodeName(feedConfig, episode))
_, err := os.Stat(episodePath)
if err != nil && !os.IsNotExist(err) {
return errors.Wrap(err, "failed to check whether episode exists")
// Check whether episode exists on disk

episodePath := filepath.Join(targetDir, u.episodeName(feedConfig, episode))
stat, err := os.Stat(episodePath)
if err == nil {
logger.Infof("episode %q already exists on disk (%s)", episode.ID, episodePath)

// File already exists, update file status and disk size
if err := u.db.UpdateFile(feedID, episode.ID, func(file *model.File) error {
file.Size = stat.Size()
file.Status = model.EpisodeDownloaded
return nil
}); err != nil {
logger.WithError(err).Error("failed to update file info")
return err
}

return nil
} else if os.IsNotExist(err) {
// Will download, do nothing here
} else {
logger.WithError(err).Error("failed to stat file")
return err
}

if os.IsNotExist(err) {
// There is no file on disk, download episode
logger.Infof("! downloading episode %s", episode.VideoURL)
if output, err := u.downloader.Download(ctx, feedConfig, episode, feedPath); err == nil {
downloaded++
} else {
// YouTube might block host with HTTP Error 429: Too Many Requests
// We still need to generate XML, so just stop sending download requests and
// retry next time
if strings.Contains(output, "HTTP Error 429") {
logger.WithError(err).Warnf("got too many requests error, will retry download next time")
break
}

logger.WithError(err).Errorf("youtube-dl error: %s", output)
// Download episode to disk

logger.Infof("! downloading episode %s", episode.VideoURL)
output, err := u.downloader.Download(ctx, feedConfig, episode, episodePath)
if err != nil {
logger.WithError(err).Errorf("youtube-dl error: %s", output)

// YouTube might block host with HTTP Error 429: Too Many Requests
// We still need to generate XML, so just stop sending download requests and
// retry next time
if strings.Contains(output, "HTTP Error 429") {
break
}
} else {
// Episode already downloaded
logger.Debug("skipping download of episode")

if err := u.db.UpdateFile(feedID, episode.ID, func(file *model.File) error {
file.Status = model.EpisodeError
return nil
}); err != nil {
return err
}

continue
}

// Record file size
if size, err := u.fileSize(episodePath); err != nil {
// Don't return on error, use estimated file size provided by builders
logger.WithError(err).Error("failed to get episode file size")
} else { //nolint
logger.Debugf("file size %d", size)
sizes[episode.ID] = size
// Update file status in database

if err := u.db.UpdateFile(feedID, episode.ID, func(file *model.File) error {
// Record file size of newly downloaded file
size, err := u.fileSize(episodePath)
if err != nil {
logger.WithError(err).Error("failed to get episode file size")
} else {
file.Size = size
}

file.Status = model.EpisodeDownloaded
return nil
}); err != nil {
return err
}
}

return nil
}

func (u *Updater) buildXML(ctx context.Context, feedConfig *config.Feed) error {
feed, err := u.db.GetFeed(ctx, feedConfig.ID)
if err != nil {
return err
}

// Build iTunes XML feed with data received from builder
log.Debug("building iTunes podcast feed")
podcast, err := u.buildPodcast(result, feedConfig, sizes)
podcast, err := u.buildPodcast(feed, feedConfig)
if err != nil {
return err
}
Expand All @@ -129,18 +221,10 @@ func (u *Updater) Update(ctx context.Context, feedConfig *config.Feed) error {
return errors.Wrapf(err, "failed to write XML feed to disk")
}

elapsed := time.Since(started)
nextUpdate := time.Now().Add(feedConfig.UpdatePeriod.Duration)
log.Infof(
"successfully updated feed in %s, downloaded: %d episode(s), next update at %s",
elapsed,
downloaded,
nextUpdate.Format(time.Kitchen),
)
return nil
}

func (u *Updater) buildPodcast(feed *model.Feed, cfg *config.Feed, sizes map[string]int64) (*itunes.Podcast, error) {
func (u *Updater) buildPodcast(feed *model.Feed, cfg *config.Feed) (*itunes.Podcast, error) {
const (
podsyncGenerator = "Podsync generator (support us at https://github.com/mxpv/podsync)"
defaultCategory = "TV & Film"
Expand All @@ -167,11 +251,6 @@ func (u *Updater) buildPodcast(feed *model.Feed, cfg *config.Feed, sizes map[str
}

for i, episode := range feed.Episodes {
// Fixup episode size after downloading and encoding
if size, ok := sizes[episode.ID]; ok {
episode.Size = size
}

item := itunes.Item{
GUID: episode.ID,
Link: episode.VideoURL,
Expand Down
1 change: 1 addition & 0 deletions pkg/model/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type EpisodeStatus string
const (
EpisodeNew = EpisodeStatus("new") // New episode received via API
EpisodeDownloaded = EpisodeStatus("downloaded") // Downloaded, encoded and available for download
EpisodeError = EpisodeStatus("error") // Could not download, will retry
EpisodeCleaned = EpisodeStatus("cleaned") // Downloaded and later removed from disk due to update strategy
)

Expand Down
Loading

0 comments on commit 7f24306

Please sign in to comment.