Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batch reprocessing queue job and 0.9.0 upgrade hook #267

Merged
merged 4 commits into from
Jun 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ require (
github.com/prometheus/client_golang v1.19.0
github.com/rs/cors v1.11.0
github.com/rs/cors/wrapper/gin v0.0.0-20240429123903-3d336ea9b505
github.com/schollz/progressbar/v3 v3.14.2
github.com/stretchr/testify v1.9.0
github.com/tylertreat/BoomFilters v0.0.0-20210315201527-1a82519a3e43
github.com/urfave/cli/v2 v2.27.2
Expand Down Expand Up @@ -118,7 +117,6 @@ require (
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/mattn/goveralls v0.0.12 // indirect
github.com/mfridman/interpolate v0.0.2 // indirect
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
Expand Down
5 changes: 0 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHm
github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE=
github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
Expand Down Expand Up @@ -326,8 +325,6 @@ github.com/mgdigital/gorm-cache/v2 v2.0.0-20230912113927-f2a8dd92a386 h1:DQflskA
github.com/mgdigital/gorm-cache/v2 v2.0.0-20230912113927-f2a8dd92a386/go.mod h1:JgSdjEKvmt182iLDU4HWIL6qH6Lk3AZIO3tm39KUHYM=
github.com/microsoft/go-mssqldb v1.7.0 h1:sgMPW0HA6Ihd37Yx0MzHyKD726C2kY/8KJsQtXHNaAs=
github.com/microsoft/go-mssqldb v1.7.0/go.mod h1:kOvZKUdrhhFQmxLZqbwUV0rHkNkZpthMITIb2Ko1IoA=
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ=
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw=
github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw=
github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw=
github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
Expand Down Expand Up @@ -419,8 +416,6 @@ github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6ke
github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4=
github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE=
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
github.com/schollz/progressbar/v3 v3.14.2 h1:EducH6uNLIWsr560zSV1KrTeUb/wZGAHqyMFIEa99ks=
github.com/schollz/progressbar/v3 v3.14.2/go.mod h1:aQAZQnhF4JGFtRJiw/eobaXpsqpVQAftEQ+hLGXaRc4=
github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8=
github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I=
github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec=
Expand Down
18 changes: 13 additions & 5 deletions internal/app/cmd/processcmd/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,29 @@ func New(p Params) (Result, error) {
Name: "infoHash",
},
&cli.StringFlag{
Name: "flags",
Name: "classifierFlags",
Value: "{}",
Usage: "optional JSON-encoded runtime flags to pass to the classifier",
},
&cli.BoolFlag{
Name: "apisDisabled",
Value: false,
Usage: "disable API calls for the classifier workflow",
},
},
Action: func(ctx *cli.Context) error {
pr, err := p.Processor.Get()
if err != nil {
return err
}
var flags classifier.Flags
strFlags := ctx.String("flags")
strFlags := ctx.String("classifierFlags")
if err := json.Unmarshal([]byte(strFlags), &flags); err != nil {
return cli.Exit("invalid flags", 1)
}
if ctx.Bool("apisDisabled") {
flags["apis_enabled"] = false
}
var infoHashes []protocol.ID
for _, infoHash := range ctx.StringSlice("infoHash") {
id, err := protocol.ParseID(infoHash)
Expand All @@ -57,9 +65,9 @@ func New(p Params) (Result, error) {
return err
}
return pr.Process(ctx.Context, processor.MessageParams{
ClassifyMode: processor.ClassifyModeRematch,
Flags: flags,
InfoHashes: infoHashes,
ClassifyMode: processor.ClassifyModeRematch,
ClassifierFlags: flags,
InfoHashes: infoHashes,
})
},
},
Expand Down
98 changes: 36 additions & 62 deletions internal/app/cmd/reprocesscmd/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import (
"github.com/bitmagnet-io/bitmagnet/internal/database/dao"
"github.com/bitmagnet-io/bitmagnet/internal/model"
"github.com/bitmagnet-io/bitmagnet/internal/processor"
"github.com/bitmagnet-io/bitmagnet/internal/protocol"
"github.com/schollz/progressbar/v3"
"github.com/bitmagnet-io/bitmagnet/internal/processor/batch"
"github.com/urfave/cli/v2"
"go.uber.org/fx"
"go.uber.org/zap"
"gorm.io/gen"
"strings"
"time"
)

type Params struct {
Expand All @@ -32,10 +31,14 @@ func New(p Params) (Result, error) {
Name: "reprocess",
Usage: "Queue all torrents for reprocessing",
Flags: []cli.Flag{
&cli.IntFlag{
&cli.UintFlag{
Name: "batchSize",
Value: 100,
},
&cli.UintFlag{
Name: "chunkSize",
Value: 10_000,
},
&cli.StringSliceFlag{
Name: "contentType",
Aliases: []string{"contentTypes"},
Expand All @@ -54,10 +57,15 @@ func New(p Params) (Result, error) {
"rematch (ignore any pre-existing match and always classify from scratch)",
},
&cli.StringFlag{
Name: "flags",
Name: "classifierFlags",
Value: "{}",
Usage: "optional JSON-encoded runtime flags to pass to the classifier",
},
&cli.BoolFlag{
Name: "apisDisabled",
Value: false,
Usage: "disable API calls for the classifier workflow",
},
},
Action: func(ctx *cli.Context) error {
var classifyMode processor.ClassifyMode
Expand All @@ -70,82 +78,48 @@ func New(p Params) (Result, error) {
return cli.Exit("invalid classifyMode", 1)
}
var flags classifier.Flags
strFlags := ctx.String("flags")
strFlags := ctx.String("classifierFlags")
if err := json.Unmarshal([]byte(strFlags), &flags); err != nil {
return cli.Exit("invalid flags", 1)
}
var contentTypes []string
var unknownContentType bool
if ctx.Bool("apisDisabled") {
flags["apis_enabled"] = false
}
var contentTypes []model.NullContentType
for _, contentType := range ctx.StringSlice("contentType") {
if contentType == "null" {
unknownContentType = true
contentTypes = append(contentTypes, model.NullContentType{})
} else {
ct, err := model.ParseContentType(contentType)
if err != nil {
return err
}
contentTypes = append(contentTypes, ct.String())
contentTypes = append(contentTypes, model.NullContentType{
ContentType: ct,
Valid: true,
})
}
}
d, err := p.Dao.Get()
job, err := batch.NewQueueJob(batch.MessageParams{
ClassifyMode: classifyMode,
ClassifierFlags: flags,
ChunkSize: ctx.Uint("chunkSize"),
BatchSize: ctx.Uint("batchSize"),
ContentTypes: contentTypes,
Orphans: ctx.Bool("orphans"),
UpdatedBefore: time.Now(),
})
if err != nil {
return err
}
println("queueing reprocess...")
var scopes []func(gen.Dao) gen.Dao
if len(contentTypes) > 0 || unknownContentType {
scopes = append(scopes, func(tx gen.Dao) gen.Dao {
sq := d.TorrentContent.Where(
d.TorrentContent.InfoHash.EqCol(d.Torrent.InfoHash),
).Where(d.TorrentContent.ContentType.In(contentTypes...))
if unknownContentType {
sq = sq.Or(d.TorrentContent.ContentType.IsNull())
}
return tx.Where(gen.Exists(sq))
})
}
if ctx.Bool("orphans") {
scopes = append(scopes, func(tx gen.Dao) gen.Dao {
return tx.Not(
gen.Exists(
d.TorrentContent.Where(
d.TorrentContent.InfoHash.EqCol(d.Torrent.InfoHash),
),
),
)
})
}
batchSize := ctx.Int("batchSize")
torrentCount := int64(0)
if result, err := dao.BudgetedCount(d.Torrent.WithContext(ctx.Context).Scopes(scopes...).UnderlyingDB(), 10_000); err != nil {
d, err := p.Dao.Get()
if err != nil {
return err
} else {
torrentCount = result.Count
}
bar := progressbar.Default(torrentCount, "queuing torrents")
var torrentResult []*model.Torrent
if err := d.Torrent.WithContext(ctx.Context).Scopes(scopes...).Select(d.Torrent.InfoHash).FindInBatches(&torrentResult, batchSize, func(tx gen.Dao, _ int) error {
infoHashes := make([]protocol.ID, 0, len(torrentResult))
for _, c := range torrentResult {
infoHashes = append(infoHashes, c.InfoHash)
}
job, err := processor.NewQueueJob(processor.MessageParams{
ClassifyMode: classifyMode,
Flags: flags,
InfoHashes: infoHashes,
}, model.QueueJobPriority(10))
if err != nil {
return err
}
if err := tx.Create(&job); err != nil {
return err
}
_ = bar.Add(len(torrentResult))
return nil
}); err != nil {
if err := d.QueueJob.WithContext(ctx.Context).Create(&job); err != nil {
return err
}
_ = bar.Finish()
_, _ = ctx.App.Writer.Write([]byte("Reprocess queued!\n"))
return nil
},
}}, nil
Expand Down
43 changes: 43 additions & 0 deletions internal/processor/batch/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package batch

import (
"github.com/bitmagnet-io/bitmagnet/internal/classifier"
"github.com/bitmagnet-io/bitmagnet/internal/model"
"github.com/bitmagnet-io/bitmagnet/internal/processor"
"github.com/bitmagnet-io/bitmagnet/internal/protocol"
"time"
)

const MessageName = "process_torrent_batch"

type MessageParams struct {
InfoHashGreaterThan protocol.ID
UpdatedBefore time.Time
ClassifyMode processor.ClassifyMode
ClassifierWorkflow string
ClassifierFlags classifier.Flags
ChunkSize uint
BatchSize uint
ContentTypes []model.NullContentType
Orphans bool
}

func (p MessageParams) ApisDisabled() bool {
if p.ClassifierFlags == nil {
return false
}
enabledAny, ok := p.ClassifierFlags["apis_enabled"]
if !ok {
return false
}
enabled, ok := enabledAny.(bool)
return ok && !enabled
}

func NewQueueJob(msg MessageParams, options ...model.QueueJobOption) (model.QueueJob, error) {
return model.NewQueueJob(
MessageName,
msg,
append([]model.QueueJobOption{model.QueueJobMaxRetries(2)}, options...)...,
)
}
Loading
Loading