Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class PipelineEditorUi extends React.Component {
super(props);

const {
pipeline: { id, description, pipeline, settings },
pipeline: { id, description, pipeline, settings, metadata },
} = this.props;

const pipelineWorkersSet = typeof settings['pipeline.workers'] === 'number';
Expand All @@ -59,6 +59,7 @@ class PipelineEditorUi extends React.Component {
'queue.max_bytes': settings['queue.max_bytes.number'] + settings['queue.max_bytes.units'],
'queue.type': settings['queue.type'],
},
metadata,
},
pipelineIdErrors: [],
pipelineIdPattern: /^[A-Za-z\_][A-Za-z0-9\-\_]*$/,
Expand Down Expand Up @@ -124,8 +125,14 @@ class PipelineEditorUi extends React.Component {
};

onPipelineSave = () => {
const { pipelineService, toastNotifications, intl } = this.props;
const { pipelineService, toastNotifications, intl, pipeline, isNewPipeline } = this.props;
const { id, ...pipelineToStore } = this.state.pipeline;

// when edit a pipeline and the pipeline settings (exclude description) updated, version should increase
if (!isNewPipeline && !pipeline.isPipelineSettingsEqualTo(pipelineToStore)) {
pipelineToStore.metadata.version = String(Number(pipelineToStore.metadata.version) + 1);
}

return pipelineService
.savePipeline({
id,
Expand Down Expand Up @@ -516,6 +523,10 @@ PipelineEditorUi.propTypes = {
'queue.max_bytes': PropTypes.number,
'queue.type': PropTypes.string.isRequired,
}),
metadata: PropTypes.shape({
type: PropTypes.string.isRequired,
version: PropTypes.string.isRequired,
}),
}).isRequired,
pipelineService: PropTypes.shape({
deletePipeline: PropTypes.func.isRequired,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import React from 'react';
import { shallowWithIntl } from 'test_utils/enzyme_helpers';
import 'brace';
import { PipelineEditor } from './pipeline_editor';
import { Pipeline } from '../../../models/pipeline';

describe('PipelineEditor component', () => {
let props;
Expand Down Expand Up @@ -41,10 +42,16 @@ describe('PipelineEditor component', () => {
'queue.max_bytes': 1024,
'queue.type': 'MB',
},
metadata: {
version: '1',
type: 'logstash_pipeline',
},
};
pipelineService = {
deletePipeline: jest.fn(),
savePipeline: jest.fn(),
savePipeline: jest.fn(() => {
return Promise.resolve(jest.fn());
}),
};
toastNotifications = {
addWarning: jest.fn(),
Expand Down Expand Up @@ -137,12 +144,42 @@ describe('PipelineEditor component', () => {
expect(wrapper.instance().state.pipeline.settings['queue.checkpoint.writes']).toBe(14);
});

it('updates pipeline metadata', () => {
const wrapper = shallowWithIntl(<PipelineEditor.WrappedComponent {...props} />);
wrapper
.find(`[data-test-subj="inputBatchSize"]`)
.simulate('change', { target: { value: '11' } });

expect(wrapper.instance().state.pipeline.settings['pipeline.batch.size']).toBe(11);
expect(wrapper.instance().state.pipeline.settings['pipeline.batch.size']).toBe(11);
});

it('calls the pipelineService delete function on delete', () => {
const wrapper = shallowWithIntl(<PipelineEditor.WrappedComponent {...props} />);
wrapper.find(`[data-test-subj="btnDeletePipeline"]`).simulate('click');
expect(wrapper.instance().state.showConfirmDeleteModal).toBe(true);
});

it('update metadata version on save when ', () => {
props.pipeline = new Pipeline(props.pipeline);
const wrapper = shallowWithIntl(<PipelineEditor.WrappedComponent {...props} />);
wrapper
.find(`[data-test-subj="inputBatchSize"]`)
.simulate('change', { target: { value: '11' } });
wrapper.find(`[data-test-subj="btnSavePipeline"]`).simulate('click');
expect(wrapper.instance().state.pipeline.metadata.version).toBe('2');
});

it('keep metadata version on save when description update', () => {
props.pipeline = new Pipeline(props.pipeline);
const wrapper = shallowWithIntl(<PipelineEditor.WrappedComponent {...props} />);
wrapper
.find(`[data-test-subj="inputDescription"]`)
.simulate('change', { target: { value: 'the new description' } });
wrapper.find(`[data-test-subj="btnSavePipeline"]`).simulate('click');
expect(wrapper.instance().state.pipeline.metadata.version).toBe('1');
});

it('only matches pipeline names that fit the acceptable parameters', () => {
const wrapper = shallowWithIntl(<PipelineEditor.WrappedComponent {...props} />);
const pattern = wrapper.instance().state.pipelineIdPattern;
Expand Down
27 changes: 26 additions & 1 deletion x-pack/plugins/logstash/public/models/pipeline/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ const settingsDefaults = {
'queue.checkpoint.writes': 1024,
};

const metadataDefaults = {
type: 'logstash_pipeline',
version: '1',
};

export class Pipeline {
/**
* Represents the pipeline for the client side editing/creating workflow
Expand All @@ -35,11 +40,12 @@ export class Pipeline {
this.pipeline = get(props, 'pipeline', emptyPipeline);
this.username = get(props, 'username');
this.settings = defaultsDeep(get(props, 'settings', {}), settingsDefaults);
this.metadata = defaultsDeep(get(props, 'metadata', {}), metadataDefaults);
}

get clone() {
return new Pipeline({
...omit(this, ['id', 'username']),
...omit(this, ['id', 'username', 'metadata']),
});
}

Expand All @@ -61,6 +67,7 @@ export class Pipeline {
pipeline: this.pipeline,
username: this.username,
settings: upstreamSettings,
metadata: this.metadata,
};
}

Expand All @@ -86,6 +93,7 @@ export class Pipeline {
pipeline: pipeline.pipeline,
username: pipeline.username,
settings,
metadata: pipeline.metadata,
});
}

Expand All @@ -101,4 +109,21 @@ export class Pipeline {

return isEqual(cleanPipeline, cleanOtherPipeline);
};

// compare config string and settings. ignore metadata and description
isPipelineSettingsEqualTo = (otherPojo) => {
const thisPojo = { ...this };

return (
thisPojo.pipeline === otherPojo.pipeline &&
thisPojo.settings['pipeline.workers'] === otherPojo.settings['pipeline.workers'] &&
thisPojo.settings['pipeline.batch.size'] === otherPojo.settings['pipeline.batch.size'] &&
thisPojo.settings['pipeline.batch.delay'] === otherPojo.settings['pipeline.batch.delay'] &&
thisPojo.settings['queue.type'] === otherPojo.settings['queue.type'] &&
thisPojo.settings['queue.checkpoint.writes'] ===
otherPojo.settings['queue.checkpoint.writes'] &&
`${thisPojo.settings['queue.max_bytes.number']}${thisPojo.settings['queue.max_bytes.units']}` ===
otherPojo.settings['queue.max_bytes']
);
};
}

This file was deleted.

This file was deleted.

This file was deleted.

27 changes: 14 additions & 13 deletions x-pack/plugins/logstash/server/models/pipeline/pipeline.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ describe('pipeline', () => {
describe('Pipeline', () => {
describe('fromUpstreamJSON factory method', () => {
const upstreamJSON = {
_id: 'apache',
_source: {
apache: {
description: 'this is an apache pipeline',
pipeline_metadata: {
version: 1,
Expand All @@ -21,25 +20,23 @@ describe('pipeline', () => {
pipeline: 'input {} filter { grok {} }\n output {}',
},
};
const upstreamId = 'apache';

it('returns correct Pipeline instance', () => {
const pipeline = Pipeline.fromUpstreamJSON(upstreamJSON);
expect(pipeline.id).toBe(upstreamJSON._id);
expect(pipeline.description).toBe(upstreamJSON._source.description);
expect(pipeline.username).toBe(upstreamJSON._source.username);
expect(pipeline.pipeline).toBe(upstreamJSON._source.pipeline);
expect(pipeline.id).toBe(upstreamId);
expect(pipeline.description).toBe(upstreamJSON.apache.description);
expect(pipeline.username).toBe(upstreamJSON.apache.username);
expect(pipeline.pipeline).toBe(upstreamJSON.apache.pipeline);
});

it('throws if pipeline argument does not contain an id property', () => {
const badJSON = {
// no _id
_source: upstreamJSON._source,
};
it('throws if pipeline argument does not contain id as a key', () => {
const badJSON = {};
const testFromUpstreamJsonError = () => {
return Pipeline.fromUpstreamJSON(badJSON);
};
expect(testFromUpstreamJsonError).toThrowError(
/upstreamPipeline argument must contain an id property/i
/upstreamPipeline argument must contain pipeline id as a key/i
);
});
});
Expand All @@ -51,13 +48,17 @@ describe('pipeline', () => {
description: 'this is an apache pipeline',
username: 'elastic',
pipeline: 'input {} filter { grok {} }\n output {}',
metadata: {
version: '1',
type: 'logstash_pipeline',
},
};
const pipeline = new Pipeline(downstreamJSON);
const expectedUpstreamJSON = {
description: 'this is an apache pipeline',
pipeline_metadata: {
type: 'logstash_pipeline',
version: 1,
version: '1',
},
username: 'elastic',
pipeline: 'input {} filter { grok {} }\n output {}',
Expand Down
Loading