Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

generateUniqueIdentifier - is not thenable/promise which is aweful #310

Open
yanivkalfa opened this issue Sep 7, 2020 · 5 comments
Open

Comments

@yanivkalfa
Copy link

If i want to start an amazon multi-part upload i will have to first create the multi-part get the identifier return it to client then use that identifier to upload other chunks.

The way it stands right now, flow doesnt allow that..

@AidasK
Copy link
Member

AidasK commented Sep 7, 2020

Will be possible to make after #304 is merged

@command-tab
Copy link

I'm currently successfully doing (unsigned) Amazon S3 multipart uploads with Flow.js 2.14.0 by creating the multipart upload in filesSubmitted and grabbing the UploadId from the response and storing it on the FlowFile:

const config = {
  responseType: 'document', // XML Document https://developer.mozilla.org/en-US/docs/Web/API/Document
  headers: {
    'Content-Type': flowFile.file.type // What the completed file's Content-Type header should be
  }
}
const url = `${baseUrl}/${bucketName}/${flowFile.uniqueIdentifier}/${filename}?uploads`
const response = await this.$axios.post(url, null, config)
flowFile.uploadId = response.request.responseXML.getElementsByTagName('UploadId')[0].textContent

Then I return an appropriate URL for each part in target:

target: (flowFile, flowChunk, isTest) => {
  return `${baseUrl}/${bucketName}/${flowFile.uniqueIdentifier}/${filename}?partNumber=${flowChunk.getParams().flowChunkNumber}&uploadId=${flowFile.uploadId}`
}

And complete the upload by POSTing a manifest in fileSuccess:

const compareParts = function (a, b) {
  // Sort `parts` by chunkNumber, as parts may not be ordered in the array
  // if flow's `simultaneousUploads` is > 1 (parts may finish out of order)
  if (a.chunkNumber < b.chunkNumber) {
    return -1
  }
  if (a.chunkNumber > b.chunkNumber) {
    return 1
  }
  return 0
}

// Axios Complete Upload URL
const url = `${baseUrl}/${bucketName}/${flowFile.uniqueIdentifier}/${filename}?uploadId=${flowFile.uploadId}`

// Build out the "Complete Upload" ordered XML manifest
const xmlDocument = new Document()

// Prefer document.createElementNS over document.createElement because
// document.createElement (no 'NS') returns case-insensitive node tag names,
// and some XML parsers are strict about accepting the proper case
const ns = 'http://s3.amazonaws.com/doc/2006-03-01/'
const rootElement = document.createElementNS(ns, 'CompleteMultipartUpload')
xmlDocument.appendChild(rootElement)

// Append sorted Part elements
const sortedParts = flowFile.manifestParts.sort(compareParts)
sortedParts.forEach((part) => {
  const partNumberElement = document.createElementNS(ns, 'PartNumber')
  partNumberElement.textContent = part.chunkNumber

  const etagElement = document.createElementNS(ns, 'ETag')
  etagElement.textContent = part.etag

  const partElement = document.createElementNS(ns, 'Part')
  partElement.appendChild(partNumberElement)
  partElement.appendChild(etagElement)

  rootElement.appendChild(partElement)
})
const serializer = new XMLSerializer()
const payload = serializer.serializeToString(xmlDocument)

const config = {
  headers: {
    'Content-Type': 'text/octet-stream'
  }
}
await this.$axios.post(url, payload, config)

I've used the above for both single and multiple simultaneous uploads.

You can also import the SparkMD5 library to compute hashes of each FlowFile chunk and set the Content-MD5 header in the preprocess and headers callbacks:

// Calculate MD5 of each chunk so we can set the S3 `Content-MD5` header on each chunk upload:
// https://github.com/flowjs/flow.js/issues/9#issuecomment-288750191
// S3 wants the Content-MD5 to be the base64-encoded 128-bit binary MD5 digest of the part data:
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html
preprocess (chunk) {
  if (chunk.readState === 0) {
    // readState=0: Read of file chunk not started. Triggering it now.
    chunk.preprocessState = 0
    const read = chunk.flowObj.opts.readFileFn
    read(chunk.fileObj, chunk.startByte, chunk.endByte, chunk.fileObj.file.type, chunk)
  } else if (chunk.readState === 1) {
    // Waiting... readState=1: Read of chunk is in progress.
    chunk.preprocessState = -1
  } else if (chunk.readState === 2) {
    // readState=2: Read is finished. We can now trigger MD5 compute.
    const reader = new FileReader()
    reader.onloadend = function () {
      // 'true' causes hash() to return a binary hash instead of hex hash
      const binaryHash = SparkMD5.ArrayBuffer.hash(reader.result, true)
      chunk.contentMD5 = btoa(binaryHash) // S3 expects base64(binaryhash)
      chunk.preprocessFinished()
    }
    reader.readAsArrayBuffer(chunk.bytes)
  }
},
headers (flowFile, flowChunk, isTest) {
  return { 'Content-MD5': flowChunk.contentMD5 }
}

Perhaps some of this will be a little simpler with async readFile?

@yanivkalfa
Copy link
Author

I didnt say its not possible, i am doing it currently as well. but a bit differently, and i am going though the my server.

Right now the way i am doing it is on fileAdded: Grab new multi-upload id, then mutating the flowFile.uniqueIdentifier' with the new ID, and then i start the upload.

const { data: { UploadId } } = await uploadChunkedVideoStart({ fileName: file.name });
    flowFile.uniqueIdentifier = UploadId;

But there should be another way...

@drzraf
Copy link
Collaborator

drzraf commented Sep 12, 2020

#304 does not make it possible out of the box, but it (with #296) may be source of inspiration to support async generateUniqueIdentifier (or more generally, async tasks within addFiles) for someone willing to do a PR.

@drzraf
Copy link
Collaborator

drzraf commented Jan 13, 2021

@yanivkalfa : You could have a look at #329 and see if

await asyncAddFile(yourfile, null, async (flowFile, event) => {
 // whatever you want before initialization
});

would fit where the 3rd parameter is an async equivalent of initFileFn.
Note:

  • flowFile isn't yet fully bootstrapped at that moment (but _bootstrap() isn't rocket-science)
  • event may indicates whether its a retried attempt.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants