Skip to content

Commit

Permalink
feat(chunk): Adding chunk upload from the browser, on a flag
Browse files Browse the repository at this point in the history
Signed-off-by: Vincent Boutour <[email protected]>
  • Loading branch information
ViBiOh committed Jul 16, 2022
1 parent 4fcb131 commit 331ae68
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 72 deletions.
14 changes: 12 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ Fibr has a special treatment for videos, that can be very large sometimes. With
- `vith` is configured with direct access to the filesystem (see [`vith`documentation about configuring `WorkDir`](https://github.com/vibioh/vith#usage) and [`fibr` configuration](#usage) for enabling it). Direct access disable large file transfer in the network.
- the video bitrate is above [`thumbnailMinBitrate (default 80000000)`](#usage)

### Chunk upload

Fibr supports uploading file by chunks or in one single request. This behavior is managed by the [`-chunkUpload`](#usage) option. In both cases, the file are written directly to the disk without buffering in memory. If you have a load-balancer in front of your Fibr instances, chunk upload requires that you enable sticky sessions because file are written locally to the `-temporaryFolder` before being written to the destination folder. On the other hand, when using one single request, you may need to tune the `-readTimeout` option to ensure that a slow connection with a big file can fullfil the request within the allowed timeout window.

In case of failure, when using one single request, all the upload is started from the beginning. In case of a chunk upload, the upload restarts from the failed chunk.

### Security

Authentication is made with [Basic Auth](https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication), compatible with all browsers and CLI tools such as `curl`. I **strongly recommend configuring HTTPS** in order to avoid exposing your credentials in plain text.
Expand Down Expand Up @@ -237,7 +243,7 @@ Usage of fibr:
-amqpShareMaxRetry uint
[amqpShare] Max send retries {FIBR_AMQP_SHARE_MAX_RETRY} (default 3)
-amqpShareQueue string
[amqpShare] Queue name {FIBR_AMQP_SHARE_QUEUE} (default "fibr.share-6db91c9c")
[amqpShare] Queue name {FIBR_AMQP_SHARE_QUEUE} (default "fibr.share-<random>")
-amqpShareRetryInterval duration
[amqpShare] Interval duration when send fails {FIBR_AMQP_SHARE_RETRY_INTERVAL}
-amqpShareRoutingKey string
Expand All @@ -251,7 +257,7 @@ Usage of fibr:
-amqpWebhookMaxRetry uint
[amqpWebhook] Max send retries {FIBR_AMQP_WEBHOOK_MAX_RETRY} (default 3)
-amqpWebhookQueue string
[amqpWebhook] Queue name {FIBR_AMQP_WEBHOOK_QUEUE} (default "fibr.webhook-94e48846")
[amqpWebhook] Queue name {FIBR_AMQP_WEBHOOK_QUEUE} (default "fibr.webhook-<random>")
-amqpWebhookRetryInterval duration
[amqpWebhook] Interval duration when send fails {FIBR_AMQP_WEBHOOK_RETRY_INTERVAL}
-amqpWebhookRoutingKey string
Expand All @@ -264,6 +270,8 @@ Usage of fibr:
[crud] Wanted bcrypt duration for calculating effective cost {FIBR_BCRYPT_DURATION} (default "0.25s")
-cert string
[server] Certificate file {FIBR_CERT}
-chunkUpload
[crud] Use chunk upload in browser {FIBR_CHUNK_UPLOAD}
-csp string
[owasp] Content-Security-Policy {FIBR_CSP} (default "default-src 'self'; base-uri 'self'; script-src 'httputils-nonce' unpkg.com/[email protected]/dist/ unpkg.com/[email protected]/; style-src 'httputils-nonce' unpkg.com/[email protected]/dist/ unpkg.com/[email protected]/; img-src 'self' data: a.tile.openstreetmap.org b.tile.openstreetmap.org c.tile.openstreetmap.org")
-exifAmqpExchange string
Expand Down Expand Up @@ -360,6 +368,8 @@ Usage of fibr:
[storage] Storage Object Secret Access {FIBR_STORAGE_OBJECT_SECRET_ACCESS}
-storagePartSize uint
[storage] PartSize configuration {FIBR_STORAGE_PART_SIZE} (default 5242880)
-temporaryFolder string
[crud] Temporary folder for chunk upload {FIBR_TEMPORARY_FOLDER} (default "/tmp")
-thumbnailAmqpExchange string
[thumbnail] AMQP Exchange Name {FIBR_THUMBNAIL_AMQP_EXCHANGE} (default "fibr")
-thumbnailAmqpStreamRoutingKey string
Expand Down
120 changes: 114 additions & 6 deletions cmd/fibr/templates/upload-form.html
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,108 @@
statusContainer.classList.add(style);
}

let xhr;
let aborter;

const chunkSize = 1024 * 1024;
let currentUpload = {};

/**
* Upload file by chunks.
* @param {String} method Method for uploading
* @param {File} file File to upload
* @param {Boolean} shared Shared option
* @param {Number} duration Duration of share
* @return {Promise} Promise of upload
*/
async function uploadFileByChunks(method, file, shared, duration) {
const messageId = await fileMessageId(file);

const container = document.getElementById(messageId);
let progress;
if (container) {
progress = container.querySelector('progress');
}

if (file.name !== currentUpload.filename) {
currentUpload.filename = file.name;
currentUpload.chunks = [];

for (let cur = 0; cur < file.size; cur += chunkSize) {
currentUpload.chunks.push({
content: file.slice(cur, cur + chunkSize),
done: false,
});
}
}

const filenameInput = document.getElementById(`${messageId}-filename`);
let fileName;
if (filenameInput && filenameInput.value) {
fileName = filenameInput.value;
}

if (typeof AbortController !== 'undefined') {
aborter = new AbortController();
}

for (var i = 0; i < currentUpload.chunks.length; i++) {
if (currentUpload.chunks[i].done) {
continue;
}

const formData = new FormData();
formData.append('method', method);
formData.append('filename', fileName);
formData.append('file', currentUpload.chunks[i].content);

const response = await fetch('', {
method: 'POST',
credentials: 'same-origin',
signal: aborter.signal,
headers: {
'X-Chunk-Upload': true,
'X-Chunk-Number': i + 1,
Accept: 'text/plain',
},
body: formData,
});

if (response.status >= 400) {
const error = await response.text();
return Promise.reject(error);
}

currentUpload.chunks[i].done = true;
if (progress) {
progress.value = ((chunkSize * (i + 1)) / file.size) * 100;
}
}

const formData = new FormData();
formData.append('method', method);
formData.append('filename', fileName);
formData.append('share', shared);
formData.append('duration', duration);
formData.append('size', file.size);

const response = await fetch('', {
method: 'POST',
credentials: 'same-origin',
headers: {
'X-Chunk-Upload': true,
Accept: 'text/plain',
},
body: formData,
});

const error = await response.text();
if (response.status >= 400) {
return Promise.reject(error);
} else {
currentUpload = {};
return Promise.resolve(error);
}
}

/**
* Upload file with updating progress indicator.
Expand All @@ -217,7 +318,7 @@
* @param {Number} duration Duration of share
* @return {Promise} Promise of upload
*/
async function uploadFile(method, file, shared, duration) {
async function uploadFileByXHR(method, file, shared, duration) {
const messageId = await fileMessageId(file);

const container = document.getElementById(messageId);
Expand All @@ -239,7 +340,8 @@
formData.append('file', file);

return new Promise((resolve, reject) => {
xhr = new XMLHttpRequest();
let xhr = new XMLHttpRequest();
aborter = xhr;

if (progress) {
xhr.upload.addEventListener(
Expand Down Expand Up @@ -278,6 +380,12 @@
});
}

{{- if .ChunkUpload }}
const uploadFile = uploadFileByChunks
{{- else }}
const uploadFile = uploadFileByXHR
{{- end }}

/**
* Slice FileList from given index.
* @param {String} name Name of the input file element
Expand Down Expand Up @@ -373,9 +481,9 @@
function abort(e) {
e.preventDefault();

if (xhr) {
xhr.abort();
xhr = undefined;
if (aborter) {
aborter.abort();
aborter = undefined;
} else {
window.location.hash = '';
}
Expand Down
1 change: 1 addition & 0 deletions infra/web.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ spec:
- name: data
mountPath: /data
config:
FIBR_CHUNK_UPLOAD: 'true'
FIBR_EXIF_DIRECT_ACCESS: 'true'
FIBR_EXIF_URL: http://exas
FIBR_IGNORE_PATTERN: '.st(folder|ignore)'
Expand Down
32 changes: 23 additions & 9 deletions pkg/crud/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package crud

import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"mime/multipart"
"net/http"
"os"
"path/filepath"
Expand All @@ -17,11 +17,16 @@ import (
)

// UploadChunk save chunk file to a temp file
func (a App) UploadChunk(w http.ResponseWriter, r *http.Request, request provider.Request, values map[string]string, file *multipart.Part) {
func (a App) UploadChunk(w http.ResponseWriter, r *http.Request, request provider.Request, fileName, chunkNumber string, file io.Reader) {
if file == nil {
a.error(w, r, request, model.WrapInvalid(errors.New("no file provided for save")))
return
}

var err error

tempDestination := filepath.Join(temporaryFolder, sha.New(values["filename"]))
tempFile := filepath.Join(tempDestination, values["chunkNumber"])
tempDestination := filepath.Join(a.temporaryFolder, sha.New(fileName))
tempFile := filepath.Join(tempDestination, chunkNumber)

if err = os.MkdirAll(tempDestination, 0o700); err != nil {
a.error(w, r, request, model.WrapInternal(err))
Expand Down Expand Up @@ -60,8 +65,9 @@ func (a App) UploadChunk(w http.ResponseWriter, r *http.Request, request provide
func (a App) MergeChunk(w http.ResponseWriter, r *http.Request, request provider.Request, values map[string]string) {
var err error

tempFolder := filepath.Join(temporaryFolder, sha.New(values["filename"]))
tempFile := filepath.Join(tempFolder, values["filename"])
fileName := values["filename"]
tempFolder := filepath.Join(a.temporaryFolder, sha.New(fileName))
tempFile := filepath.Join(tempFolder, fileName)

if err := a.mergeChunkFiles(tempFolder, tempFile); err != nil {
a.error(w, r, request, model.WrapInternal(err))
Expand All @@ -81,7 +87,7 @@ func (a App) MergeChunk(w http.ResponseWriter, r *http.Request, request provider
return
}

filePath := request.SubPath(values["filename"])
filePath := request.SubPath(fileName)
err = provider.WriteToStorage(r.Context(), a.storageApp, filePath, size, file)

if err == nil {
Expand All @@ -94,7 +100,11 @@ func (a App) MergeChunk(w http.ResponseWriter, r *http.Request, request provider
}()
}

w.WriteHeader(http.StatusCreated)
if err = os.RemoveAll(tempFolder); err != nil {
logger.Error("unable to delete chunk folder `%s`: %s", tempFolder, err)
}

a.postUpload(r.Context(), w, r, request, fileName, values)
}

func (a App) mergeChunkFiles(directory, destination string) error {
Expand All @@ -117,11 +127,15 @@ func (a App) mergeChunkFiles(directory, destination string) error {
}
}()

if err = filepath.WalkDir(directory, func(path string, _ fs.DirEntry, err error) error {
if err = filepath.WalkDir(directory, func(path string, info fs.DirEntry, err error) error {
if err != nil {
return err
}

if info.IsDir() || path == destination {
return nil
}

reader, err := os.Open(path)
if err != nil {
return fmt.Errorf("unable to open chunk `%s`: %s", path, err)
Expand Down
10 changes: 10 additions & 0 deletions pkg/crud/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,20 @@ type App struct {
rendererApp renderer.App
thumbnailApp thumbnail.App

temporaryFolder string
bcryptCost int
sanitizeOnStart bool
chunkUpload bool
}

// Config of package
type Config struct {
ignore *string
amqpExclusiveRoutingKey *string
bcryptDuration *string
temporaryFolder *string
sanitizeOnStart *bool
chunkUpload *bool
}

// Flags adds flags for configuring package
Expand All @@ -72,6 +76,9 @@ func Flags(fs *flag.FlagSet, prefix string) Config {
sanitizeOnStart: flags.Bool(fs, prefix, "crud", "SanitizeOnStart", "Sanitize name on start", false, nil),
bcryptDuration: flags.String(fs, prefix, "crud", "BcryptDuration", "Wanted bcrypt duration for calculating effective cost", "0.25s", nil),

chunkUpload: flags.Bool(fs, prefix, "crud", "ChunkUpload", "Use chunk upload in browser", false, nil),
temporaryFolder: flags.String(fs, prefix, "crud", "TemporaryFolder", "Temporary folder for chunk upload", "/tmp", nil),

amqpExclusiveRoutingKey: flags.String(fs, prefix, "crud", "AmqpExclusiveRoutingKey", "AMQP Routing Key for exclusive lock on default exchange", "fibr.semaphore.start", nil),
}
}
Expand All @@ -81,6 +88,9 @@ func New(config Config, storage absto.Storage, rendererApp renderer.App, shareAp
app := App{
sanitizeOnStart: *config.sanitizeOnStart,

chunkUpload: *config.chunkUpload,
temporaryFolder: strings.TrimSpace(*config.temporaryFolder),

tracer: tracerApp.GetTracer("crud"),
pushEvent: eventProducer,

Expand Down
1 change: 1 addition & 0 deletions pkg/crud/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (a App) List(ctx context.Context, request provider.Request, message rendere
"HasMap": hasMap,
"HasThumbnail": hasThumbnail,
"HasStory": hasStory,
"ChunkUpload": a.chunkUpload,
}

if request.CanShare {
Expand Down
Loading

0 comments on commit 331ae68

Please sign in to comment.