Skip to content

Commit

Permalink
[import] Refactored to alllow importing from tarballs
Browse files Browse the repository at this point in the history
  • Loading branch information
rexxars authored and bjoerge committed Feb 16, 2018
1 parent b14a433 commit 36bf52a
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 127 deletions.
17 changes: 8 additions & 9 deletions packages/@sanity/import/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,24 @@
"clean": "rimraf lib",
"test": "jest"
},
"keywords": [
"sanity",
"cms",
"headless",
"realtime",
"content",
"import",
"ndjson"
],
"keywords": ["sanity", "cms", "headless", "realtime", "content", "import", "ndjson"],
"dependencies": {
"@rexxars/get-uri": "^2.0.2",
"@sanity/mutator": "^0.125.8",
"@sanity/uuid": "^0.125.8",
"debug": "^3.1.0",
"fs-extra": "^5.0.0",
"globby": "^8.0.0",
"gunzip-maybe": "^1.4.1",
"is-tar": "^1.0.0",
"lodash": "^4.17.4",
"mississippi": "^2.0.0",
"p-map": "^1.2.0",
"peek-stream": "^1.1.2",
"simple-concat": "^1.0.0",
"split2": "^2.1.1",
"stream-collect": "^1.3.1",
"tar-fs": "^1.16.0",
"through2": "^2.0.3"
},
"devDependencies": {
Expand Down
21 changes: 20 additions & 1 deletion packages/@sanity/import/src/assetRefs.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const {get, unset} = require('lodash')
const {get, set, unset} = require('lodash')
const {extractWithPath} = require('@sanity/mutator')
const serializePath = require('./serializePath')

Expand All @@ -14,6 +14,24 @@ function unsetAssetRefs(doc) {
return doc
}

// Note: mutates in-place
function absolutifyPaths(doc, absPath) {
if (!absPath) {
return doc
}

const modifier = value =>
value
.replace(/file:\/\/\.\//i, `file://${absPath}/`)
.replace(/(https?):\/\/\.\//, `$1://${absPath}/`)

findAssetRefs(doc).forEach(path => {
set(doc, path, modifier(get(doc, path)))
})

return doc
}

function getAssetRefs(doc) {
return findAssetRefs(doc).map(path => ({
documentId: doc._id,
Expand All @@ -33,3 +51,4 @@ function findAssetRefs(doc) {

exports.getAssetRefs = getAssetRefs
exports.unsetAssetRefs = unsetAssetRefs
exports.absolutifyPaths = absolutifyPaths
84 changes: 13 additions & 71 deletions packages/@sanity/import/src/import.js
Original file line number Diff line number Diff line change
@@ -1,76 +1,18 @@
const debug = require('debug')('sanity:import')
const flatten = require('lodash/flatten')
const fromStream = require('./importFromStream')
const fromFolder = require('./importFromFolder')
const fromArray = require('./importFromArray')
const validateOptions = require('./validateOptions')
const streamToArray = require('./streamToArray')
const {getAssetRefs, unsetAssetRefs} = require('./assetRefs')
const assignArrayKeys = require('./assignArrayKeys')
const assignDocumentId = require('./assignDocumentId')
const uploadAssets = require('./uploadAssets')
const documentHasErrors = require('./documentHasErrors')
const batchDocuments = require('./batchDocuments')
const importBatches = require('./importBatches')
const {
getStrongRefs,
weakenStrongRefs,
setTypeOnReferences,
strengthenReferences
} = require('./references')

async function importDocuments(input, opts) {
const options = validateOptions(input, opts)

options.onProgress({step: 'Reading/validating data file'})
const isStream = typeof input.pipe === 'function'
let documents = input
if (isStream) {
debug('Streaming input source to array of documents')
documents = await streamToArray(input)
} else {
documents.some(documentHasErrors.validate)
}

// Assign document IDs for document that do not have one. This is necessary
// for us to strengthen references and import assets properly.
const ided = documents.map(doc => assignDocumentId(doc))

// User might not have applied `_key` on array elements which are objects;
// if this is the case, generate random keys to help realtime engine
const keyed = ided.map(doc => assignArrayKeys(doc))

// Sanity prefers to have a `_type` on every object. Make sure references
// has `_type` set to `reference`.
const docs = keyed.map(doc => setTypeOnReferences(doc))

// Find references that will need strengthening when import is done
const strongRefs = docs.map(getStrongRefs).filter(Boolean)

// Extract asset references from the documents
const assetRefs = flatten(docs.map(getAssetRefs).filter(ref => ref.length))

// Remove asset references from the documents
const assetless = docs.map(unsetAssetRefs)

// Make strong references weak so they can be imported in any order
const weakened = assetless.map(weakenStrongRefs)

// Create batches of documents to import. Try to keep batches below a certain
// byte-size (since document may vary greatly in size depending on type etc)
const batches = batchDocuments(weakened)

// Trigger actual import process
debug('Starting import of documents')
const docsImported = await importBatches(batches, options)

// Documents are imported, now proceed with post-import operations
debug('Uploading assets')
await uploadAssets(assetRefs, options)
const importers = {
fromStream,
fromFolder,
fromArray
}

// Strengthen references
debug('Strengthening references')
await strengthenReferences(strongRefs, options)
module.exports = (input, opts) => {
const options = validateOptions(input, opts)

// Return number of documents imported
return docsImported
return Array.isArray(input)
? fromArray(input, options, importers)
: fromStream(input, options, importers)
}

module.exports = importDocuments
6 changes: 1 addition & 5 deletions packages/@sanity/import/src/importBatches.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@ async function importBatches(batches, options) {
})

const mapOptions = {concurrency: DOCUMENT_IMPORT_CONCURRENCY}
const batchSizes = await pMap(
batches,
importBatch.bind(null, options, progress),
mapOptions
)
const batchSizes = await pMap(batches, importBatch.bind(null, options, progress), mapOptions)

return batchSizes.reduce((prev, add) => prev + add, 0)
}
Expand Down
69 changes: 69 additions & 0 deletions packages/@sanity/import/src/importFromArray.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
const debug = require('debug')('sanity:import:array')
const flatten = require('lodash/flatten')
const {getAssetRefs, unsetAssetRefs, absolutifyPaths} = require('./assetRefs')
const assignArrayKeys = require('./assignArrayKeys')
const assignDocumentId = require('./assignDocumentId')
const uploadAssets = require('./uploadAssets')
const documentHasErrors = require('./documentHasErrors')
const batchDocuments = require('./batchDocuments')
const importBatches = require('./importBatches')
const {
getStrongRefs,
weakenStrongRefs,
setTypeOnReferences,
strengthenReferences
} = require('./references')

async function importDocuments(documents, options, importers) {
options.onProgress({step: 'Reading/validating data file'})
documents.some(documentHasErrors.validate)

// Replace relative asset paths if one is defined
// (file://./images/foo-bar.png -> file:///abs/olute/images/foo-bar.png)
const absPathed = documents.map(doc => absolutifyPaths(doc, options.assetsBase))

// Assign document IDs for document that do not have one. This is necessary
// for us to strengthen references and import assets properly.
const ided = absPathed.map(doc => assignDocumentId(doc))

// User might not have applied `_key` on array elements which are objects;
// if this is the case, generate random keys to help realtime engine
const keyed = ided.map(doc => assignArrayKeys(doc))

// Sanity prefers to have a `_type` on every object. Make sure references
// has `_type` set to `reference`.
const docs = keyed.map(doc => setTypeOnReferences(doc))

// Find references that will need strengthening when import is done
const strongRefs = docs.map(getStrongRefs).filter(Boolean)

// Extract asset references from the documents
const assetRefs = flatten(docs.map(getAssetRefs).filter(ref => ref.length))

// Remove asset references from the documents
const assetless = docs.map(unsetAssetRefs)

// Make strong references weak so they can be imported in any order
const weakened = assetless.map(weakenStrongRefs)

// Create batches of documents to import. Try to keep batches below a certain
// byte-size (since document may vary greatly in size depending on type etc)
const batches = batchDocuments(weakened)

// Trigger actual import process
debug('Starting import of documents')
const docsImported = await importBatches(batches, options)

// Documents are imported, now proceed with post-import operations
debug('Uploading assets')
await uploadAssets(assetRefs, options)

// Strengthen references
debug('Strengthening references')
await strengthenReferences(strongRefs, options)

// Return number of documents imported
return docsImported
}

module.exports = importDocuments
36 changes: 36 additions & 0 deletions packages/@sanity/import/src/importFromFolder.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
const fse = require('fs-extra')
const globby = require('globby')
const debug = require('debug')('sanity:import:folder')

module.exports = async function importFromFolder(fromDir, options, importers) {
debug('Importing from folder %s', fromDir)
const dataFiles = await globby('*.ndjson', {cwd: fromDir, absolute: true})
if (dataFiles.length === 0) {
throw new Error(`No .ndjson file found in ${fromDir}`)
}

if (dataFiles.length > 1) {
throw new Error(`More than one .ndjson file found in ${fromDir} - only one is supported`)
}

const dataFile = dataFiles[0]
debug('Importing from file %s', dataFile)

const stream = fse.createReadStream(dataFile)
const images = await globby('images/*', {cwd: fromDir, absolute: true})
const files = await globby('files/*', {cwd: fromDir, absolute: true})
const unreferencedAssets = []
.concat(images.map(path => `image#${path}`))
.concat(files.map(path => `file#${path}`))

debug('Queueing %d assets', unreferencedAssets.length)

const streamOptions = {...options, unreferencedAssets, assetsBase: fromDir}
const result = await importers.fromStream(stream, streamOptions, importers)

if (options.deleteOnComplete) {
await fse.remove(fromDir)
}

return result
}
62 changes: 62 additions & 0 deletions packages/@sanity/import/src/importFromStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
const os = require('os')
const path = require('path')
const miss = require('mississippi')
const gunzipMaybe = require('gunzip-maybe')
const peek = require('peek-stream')
const isTar = require('is-tar')
const tar = require('tar-fs')
const globby = require('globby')
const debug = require('debug')('sanity:import:stream')
const getJsonStreamer = require('./util/getJsonStreamer')

module.exports = (stream, options, importers) =>
new Promise((resolve, reject) => {
const outputPath = path.join(os.tmpdir(), 'sanity-import')
debug('Importing from stream')

let isTarStream = false

miss.pipe(stream, gunzipMaybe(), untarMaybe(), err => {
if (err) {
reject(err)
return
}

if (!isTarStream) {
return // Will be resolved by concatenation
}

findAndImport()
})

function untarMaybe() {
return peek({newline: false, maxBuffer: 300}, (data, swap) => {
if (isTar(data)) {
debug('Stream is a tarball, extracting to %s', outputPath)
isTarStream = true
return swap(null, tar.extract(outputPath))
}

debug('Stream is an ndjson file, streaming JSON')
return swap(null, miss.pipeline(getJsonStreamer(), miss.concat(resolveNdjsonStream)))
})
}

function resolveNdjsonStream(documents) {
debug('Finished reading ndjson stream')
resolve(importers.fromArray(documents, options))
}

async function findAndImport() {
debug('Tarball extracted, looking for ndjson')

const files = await globby('**/*.ndjson', {cwd: outputPath, deep: 2, absolute: true})
if (!files.length) {
reject(new Error('ndjson-file not found in tarball'))
return
}

const importBaseDir = path.dirname(files[0])
resolve(importers.fromFolder(importBaseDir, {...options, deleteOnComplete: true}, importers))
}
})
40 changes: 0 additions & 40 deletions packages/@sanity/import/src/streamToArray.js

This file was deleted.

Loading

0 comments on commit 36bf52a

Please sign in to comment.