Skip to content

Commit

Permalink
Allow configuring processor concurrency (#281)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgdigital authored Jun 14, 2024
1 parent bbd8a10 commit 3968c8b
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 2 deletions.
3 changes: 2 additions & 1 deletion bitmagnet.io/setup/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ nav_order: 2
- `log.level` (default: `info`): If you're developing or just curious then you may want to set this to `debug`; note that `debug` output will be very verbose.
- `log.development` (default: `false`): If you're developing you may want to enable this flag to enable more verbose output such as stack traces.
- `log.json` (default: `false`): By default logs are output in a pretty format with colors; enable this flag if you'd prefer plain JSON.
- `log.file_rotator.enabled` (default: `false`): If true, logs will be output to rotating log files at level `log.file_rotator.level` in the `log.file_rotator.path` directory, allowing forwarding to a logs aggregator (see [the observability guide](/internals-development/observability-telemetry.html)).
- `log.file_rotator.enabled` (default: `false`): If true, logs will be output to rotating log files at level `log.file_rotator.level` in the `log.file_rotator.path` directory, allowing forwarding to a logs aggregator (see [the observability guide](/guides/observability-telemetry.html)).
- `http_server.options` (default `["*"]`): A list of enabled HTTP server components. By default all are enabled. Components include: `cors`, `pprof`, `graphql`, `import`, `prometheus`, `torznab`, `status`, `webui`.
- `dht_crawler.scaling_factor` (default: `10`): There are various rate and concurrency limits associated with the DHT crawler. This parameter is a rough proxy for resource usage of the crawler; concurrency and buffer size of the various pipeline channels are multiplied by this value. Diminishing returns may result from exceeding the default value of 10. Since the software has not been tested on a wide variety of hardware and network conditions your mileage may vary here...
- `processor.concurrency` (default: `3`): Defines the maximum number of torrents to be processed/classified simultaneously. If you experience slowdowns when the queue is working, try decreasing this to 1; conversely, if running on more powerful hardware and you'd like to work through the queue more quickly, try increasing the value.

To see a full list of available configuration options using the CLI, run:

Expand Down
11 changes: 11 additions & 0 deletions internal/processor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package processor

type Config struct {
Concurrency uint
}

func NewDefaultConfig() Config {
return Config{
Concurrency: 3,
}
}
2 changes: 2 additions & 0 deletions internal/processor/processorfx/module.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package processorfx

import (
"github.com/bitmagnet-io/bitmagnet/internal/boilerplate/config/configfx"
"github.com/bitmagnet-io/bitmagnet/internal/processor"
batchqueue "github.com/bitmagnet-io/bitmagnet/internal/processor/batch/queue"
"github.com/bitmagnet-io/bitmagnet/internal/processor/hook_0_9_0"
Expand All @@ -11,6 +12,7 @@ import (
func New() fx.Option {
return fx.Module(
"processor",
configfx.NewConfigModule[processor.Config]("processor", processor.NewDefaultConfig()),
fx.Provide(
processor.New,
processorqueue.New,
Expand Down
3 changes: 2 additions & 1 deletion internal/processor/queue/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

type Params struct {
fx.In
Config processor.Config
Processor lazy.Lazy[processor.Processor]
}

Expand All @@ -34,7 +35,7 @@ func New(p Params) Result {
return err
}
return pr.Process(ctx, *msg)
}, handler.JobTimeout(time.Second*60*10), handler.Concurrency(3)), nil
}, handler.JobTimeout(time.Second*60*10), handler.Concurrency(int(p.Config.Concurrency))), nil
}),
}
}

0 comments on commit 3968c8b

Please sign in to comment.