-
Notifications
You must be signed in to change notification settings - Fork 8.6k
[esArchiver] combine elasticDump and ScenarioManager #10359
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
70fc947
28c34f2
3b6676c
923636b
86af5a4
8c77b43
9cd6cff
30bdefc
edfe4b3
01c0dcb
5e602e8
6a58180
b292123
b0c4824
3f1070c
740a466
d50983f
f039afd
f0e0130
d8f85f5
1ec2a05
b926d96
eefb8cc
9e2ee74
9449535
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| # kibana dev scripts | ||
|
|
||
| This directory contains scripts useful for interacting with Kibana tools in development. Use the node executable and `--help` flag to learn about how they work: | ||
|
|
||
| ```sh | ||
| node scripts/{{script name}} --help | ||
| ``` | ||
|
|
||
| ## for developers | ||
|
|
||
| This directory is excluded from the build and tools within it should help users discover their capabilities. Each script in this directory must: | ||
|
|
||
| - include the `../src/optimize/babel/register` module to bootstrap babel | ||
| - call out to source code that is in the `src` directory | ||
| - react to the `--help` flag | ||
| - run everywhere OR check and fail fast when a required OS or toolchain is not available |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| require('../src/optimize/babel/register'); | ||
| require('../src/es_archiver/cli'); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| require('../src/optimize/babel/register'); | ||
| require('../src/es_archiver/load_dump_data'); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| export { saveAction } from './save'; | ||
| export { loadAction } from './load'; | ||
| export { unloadAction } from './unload'; | ||
| export { rebuildAllAction } from './rebuild_all'; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| import { resolve } from 'path'; | ||
| import { createReadStream } from 'fs'; | ||
|
|
||
| import { | ||
| createPromiseFromStreams | ||
| } from '../../utils'; | ||
|
|
||
| import { | ||
| isGzip, | ||
| createStats, | ||
| prioritizeMappings, | ||
| getArchiveFiles, | ||
| createParseArchiveStreams, | ||
| createCreateIndexStream, | ||
| createIndexDocRecordsStream, | ||
| } from '../lib'; | ||
|
|
||
| export async function loadAction({ name, skipExisting, client, dataDir, log }) { | ||
| const inputDir = resolve(dataDir, name); | ||
| const stats = createStats(name, log); | ||
|
|
||
| const files = prioritizeMappings(await getArchiveFiles(inputDir)); | ||
| for (const filename of files) { | ||
| log.info('[%s] Loading %j', name, filename); | ||
|
|
||
| await createPromiseFromStreams([ | ||
| createReadStream(resolve(inputDir, filename)), | ||
| ...createParseArchiveStreams({ gzip: isGzip(filename) }), | ||
| createCreateIndexStream({ client, stats, skipExisting }), | ||
| createIndexDocRecordsStream(client, stats), | ||
| ]); | ||
| } | ||
|
|
||
| stats.forEachIndex((index, { docs }) => { | ||
| log.info('[%s] Indexed %d docs into %j', name, docs.indexed, index); | ||
| }); | ||
|
|
||
| return stats.toJSON(); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| import { resolve } from 'path'; | ||
| import { | ||
| rename, | ||
| readdir, | ||
| createReadStream, | ||
| createWriteStream | ||
| } from 'fs'; | ||
|
|
||
| import { fromNode } from 'bluebird'; | ||
|
|
||
| import { | ||
| createPromiseFromStreams | ||
| } from '../../utils'; | ||
|
|
||
| import { | ||
| prioritizeMappings, | ||
| getArchiveFiles, | ||
| isGzip, | ||
| createParseArchiveStreams, | ||
| createFormatArchiveStreams, | ||
| } from '../lib'; | ||
|
|
||
| export async function rebuildAllAction({ dataDir, log }) { | ||
| const archiveNames = await fromNode(cb => readdir(dataDir, cb)); | ||
|
|
||
| for (const name of archiveNames) { | ||
| const inputDir = resolve(dataDir, name); | ||
| const files = prioritizeMappings(await getArchiveFiles(inputDir)); | ||
| for (const filename of files) { | ||
| log.info('[%s] Rebuilding %j', name, filename); | ||
|
|
||
| const path = resolve(inputDir, filename); | ||
| const gzip = isGzip(path); | ||
| const tempFile = path + (gzip ? '.rebuilding.gz' : '.rebuilding'); | ||
|
|
||
| await createPromiseFromStreams([ | ||
| createReadStream(path), | ||
| ...createParseArchiveStreams({ gzip }), | ||
| ...createFormatArchiveStreams({ gzip }), | ||
| createWriteStream(tempFile), | ||
| ]); | ||
|
|
||
| await fromNode(cb => rename(tempFile, path, cb)); | ||
| log.info('[%s] Rebuilt %j', name, filename); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| import { resolve } from 'path'; | ||
| import { createWriteStream } from 'fs'; | ||
|
|
||
| import { fromNode } from 'bluebird'; | ||
| import mkdirp from 'mkdirp'; | ||
|
|
||
| import { | ||
| createListStream, | ||
| createPromiseFromStreams, | ||
| } from '../../utils'; | ||
|
|
||
| import { | ||
| createStats, | ||
| createGenerateIndexRecordsStream, | ||
| createFormatArchiveStreams, | ||
| createGenerateDocRecordsStream, | ||
| } from '../lib'; | ||
|
|
||
| export async function saveAction({ name, indices, client, dataDir, log }) { | ||
| const outputDir = resolve(dataDir, name); | ||
| const stats = createStats(name, log); | ||
|
|
||
| log.info('[%s] Creating archive of %j', name, indices); | ||
|
|
||
| await fromNode(cb => mkdirp(outputDir, cb)); | ||
| const resolvedIndexes = Object.keys(await client.indices.get({ | ||
| index: indices, | ||
| feature: ['_settings'], | ||
| filterPath: ['*.settings.index.uuid'] | ||
| })); | ||
|
|
||
| await Promise.all([ | ||
| // export and save the matching indices to mappings.json | ||
| createPromiseFromStreams([ | ||
| createListStream(resolvedIndexes), | ||
| createGenerateIndexRecordsStream(client, stats), | ||
| ...createFormatArchiveStreams(), | ||
| createWriteStream(resolve(outputDir, 'mappings.json')), | ||
| ]), | ||
|
|
||
| // export all documents from matching indexes into data.json.gz | ||
| createPromiseFromStreams([ | ||
| createListStream(resolvedIndexes), | ||
| createGenerateDocRecordsStream(client, stats), | ||
| ...createFormatArchiveStreams({ gzip: true }), | ||
| createWriteStream(resolve(outputDir, 'data.json.gz')) | ||
| ]) | ||
| ]); | ||
|
|
||
| stats.forEachIndex((index, { docs }) => { | ||
| log.info('[%s] Archived %d docs from %j', name, docs.archived, index); | ||
| }); | ||
|
|
||
| return stats.toJSON(); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| import { resolve } from 'path'; | ||
| import { createReadStream } from 'fs'; | ||
|
|
||
| import { | ||
| createPromiseFromStreams | ||
| } from '../../utils'; | ||
|
|
||
| import { | ||
| isGzip, | ||
| createStats, | ||
| prioritizeMappings, | ||
| getArchiveFiles, | ||
| createParseArchiveStreams, | ||
| createFilterRecordsStream, | ||
| createDeleteIndexStream | ||
| } from '../lib'; | ||
|
|
||
| export async function unloadAction({ name, client, dataDir, log }) { | ||
| const inputDir = resolve(dataDir, name); | ||
| const stats = createStats(name, log); | ||
|
|
||
| const files = prioritizeMappings(await getArchiveFiles(inputDir)); | ||
| for (const filename of files) { | ||
| log.info('[%s] Unloading indices from %j', name, filename); | ||
|
|
||
| await createPromiseFromStreams([ | ||
| createReadStream(resolve(inputDir, filename)), | ||
| ...createParseArchiveStreams({ gzip: isGzip(filename) }), | ||
| createFilterRecordsStream('index'), | ||
| createDeleteIndexStream({ client, stats }) | ||
| ]); | ||
| } | ||
|
|
||
| return stats.toJSON(); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,94 @@ | ||
| /************************************************************* | ||
| * | ||
| * Run `node scripts/es_archiver -- --help` for usage information | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| * | ||
| *************************************************************/ | ||
|
|
||
| import { resolve } from 'path'; | ||
| import { readFileSync } from 'fs'; | ||
| import { format as formatUrl } from 'url'; | ||
|
|
||
| import { Command } from 'commander'; | ||
| import elasticsearch from 'elasticsearch'; | ||
|
|
||
| import { EsArchiver } from './es_archiver'; | ||
| import { createLog } from './lib'; | ||
|
|
||
| const cmd = new Command('node scripts/es_archiver'); | ||
|
|
||
| cmd | ||
| .description(`CLI to manage archiving/restoring data in elasticsearch`) | ||
| .option('--es-url [url]', 'url for elasticsearch') | ||
| .option(`--dir [path]`, 'where archives are stored') | ||
| .option('--verbose', 'turn on verbose logging') | ||
| .option('--config [path]', 'path to a functional test config file to use for default values') | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might just be me as I don't have any experience with Kibana functional tests, but maybe link to an example? The
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's not supposed to be there yet :) |
||
| .on('--help', () => { | ||
| console.log(readFileSync(resolve(__dirname, './cli_help.txt'), 'utf8')); | ||
| }); | ||
|
|
||
| cmd.command('save <name> <indices...>') | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| .action((name, indices) => execute('save', { name, indices })); | ||
|
|
||
| cmd.command('load <name>') | ||
| .action(name => execute('load', { name })); | ||
|
|
||
| cmd.command('unload <name>') | ||
| .action(name => execute('unload', { name })); | ||
|
|
||
| cmd.command('rebuild-all') | ||
| .action(name => execute('rebuildAll')); | ||
|
|
||
| cmd.parse(process.argv); | ||
|
|
||
| const missingCommand = cmd.args.every(a => !(a instanceof Command)); | ||
| if (missingCommand) { | ||
| execute(); | ||
| } | ||
|
|
||
| async function execute(operation, options) { | ||
| try { | ||
| const log = createLog(cmd.verbose ? 3 : 2); | ||
| log.pipe(process.stdout); | ||
|
|
||
| // log and count all validation errors | ||
| let errorCount = 0; | ||
| const error = (msg) => { | ||
| errorCount++; | ||
| log.error(msg); | ||
| }; | ||
|
|
||
| if (!operation) error('Missing or invalid command'); | ||
| if (!cmd.esUrl) { | ||
| error('You must specify either --es-url or --config flags'); | ||
| } | ||
| if (!cmd.dir) { | ||
| error('You must specify either --dir or --config flags'); | ||
| } | ||
|
|
||
| // if there was a validation error display the help | ||
| if (errorCount) { | ||
| cmd.help(); | ||
| return; | ||
| } | ||
|
|
||
| // run! | ||
|
|
||
| const client = new elasticsearch.Client({ | ||
| host: cmd.esUrl, | ||
| log: cmd.verbose ? 'trace' : [] | ||
| }); | ||
|
|
||
| try { | ||
| const esArchiver = new EsArchiver({ | ||
| log, | ||
| client, | ||
| dataDir: resolve(cmd.dir) | ||
| }); | ||
| await esArchiver[operation](options); | ||
| } finally { | ||
| await client.close(); | ||
| } | ||
| } catch (err) { | ||
| console.log('FATAL ERROR', err.stack); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| Examples: | ||
| Dump an index to disk: | ||
| Save all `logstash-*` indices from http://localhost:9200 to `snapshots/my_test_data` directory | ||
|
|
||
| WARNING: If the `my_test_data` snapshot exists it will be deleted! | ||
|
|
||
| $ node scripts/es_archiver save my_test_data logstash-* --dir snapshots | ||
|
|
||
| Load an index from disk | ||
| Load the `my_test_data` snapshot from the archive directory and elasticsearch instance defined | ||
| in the `test/functional/config.js` config file | ||
|
|
||
| WARNING: If the indices exist already they will be deleted! | ||
|
|
||
| $ node scripts/es_archiver load my_test_data --config test/functional/config.js |

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs error handling for non-OK HTTP statuses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? the es client will throw on non-OK HTTP status
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, great