Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@
"strip-ansi": "^3.0.1",
"supertest": "3.0.0",
"supertest-as-promised": "2.0.2",
"tmp": "0.0.31",
"tree-kill": "1.1.0",
"webpack-dev-server": "1.14.1"
},
Expand Down
3 changes: 1 addition & 2 deletions src/core_plugins/elasticsearch/lib/create_kibana_index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ export default function (server, mappings) {
body: {
settings: {
number_of_shards: 1,
'index.mapper.dynamic': false,
'index.mapping.single_type': false
'index.mapper.dynamic': false
},
mappings
}
Expand Down
1 change: 1 addition & 0 deletions src/es_archiver/actions/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export { saveAction } from './save';
export { loadAction } from './load';
export { unloadAction } from './unload';
export { rebuildAllAction } from './rebuild_all';
export { reindexAction } from './reindex';
6 changes: 4 additions & 2 deletions src/es_archiver/actions/load.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { resolve } from 'path';
import { createReadStream } from 'fs';

import {
createPromiseFromStreams
createPromiseFromStreams,
} from '../../utils';

import {
Expand All @@ -13,9 +13,10 @@ import {
createParseArchiveStreams,
createCreateIndexStream,
createIndexDocRecordsStream,
createConvertToV6Stream,
} from '../lib';

export async function loadAction({ name, skipExisting, client, dataDir, log }) {
export async function loadAction({ name, skipExisting, client, dataDir, log, convertToV6 }) {
const inputDir = resolve(dataDir, name);
const stats = createStats(name, log);

Expand All @@ -26,6 +27,7 @@ export async function loadAction({ name, skipExisting, client, dataDir, log }) {
await createPromiseFromStreams([
createReadStream(resolve(inputDir, filename)),
...createParseArchiveStreams({ gzip: isGzip(filename) }),
...(convertToV6 ? [createConvertToV6Stream()] : []),
createCreateIndexStream({ client, stats, skipExisting }),
createIndexDocRecordsStream(client, stats),
]);
Expand Down
7 changes: 5 additions & 2 deletions src/es_archiver/actions/rebuild_all.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ import {
import {
prioritizeMappings,
readDirectory,
findArchiveNames,
isGzip,
createParseArchiveStreams,
createFormatArchiveStreams,
createConvertToV6Stream,
} from '../lib';

export async function rebuildAllAction({ dataDir, log }) {
const archiveNames = await readDirectory(dataDir);
export async function rebuildAllAction({ dataDir, log, convertToV6 }) {
const archiveNames = await findArchiveNames(dataDir);

for (const name of archiveNames) {
const inputDir = resolve(dataDir, name);
Expand All @@ -35,6 +37,7 @@ export async function rebuildAllAction({ dataDir, log }) {
await createPromiseFromStreams([
createReadStream(path),
...createParseArchiveStreams({ gzip }),
...(convertToV6 ? [createConvertToV6Stream()] : []),
...createFormatArchiveStreams({ gzip }),
createWriteStream(tempFile),
]);
Expand Down
34 changes: 34 additions & 0 deletions src/es_archiver/actions/reindex.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import tmp from 'tmp';
import del from 'del';

import { saveAction } from './save';
import { loadAction } from './load';

export async function reindexAction({ indices, client, log, convertToV6 }) {
const tmpDir = tmp.dirSync();
const name = 'reindex';

try {
log.info('Saving indices to %j', tmpDir.name);
await saveAction({
name,
indices,
client,
dataDir: tmpDir.name,
log,
});

log.info('Loading indices from %j', tmpDir.name);
await loadAction({
name,
convertToV6,
skipExisting: false,
client,
dataDir: tmpDir.name,
log,
});
} finally {
log.info('Cleaning up %j', tmpDir.name);
del.sync(tmpDir.name, { force: true });
}
}
9 changes: 7 additions & 2 deletions src/es_archiver/cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ cmd
.option('--es-url [url]', 'url for elasticsearch')
.option(`--dir [path]`, 'where archives are stored')
.option('--verbose', 'turn on verbose logging')
.option('--convert-to-v6', 'turn on automatic v6 conversion (true by default in `reindex` and `rebuild-all`)')
.option('--config [path]', 'path to a functional test config file to use for default values', resolveConfigPath, defaultConfigPath)
.on('--help', () => {
console.log(readFileSync(resolve(__dirname, './cli_help.txt'), 'utf8'));
Expand All @@ -36,15 +37,19 @@ cmd.command('save <name> <indices...>')

cmd.command('load <name>')
.description('load the archive in --dir with <name>')
.action(name => execute('load', name));
.action(name => execute('load', name, { convertToV6: cmd.convertToV6 }));

cmd.command('unload <name>')
.description('remove indices created by the archive in --dir with <name>')
.action(name => execute('unload', name));

cmd.command('rebuild-all')
.description('[internal] read and write all archives in --dir to remove any inconsistencies')
.action(() => execute('rebuildAll'));
.action(() => execute('rebuildAll', { convertToV6: cmd.convertToV6 }));

cmd.command('reindex <indices...>')
.description('load the archive in --dir with <name>')
.action(indices => execute('reindex', indices, { convertToV6: cmd.convertToV6 }));

cmd.parse(process.argv);

Expand Down
31 changes: 27 additions & 4 deletions src/es_archiver/es_archiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
loadAction,
unloadAction,
rebuildAllAction,
reindexAction,
} from './actions';

export class EsArchiver {
Expand Down Expand Up @@ -40,11 +41,15 @@ export class EsArchiver {
* @return Promise<Stats>
*/
async load(name, options = {}) {
const { skipExisting } = options;
const {
skipExisting = false,
convertToV6 = false
} = options;

return await loadAction({
name,
skipExisting: !!skipExisting,
convertToV6,
skipExisting,
client: this.client,
dataDir: this.dataDir,
log: this.log,
Expand Down Expand Up @@ -72,11 +77,16 @@ export class EsArchiver {
*
* @return Promise<Stats>
*/
async rebuildAll() {
async rebuildAll(options = {}) {
const {
convertToV6 = true
} = options;

return rebuildAllAction({
convertToV6,
client: this.client,
dataDir: this.dataDir,
log: this.log
log: this.log,
});
}

Expand All @@ -89,4 +99,17 @@ export class EsArchiver {
async loadIfNeeded(name) {
return this.load(name, { skipExisting: true });
}

async reindex(indices, options = {}) {
const {
convertToV6 = true
} = options;

return reindexAction({
indices,
convertToV6,
client: this.client,
log: this.log
});
}
}
1 change: 1 addition & 0 deletions src/es_archiver/lib/convert/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { createConvertToV6Stream } from './v5_to_v6';
79 changes: 79 additions & 0 deletions src/es_archiver/lib/convert/v5_to_v6.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { set, omit } from 'lodash';

import {
createMapStream,
getFlattenedObject,
} from '../../../utils';

export function createConvertToV6Stream() {
return createMapStream(record => {
switch (record.type) {
case 'index': {
const { index, settings, mappings } = record.value;

// only convert kibana indices
if (index !== '.kibana') {
return record;
}

// already v6
if (typeof mappings.doc === 'object') {
return record;
}

// settings can be dot-notated strings or nested objects
settings.index = getFlattenedObject(settings.index || {});
delete settings.index['mapping.single_type'];
set(settings.index, 'mapper.dynamic', false);

return {
type: 'index',
value: {
index,
settings,
mappings: {
doc: {
properties: {
type: {
type: 'keyword'
},
...omit(mappings, '_default_')
}
}
}
}
};
}

case 'doc': {
const { index, type, id, source } = record.value;

// only convert kibana indices
if (index !== '.kibana') {
return record;
}

if (type === 'doc' && source.type) {
// doc already v6
return record;
}

return {
type: 'doc',
value: {
index,
type: 'doc',
id: `${type}:${id}`,
source: {
type,
[type]: source
}
}
};
}

default:
return record;
}
});
}
11 changes: 11 additions & 0 deletions src/es_archiver/lib/directory.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
import { readdir } from 'fs';
import { dirname } from 'path';

import glob from 'glob';
import { uniq } from 'lodash';
import { fromNode } from 'bluebird';

export async function readDirectory(path) {
const allNames = await fromNode(cb => readdir(path, cb));
return allNames.filter(name => !name.startsWith('.'));
}

// get the archives from a path. archives don't have to be direct children
// so this looks for any `.json` or `.gz` file and assumes that it's parent
// directory is an "archive"
export async function findArchiveNames(path) {
const dataFiles = await fromNode(cb => glob('**/*.{gz,json}', { cwd: path }, cb));
return uniq(dataFiles.map(dataFile => dirname(dataFile)));
}
7 changes: 6 additions & 1 deletion src/es_archiver/lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,10 @@ export {
} from './archives';

export {
readDirectory
readDirectory,
findArchiveNames,
} from './directory';

export {
createConvertToV6Stream,
} from './convert';
Loading