diff --git a/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/errors.js b/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/errors.js new file mode 100644 index 00000000000000..66d100640f73a7 --- /dev/null +++ b/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/errors.js @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + + +import React from 'react'; + +import { + EuiCallOut, +} from '@elastic/eui'; + +import { IMPORT_STATUS } from './import_progress'; + +export function Errors({ errors, statuses }) { + return ( + + { + errors.map((e, i) => ( +

+ { toString(e) } +

+ )) + } + +
+ ); +} + +function title(statuses) { + switch (IMPORT_STATUS.FAILED) { + case statuses.readStatus: + return 'Error reading file'; + case statuses.indexCreatedStatus: + return 'Error creating index'; + case statuses.ingestPipelineCreatedStatus: + return 'Error creating ingest pipeline'; + case statuses.uploadStatus: + return 'Error uploading data'; + case statuses.indexPatternCreatedStatus: + return 'Error creating index pattern'; + default: + return 'Error'; + } +} + +function toString(error) { + if (typeof error === 'object') { + if (error.msg !== undefined) { + return error.msg; + } else if (error.error !== undefined) { + return error.error; + } else { + return error.toString(); + } + } + return error; +} diff --git a/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/import_progress.js b/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/import_progress.js index 90a97628f3d284..14bd8824892dfd 100644 --- a/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/import_progress.js +++ b/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/import_progress.js @@ -34,40 +34,70 @@ export function ImportProgress({ statuses }) { let statusInfo = null; - let processFileTitle = 'Process file'; + let completedStep = 0; + if (reading === true && readStatus === IMPORT_STATUS.INCOMPLETE) { - processFileTitle = 'Processing file'; - statusInfo = (

Converting file for import

); - } else if (reading === false && readStatus === IMPORT_STATUS.COMPLETE) { - processFileTitle = 'File processed'; + completedStep = 0; + } + if ( + readStatus === IMPORT_STATUS.COMPLETE && + indexCreatedStatus === IMPORT_STATUS.INCOMPLETE && + ingestPipelineCreatedStatus === IMPORT_STATUS.INCOMPLETE + ) { + completedStep = 1; } - - let createIndexTitle = 'Create index'; if (indexCreatedStatus === IMPORT_STATUS.COMPLETE) { - createIndexTitle = 'Index created'; + completedStep = 2; } - - let createIngestPipelineTitle = 'Create ingest pipeline'; if (ingestPipelineCreatedStatus === IMPORT_STATUS.COMPLETE) { - createIngestPipelineTitle = 'Ingest pipeline created'; + completedStep = 3; + } + if (uploadStatus === IMPORT_STATUS.COMPLETE) { + completedStep = 4; } + if (indexPatternCreatedStatus === IMPORT_STATUS.COMPLETE) { + completedStep = 5; + } + + let processFileTitle = 'Process file'; + let createIndexTitle = 'Create index'; + let createIngestPipelineTitle = 'Create ingest pipeline'; let uploadingDataTitle = 'Upload data'; - if (uploadProgress > 0 && uploadStatus === IMPORT_STATUS.INCOMPLETE) { - uploadingDataTitle = 'Uploading data'; + let createIndexPatternTitle = 'Create index pattern'; + if (completedStep >= 0) { + processFileTitle = 'Processing file'; + statusInfo = (

Converting file for import

); + } + if (completedStep >= 1) { + processFileTitle = 'File processed'; + createIndexTitle = 'Creating index'; + statusInfo = (

Creating index and ingest pipeline

); + } + if (completedStep >= 2) { + createIndexTitle = 'Index created'; + createIngestPipelineTitle = 'Creating ingest pipeline'; + statusInfo = (

Creating index and ingest pipeline

); + } + if (completedStep >= 3) { + createIngestPipelineTitle = 'Ingest pipeline created'; + uploadingDataTitle = 'Uploading data'; statusInfo = (); - } else if (uploadStatus === IMPORT_STATUS.COMPLETE) { + } + if (completedStep >= 4) { uploadingDataTitle = 'Data uploaded'; + if (createIndexPattern === true) { + createIndexPatternTitle = 'Creating index pattern'; + statusInfo = (

Creating index pattern

); + } } - - let createIndexPatternTitle = 'Create index pattern'; - if (indexPatternCreatedStatus === IMPORT_STATUS.FAILED) { + if (completedStep >= 5) { createIndexPatternTitle = 'Index pattern created'; statusInfo = null; } - const firstSetOfSteps = [ + const steps = [ { title: processFileTitle, isSelected: true, @@ -91,7 +121,7 @@ export function ImportProgress({ statuses }) { }, { title: uploadingDataTitle, - isSelected: (indexCreatedStatus === IMPORT_STATUS.COMPLETE), + isSelected: (indexCreatedStatus === IMPORT_STATUS.COMPLETE && ingestPipelineCreatedStatus === IMPORT_STATUS.COMPLETE), isComplete: (uploadStatus === IMPORT_STATUS.COMPLETE), status: uploadStatus, onClick: () => {}, @@ -99,7 +129,7 @@ export function ImportProgress({ statuses }) { ]; if (createIndexPattern === true) { - firstSetOfSteps.push({ + steps.push({ title: createIndexPatternTitle, isSelected: (uploadStatus === IMPORT_STATUS.COMPLETE), isComplete: (indexPatternCreatedStatus === IMPORT_STATUS.COMPLETE), @@ -111,9 +141,14 @@ export function ImportProgress({ statuses }) { return ( - { statusInfo } + { statusInfo && + + + { statusInfo } + + } ); } diff --git a/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/import_view.js b/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/import_view.js index 6ee9df4168dbb6..d40a8b08ac3cb6 100644 --- a/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/import_view.js +++ b/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/import_view.js @@ -12,7 +12,6 @@ import React, { import { EuiFieldText, EuiButton, - EuiLoadingSpinner, EuiSpacer, EuiFormRow, EuiCheckbox, @@ -22,29 +21,39 @@ import { import { importerFactory } from './importer'; import { ResultsLinks } from './results_links'; import { ImportProgress, IMPORT_STATUS } from './import_progress'; +import { Errors } from './errors'; const DEFAULT_TIME_FIELD = '@timestamp'; +const DEFAULT_STATE = { + index: '', + importing: false, + imported: false, + initialized: false, + reading: false, + readProgress: 0, + readStatus: IMPORT_STATUS.INCOMPLETE, + indexCreatedStatus: IMPORT_STATUS.INCOMPLETE, + indexPatternCreatedStatus: IMPORT_STATUS.INCOMPLETE, + ingestPipelineCreatedStatus: IMPORT_STATUS.INCOMPLETE, + uploadProgress: 0, + uploadStatus: IMPORT_STATUS.INCOMPLETE, + createIndexPattern: true, + indexPattern: '', + indexPatternId: '', + errors: [], + importFailures: [], +}; + export class ImportView extends Component { constructor(props) { super(props); - this.state = { - index: '', - importing: false, - imported: false, - reading: false, - readProgress: 0, - readStatus: IMPORT_STATUS.INCOMPLETE, - indexCreatedStatus: IMPORT_STATUS.INCOMPLETE, - indexPatternCreatedStatus: IMPORT_STATUS.INCOMPLETE, - ingestPipelineCreatedStatus: IMPORT_STATUS.INCOMPLETE, - uploadProgress: 0, - uploadStatus: IMPORT_STATUS.INCOMPLETE, - createIndexPattern: true, - indexPattern: '', - indexPatternId: '', - }; + this.state = DEFAULT_STATE; + } + + clickReset = () => { + this.setState(DEFAULT_STATE); } clickImport() { @@ -60,45 +69,85 @@ export class ImportView extends Component { createIndexPattern, } = this.state; + const errors = []; + if (index !== '') { this.setState({ importing: true, imported: false, reading: true, + initialized: true, }, () => { setTimeout(async () => { + let success = false; + const importer = importerFactory(format, results); if (importer !== undefined) { console.log('read start'); - let success = await importer.read(fileContents, this.setReadProgress); + const readResp = await importer.read(fileContents, this.setReadProgress); console.log('read end'); + success = readResp.success; this.setState({ - readStatus: IMPORT_STATUS.COMPLETE, + readStatus: success ? IMPORT_STATUS.COMPLETE : IMPORT_STATUS.FAILED, reading: false, }); + if (readResp.success === false) { + console.error(readResp.error); + errors.push(readResp.error); + } + if (success) { - const resp = await importer.import(index, this.setImportProgress); - success = resp.success; - const uploadStatus = success ? IMPORT_STATUS.COMPLETE : IMPORT_STATUS.FAILED; - this.setState({ uploadStatus }); + const initializeImportResp = await importer.initializeImport(index); + + const indexCreated = (initializeImportResp.index !== undefined); + const pipelineCreated = (initializeImportResp.pipelineId !== undefined); + this.setState({ + indexCreatedStatus: indexCreated ? IMPORT_STATUS.COMPLETE : IMPORT_STATUS.FAILED, + }); + if (indexCreated) { + this.setState({ + ingestPipelineCreatedStatus: pipelineCreated ? IMPORT_STATUS.COMPLETE : IMPORT_STATUS.FAILED, + }); + } + success = (indexCreated && pipelineCreated); - if (success && createIndexPattern) { - const indexPatternName = (indexPattern === '') ? index : indexPattern; - const indexPatternId = await this.createIndexPattern(indexPatternName); - console.log(indexPatternId); + if (success) { + const importId = initializeImportResp.id; + const pipelineId = initializeImportResp.pipelineId; + const importResp = await importer.import(importId, index, pipelineId, this.setImportProgress); + success = importResp.success; this.setState({ - indexPatternCreatedStatus: IMPORT_STATUS.COMPLETE, - indexPatternId, + uploadStatus: importResp.success ? IMPORT_STATUS.COMPLETE : IMPORT_STATUS.FAILED, + importFailures: importResp.failures, }); + + if (success && createIndexPattern) { + const indexPatternName = (indexPattern === '') ? index : indexPattern; + + const indexPatternResp = await this.createIndexPattern(indexPatternName); + success = indexPatternResp.success; + this.setState({ + indexPatternCreatedStatus: indexPatternResp.success ? IMPORT_STATUS.COMPLETE : IMPORT_STATUS.FAILED, + indexPatternId: indexPatternResp.id + }); + if (indexPatternResp.success === false) { + errors.push(indexPatternResp.error); + } + } else { + errors.push(importResp.error); + } + } else { + errors.push(initializeImportResp.error); } } this.setState({ importing: false, imported: success, + errors, }); } else { console.error('Unsupported file format'); @@ -128,8 +177,6 @@ export class ImportView extends Component { setImportProgress = (progress) => { this.setState({ - indexCreatedStatus: (progress > 0) ? IMPORT_STATUS.COMPLETE : IMPORT_STATUS.INCOMPLETE, - ingestPipelineCreatedStatus: (progress > 0) ? IMPORT_STATUS.COMPLETE : IMPORT_STATUS.INCOMPLETE, uploadProgress: progress, }); } @@ -141,7 +188,6 @@ export class ImportView extends Component { } async createIndexPattern(indexPatternName, timeFieldName = DEFAULT_TIME_FIELD) { - let createdId; try { const emptyPattern = await this.props.indexPatterns.get(); @@ -151,12 +197,18 @@ export class ImportView extends Component { timeFieldName, }); - createdId = await emptyPattern.create(); - return createdId; + const id = await emptyPattern.create(); + return { + success: true, + id, + }; } catch (error) { console.error(error); + return { + success: false, + error, + }; } - return createdId; } @@ -168,6 +220,7 @@ export class ImportView extends Component { importing, imported, reading, + initialized, readStatus, indexCreatedStatus, ingestPipelineCreatedStatus, @@ -175,6 +228,8 @@ export class ImportView extends Component { uploadProgress, uploadStatus, createIndexPattern, + errors, + // importFailures, } = this.state; const statuses = { @@ -199,7 +254,7 @@ export class ImportView extends Component { @@ -208,7 +263,7 @@ export class ImportView extends Component { id="createIndexPattern" label="Create index pattern" checked={createIndexPattern === true} - disabled={importing === true} + disabled={initialized === true} onChange={this.onCreateIndexPatternChange} /> @@ -216,10 +271,10 @@ export class ImportView extends Component { - this.clickImport()} - > - Import - + { + (initialized === false || importing === true) && + + this.clickImport()} + isLoading={importing} + iconSide="right" + > + Import + + } + { + (initialized === true && importing === false) && - {(importing === true) && - - - + this.clickReset()} + > + Reset + } + - {(importing === true || imported === true) && + {(initialized === true) && @@ -253,6 +318,19 @@ export class ImportView extends Component { + + { + (errors.length > 0) && + + + + + + + } } diff --git a/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/importer/csv_importer.js b/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/importer/csv_importer.js index bc961bd9c430c3..af389e217862ad 100644 --- a/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/importer/csv_importer.js +++ b/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/importer/csv_importer.js @@ -67,11 +67,15 @@ export class CsvImporter extends Importer { // this.docArray = formatToJson(this.data, this.columnNames); console.timeEnd('read delimited file'); - return true; + return { + success: true, + }; } catch (error) { - console.error(error); console.timeEnd('read delimited file'); - return false; + return { + success: false, + error, + }; } } } diff --git a/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/importer/importer.js b/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/importer/importer.js index e8326d5e75cd8f..77964a1ce5cca8 100644 --- a/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/importer/importer.js +++ b/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/importer/importer.js @@ -10,6 +10,7 @@ import { chunk } from 'lodash'; import moment from 'moment'; const CHUNK_SIZE = 10000; +const IMPORT_RETRIES = 5; export class Importer { constructor(results) { @@ -18,48 +19,83 @@ export class Importer { this.docArray = []; } - async import(index, setImportProgress) { + async initializeImport(index) { + + const mappings = this.results.mappings; + const pipeline = this.results.ingest_pipeline; + updatePipelineTimezone(pipeline); + const ingestPipeline = { + id: `${index}-pipeline`, + pipeline, + }; + + const createIndexResp = await ml.fileDatavisualizer.import({ + id: undefined, + index, + data: [], + mappings, + ingestPipeline + }); + + return createIndexResp; + } + + async import(id, index, pipelineId, setImportProgress) { console.log('create index and ingest'); console.time('create index and ingest'); - if (!index) { - return; + if (!id || !index) { + return { + success: false, + error: 'no id ot index supplied' + }; } const chunks = chunk(this.docArray, CHUNK_SIZE); - // add an empty chunk to the beginning so the first - // import request only creates the index and pipeline - // and returns quickly - chunks.unshift([]); - const mappings = this.results.mappings; - const ingestPipeline = this.results.ingest_pipeline; - updatePipelineTimezone(ingestPipeline); + const ingestPipeline = { + id: pipelineId, + }; - let id = undefined; let success = true; const failures = []; + let error; for (let i = 0; i < chunks.length; i++) { const aggs = { id, index, data: chunks[i], - mappings, + mappings: {}, ingestPipeline }; - const resp = await ml.fileDatavisualizer.import(aggs); - if (resp.success || (resp.success === false && (resp.failures.length < resp.docCount))) { - // allow some failures. however it must be less than the total number of docs sent - id = resp.id; - setImportProgress((i / chunks.length) * 100); + let retries = IMPORT_RETRIES; + let resp = { + success: false, + failures: [], + docCount: 0, + }; + + while (resp.success === false && retries > 0) { + resp = await ml.fileDatavisualizer.import(aggs); + + if (retries < IMPORT_RETRIES) { + console.log(`Retrying import ${IMPORT_RETRIES - retries}`); + } + + retries--; + } + + if (resp.success) { + setImportProgress(((i + 1) / chunks.length) * 100); } else { console.error(resp); success = false; + error = resp.error; break; } - if (resp.failures.length) { + if (resp.failures && resp.failures.length) { // update the item value to include the chunk count // e.g. item 3 in chunk 2 is actually item 20003 for (let f = 0; f < resp.failures.length; f++) { @@ -71,11 +107,20 @@ export class Importer { } console.log('total failures', failures); + console.timeEnd('create index and ingest'); + + const result = { + success, + failures, + }; + if (success) { setImportProgress(100); + } else { + result.error = error; } - console.timeEnd('create index and ingest'); - return { success, failures }; + + return result; } } diff --git a/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/importer/json_importer.js b/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/importer/json_importer.js index 47b284b95272b0..03c63cabebcbc9 100644 --- a/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/importer/json_importer.js +++ b/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/importer/json_importer.js @@ -27,11 +27,16 @@ export class JsonImporter extends Importer { this.docArray = ndjson; console.timeEnd('read json file'); - return true; + return { + success: true, + }; } catch (error) { console.error(error); console.timeEnd('read json file'); - return false; + return { + success: false, + error, + }; } } } diff --git a/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/importer/sst_importer.js b/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/importer/sst_importer.js index 307bf30aac1063..0b523f6451c965 100644 --- a/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/importer/sst_importer.js +++ b/x-pack/plugins/ml/public/file_datavisualizer/components/import_view/importer/sst_importer.js @@ -54,10 +54,15 @@ export class SstImporter extends Importer { this.docArray = this.data; console.timeEnd('read sst file'); - return true; + return { + success: true, + }; } catch (error) { console.error(error); - return false; + return { + success: false, + error, + }; } } } diff --git a/x-pack/plugins/ml/public/file_datavisualizer/components/summary/summary.js b/x-pack/plugins/ml/public/file_datavisualizer/components/summary/summary.js index aa0c4352970590..0589a0af02d61b 100644 --- a/x-pack/plugins/ml/public/file_datavisualizer/components/summary/summary.js +++ b/x-pack/plugins/ml/public/file_datavisualizer/components/summary/summary.js @@ -35,10 +35,10 @@ function createDisplayItems(results) { title: 'Number of lines analyzed', description: results.num_lines_analyzed, }, - { - title: 'Charset', - description: results.charset, - } + // { + // title: 'Charset', + // description: results.charset, + // } ]; if (results.format !== undefined) { diff --git a/x-pack/plugins/ml/server/models/file_data_visualizer/import_data.js b/x-pack/plugins/ml/server/models/file_data_visualizer/import_data.js index b52748ebc53e44..15b78df6568a26 100644 --- a/x-pack/plugins/ml/server/models/file_data_visualizer/import_data.js +++ b/x-pack/plugins/ml/server/models/file_data_visualizer/import_data.js @@ -7,54 +7,72 @@ export function importDataProvider(callWithRequest) { async function importData(id, index, mappings, ingestPipeline, data) { - let pipelineId; + let createdIndex; + let createdPipelineId; const docCount = data.length; try { - // first chunk of data, create the index and id to return + + if (ingestPipeline === undefined || ingestPipeline.id === undefined) { + throw 'No ingest pipeline id specified'; + } + + const { + id: pipelineId, + pipeline, + } = ingestPipeline; + if (id === undefined) { + // first chunk of data, create the index and id to return id = generateId(); + await createIndex(index, mappings); + createdIndex = index; - if (ingestPipeline !== undefined) { - const pid = `${index}-pipeline`; - const success = await createPipeline(pid, ingestPipeline); - if (success.acknowledged === true) { - pipelineId = pid; - } else { - console.error(success); - throw success; - } - console.log('creating pipeline', pipelineId); + const success = await createPipeline(pipelineId, pipeline); + if (success.acknowledged !== true) { + console.error(success); + throw success; } + createdPipelineId = pipelineId; + } else { - if (ingestPipeline !== undefined) { - pipelineId = `${index}-pipeline`; - } + createdIndex = index; + createdPipelineId = pipelineId; } + let failures = []; if (data.length && indexExits(index)) { - const resp = await indexData(index, pipelineId, data); + const resp = await indexData(index, createdPipelineId, data); if (resp.success === false) { - throw resp; + if (resp.failures.length === data.length) { + // all docs failed, abort + throw resp; + } else { + // some docs failed. + // still report success but with a list of failures + failures = (resp.failures || []); + } } } return { success: true, id, - pipelineId, + index: createdIndex, + pipelineId: createdPipelineId, docCount, - failures: [], + failures, }; } catch (error) { return { success: false, id, - pipelineId, + index: createdIndex, + pipelineId: createdPipelineId, error, docCount, - failures: (error.failures ? error.failures : []) + failures: (error.failures || []) }; } } @@ -69,9 +87,7 @@ export function importDataProvider(callWithRequest) { } }; - console.log('creating index'); - const gg = await callWithRequest('indices.create', { index, body }); - console.log(gg); + await callWithRequest('indices.create', { index, body }); } else { throw `${index} already exists.`; } @@ -102,11 +118,20 @@ export function importDataProvider(callWithRequest) { }; } } catch (error) { - const failures = getFailures(error.items || []); + let failures = []; + if (error.errors !== undefined && Array.isArray(error.items)) { + // an expected error where some or all of the bulk request + // docs have failed to be ingested. + failures = getFailures(error.items); + } else { + // some other error has happened. assume all the docs have failed to be ingested + failures = data; + } + return { success: false, error, - docs: data.length, + docCount: data.length, failures, }; }