Skip to content

Commit

Permalink
feature: 增加多线程分片上传和断续上传
Browse files Browse the repository at this point in the history
  • Loading branch information
arrebole committed May 24, 2023
1 parent 33b2b3a commit 96547a6
Show file tree
Hide file tree
Showing 12 changed files with 594 additions and 46 deletions.
55 changes: 55 additions & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package cache

import (
"os"
"path/filepath"
"runtime"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
)

// 存储本地数据库连接
var db *leveldb.DB

func GetDBName() string {
if runtime.GOOS == "windows" {
return filepath.Join(os.Getenv("USERPROFILE"), ".upx.db")
}
return filepath.Join(os.Getenv("HOME"), ".upx.db")
}

func GetClient() (*leveldb.DB, error) {
var err error
if db == nil {
db, err = leveldb.OpenFile(GetDBName(), nil)
}
return db, err
}

func Delete(key string) error {
db, err := GetClient()
if err != nil {
return err
}
return db.Delete([]byte(key), nil)
}

func Range(scoop string, fn func(key []byte, data []byte)) error {
db, err := GetClient()
if err != nil {
return err
}

iter := db.NewIterator(
util.BytesPrefix([]byte(scoop)),
nil,
)

for iter.Next() {
fn(iter.Key(), iter.Value())
}

iter.Release()
return iter.Error()
}
145 changes: 145 additions & 0 deletions cache/upload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package cache

import (
"crypto/md5"
"encoding/json"
"fmt"
"time"
)

// 分片上传任务
type MutUpload struct {
UploadID string

// 文件总计大小
Size int64

// 分块大小
PartSize int64

// 本都文件路径
Path string

// 云端文件路径
UpPath string

// 上传时间
CreateAt time.Time
}

func (p *MutUpload) Key() string {
fingerprint := fmt.Sprintf(
"%s:%s:%d:%d",
p.Path,
p.UpPath,
p.Size,
p.PartSize,
)

return fmt.Sprintf(
"mutupload-%x",
md5.Sum([]byte(fingerprint)),
)
}

// 查询分片上传任务
func FindMutUpload(fn func(key string, entity *MutUpload) bool) ([]*MutUpload, error) {
var result []*MutUpload
err := Range("mutupload-", func(key []byte, value []byte) {
var item = &MutUpload{}
if err := json.Unmarshal(value, item); err != nil {
db.Delete(key, nil)
return
}

// 删除过期的分片上传记录
if time.Since(item.CreateAt).Hours() > 12 {
FindMutUploadPart(func(key string, part *MutUploadPart) bool {
if part.UploadID == item.UploadID {
db.Delete([]byte(key), nil)
}
return false
})
db.Delete(key, nil)
}

if fn(string(key), item) {
result = append(result, item)
}
})
return result, err
}

// 添加分片上传
func AddMutUpload(entity *MutUpload) error {
db, err := GetClient()
if err != nil {
return err
}

data, err := json.Marshal(entity)
if err != nil {
return err
}

return db.Put([]byte(entity.Key()), data, nil)
}

// 分片上传任务下的具体分片信息
type MutUploadPart struct {
UploadID string
PartId int64
Len int64
}

func (p *MutUploadPart) Key() string {
return fmt.Sprintf("part-%s-%d", p.UploadID, p.PartId)
}

// 获取已经上传的分片
func FindMutUploadPart(fn func(key string, entity *MutUploadPart) bool) ([]*MutUploadPart, error) {
var result []*MutUploadPart
err := Range("part-", func(key []byte, value []byte) {
var item = &MutUploadPart{}
if err := json.Unmarshal(value, item); err != nil {
db.Delete(key, nil)
return
}

if fn(string(key), item) {
result = append(result, item)
}
})
return result, err
}

// 记录已经上传的分片
func AddMutUploadPart(entity *MutUploadPart) error {
db, err := GetClient()
if err != nil {
return err
}

data, err := json.Marshal(entity)
if err != nil {
return err
}

return db.Put([]byte(entity.Key()), data, nil)
}

func DeleteByUploadID(uploadID string) error {
FindMutUpload(func(key string, entity *MutUpload) bool {
if entity.UploadID == uploadID {
Delete(key)
}
return false
})
FindMutUploadPart(func(key string, entity *MutUploadPart) bool {
if entity.UploadID == uploadID {
Delete(key)
}
return false
})
return nil
}
66 changes: 66 additions & 0 deletions cache/upload_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package cache

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestMutUpload(t *testing.T) {
mutUpload := &MutUpload{
UploadID: "1",
Size: 100 * 12,
PartSize: 100,
Path: "a.jpg",
UpPath: "b.jpg",
CreateAt: time.Now(),
}
assert.NoError(t, AddMutUpload(mutUpload))
assert.NoError(t, AddMutUpload(&MutUpload{
UploadID: "2",
Size: 100 * 12,
PartSize: 100,
Path: "/c/a.jpg",
UpPath: "b.jpg",
CreateAt: time.Now(),
}))
results, err := FindMutUpload(func(key string, entity *MutUpload) bool {
return key == mutUpload.Key()
})

assert.NoError(t, err)
assert.Equal(t, len(results), 1)
assert.Equal(
t,
results[0].Key(),
mutUpload.Key(),
)
}

func TestMutUploadPart(t *testing.T) {
part1s := []int64{}
for i := 0; i < 100; i++ {
part1s = append(part1s, int64(i))
}

for _, v := range part1s {
err := AddMutUploadPart(&MutUploadPart{
UploadID: "1",
PartId: v,
Len: 100,
})
assert.NoError(t, err)
}

part2s := []int64{}
records, err := FindMutUploadPart(func(key string, entity *MutUploadPart) bool {
return entity.UploadID == "1"
})
assert.NoError(t, err)
for _, v := range records {
part2s = append(part2s, v.PartId)
}

assert.ElementsMatch(t, part1s, part2s)
}
4 changes: 4 additions & 0 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,12 @@ func NewPutCommand() cli.Command {
upPath,
c.Int("w"),
c.Bool("all"),
c.Bool("c"),
)
return nil
},
Flags: []cli.Flag{
cli.BoolFlag{Name: "c", Usage: "continue put, resume broken put"},
cli.IntFlag{Name: "w", Usage: "max concurrent threads", Value: 5},
cli.BoolFlag{Name: "all", Usage: "upload all files including hidden files"},
},
Expand All @@ -373,10 +375,12 @@ func NewUploadCommand() cli.Command {
c.String("remote"),
c.Int("w"),
c.Bool("all"),
c.Bool("c"),
)
return nil
},
Flags: []cli.Flag{
cli.BoolFlag{Name: "c", Usage: "continue put, resume broken put"},
cli.BoolFlag{Name: "all", Usage: "upload all files including hidden files"},
cli.IntFlag{Name: "w", Usage: "max concurrent threads", Value: 5},
cli.StringFlag{Name: "remote", Usage: "remote path", Value: "./"},
Expand Down
13 changes: 3 additions & 10 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"os"
"path"
"path/filepath"
"runtime"
"strings"

"github.com/syndtr/goleveldb/leveldb"
"github.com/upyun/upx/cache"
)

var db *leveldb.DB
Expand All @@ -31,13 +31,6 @@ type dbValue struct {
Items []*fileMeta `json:"items"`
}

func getDBName() string {
if runtime.GOOS == "windows" {
return filepath.Join(os.Getenv("USERPROFILE"), ".upx.db")
}
return filepath.Join(os.Getenv("HOME"), ".upx.db")
}

func makeDBKey(src, dst string) ([]byte, error) {
return json.Marshal(&dbKey{
SrcPath: src,
Expand Down Expand Up @@ -182,9 +175,9 @@ func diffFileMetas(src []*fileMeta, dst []*fileMeta) []*fileMeta {
}

func initDB() (err error) {
db, err = leveldb.OpenFile(getDBName(), nil)
db, err = cache.GetClient()
if err != nil {
Print("db %v %s", err, getDBName())
Print("db %v %s", err, cache.GetDBName())
}
return err
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/rivo/uniseg v0.4.4 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.8.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI=
golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
9 changes: 0 additions & 9 deletions partial/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,6 @@ type Chunk struct {
buffer []byte
}

func NewChunk(index, start, end int64) *Chunk {
chunk := &Chunk{
start: start,
end: end,
index: index,
}
return chunk
}

func (p *Chunk) SetData(bytes []byte) {
p.buffer = bytes
}
Expand Down
13 changes: 10 additions & 3 deletions partial/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"sync"
)

const DefaultChunkSize = 1024 * 1024 * 10
type Downloader interface {
Download() error
}

type ChunkDownFunc func(start, end int64) ([]byte, error)

Expand All @@ -31,7 +33,7 @@ type MultiPartialDownloader struct {
downFunc ChunkDownFunc
}

func NewMultiPartialDownloader(filePath string, finalSize, chunkSize int64, writer io.Writer, works int, fn ChunkDownFunc) *MultiPartialDownloader {
func NewMultiPartialDownloader(filePath string, finalSize, chunkSize int64, writer io.Writer, works int, fn ChunkDownFunc) Downloader {
return &MultiPartialDownloader{
filePath: filePath,
finalSize: finalSize,
Expand Down Expand Up @@ -100,7 +102,12 @@ func (p *MultiPartialDownloader) Download() error {
if end > p.finalSize {
end = p.finalSize
}
chunk := NewChunk(int64(j), start, end)

chunk := &Chunk{
index: int64(j),
start: start,
end: end,
}

// 重试三次
for t := 0; t < 3; t++ {
Expand Down
Loading

0 comments on commit 96547a6

Please sign in to comment.