Skip to content

Commit

Permalink
Processor optimization (#278)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgdigital authored Jun 13, 2024
1 parent 8a2d3b4 commit bbd8a10
Showing 1 changed file with 34 additions and 73 deletions.
107 changes: 34 additions & 73 deletions internal/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/bitmagnet-io/bitmagnet/internal/protocol"
"golang.org/x/sync/semaphore"
"gorm.io/gen/field"
"sync"
)

type Processor interface {
Expand Down Expand Up @@ -88,83 +87,45 @@ func (c processor) Process(ctx context.Context, params MessageParams) error {
var idsToDelete []string
var infoHashesToDelete []protocol.ID
tagsToAdd := make(map[protocol.ID]map[string]struct{})
var mtx sync.Mutex
addDeleteId := func(id string) {
mtx.Lock()
defer mtx.Unlock()
idsToDelete = append(idsToDelete, id)
}
addDeleteInfoHash := func(id protocol.ID) {
mtx.Lock()
defer mtx.Unlock()
infoHashesToDelete = append(infoHashesToDelete, id)
}
addTags := func(id protocol.ID, tags map[string]struct{}) {
mtx.Lock()
defer mtx.Unlock()
tagsToAdd[id] = tags
}
addFailedHash := func(id protocol.ID, err error) {
mtx.Lock()
defer mtx.Unlock()
failedHashes = append(failedHashes, id)
errs = append(errs, err)
}
addTorrentContent := func(tc model.TorrentContent) {
mtx.Lock()
defer mtx.Unlock()
tcs = append(tcs, tc)
}
var wg sync.WaitGroup
sem := semaphore.NewWeighted(3)
for _, torrent := range searchResult.Torrents {
wg.Add(1)
go func() {
defer wg.Done()
if semErr := sem.Acquire(ctx, 1); semErr != nil {
addFailedHash(torrent.InfoHash, semErr)
return
}
defer sem.Release(1)
thisDeleteIds := make(map[string]struct{}, len(torrent.Contents))
foundMatch := false
for _, tc := range torrent.Contents {
thisDeleteIds[tc.ID] = struct{}{}
if !foundMatch &&
!torrent.Hint.ContentSource.Valid &&
params.ClassifyMode != ClassifyModeRematch &&
tc.ContentType.Valid &&
tc.ContentSource.Valid &&
(torrent.Hint.IsNil() || torrent.Hint.ContentType == tc.ContentType.ContentType) {
torrent.Hint.ContentType = tc.ContentType.ContentType
torrent.Hint.ContentSource = tc.ContentSource
torrent.Hint.ContentID = tc.ContentID
foundMatch = true
}
thisDeleteIds := make(map[string]struct{}, len(torrent.Contents))
foundMatch := false
for _, tc := range torrent.Contents {
thisDeleteIds[tc.ID] = struct{}{}
if !foundMatch &&
!torrent.Hint.ContentSource.Valid &&
params.ClassifyMode != ClassifyModeRematch &&
tc.ContentType.Valid &&
tc.ContentSource.Valid &&
(torrent.Hint.IsNil() || torrent.Hint.ContentType == tc.ContentType.ContentType) {
torrent.Hint.ContentType = tc.ContentType.ContentType
torrent.Hint.ContentSource = tc.ContentSource
torrent.Hint.ContentID = tc.ContentID
foundMatch = true
}
cl, classifyErr := c.runner.Run(ctx, workflowName, params.ClassifierFlags, torrent)
if classifyErr != nil {
if errors.Is(classifyErr, classification.ErrDeleteTorrent) {
addDeleteInfoHash(torrent.InfoHash)
} else {
addFailedHash(torrent.InfoHash, classifyErr)
}
return
}
torrentContent := newTorrentContent(torrent, cl)
tcId := torrentContent.InferID()
for id := range thisDeleteIds {
if id != tcId {
addDeleteId(id)
}
}
cl, classifyErr := c.runner.Run(ctx, workflowName, params.ClassifierFlags, torrent)
if classifyErr != nil {
if errors.Is(classifyErr, classification.ErrDeleteTorrent) {
infoHashesToDelete = append(infoHashesToDelete, torrent.InfoHash)
} else {
failedHashes = append(failedHashes, torrent.InfoHash)
errs = append(errs, classifyErr)
}
addTorrentContent(torrentContent)
if len(cl.Tags) > 0 {
addTags(torrent.InfoHash, cl.Tags)
continue
}
torrentContent := newTorrentContent(torrent, cl)
tcId := torrentContent.InferID()
for id := range thisDeleteIds {
if id != tcId {
idsToDelete = append(idsToDelete, id)
}
}()
}
tcs = append(tcs, torrentContent)
if len(cl.Tags) > 0 {
tagsToAdd[torrent.InfoHash] = cl.Tags
}
}
wg.Wait()
if len(failedHashes) > 0 {
if len(tcs) == 0 {
return errors.Join(errs...)
Expand Down

0 comments on commit bbd8a10

Please sign in to comment.