Skip to content

Commit

Permalink
fix worker concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
KevinSJ committed Aug 3, 2023
1 parent 3bdd4de commit df5024a
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 19 deletions.
36 changes: 20 additions & 16 deletions internal/pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package worker
import (
"context"
"log"
"math"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -33,13 +34,15 @@ type WorkerRequest struct {
}

type WorkerGroup struct {
config *config.Config
channel *chan *WorkerRequest
client *texttospeech.Client
config *config.Config
channel chan *WorkerRequest
client *texttospeech.Client
waitGroup *sync.WaitGroup
}

func (w *WorkerGroup) Close() {
close(*w.channel)
defer log.Printf("Closing channel")
close(w.channel)
}

func (w *WorkerGroup) CreateSpeechFromItems(feed *gofeed.Feed, direcory *string) {
Expand All @@ -60,25 +63,23 @@ func (w *WorkerGroup) CreateSpeechFromItems(feed *gofeed.Feed, direcory *string)
for _, item := range feed.Items[:itemSize] {
if isInRange(item.PublishedParsed) {
log.Printf("Adding item... title: %s", item.Title)
*w.channel <- &WorkerRequest{
w.channel <- &WorkerRequest{
Item: item,
LanguageCode: feed.Language,
Directory: *direcory,
UseNaturalVoice: w.config.UseNaturalVoice,
SpeechSpeed: w.config.SpeechSpeed,
}
continue
}
log.Printf("Skipping item... title: %s", item.Title)
}
}

// This code is taken from sample google TTS code with some modification
// Source: https://cloud.google.com/text-to-speech/docs/libraries
func processSpeechGeneration(wg *sync.WaitGroup, client *texttospeech.Client, workerItems *chan *WorkerRequest, ctx context.Context) error {
func processSpeechGeneration(wg *sync.WaitGroup, client *texttospeech.Client, workerItems chan *WorkerRequest, ctx context.Context) error {
defer wg.Done()

for workerItem := range *workerItems {
for workerItem := range workerItems {
feedItem := workerItem.Item

log.Printf("Start procesing %v ", feedItem.Title)
Expand All @@ -88,7 +89,7 @@ func processSpeechGeneration(wg *sync.WaitGroup, client *texttospeech.Client, wo

if _, err := os.Stat(filepath); err == nil {
log.Printf("File exists at path: %s\n, skip generating", filepath)
return nil
continue
}

speechRequests := rss.GetSynthesizeSpeechRequests(feedItem, workerItem.LanguageCode, workerItem.UseNaturalVoice, workerItem.SpeechSpeed)
Expand Down Expand Up @@ -133,15 +134,18 @@ func processSpeechGeneration(wg *sync.WaitGroup, client *texttospeech.Client, wo
func NewWorkerGroup(config *config.Config, wg *sync.WaitGroup, client *texttospeech.Client, ctx context.Context) *WorkerGroup {
channelSize := config.MaxItemPerFeed * len(config.Feeds)
work := make(chan *WorkerRequest, channelSize)
wg.Add(channelSize)

for i := 0; i < config.ConcurrentWorkers; i++ {
go processSpeechGeneration(wg, client, &work, ctx)
workerSize := int(math.Min(float64(config.ConcurrentWorkers), float64(channelSize)))
wg.Add(workerSize)

for i := 0; i < workerSize; i++ {
go processSpeechGeneration(wg, client, work, ctx)
}

return &WorkerGroup{
config: config,
channel: &work,
client: client,
config: config,
channel: work,
client: client,
waitGroup: wg,
}
}
5 changes: 2 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
)

func main() {
defer log.Printf("Done processing all feeds")
configPath, _ := filepath.Abs("./config.yaml")
config, err := config.NewConfig(configPath)
if err != nil {
Expand All @@ -61,7 +62,7 @@ func main() {

var wg sync.WaitGroup

workerGroup := *worker.NewWorkerGroup(config, &wg, client, ctx)
workerGroup := worker.NewWorkerGroup(config, &wg, client, ctx)

for _, _v := range config.Feeds {
v := _v
Expand Down Expand Up @@ -99,6 +100,4 @@ func main() {

workerGroup.Close()
wg.Wait()

log.Printf("Done processing all feeds")
}

0 comments on commit df5024a

Please sign in to comment.