Skip to content

Commit

Permalink
refactor scheduler logic to handle streams properly
Browse files Browse the repository at this point in the history
  • Loading branch information
jmillerv committed Sep 27, 2023
1 parent 76ba403 commit da96f9f
Showing 1 changed file with 38 additions and 70 deletions.
108 changes: 38 additions & 70 deletions content/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ type Scheduler struct {
PlayedPodcastTTL string
// Duration between the loop pausing and checking the content against the schedule.
CheckInterval string
Programs []*Program
// Determines if go-dj will all a podcast to finish playing or force the next scheduled program
StrictPodcastSchedule bool
Programs []*Program
}
}

Expand Down Expand Up @@ -78,35 +80,37 @@ func (s *Scheduler) Run() error { //nolint:godox,funlen,gocognit,cyclop,nolintli
// because these are streams rather than files they behave differently from local content.
switch p.getMediaType() { //nolint:exhaustive // TODO consider refactoring into function
case webRadioContent:
log.Info("webradio content detected")
log.Info("web radio content detected")

go func() {
duration := getDurationToEndTime(p.Timeslot.End) // might cause an index out of range issue
stopCountDown(content, duration, &wg)
}()
webRadio := content.(*WebRadio) //nolint:forcetypeassert // TODO: type checking
webRadio.Duration = getDurationToEndTime(p.Timeslot.End) // might cause an index out of range issue

log.Debug("scheduler.Run::add 1 to waitgroup")
wg.Add(1)

go func() {
defer wg.Done()
log.Info("playing web radio inside of a go routine")

err = content.Play()
log.Info("Run::playing web radio inside of a go routine")
err := content.Play()
if err != nil {
log.WithError(err).Error("Run::content.Play")
} // play will block until done
}()

case podcastContent:
log.Info("podcast content detected")

go func() {
podcast := content.(*Podcast) //nolint:forcetypeassert // TODO: type checking
log.Infof("podcast player duration %s", podcast.Player.duration)
stopCountDown(content, podcast.Player.duration, &wg)
}()
podcast := content.(*Podcast) //nolint:forcetypeassert // TODO: type checking

wg.Add(1)
// If the StrictPodcastSchedule is set to false, use the duration of the podcast to set the countdown.
if !s.Content.StrictPodcastSchedule {
podcast.Duration = podcast.Player.duration

} else {
podcast.Duration = getDurationToEndTime(p.Timeslot.End)
}

wg.Add(1)
go func() {
defer wg.Done()
log.Info("playing podcast inside of a go routine")
Expand All @@ -124,7 +128,7 @@ func (s *Scheduler) Run() error { //nolint:godox,funlen,gocognit,cyclop,nolintli
}
}

log.Info("paused while go routines are running")
log.Info("scheduler paused while go routines are running")
wg.Wait() // pause

if !p.Timeslot.IsScheduledNow(now) {
Expand Down Expand Up @@ -213,31 +217,36 @@ func (s *Scheduler) Shuffle() error { //nolint:godox,funlen,gocognit,cyclop,noli
//nolint:godox,nolintlint // TODO: consider refactoring switch into single function used in Run() & Shuffle()
switch p.getMediaType() { //nolint:dupl,exhaustive
case webRadioContent:
go func() {
duration := getDurationToEndTime(p.Timeslot.End) // might cause an index out of range issue
stopCountDown(content, duration, wg)
}()
log.Info("web radio content detected")

webRadio := content.(*WebRadio) //nolint:forcetypeassert // TODO: type checking
webRadio.Duration = getDurationToEndTime(p.Timeslot.End) // might cause an index out of range issue

log.Debug("scheduler.Run::add 1 to waitgroup")
wg.Add(1)

go func() {
defer wg.Done()
log.Info("playing web radio inside of a go routine")

err = content.Play()
log.Info("Run::playing web radio inside of a go routine")
err := content.Play()
if err != nil {
log.WithError(err).Error("Run::content.Play")
} // play will block until done
}()
case podcastContent:
go func() {
podcast := content.(*Podcast) //nolint:forcetypeassert // TODO: type checking
log.Infof("podcast player duration %s", podcast.Player.duration)
stopCountDown(content, podcast.Player.duration, wg)
}()
log.Info("podcast content detected")

wg.Add(1)
podcast := content.(*Podcast) //nolint:forcetypeassert // TODO: type checking

// If the StrictPodcastSchedule is set to false, use the duration of the podcast to set the countdown.
if !s.Content.StrictPodcastSchedule {
podcast.Duration = podcast.Player.duration

} else {
podcast.Duration = getDurationToEndTime(p.Timeslot.End)
}

wg.Add(1)
go func() {
defer wg.Done()
log.Info("playing podcast inside of a go routine")
Expand Down Expand Up @@ -334,47 +343,6 @@ func NewScheduler(file string) (*Scheduler, error) {
return scheduler, nil
}

// stopCountDown takes in a Media and duration and starts a ticker to stop the playing content.
func stopCountDown(content Media, period time.Duration, wg *sync.WaitGroup) {
log.Infof("remaining time playing this stream %v", period)

t := time.NewTicker(period)
defer t.Stop()

for { //nolint:gosimple // for select worked better here at the time of writing.
select {
case <-t.C: // call content.Stop
log.Info("stopping content")

err := content.Stop()
if err != nil {
log.WithError(err).Error("stopCountDown::error stopping content")
}

// typecast content as WebRadio
webRadio, ok := content.(*WebRadio)
if ok {
// only send a wg.Done() signal if the web radio has stopped playing.
if !webRadio.Player.isPlaying {
wg.Done()
}
}

// typecast content as Podcast
podcast, ok := content.(*Podcast)
if ok {
if !podcast.Player.isPlaying {
wg.Done()
}
}

log.Info("content stopped")

return
}
}
}

// getDurationToEndTime determines how much time in seconds needs to pass before the next program starts.
// TODO: examine function and timeslot.go's IsScheduleNow(), attempt to refactor to remove duplicate code.
//
Expand Down

0 comments on commit da96f9f

Please sign in to comment.