Skip to content

Commit 1d96591

Browse files
copy rewrite-2 to master
1 parent b154893 commit 1d96591

File tree

9 files changed

+331
-68
lines changed

9 files changed

+331
-68
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
2+
*.mp4

README.md

+4-3
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ go get github.com/husseinelguindi/mtd
1212
- [x] HTTP/HTTPS download support
1313
- [x] Offset write to file
1414
- [x] Synchronized writing to file (never write 2 things at once)
15-
- [x] Go module interface to download within your program
15+
- [x] Go module/library interface
16+
- [x] Multiple downloads in a single instance
1617
- [ ] Pause/resume support
17-
- [ ] Multiple downloads in a single instance
18-
- [ ] Command line interface (coming soon)
18+
- [ ] Command line interface
1919
- [ ] Args
2020
- [ ] Console input
2121

@@ -24,3 +24,4 @@ go get github.com/husseinelguindi/mtd
2424
A writer Goroutine that only writes one chunk at a time, this was done for a number of a simple reason:
2525
- Seeking to a location in a file, physically moves the write head of a hard drive, which slows the writing to a file.
2626
- This means that writing chunks, that are near each other, would yield better performance, avoiding the hard drive seek delay.
27+
- This also means that using a bigger chunk/buffer size is beneficial to write speeds (less system calls, as well).

go.mod

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
module github.com/husseinelguindi/mtd
2+
3+
go 1.16

http.go

+138
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package mtd
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"io"
7+
"net/http"
8+
"strings"
9+
"sync"
10+
)
11+
12+
var (
13+
ErrNoContentLength = errors.New("content length is unset or zero")
14+
)
15+
16+
type metadata struct {
17+
contentLength int64
18+
contentType string
19+
acceptByteRanges bool
20+
}
21+
22+
// TODO: implement download retries
23+
// TODO: progress bars
24+
// TODO: background program that connects to terminal command (like Docker cli)
25+
26+
func (t Task) httpHEAD() (metadata, error) {
27+
// Create request
28+
req, _ := http.NewRequest(http.MethodHead, t.URL, nil)
29+
// Set task's request headers
30+
for k, v := range t.Headers {
31+
req.Header.Set(k, v)
32+
}
33+
34+
// Perform the HEAD request with the task's client
35+
resp, err := t.Client.Do(req)
36+
if err != nil {
37+
return metadata{}, err
38+
}
39+
defer resp.Body.Close()
40+
41+
// Parse headers and create metadata
42+
return metadata{
43+
contentLength: resp.ContentLength,
44+
contentType: resp.Header.Get("Content-Type"),
45+
acceptByteRanges: strings.EqualFold(resp.Header.Get("Accept-Ranges"), "Bytes"),
46+
}, err
47+
}
48+
49+
func (t Task) Download() error {
50+
// Get metadata from a HEAD request
51+
meta, err := t.httpHEAD()
52+
if err != nil {
53+
return err
54+
}
55+
// Ensure the content has a positive, non-zero length
56+
if meta.contentLength <= 0 {
57+
return ErrNoContentLength
58+
}
59+
60+
// Determine the chunk size taking into consideration the content length, number of chunks,
61+
// and whether or not the file server allows for byte ranges
62+
chunkSize := meta.contentLength
63+
if meta.acceptByteRanges && t.Chunks > 1 {
64+
chunkSize = meta.contentLength / int64(t.Chunks) // Floored
65+
}
66+
67+
// Prepare waitgroup with the expected number of goroutines
68+
wg := &sync.WaitGroup{}
69+
wg.Add(int(t.Chunks))
70+
71+
// Initialize the error channel
72+
errc := make(chan error)
73+
74+
// Distibute the byte ranges between t.Chunks number of goroutines
75+
var start, end int64
76+
for i := uint(1); i < t.Chunks; i++ {
77+
end += chunkSize
78+
go t.httpWorker(byteRange{start, end - 1}, wg, errc)
79+
start = end
80+
}
81+
// Handle remaining bytes (or all bytes if t.Chunks is 1)
82+
end += meta.contentLength - start
83+
go t.httpWorker(byteRange{start, end}, wg, errc)
84+
85+
// Listen for errors
86+
go func() {
87+
for {
88+
err := <-errc
89+
// TODO: handle errors
90+
fmt.Println("error!", err)
91+
}
92+
}()
93+
94+
// Wait for worker goroutines to finish
95+
wg.Wait()
96+
return nil
97+
}
98+
99+
func (t Task) httpWorker(bRange byteRange, wg *sync.WaitGroup, errc chan<- error) {
100+
defer wg.Done()
101+
102+
// Get body
103+
rc, err := t.httpGET(bRange)
104+
if err != nil {
105+
errc <- err
106+
return
107+
}
108+
defer rc.Close()
109+
110+
// Write the body
111+
_, err = t.write(rc, bRange)
112+
if err != nil {
113+
errc <- err
114+
return
115+
}
116+
}
117+
118+
type byteRange struct{ start, end int64 }
119+
120+
func (b byteRange) Header() string { return fmt.Sprintf("bytes=%d-%d", b.start, b.end) }
121+
func (b byteRange) Valid() bool { return b.end > 0 && b.end > b.start }
122+
123+
func (t Task) httpGET(bRange byteRange) (io.ReadCloser, error) {
124+
req, _ := http.NewRequest(http.MethodGet, t.URL, nil)
125+
// Set task's request headers
126+
for k, v := range t.Headers {
127+
req.Header.Set(k, v)
128+
}
129+
130+
// Set the byte range header if a range is passed
131+
if bRange.Valid() {
132+
req.Header.Set("Range", bRange.Header())
133+
}
134+
135+
// Perform the GET request with the task's client
136+
resp, err := t.Client.Do(req)
137+
return resp.Body, err
138+
}

http_test.go

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package mtd
2+
3+
import (
4+
"context"
5+
"os"
6+
"runtime"
7+
"testing"
8+
)
9+
10+
func TestHTTPDownload(t *testing.T) {
11+
f, err := os.OpenFile("./vid.mp4", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, os.FileMode(0666))
12+
if err != nil {
13+
t.FailNow()
14+
}
15+
defer f.Close()
16+
17+
writer := Writer{}
18+
19+
ctx, cancel := context.WithCancel(context.Background())
20+
defer cancel()
21+
22+
task := Task{
23+
URL: "https://file-examples-com.github.io/uploads/2017/04/file_example_MP4_1920_18MG.mp4",
24+
Chunks: uint(runtime.NumCPU()),
25+
BufSize: 7 * 1024 * 1024, // ~7mb
26+
27+
Dst: f,
28+
Writer: &writer,
29+
30+
Ctx: ctx,
31+
}
32+
33+
if err := task.Download(); err != nil {
34+
t.FailNow()
35+
}
36+
}

task.go

+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package mtd
2+
3+
import (
4+
"context"
5+
"io"
6+
"net/http"
7+
)
8+
9+
type Task struct {
10+
URL string
11+
// Threads uint
12+
// ChunkSize int64
13+
Chunks uint
14+
BufSize int64
15+
16+
Client http.Client
17+
Headers map[string]string
18+
19+
Dst io.WriterAt
20+
Writer *Writer
21+
22+
Ctx context.Context
23+
}
24+
25+
func (t Task) write(rc io.ReadCloser, bRange byteRange) (written int, err error) {
26+
// Determine the smallest useable byte buffer size and allocate the space for it
27+
bufSize := minInt64(t.BufSize, bRange.end-bRange.start+1)
28+
buf := make([]byte, bufSize)
29+
30+
// Prepare backet for reusing
31+
packet := packet{
32+
buf: buf,
33+
dst: t.Dst,
34+
}
35+
36+
// Set offset for chunked downloads
37+
if bRange.Valid() {
38+
packet.off = bRange.start
39+
}
40+
41+
for {
42+
// Handle cancel without blocking
43+
select {
44+
case <-t.Ctx.Done():
45+
return
46+
default:
47+
}
48+
49+
// Read up to bufSize bytes
50+
var n int
51+
n, err = io.ReadFull(rc, buf[:])
52+
if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
53+
return
54+
}
55+
56+
// Break if no bytes were read (no bytes left to write)
57+
if n == 0 {
58+
break
59+
}
60+
61+
// Set the packet's buffer as a slice of the newly read bytes
62+
packet.buf = buf[:n]
63+
64+
// Write the packet
65+
n, err = t.Writer.Write(packet)
66+
67+
// Update offset for next write
68+
packet.off += int64(n)
69+
// Update the total number of bytes written
70+
written += n
71+
72+
// Handle any write errors
73+
if err != nil {
74+
return
75+
}
76+
}
77+
78+
return written, nil
79+
}

utils.go

+8-13
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,8 @@
1-
package mtd
2-
3-
import "net/http"
4-
5-
func setReqHeaders(req *http.Request, headers http.Header) {
6-
if headers == nil {
7-
return
8-
}
9-
10-
for k, v := range headers {
11-
req.Header.Set(k, v[0])
12-
}
13-
}
1+
package mtd
2+
3+
func minInt64(x, y int64) int64 {
4+
if x < y {
5+
return x
6+
}
7+
return y
8+
}

writer.go

+12-52
Original file line numberDiff line numberDiff line change
@@ -2,64 +2,24 @@ package mtd
22

33
import (
44
"io"
5+
"sync"
56
)
67

7-
type writeObj struct {
8-
dst io.WriterAt
9-
buf []byte
10-
offset uint64
11-
errCh chan error
12-
}
13-
148
type Writer struct {
15-
writeCh chan writeObj
16-
closeCh chan int
17-
}
18-
19-
// NewWriter initializes a new writer
20-
func NewWriter() Writer {
21-
w := Writer{
22-
writeCh: make(chan writeObj),
23-
closeCh: make(chan int, 1),
24-
}
25-
w.listen()
26-
return w
9+
sync.Mutex
2710
}
2811

29-
func (w Writer) listen() {
30-
go func() {
31-
for {
32-
select {
33-
case writeObj := <-w.writeCh:
34-
writeObj.write()
35-
case code := <-w.closeCh:
36-
if code == 0 {
37-
w.closeCh <- 0
38-
}
39-
40-
break
41-
}
42-
}
43-
}()
12+
type packet struct {
13+
dst io.WriterAt
14+
buf []byte
15+
off int64
4416
}
4517

46-
func (wo writeObj) write() {
47-
// if _, err := wo.dest.Seek(int64(wo.offset), io.SeekStart); err != nil {
48-
// wo.errCh <- err
49-
// return
50-
// }
51-
52-
wrote := 0
53-
for wrote < len(wo.buf) {
54-
// n, err := wo.dest.Write(wo.buf)
55-
n, err := wo.dst.WriteAt(wo.buf, int64(wo.offset+uint64(wrote)))
56-
if err != nil {
57-
wo.errCh <- err
58-
return
59-
}
60-
wrote += n
61-
}
18+
func (w *Writer) Write(p packet) (int, error) {
19+
// Block until free
20+
w.Lock()
21+
defer w.Unlock()
6222

63-
wo.errCh <- nil
64-
// close(wo.errCh)
23+
// Write buf at offset
24+
return p.dst.WriteAt(p.buf[:], p.off)
6525
}

0 commit comments

Comments
 (0)