From 331ae684d41c4e194f8fc1385e55835d11dd08fa Mon Sep 17 00:00:00 2001 From: Vincent Boutour Date: Sat, 16 Jul 2022 15:19:42 +0200 Subject: [PATCH] feat(chunk): Adding chunk upload from the browser, on a flag Signed-off-by: Vincent Boutour --- README.md | 14 +++- cmd/fibr/templates/upload-form.html | 120 ++++++++++++++++++++++++++-- infra/web.yaml | 1 + pkg/crud/chunk.go | 32 +++++--- pkg/crud/crud.go | 10 +++ pkg/crud/list.go | 1 + pkg/crud/post.go | 68 ++++++++++------ pkg/crud/upload.go | 65 ++++++++------- 8 files changed, 239 insertions(+), 72 deletions(-) diff --git a/README.md b/README.md index e230555c..e0d771cd 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,12 @@ Fibr has a special treatment for videos, that can be very large sometimes. With - `vith` is configured with direct access to the filesystem (see [`vith`documentation about configuring `WorkDir`](https://github.com/vibioh/vith#usage) and [`fibr` configuration](#usage) for enabling it). Direct access disable large file transfer in the network. - the video bitrate is above [`thumbnailMinBitrate (default 80000000)`](#usage) +### Chunk upload + +Fibr supports uploading file by chunks or in one single request. This behavior is managed by the [`-chunkUpload`](#usage) option. In both cases, the file are written directly to the disk without buffering in memory. If you have a load-balancer in front of your Fibr instances, chunk upload requires that you enable sticky sessions because file are written locally to the `-temporaryFolder` before being written to the destination folder. On the other hand, when using one single request, you may need to tune the `-readTimeout` option to ensure that a slow connection with a big file can fullfil the request within the allowed timeout window. + +In case of failure, when using one single request, all the upload is started from the beginning. In case of a chunk upload, the upload restarts from the failed chunk. + ### Security Authentication is made with [Basic Auth](https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication), compatible with all browsers and CLI tools such as `curl`. I **strongly recommend configuring HTTPS** in order to avoid exposing your credentials in plain text. @@ -237,7 +243,7 @@ Usage of fibr: -amqpShareMaxRetry uint [amqpShare] Max send retries {FIBR_AMQP_SHARE_MAX_RETRY} (default 3) -amqpShareQueue string - [amqpShare] Queue name {FIBR_AMQP_SHARE_QUEUE} (default "fibr.share-6db91c9c") + [amqpShare] Queue name {FIBR_AMQP_SHARE_QUEUE} (default "fibr.share-") -amqpShareRetryInterval duration [amqpShare] Interval duration when send fails {FIBR_AMQP_SHARE_RETRY_INTERVAL} -amqpShareRoutingKey string @@ -251,7 +257,7 @@ Usage of fibr: -amqpWebhookMaxRetry uint [amqpWebhook] Max send retries {FIBR_AMQP_WEBHOOK_MAX_RETRY} (default 3) -amqpWebhookQueue string - [amqpWebhook] Queue name {FIBR_AMQP_WEBHOOK_QUEUE} (default "fibr.webhook-94e48846") + [amqpWebhook] Queue name {FIBR_AMQP_WEBHOOK_QUEUE} (default "fibr.webhook-") -amqpWebhookRetryInterval duration [amqpWebhook] Interval duration when send fails {FIBR_AMQP_WEBHOOK_RETRY_INTERVAL} -amqpWebhookRoutingKey string @@ -264,6 +270,8 @@ Usage of fibr: [crud] Wanted bcrypt duration for calculating effective cost {FIBR_BCRYPT_DURATION} (default "0.25s") -cert string [server] Certificate file {FIBR_CERT} + -chunkUpload + [crud] Use chunk upload in browser {FIBR_CHUNK_UPLOAD} -csp string [owasp] Content-Security-Policy {FIBR_CSP} (default "default-src 'self'; base-uri 'self'; script-src 'httputils-nonce' unpkg.com/leaflet@1.8.0/dist/ unpkg.com/leaflet.markercluster@1.5.1/; style-src 'httputils-nonce' unpkg.com/leaflet@1.8.0/dist/ unpkg.com/leaflet.markercluster@1.5.1/; img-src 'self' data: a.tile.openstreetmap.org b.tile.openstreetmap.org c.tile.openstreetmap.org") -exifAmqpExchange string @@ -360,6 +368,8 @@ Usage of fibr: [storage] Storage Object Secret Access {FIBR_STORAGE_OBJECT_SECRET_ACCESS} -storagePartSize uint [storage] PartSize configuration {FIBR_STORAGE_PART_SIZE} (default 5242880) + -temporaryFolder string + [crud] Temporary folder for chunk upload {FIBR_TEMPORARY_FOLDER} (default "/tmp") -thumbnailAmqpExchange string [thumbnail] AMQP Exchange Name {FIBR_THUMBNAIL_AMQP_EXCHANGE} (default "fibr") -thumbnailAmqpStreamRoutingKey string diff --git a/cmd/fibr/templates/upload-form.html b/cmd/fibr/templates/upload-form.html index 26eb503d..fca3ed3e 100644 --- a/cmd/fibr/templates/upload-form.html +++ b/cmd/fibr/templates/upload-form.html @@ -207,7 +207,108 @@ statusContainer.classList.add(style); } - let xhr; + let aborter; + + const chunkSize = 1024 * 1024; + let currentUpload = {}; + + /** + * Upload file by chunks. + * @param {String} method Method for uploading + * @param {File} file File to upload + * @param {Boolean} shared Shared option + * @param {Number} duration Duration of share + * @return {Promise} Promise of upload + */ + async function uploadFileByChunks(method, file, shared, duration) { + const messageId = await fileMessageId(file); + + const container = document.getElementById(messageId); + let progress; + if (container) { + progress = container.querySelector('progress'); + } + + if (file.name !== currentUpload.filename) { + currentUpload.filename = file.name; + currentUpload.chunks = []; + + for (let cur = 0; cur < file.size; cur += chunkSize) { + currentUpload.chunks.push({ + content: file.slice(cur, cur + chunkSize), + done: false, + }); + } + } + + const filenameInput = document.getElementById(`${messageId}-filename`); + let fileName; + if (filenameInput && filenameInput.value) { + fileName = filenameInput.value; + } + + if (typeof AbortController !== 'undefined') { + aborter = new AbortController(); + } + + for (var i = 0; i < currentUpload.chunks.length; i++) { + if (currentUpload.chunks[i].done) { + continue; + } + + const formData = new FormData(); + formData.append('method', method); + formData.append('filename', fileName); + formData.append('file', currentUpload.chunks[i].content); + + const response = await fetch('', { + method: 'POST', + credentials: 'same-origin', + signal: aborter.signal, + headers: { + 'X-Chunk-Upload': true, + 'X-Chunk-Number': i + 1, + Accept: 'text/plain', + }, + body: formData, + }); + + if (response.status >= 400) { + const error = await response.text(); + return Promise.reject(error); + } + + currentUpload.chunks[i].done = true; + if (progress) { + progress.value = ((chunkSize * (i + 1)) / file.size) * 100; + } + } + + const formData = new FormData(); + formData.append('method', method); + formData.append('filename', fileName); + formData.append('share', shared); + formData.append('duration', duration); + formData.append('size', file.size); + + const response = await fetch('', { + method: 'POST', + credentials: 'same-origin', + headers: { + 'X-Chunk-Upload': true, + Accept: 'text/plain', + }, + body: formData, + }); + + const error = await response.text(); + if (response.status >= 400) { + return Promise.reject(error); + } else { + currentUpload = {}; + return Promise.resolve(error); + } + } /** * Upload file with updating progress indicator. @@ -217,7 +318,7 @@ * @param {Number} duration Duration of share * @return {Promise} Promise of upload */ - async function uploadFile(method, file, shared, duration) { + async function uploadFileByXHR(method, file, shared, duration) { const messageId = await fileMessageId(file); const container = document.getElementById(messageId); @@ -239,7 +340,8 @@ formData.append('file', file); return new Promise((resolve, reject) => { - xhr = new XMLHttpRequest(); + let xhr = new XMLHttpRequest(); + aborter = xhr; if (progress) { xhr.upload.addEventListener( @@ -278,6 +380,12 @@ }); } + {{- if .ChunkUpload }} + const uploadFile = uploadFileByChunks + {{- else }} + const uploadFile = uploadFileByXHR + {{- end }} + /** * Slice FileList from given index. * @param {String} name Name of the input file element @@ -373,9 +481,9 @@ function abort(e) { e.preventDefault(); - if (xhr) { - xhr.abort(); - xhr = undefined; + if (aborter) { + aborter.abort(); + aborter = undefined; } else { window.location.hash = ''; } diff --git a/infra/web.yaml b/infra/web.yaml index ab28ef4d..38b34529 100644 --- a/infra/web.yaml +++ b/infra/web.yaml @@ -34,6 +34,7 @@ spec: - name: data mountPath: /data config: + FIBR_CHUNK_UPLOAD: 'true' FIBR_EXIF_DIRECT_ACCESS: 'true' FIBR_EXIF_URL: http://exas FIBR_IGNORE_PATTERN: '.st(folder|ignore)' diff --git a/pkg/crud/chunk.go b/pkg/crud/chunk.go index 3e4f1a93..39a339fb 100644 --- a/pkg/crud/chunk.go +++ b/pkg/crud/chunk.go @@ -2,10 +2,10 @@ package crud import ( "context" + "errors" "fmt" "io" "io/fs" - "mime/multipart" "net/http" "os" "path/filepath" @@ -17,11 +17,16 @@ import ( ) // UploadChunk save chunk file to a temp file -func (a App) UploadChunk(w http.ResponseWriter, r *http.Request, request provider.Request, values map[string]string, file *multipart.Part) { +func (a App) UploadChunk(w http.ResponseWriter, r *http.Request, request provider.Request, fileName, chunkNumber string, file io.Reader) { + if file == nil { + a.error(w, r, request, model.WrapInvalid(errors.New("no file provided for save"))) + return + } + var err error - tempDestination := filepath.Join(temporaryFolder, sha.New(values["filename"])) - tempFile := filepath.Join(tempDestination, values["chunkNumber"]) + tempDestination := filepath.Join(a.temporaryFolder, sha.New(fileName)) + tempFile := filepath.Join(tempDestination, chunkNumber) if err = os.MkdirAll(tempDestination, 0o700); err != nil { a.error(w, r, request, model.WrapInternal(err)) @@ -60,8 +65,9 @@ func (a App) UploadChunk(w http.ResponseWriter, r *http.Request, request provide func (a App) MergeChunk(w http.ResponseWriter, r *http.Request, request provider.Request, values map[string]string) { var err error - tempFolder := filepath.Join(temporaryFolder, sha.New(values["filename"])) - tempFile := filepath.Join(tempFolder, values["filename"]) + fileName := values["filename"] + tempFolder := filepath.Join(a.temporaryFolder, sha.New(fileName)) + tempFile := filepath.Join(tempFolder, fileName) if err := a.mergeChunkFiles(tempFolder, tempFile); err != nil { a.error(w, r, request, model.WrapInternal(err)) @@ -81,7 +87,7 @@ func (a App) MergeChunk(w http.ResponseWriter, r *http.Request, request provider return } - filePath := request.SubPath(values["filename"]) + filePath := request.SubPath(fileName) err = provider.WriteToStorage(r.Context(), a.storageApp, filePath, size, file) if err == nil { @@ -94,7 +100,11 @@ func (a App) MergeChunk(w http.ResponseWriter, r *http.Request, request provider }() } - w.WriteHeader(http.StatusCreated) + if err = os.RemoveAll(tempFolder); err != nil { + logger.Error("unable to delete chunk folder `%s`: %s", tempFolder, err) + } + + a.postUpload(r.Context(), w, r, request, fileName, values) } func (a App) mergeChunkFiles(directory, destination string) error { @@ -117,11 +127,15 @@ func (a App) mergeChunkFiles(directory, destination string) error { } }() - if err = filepath.WalkDir(directory, func(path string, _ fs.DirEntry, err error) error { + if err = filepath.WalkDir(directory, func(path string, info fs.DirEntry, err error) error { if err != nil { return err } + if info.IsDir() || path == destination { + return nil + } + reader, err := os.Open(path) if err != nil { return fmt.Errorf("unable to open chunk `%s`: %s", path, err) diff --git a/pkg/crud/crud.go b/pkg/crud/crud.go index 28b91ef6..57b9db28 100644 --- a/pkg/crud/crud.go +++ b/pkg/crud/crud.go @@ -53,8 +53,10 @@ type App struct { rendererApp renderer.App thumbnailApp thumbnail.App + temporaryFolder string bcryptCost int sanitizeOnStart bool + chunkUpload bool } // Config of package @@ -62,7 +64,9 @@ type Config struct { ignore *string amqpExclusiveRoutingKey *string bcryptDuration *string + temporaryFolder *string sanitizeOnStart *bool + chunkUpload *bool } // Flags adds flags for configuring package @@ -72,6 +76,9 @@ func Flags(fs *flag.FlagSet, prefix string) Config { sanitizeOnStart: flags.Bool(fs, prefix, "crud", "SanitizeOnStart", "Sanitize name on start", false, nil), bcryptDuration: flags.String(fs, prefix, "crud", "BcryptDuration", "Wanted bcrypt duration for calculating effective cost", "0.25s", nil), + chunkUpload: flags.Bool(fs, prefix, "crud", "ChunkUpload", "Use chunk upload in browser", false, nil), + temporaryFolder: flags.String(fs, prefix, "crud", "TemporaryFolder", "Temporary folder for chunk upload", "/tmp", nil), + amqpExclusiveRoutingKey: flags.String(fs, prefix, "crud", "AmqpExclusiveRoutingKey", "AMQP Routing Key for exclusive lock on default exchange", "fibr.semaphore.start", nil), } } @@ -81,6 +88,9 @@ func New(config Config, storage absto.Storage, rendererApp renderer.App, shareAp app := App{ sanitizeOnStart: *config.sanitizeOnStart, + chunkUpload: *config.chunkUpload, + temporaryFolder: strings.TrimSpace(*config.temporaryFolder), + tracer: tracerApp.GetTracer("crud"), pushEvent: eventProducer, diff --git a/pkg/crud/list.go b/pkg/crud/list.go index b6cc3e4d..c48056b5 100644 --- a/pkg/crud/list.go +++ b/pkg/crud/list.go @@ -93,6 +93,7 @@ func (a App) List(ctx context.Context, request provider.Request, message rendere "HasMap": hasMap, "HasThumbnail": hasThumbnail, "HasStory": hasStory, + "ChunkUpload": a.chunkUpload, } if request.CanShare { diff --git a/pkg/crud/post.go b/pkg/crud/post.go index 4fecdf96..6db73022 100644 --- a/pkg/crud/post.go +++ b/pkg/crud/post.go @@ -53,39 +53,59 @@ func (a App) Post(w http.ResponseWriter, r *http.Request, request provider.Reque contentType := r.Header.Get("Content-Type") if contentType == "application/x-www-form-urlencoded" { - method := r.FormValue("method") - - switch r.FormValue("type") { - case "share": - a.handlePostShare(w, r, request, method) - case "webhook": - a.handlePostWebhook(w, r, request, method) - case "description": - a.handlePostDescription(w, r, request, method) - default: - a.handlePost(w, r, request, method) - } - + a.handleFormURLEncoded(w, r, request) return } if strings.HasPrefix(contentType, "multipart/form-data") { - values, file, err := parseMultipart(r) - if err != nil { - a.error(w, r, request, model.WrapInternal(fmt.Errorf("unable to parse multipart request: %s", err))) - return - } + a.handleMultipart(w, r, request) + return + } - if values["method"] != http.MethodPost { - a.error(w, r, request, model.WrapMethodNotAllowed(fmt.Errorf("unknown method `%s` for multipart", values["method"]))) - return - } + a.error(w, r, request, model.WrapMethodNotAllowed(fmt.Errorf("unknown content-type %s", contentType))) +} - a.Upload(w, r, request, values, file) +func (a App) handleFormURLEncoded(w http.ResponseWriter, r *http.Request, request provider.Request) { + method := r.FormValue("method") + + switch r.FormValue("type") { + case "share": + a.handlePostShare(w, r, request, method) + case "webhook": + a.handlePostWebhook(w, r, request, method) + case "description": + a.handlePostDescription(w, r, request, method) + default: + a.handlePost(w, r, request, method) + } +} + +func (a App) handleMultipart(w http.ResponseWriter, r *http.Request, request provider.Request) { + if !request.CanEdit { + a.error(w, r, request, model.WrapForbidden(ErrNotAuthorized)) return } - a.error(w, r, request, model.WrapMethodNotAllowed(fmt.Errorf("unknown content-type %s", contentType))) + values, file, err := parseMultipart(r) + if err != nil { + a.error(w, r, request, model.WrapInternal(fmt.Errorf("unable to parse multipart request: %s", err))) + return + } + + if values["method"] != http.MethodPost { + a.error(w, r, request, model.WrapMethodNotAllowed(fmt.Errorf("unknown method `%s` for multipart", values["method"]))) + return + } + + if len(r.Header.Get("X-Chunk-Upload")) != 0 { + if chunkNumber := r.Header.Get("X-Chunk-Number"); len(chunkNumber) != 0 { + a.UploadChunk(w, r, request, values["filename"], chunkNumber, file) + } else { + a.MergeChunk(w, r, request, values) + } + } else { + a.Upload(w, r, request, values, file) + } } func (a App) handlePostShare(w http.ResponseWriter, r *http.Request, request provider.Request, method string) { diff --git a/pkg/crud/upload.go b/pkg/crud/upload.go index 29e65dff..a17fa9dd 100644 --- a/pkg/crud/upload.go +++ b/pkg/crud/upload.go @@ -15,8 +15,6 @@ import ( "github.com/ViBiOh/httputils/v4/pkg/renderer" ) -const temporaryFolder = "/tmp" - func (a App) saveUploadedFile(ctx context.Context, request provider.Request, inputName, rawSize string, file *multipart.Part) (fileName string, err error) { var filePath string @@ -80,28 +78,11 @@ func getUploadSize(rawSize string) (int64, error) { // Upload saves form files to filesystem func (a App) Upload(w http.ResponseWriter, r *http.Request, request provider.Request, values map[string]string, file *multipart.Part) { - if !request.CanEdit { - a.error(w, r, request, model.WrapForbidden(ErrNotAuthorized)) - return - } - if file == nil { a.error(w, r, request, model.WrapInvalid(errors.New("no file provided for save"))) return } - shared, err := getFormBool(values["share"]) - if err != nil { - a.error(w, r, request, model.WrapInvalid(err)) - return - } - - duration, err := getFormDuration(values["duration"]) - if err != nil { - a.error(w, r, request, model.WrapInvalid(err)) - return - } - ctx := r.Context() filename, err := a.saveUploadedFile(ctx, request, values["filename"], values["size"], file) @@ -110,31 +91,53 @@ func (a App) Upload(w http.ResponseWriter, r *http.Request, request provider.Req return } - var shareID string - if shared { - id, err := a.shareApp.Create(ctx, path.Join(request.Path, filename), false, false, "", false, duration) - if err != nil { - a.error(w, r, request, model.WrapInternal(err)) - return - } + a.postUpload(ctx, w, r, request, filename, values) +} - shareID = id +func (a App) postUpload(ctx context.Context, w http.ResponseWriter, r *http.Request, request provider.Request, fileName string, values map[string]string) { + shareID, err := a.handleUploadShare(ctx, request, fileName, values) + if err != nil { + a.error(w, r, request, err) + return } if r.Header.Get("Accept") == "text/plain" { w.WriteHeader(http.StatusCreated) - provider.SafeWrite(w, filename) - if shared { + provider.SafeWrite(w, fileName) + if len(shareID) > 0 { provider.SafeWrite(w, fmt.Sprintf("\n%s", shareID)) } return } - message := fmt.Sprintf("File %s successfully uploaded", filename) - if shared { + message := fmt.Sprintf("File %s successfully uploaded", fileName) + if len(shareID) > 0 { message = fmt.Sprintf("%s. Share ID is %s", message, shareID) } a.rendererApp.Redirect(w, r, fmt.Sprintf("?d=%s", request.Display), renderer.NewSuccessMessage(message)) } + +func (a App) handleUploadShare(ctx context.Context, request provider.Request, fileName string, values map[string]string) (string, error) { + shared, err := getFormBool(values["share"]) + if err != nil { + return "", model.WrapInvalid(err) + } + + if !shared { + return "", nil + } + + duration, err := getFormDuration(values["duration"]) + if err != nil { + return "", model.WrapInvalid(err) + } + + id, err := a.shareApp.Create(ctx, path.Join(request.Path, fileName), false, false, "", false, duration) + if err != nil { + return id, model.WrapInternal(err) + } + + return id, nil +}