Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(uploader): Make sure every request is added to the job queue #1326

Merged
merged 1 commit into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions cypress/components/UploadPicker/UploadPicker.cy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// eslint-disable-next-line import/no-unresolved,n/no-missing-import
import { Folder, Permission, addNewFileMenuEntry, type Entry } from '@nextcloud/files'
import { generateRemoteUrl } from '@nextcloud/router'
import { UploadPicker, getUploader } from '../../../lib/index.ts'
import { UploadPicker, UploadStatus, getUploader } from '../../../lib/index.ts'

let state: string | undefined
before(() => {
Expand Down Expand Up @@ -347,7 +347,7 @@ describe('Destination management', () => {

cy.wait('@upload').then((upload) => {
expect(upload.request.url).to.have.string(
'/remote.php/dav/files/user/image.jpg'
'/remote.php/dav/files/user/image.jpg',
)
})

Expand All @@ -374,7 +374,7 @@ describe('Destination management', () => {

cy.wait('@upload').then((upload) => {
expect(upload.request.url).to.have.string(
'/remote.php/dav/files/user/Photos/image.jpg'
'/remote.php/dav/files/user/Photos/image.jpg',
)
})
})
Expand Down Expand Up @@ -517,9 +517,17 @@ describe('UploadPicker notify testing', () => {
cy.get('@progress')
.children('progress')
.should('not.have.value', '0')
expect(notify).to.be.calledOnce
expect(listeners.uploaded).to.be.calledOnce
expect(notify).to.be.calledTwice
// The image upload
expect(notify.getCall(0).args[0].file.name).to.eq('image.jpg')
expect(notify.getCall(0).args[0].status).to.eq(UploadStatus.FINISHED)
// The meta upload
expect(notify.getCall(1).args[0].status).to.eq(UploadStatus.FINISHED)
expect(notify.getCall(1).args[0].file.name).to.eq('')
expect(notify.getCall(1).args[0].file.type).to.eq('httpd/unix-directory')
// the listeners
expect(listeners.failed).to.not.be.called
expect(listeners.uploaded).to.be.calledTwice
})
})

Expand Down Expand Up @@ -549,9 +557,17 @@ describe('UploadPicker notify testing', () => {
cy.get('@progress')
.children('progress')
.should('not.have.value', '0')
expect(notify).to.be.calledOnce
expect(notify).to.be.calledTwice
// The image upload
expect(notify.getCall(0).args[0].file.name).to.eq('image.jpg')
expect(notify.getCall(0).args[0].status).to.eq(UploadStatus.FAILED)
// The meta upload
expect(notify.getCall(1).args[0].status).to.eq(UploadStatus.FAILED)
expect(notify.getCall(1).args[0].file.name).to.eq('')
expect(notify.getCall(1).args[0].file.type).to.eq('httpd/unix-directory')
// the listeners
expect(listeners.uploaded).to.not.be.called
expect(listeners.failed).to.be.calledOnce
expect(listeners.failed).to.be.calledTwice
})
})
})
132 changes: 80 additions & 52 deletions lib/uploader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,40 +234,88 @@ export class Uploader {
batchUpload(
destination: string,
files: (File|FileSystemEntry)[],
callback?: (nodes: Array<File|IDirectory>,
currentPath: string) => Promise<Array<File|IDirectory>|false>,
callback?: (nodes: Array<File|IDirectory>, currentPath: string) => Promise<Array<File|IDirectory>|false>,
): PCancelable<Upload[]> {
const rootFolder = new Directory('', files)
if (!callback) {
callback = async (files: Array<File|Directory>) => files
}

try {
// Increase concurrency to 4 to keep 3 parallel uploads as one if blocked by the directory meta-upload
this._jobQueue.concurrency += 1
return new PCancelable(async (resolve, reject, onCancel) => {
// create a meta upload to ensure all ongoing child requests are listed
const upload = new Upload(`${this.root.replace(/\/$/, '')}/${destination.replace(/^\//, '')}`, false, 0, rootFolder)
upload.status = UploadStatus.UPLOADING
this._uploadQueue.push(upload)
try {
// Create the promise for the virtual root directory
const promise = this.uploadDirectory(destination, rootFolder, callback, davGetClient(this.root))
// Make sure to cancel it when requested
onCancel(() => promise.cancel())
// await the uploads and resolve with "finished" status
const uploads = await promise
upload.status = UploadStatus.FINISHED
resolve(uploads)
} catch (error) {
logger.error('Error in batch upload', { error })
upload.status = UploadStatus.FAILED
reject(t('Upload has been cancelled'))
} finally {
this._notifyAll(upload)
this.updateStats()
}
})
}

/**
* Helper to create a directory wrapped inside an Upload class
* @param destination Destination where to create the directory
* @param directory The directory to create
* @param client The cached WebDAV client
*/
private createDirectory(destination: string, directory: Directory, client: WebDAVClient): PCancelable<Upload> {
const folderPath = normalize(`${destination}/${directory.name}`).replace(/\/$/, '')
const rootPath = `${this.root.replace(/\/$/, '')}/${folderPath.replace(/^\//, '')}`

if (!directory.name) {
throw new Error('Can not create empty directory')
}

// Add a new upload to the upload queue
const currentUpload: Upload = new Upload(rootPath, false, 0, directory)
this._uploadQueue.push(currentUpload)

// Return the cancelable promise
return new PCancelable(async (resolve, reject, onCancel) => {
const abort = new AbortController()
onCancel(() => abort.abort())
currentUpload.signal.addEventListener('abort', () => reject(t('Upload has been cancelled')))

return new PCancelable(async (resolve, reject, onCancel) => {
// Add the request to the job queue -> wait for finish to resolve the promise
await this._jobQueue.add(async () => {
currentUpload.status = UploadStatus.UPLOADING
try {
const value = await this._jobQueue.add(() => {
const promise = this.uploadDirectory(destination, rootFolder, callback, davGetClient(this.root))
onCancel(() => promise.cancel())
return promise
})
if (value) {
resolve(value)
}
await client.createDirectory(folderPath, { signal: abort.signal })
resolve(currentUpload)
} catch (error) {
logger.error('Error in batch upload', { error })
if (error && typeof error === 'object' && 'status' in error && error.status === 405) {
// Directory already exists, so just write into it and ignore the error
currentUpload.status = UploadStatus.FINISHED
logger.debug('Directory already exists, writing into it', { directory: directory.name })
} else {
// Another error happened, so abort uploading the directory
currentUpload.status = UploadStatus.FAILED
reject(error)
}
} finally {
// Update statistics
this._notifyAll(currentUpload)
this.updateStats()
}
reject(t('Upload has been cancelled'))
})
} finally {
// Reset concurrency
this._jobQueue.concurrency -= 1
}
})
}

// Helper for uploading directories (recursivly)
// Helper for uploading directories (recursively)
private uploadDirectory(
destination: string,
directory: Directory,
Expand All @@ -276,7 +324,6 @@ export class Uploader {
client: WebDAVClient,
): PCancelable<Upload[]> {
const folderPath = normalize(`${destination}/${directory.name}`).replace(/\/$/, '')
const rootPath = `${this.root.replace(/\/$/, '')}/${folderPath.replace(/^\//, '')}`

return new PCancelable(async (resolve, reject, onCancel) => {
const abort = new AbortController()
Expand All @@ -294,27 +341,19 @@ export class Uploader {

const directories: PCancelable<Upload[]>[] = []
const uploads: PCancelable<Upload>[] = []
const currentUpload: Upload = new Upload(rootPath, false, 0, directory)
currentUpload.signal.addEventListener('abort', () => reject(t('Upload has been cancelled')))
currentUpload.status = UploadStatus.UPLOADING
// Setup abort controller to cancel all child requests
abort.signal.addEventListener('abort', () => {
directories.forEach((upload) => upload.cancel())
uploads.forEach((upload) => upload.cancel())
})

try {
// Wait for own directory to be created (if not the virtual root)
if (directory.name) {
try {
await client.createDirectory(folderPath, { signal: abort.signal })
// We add the "upload" to get some information of changed nodes
uploads.push(new PCancelable((resolve) => resolve(currentUpload!)))
this._uploadQueue.push(currentUpload)
} catch (error) {
if (error && typeof error === 'object' && 'status' in error && error.status === 405) {
// Directory already exists, so just write into it and ignore the error
logger.debug('Directory already exists, writing into it', { directory: directory.name })
} else {
// Another error happend, so abort uploading the directory
throw error
}
}
// If not the virtual root we need to create the directory first before uploading
// Make sure the promise is listed in the final result
uploads.push(this.createDirectory(destination, directory, client) as PCancelable<Upload>)
// Ensure the directory is created before uploading / creating children
await uploads.at(-1)
}

for (const node of selectedForUpload) {
Expand All @@ -325,24 +364,13 @@ export class Uploader {
}
}

abort.signal.addEventListener('abort', () => {
uploads.forEach((upload) => upload.cancel())
directories.forEach((upload) => upload.cancel())
})

const resolvedUploads = await Promise.all(uploads)
const resolvedDirectoryUploads = await Promise.all(directories)
currentUpload.status = UploadStatus.FINISHED
resolve([resolvedUploads, ...resolvedDirectoryUploads].flat())
} catch (e) {
// Ensure a failure cancels all other requests
abort.abort(e)
currentUpload.status = UploadStatus.FAILED
reject(e)
} finally {
if (directory.name) {
this._notifyAll(currentUpload)
this.updateStats()
}
}
})
}
Expand Down
Loading