Skip to content

Commit

Permalink
feat: customize workers and retry of task (close #5493 fix #5274)
Browse files Browse the repository at this point in the history
  • Loading branch information
xhofe committed Nov 21, 2023
1 parent 11a30c5 commit 7583c4d
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 31 deletions.
8 changes: 5 additions & 3 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"context"
"errors"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -36,6 +37,7 @@ the address is defined in config file`,
}
bootstrap.InitOfflineDownloadTools()
bootstrap.LoadStorages()
bootstrap.InitTaskManager()
if !flags.Debug && !flags.Dev {
gin.SetMode(gin.ReleaseMode)
}
Expand All @@ -49,7 +51,7 @@ the address is defined in config file`,
httpSrv = &http.Server{Addr: httpBase, Handler: r}
go func() {
err := httpSrv.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
if err != nil && !errors.Is(err, http.ErrServerClosed) {
utils.Log.Fatalf("failed to start http: %s", err.Error())
}
}()
Expand All @@ -60,7 +62,7 @@ the address is defined in config file`,
httpsSrv = &http.Server{Addr: httpsBase, Handler: r}
go func() {
err := httpsSrv.ListenAndServeTLS(conf.Conf.Scheme.CertFile, conf.Conf.Scheme.KeyFile)
if err != nil && err != http.ErrServerClosed {
if err != nil && !errors.Is(err, http.ErrServerClosed) {
utils.Log.Fatalf("failed to start https: %s", err.Error())
}
}()
Expand All @@ -84,7 +86,7 @@ the address is defined in config file`,
}
}
err = unixSrv.Serve(listener)
if err != nil && err != http.ErrServerClosed {
if err != nil && !errors.Is(err, http.ErrServerClosed) {
utils.Log.Fatalf("failed to start unix: %s", err.Error())
}
}()
Expand Down
15 changes: 15 additions & 0 deletions internal/bootstrap/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package bootstrap

import (
"github.com/alist-org/alist/v3/internal/conf"
"github.com/alist-org/alist/v3/internal/fs"
"github.com/alist-org/alist/v3/internal/offline_download/tool"
"github.com/xhofe/tache"
)

func InitTaskManager() {
fs.UploadTaskManager = tache.NewManager[*fs.UploadTask](tache.WithWorks(conf.Conf.Tasks.Upload.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Upload.MaxRetry))
fs.CopyTaskManager = tache.NewManager[*fs.CopyTask](tache.WithWorks(conf.Conf.Tasks.Copy.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Copy.MaxRetry))
tool.DownloadTaskManager = tache.NewManager[*tool.DownloadTask](tache.WithWorks(conf.Conf.Tasks.Download.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Download.MaxRetry))
tool.TransferTaskManager = tache.NewManager[*tool.TransferTask](tache.WithWorks(conf.Conf.Tasks.Transfer.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Transfer.MaxRetry))
}
76 changes: 53 additions & 23 deletions internal/conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import (
)

type Database struct {
Type string `json:"type" env:"DB_TYPE"`
Host string `json:"host" env:"DB_HOST"`
Port int `json:"port" env:"DB_PORT"`
User string `json:"user" env:"DB_USER"`
Password string `json:"password" env:"DB_PASS"`
Name string `json:"name" env:"DB_NAME"`
DBFile string `json:"db_file" env:"DB_FILE"`
TablePrefix string `json:"table_prefix" env:"DB_TABLE_PREFIX"`
SSLMode string `json:"ssl_mode" env:"DB_SSL_MODE"`
Type string `json:"type" env:"TYPE"`
Host string `json:"host" env:"HOST"`
Port int `json:"port" env:"PORT"`
User string `json:"user" env:"USER"`
Password string `json:"password" env:"PASS"`
Name string `json:"name" env:"NAME"`
DBFile string `json:"db_file" env:"FILE"`
TablePrefix string `json:"table_prefix" env:"TABLE_PREFIX"`
SSLMode string `json:"ssl_mode" env:"SSL_MODE"`
}

type Scheme struct {
Expand All @@ -39,21 +39,34 @@ type LogConfig struct {
Compress bool `json:"compress" env:"COMPRESS"`
}

type TaskConfig struct {
Workers int `json:"workers" env:"WORKERS"`
MaxRetry int `json:"max_retry" env:"MAX_RETRY"`
}

type TasksConfig struct {
Download TaskConfig `json:"download" envPrefix:"DOWNLOAD_"`
Transfer TaskConfig `json:"transfer" envPrefix:"TRANSFER_"`
Upload TaskConfig `json:"upload" envPrefix:"UPLOAD_"`
Copy TaskConfig `json:"copy" envPrefix:"COPY_"`
}

type Config struct {
Force bool `json:"force" env:"FORCE"`
SiteURL string `json:"site_url" env:"SITE_URL"`
Cdn string `json:"cdn" env:"CDN"`
JwtSecret string `json:"jwt_secret" env:"JWT_SECRET"`
TokenExpiresIn int `json:"token_expires_in" env:"TOKEN_EXPIRES_IN"`
Database Database `json:"database"`
Scheme Scheme `json:"scheme"`
TempDir string `json:"temp_dir" env:"TEMP_DIR"`
BleveDir string `json:"bleve_dir" env:"BLEVE_DIR"`
DistDir string `json:"dist_dir"`
Log LogConfig `json:"log"`
DelayedStart int `json:"delayed_start" env:"DELAYED_START"`
MaxConnections int `json:"max_connections" env:"MAX_CONNECTIONS"`
TlsInsecureSkipVerify bool `json:"tls_insecure_skip_verify" env:"TLS_INSECURE_SKIP_VERIFY"`
Force bool `json:"force" env:"FORCE"`
SiteURL string `json:"site_url" env:"SITE_URL"`
Cdn string `json:"cdn" env:"CDN"`
JwtSecret string `json:"jwt_secret" env:"JWT_SECRET"`
TokenExpiresIn int `json:"token_expires_in" env:"TOKEN_EXPIRES_IN"`
Database Database `json:"database" envPrefix:"DB_"`
Scheme Scheme `json:"scheme"`
TempDir string `json:"temp_dir" env:"TEMP_DIR"`
BleveDir string `json:"bleve_dir" env:"BLEVE_DIR"`
DistDir string `json:"dist_dir"`
Log LogConfig `json:"log"`
DelayedStart int `json:"delayed_start" env:"DELAYED_START"`
MaxConnections int `json:"max_connections" env:"MAX_CONNECTIONS"`
TlsInsecureSkipVerify bool `json:"tls_insecure_skip_verify" env:"TLS_INSECURE_SKIP_VERIFY"`
Tasks TasksConfig `json:"tasks" envPrefix:"TASKS_"`
}

func DefaultConfig() *Config {
Expand Down Expand Up @@ -90,5 +103,22 @@ func DefaultConfig() *Config {
},
MaxConnections: 0,
TlsInsecureSkipVerify: true,
Tasks: TasksConfig{
Download: TaskConfig{
Workers: 5,
MaxRetry: 1,
},
Transfer: TaskConfig{
Workers: 5,
MaxRetry: 2,
},
Upload: TaskConfig{
Workers: 5,
},
Copy: TaskConfig{
Workers: 5,
MaxRetry: 2,
},
},
}
}
2 changes: 1 addition & 1 deletion internal/fs/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (t *CopyTask) Run() error {
return copyBetween2Storages(t, t.srcStorage, t.dstStorage, t.srcObjPath, t.dstDirPath)
}

var CopyTaskManager = tache.NewManager[*CopyTask]()
var CopyTaskManager *tache.Manager[*CopyTask]

// Copy if in the same storage, call move method
// if not, add copy task
Expand Down
2 changes: 1 addition & 1 deletion internal/fs/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (t *UploadTask) Run() error {
return op.Put(t.Ctx(), t.storage, t.dstDirActualPath, t.file, t.SetProgress, true)
}

var UploadTaskManager = tache.NewManager[*UploadTask]()
var UploadTaskManager *tache.Manager[*UploadTask]

// putAsTask add as a put task and return immediately
func putAsTask(dstDirPath string, file model.FileStreamer) error {
Expand Down
2 changes: 1 addition & 1 deletion internal/offline_download/tool/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,5 +143,5 @@ func (t *DownloadTask) GetStatus() string {
}

var (
DownloadTaskManager *tache.Manager[*DownloadTask] = tache.NewManager[*DownloadTask]()
DownloadTaskManager *tache.Manager[*DownloadTask]
)
2 changes: 1 addition & 1 deletion internal/offline_download/tool/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,5 @@ func (t *TransferTask) GetStatus() string {
}

var (
TransferTaskManager *tache.Manager[*TransferTask] = tache.NewManager[*TransferTask]()
TransferTaskManager *tache.Manager[*TransferTask]
)
6 changes: 5 additions & 1 deletion server/handles/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@ type TaskInfo struct {
}

func getTaskInfo[T tache.TaskWithInfo](task T) TaskInfo {
errMsg := ""
if task.GetErr() != nil {
errMsg = task.GetErr().Error()
}
return TaskInfo{
ID: task.GetID(),
Name: task.GetName(),
State: task.GetState(),
Status: task.GetStatus(),
Progress: task.GetProgress(),
Error: task.GetErr().Error(),
Error: errMsg,
}
}

Expand Down

0 comments on commit 7583c4d

Please sign in to comment.