diff --git a/qa/integration/fixtures/dlq_spec.yml b/qa/integration/fixtures/dlq_spec.yml index 8ca9e6d08bd..6bf2953acd5 100644 --- a/qa/integration/fixtures/dlq_spec.yml +++ b/qa/integration/fixtures/dlq_spec.yml @@ -5,7 +5,7 @@ services: config: input { generator{ - message => '{"test":"one"}' + message => '{"ip":1}' codec => "json" count => 1000 } @@ -17,22 +17,16 @@ config: } filter { - if ([geoip]) { + if ([ip]) { mutate { - remove_field => ["geoip"] + remove_field => ["ip"] add_field => { "mutated" => "true" } } - }else{ - mutate { - add_field => { - "geoip" => "somewhere" - } - } } } output { - elasticsearch {} + elasticsearch { index => "test-index" } } teardown_script: diff --git a/qa/integration/services/elasticsearch_setup.sh b/qa/integration/services/elasticsearch_setup.sh index 0af88f0d9f1..c6693d28ebd 100755 --- a/qa/integration/services/elasticsearch_setup.sh +++ b/qa/integration/services/elasticsearch_setup.sh @@ -8,7 +8,7 @@ ES_HOME="$current_dir/../../../build/elasticsearch" start_es() { es_args=$@ - JAVA_HOME= $ES_HOME/bin/elasticsearch -Epath.data=/tmp/ls_integration/es-data -Epath.logs=/tmp/ls_integration/es-logs $es_args -p $ES_HOME/elasticsearch.pid > /tmp/elasticsearch.log 2>/dev/null & + JAVA_HOME= $ES_HOME/bin/elasticsearch -Epath.data=/tmp/ls_integration/es-data -Ediscovery.type=single-node -Epath.logs=/tmp/ls_integration/es-logs $es_args -p $ES_HOME/elasticsearch.pid > /tmp/elasticsearch.log 2>/dev/null & count=120 echo "Waiting for elasticsearch to respond..." while ! curl --silent localhost:9200 && [[ $count -ne 0 ]]; do diff --git a/qa/integration/specs/dlq_spec.rb b/qa/integration/specs/dlq_spec.rb index da2b521566f..74088f5d125 100644 --- a/qa/integration/specs/dlq_spec.rb +++ b/qa/integration/specs/dlq_spec.rb @@ -23,16 +23,13 @@ require "logstash/devutils/rspec/spec_helper" -def generate_message(number) - message = {} - number.times do |i| - message["field#{i}"] = "value#{i}" - end - message.to_json -end - describe "Test Dead Letter Queue" do + # template with an ip field + let(:template) { { "index_patterns": ["te*"], "mappings": { "properties": { "ip": { "type": "ip" }}}} } + # a message that is incompatible with the template + let(:message) { {"message": "hello", "ip": 1}.to_json } + before(:all) { @fixture = Fixture.new(__FILE__) es_allow_wildcard_deletes(@fixture.get_service("elasticsearch").get_client) @@ -45,7 +42,9 @@ def generate_message(number) before(:each) { IO.write(config_yaml_file, config_yaml) - clean_es(@fixture.get_service("elasticsearch").get_client) + es_client = @fixture.get_service("elasticsearch").get_client + clean_es(es_client) + es_client.perform_request("PUT", "_template/ip-template", {}, template) } @@ -76,15 +75,18 @@ def generate_message(number) logstash_service.wait_for_logstash try(60) do begin - result = es_client.search(index: 'logstash-*', size: 0, q: '*') + result = es_client.search(index: 'test-index', size: 0, q: '*') rescue Elasticsearch::Transport::Transport::Errors::ServiceUnavailable => e puts "Elasticsearch unavailable #{e.inspect}" hits = 0 + rescue Elasticsearch::Transport::Transport::Errors::NotFound => e + puts "Index not found" + hits = 0 end expect(result).to have_hits(1000) end - result = es_client.search(index: 'logstash-*', size: 1, q: '*') + result = es_client.search(index: 'test-index', size: 1, q: '*') s = result["hits"]["hits"][0]["_source"] expect(s["mutated"]).to eq("true") end @@ -100,21 +102,20 @@ def generate_message(number) end context 'with multiple pipelines' do - let(:message) { generate_message(100)} let(:pipelines) {[ { "pipeline.id" => "test", "pipeline.workers" => 1, "dead_letter_queue.enable" => true, - "pipeline.batch.size" => 1, - "config.string" => "input { generator { message => '#{message}' codec => \"json\" count => 1000 } } filter { mutate { add_field => { \"geoip\" => \"somewhere\" } } } output { elasticsearch {} }" + "pipeline.batch.size" => 100, + "config.string" => "input { generator { message => '#{message}' codec => \"json\" count => 1000 } } output { elasticsearch { index => \"test-index\" } }" }, { "pipeline.id" => "test2", "pipeline.workers" => 1, "dead_letter_queue.enable" => false, - "pipeline.batch.size" => 1, - "config.string" => "input { dead_letter_queue { pipeline_id => 'test' path => \"#{dlq_dir}\" commit_offsets => true } } filter { mutate { remove_field => [\"geoip\"] add_field => {\"mutated\" => \"true\" } } } output { elasticsearch {} }" + "pipeline.batch.size" => 100, + "config.string" => "input { dead_letter_queue { pipeline_id => 'test' path => \"#{dlq_dir}\" commit_offsets => true } } filter { mutate { remove_field => [\"ip\"] add_field => {\"mutated\" => \"true\" } } } output { elasticsearch { index => \"test-index\" } }" } ]} @@ -122,22 +123,20 @@ def generate_message(number) end context 'with a single pipeline' do - let(:message) { generate_message(100)} let(:pipelines) {[ { "pipeline.id" => "main", "pipeline.workers" => 1, "dead_letter_queue.enable" => true, - "pipeline.batch.size" => 1, + "pipeline.batch.size" => 100, "config.string" => " input { generator{ message => '#{message}' codec => \"json\" count => 1000 } dead_letter_queue { path => \"#{dlq_dir}\" commit_offsets => true } } filter { - if ([geoip]) { mutate { remove_field => [\"geoip\"] add_field => { \"mutated\" => \"true\" } } } - else{ mutate { add_field => { \"geoip\" => \"somewhere\" } } } + if ([ip]) { mutate { remove_field => [\"ip\"] add_field => { \"mutated\" => \"true\" } } } } - output { elasticsearch {} }" + output { elasticsearch { index => \"test-index\" } }" } ]}