From f6242d46b1fd25e5d5bd02d77a6ed2f4e260e1a6 Mon Sep 17 00:00:00 2001 From: Noah Hsu Date: Tue, 21 Jun 2022 17:37:02 +0800 Subject: [PATCH] feat: add uri to aria2 --- internal/aria2/add.go | 106 +++++++++++++++++++++++++++++- internal/aria2/aria2.go | 2 +- internal/aria2/notify.go | 3 +- internal/aria2/offlinedownload.go | 7 ++ internal/aria2/task.go | 12 ---- internal/fs/put.go | 4 +- pkg/task/task.go | 3 + 7 files changed, 119 insertions(+), 18 deletions(-) create mode 100644 internal/aria2/offlinedownload.go delete mode 100644 internal/aria2/task.go diff --git a/internal/aria2/add.go b/internal/aria2/add.go index 716ee576e14..5e73d7feb81 100644 --- a/internal/aria2/add.go +++ b/internal/aria2/add.go @@ -2,13 +2,21 @@ package aria2 import ( "context" + "fmt" "github.com/alist-org/alist/v3/conf" "github.com/alist-org/alist/v3/internal/driver" "github.com/alist-org/alist/v3/internal/fs" + "github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/internal/operations" + "github.com/alist-org/alist/v3/pkg/task" "github.com/google/uuid" "github.com/pkg/errors" + "mime" + "os" + "path" "path/filepath" + "strconv" + "time" ) func AddURI(ctx context.Context, uri string, dstPath string, parentPath string) error { @@ -34,13 +42,109 @@ func AddURI(ctx context.Context, uri string, dstPath string, parentPath string) } } // call aria2 rpc + tempDir := filepath.Join(conf.Conf.TempDir, "aria2", uuid.NewString()) options := map[string]interface{}{ - "dir": filepath.Join(conf.Conf.TempDir, "aria2", uuid.NewString()), + "dir": tempDir, } gid, err := client.AddURI([]string{uri}, options) if err != nil { return errors.Wrapf(err, "failed to add uri %s", uri) } // TODO add to task manager + Aria2TaskManager.Submit(task.WithCancelCtx(&task.Task[string, OfflineDownload]{ + ID: gid, + Name: fmt.Sprintf("download %s to [%s](%s)", uri, account.GetAccount().VirtualPath, actualParentPath), + Func: func(tsk *task.Task[string, OfflineDownload]) error { + defer func() { + notify.Signals.Delete(gid) + // clear temp dir + _ = os.RemoveAll(tempDir) + }() + c := make(chan int) + notify.Signals.Store(gid, c) + retried := 0 + for { + select { + case <-tsk.Ctx.Done(): + _, err := client.Remove(gid) + if err != nil { + return err + } + case status := <-c: + switch status { + case Completed: + return nil + default: + info, err := client.TellStatus(gid) + if err != nil { + retried++ + } + if retried > 5 { + return errors.Errorf("failed to get status of %s, retried %d times", gid, retried) + } + retried = 0 + if len(info.FollowedBy) != 0 { + gid = info.FollowedBy[0] + + } + // update download status + total, err := strconv.ParseUint(info.TotalLength, 10, 64) + if err != nil { + total = 0 + } + downloaded, err := strconv.ParseUint(info.CompletedLength, 10, 64) + if err != nil { + downloaded = 0 + } + tsk.SetProgress(int(float64(downloaded) / float64(total))) + switch info.Status { + case "complete": + // get files + files, err := client.GetFiles(gid) + if err != nil { + return errors.Wrapf(err, "failed to get files of %s", gid) + } + // upload files + for _, file := range files { + size, _ := strconv.ParseUint(file.Length, 10, 64) + f, err := os.Open(file.Path) + mimetype := mime.TypeByExtension(path.Ext(file.Path)) + if mimetype == "" { + mimetype = "application/octet-stream" + } + if err != nil { + return errors.Wrapf(err, "failed to open file %s", file.Path) + } + stream := model.FileStream{ + Obj: model.Object{ + Name: path.Base(file.Path), + Size: size, + Modified: time.Now(), + IsFolder: false, + }, + ReadCloser: f, + Mimetype: "", + } + return operations.Put(tsk.Ctx, account, actualParentPath, stream, tsk.SetProgress) + } + case "error": + return errors.Errorf("failed to download %s, error: %s", gid, info.ErrorMessage) + case "active", "waiting", "paused": + // do nothing + case "removed": + return errors.Errorf("failed to download %s, removed", gid) + default: + return errors.Errorf("failed to download %s, unknown status %s", gid, info.Status) + } + } + } + } + }, + Data: OfflineDownload{ + Gid: gid, + URI: uri, + DstPath: dstPath, + }, + })) return nil } diff --git a/internal/aria2/aria2.go b/internal/aria2/aria2.go index a07de404357..9bbb03aba09 100644 --- a/internal/aria2/aria2.go +++ b/internal/aria2/aria2.go @@ -8,7 +8,7 @@ import ( "time" ) -var Aria2TaskManager = task.NewTaskManager() +var Aria2TaskManager = task.NewTaskManager[string, OfflineDownload](3) var notify = NewNotify() var client rpc.Client diff --git a/internal/aria2/notify.go b/internal/aria2/notify.go index a4241894e38..056fe5147b4 100644 --- a/internal/aria2/notify.go +++ b/internal/aria2/notify.go @@ -6,8 +6,7 @@ import ( ) const ( - Ready = iota - Downloading + Downloading = iota Paused Stopped Completed diff --git a/internal/aria2/offlinedownload.go b/internal/aria2/offlinedownload.go new file mode 100644 index 00000000000..2cbd0153c8b --- /dev/null +++ b/internal/aria2/offlinedownload.go @@ -0,0 +1,7 @@ +package aria2 + +type OfflineDownload struct { + Gid string + DstPath string + URI string +} diff --git a/internal/aria2/task.go b/internal/aria2/task.go deleted file mode 100644 index aed28fc9879..00000000000 --- a/internal/aria2/task.go +++ /dev/null @@ -1,12 +0,0 @@ -package aria2 - -import ( - "github.com/alist-org/alist/v3/internal/driver" - "github.com/alist-org/alist/v3/pkg/task" -) - -type Task struct { - Account driver.Driver - ParentDir string - T task.Task -} diff --git a/internal/fs/put.go b/internal/fs/put.go index 9f26822ccd3..7c9589df445 100644 --- a/internal/fs/put.go +++ b/internal/fs/put.go @@ -16,8 +16,8 @@ var UploadTaskManager = task.NewTaskManager[uint64, struct{}](3, func(tid *uint6 }) // Put add as a put task -func Put(ctx context.Context, account driver.Driver, dstDirPath string, file model.FileStreamer) error { - account, actualParentPath, err := operations.GetAccountAndActualPath(dstDirPath) +func Put(ctx context.Context, account driver.Driver, parentPath string, file model.FileStreamer) error { + account, actualParentPath, err := operations.GetAccountAndActualPath(parentPath) if account.Config().NoUpload { return errors.WithStack(ErrUploadNotSupported) } diff --git a/pkg/task/task.go b/pkg/task/task.go index 5c69b381b00..1a532c1970e 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -53,6 +53,9 @@ func (t *Task[K, V]) run() { } }() t.Error = t.Func(t) + if t.Error != nil { + log.Errorf("error [%+v] while run task [%s]", t.Error, t.Name) + } if errors.Is(t.Ctx.Err(), context.Canceled) { t.Status = CANCELED } else if t.Error != nil {