Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions qa/integration/fixtures/dlq_spec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ services:
config:
input {
generator{
message => '{"test":"one"}'
message => '{"ip":1}'
codec => "json"
count => 1000
}
Expand All @@ -17,22 +17,16 @@ config:
}

filter {
if ([geoip]) {
if ([ip]) {
mutate {
remove_field => ["geoip"]
remove_field => ["ip"]
add_field => {
"mutated" => "true"
}
}
}else{
mutate {
add_field => {
"geoip" => "somewhere"
}
}
}
}
output {
elasticsearch {}
elasticsearch { index => "test-index" }
}
teardown_script:
2 changes: 1 addition & 1 deletion qa/integration/services/elasticsearch_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ ES_HOME="$current_dir/../../../build/elasticsearch"

start_es() {
es_args=$@
JAVA_HOME= $ES_HOME/bin/elasticsearch -Epath.data=/tmp/ls_integration/es-data -Epath.logs=/tmp/ls_integration/es-logs $es_args -p $ES_HOME/elasticsearch.pid > /tmp/elasticsearch.log 2>/dev/null &
JAVA_HOME= $ES_HOME/bin/elasticsearch -Epath.data=/tmp/ls_integration/es-data -Ediscovery.type=single-node -Epath.logs=/tmp/ls_integration/es-logs $es_args -p $ES_HOME/elasticsearch.pid > /tmp/elasticsearch.log 2>/dev/null &
count=120
echo "Waiting for elasticsearch to respond..."
while ! curl --silent localhost:9200 && [[ $count -ne 0 ]]; do
Expand Down
41 changes: 20 additions & 21 deletions qa/integration/specs/dlq_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,13 @@

require "logstash/devutils/rspec/spec_helper"

def generate_message(number)
message = {}
number.times do |i|
message["field#{i}"] = "value#{i}"
end
message.to_json
end

describe "Test Dead Letter Queue" do

# template with an ip field
let(:template) { { "index_patterns": ["te*"], "mappings": { "properties": { "ip": { "type": "ip" }}}} }
# a message that is incompatible with the template
let(:message) { {"message": "hello", "ip": 1}.to_json }

before(:all) {
@fixture = Fixture.new(__FILE__)
es_allow_wildcard_deletes(@fixture.get_service("elasticsearch").get_client)
Expand All @@ -45,7 +42,9 @@ def generate_message(number)

before(:each) {
IO.write(config_yaml_file, config_yaml)
clean_es(@fixture.get_service("elasticsearch").get_client)
es_client = @fixture.get_service("elasticsearch").get_client
clean_es(es_client)
es_client.perform_request("PUT", "_template/ip-template", {}, template)
}


Expand Down Expand Up @@ -76,15 +75,18 @@ def generate_message(number)
logstash_service.wait_for_logstash
try(60) do
begin
result = es_client.search(index: 'logstash-*', size: 0, q: '*')
result = es_client.search(index: 'test-index', size: 0, q: '*')
rescue Elasticsearch::Transport::Transport::Errors::ServiceUnavailable => e
puts "Elasticsearch unavailable #{e.inspect}"
hits = 0
rescue Elasticsearch::Transport::Transport::Errors::NotFound => e
puts "Index not found"
hits = 0
end
expect(result).to have_hits(1000)
end

result = es_client.search(index: 'logstash-*', size: 1, q: '*')
result = es_client.search(index: 'test-index', size: 1, q: '*')
s = result["hits"]["hits"][0]["_source"]
expect(s["mutated"]).to eq("true")
end
Expand All @@ -100,44 +102,41 @@ def generate_message(number)
end

context 'with multiple pipelines' do
let(:message) { generate_message(100)}
let(:pipelines) {[
{
"pipeline.id" => "test",
"pipeline.workers" => 1,
"dead_letter_queue.enable" => true,
"pipeline.batch.size" => 1,
"config.string" => "input { generator { message => '#{message}' codec => \"json\" count => 1000 } } filter { mutate { add_field => { \"geoip\" => \"somewhere\" } } } output { elasticsearch {} }"
"pipeline.batch.size" => 100,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The downsize of the batch size is it functional to the test or to the fact that now it's single node cluster?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't mind my comment, I thought it passed from 100 to 1, but it's opposite and is reasonable to have a batch greater than the unit

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah having size 1 was making these tests unnecessarily slow

"config.string" => "input { generator { message => '#{message}' codec => \"json\" count => 1000 } } output { elasticsearch { index => \"test-index\" } }"
},
{
"pipeline.id" => "test2",
"pipeline.workers" => 1,
"dead_letter_queue.enable" => false,
"pipeline.batch.size" => 1,
"config.string" => "input { dead_letter_queue { pipeline_id => 'test' path => \"#{dlq_dir}\" commit_offsets => true } } filter { mutate { remove_field => [\"geoip\"] add_field => {\"mutated\" => \"true\" } } } output { elasticsearch {} }"
"pipeline.batch.size" => 100,
"config.string" => "input { dead_letter_queue { pipeline_id => 'test' path => \"#{dlq_dir}\" commit_offsets => true } } filter { mutate { remove_field => [\"ip\"] add_field => {\"mutated\" => \"true\" } } } output { elasticsearch { index => \"test-index\" } }"
}
]}

it_behaves_like 'it can send 1000 documents to and index from the dlq'
end

context 'with a single pipeline' do
let(:message) { generate_message(100)}
let(:pipelines) {[
{
"pipeline.id" => "main",
"pipeline.workers" => 1,
"dead_letter_queue.enable" => true,
"pipeline.batch.size" => 1,
"pipeline.batch.size" => 100,
"config.string" => "
input { generator{ message => '#{message}' codec => \"json\" count => 1000 }
dead_letter_queue { path => \"#{dlq_dir}\" commit_offsets => true }
}
filter {
if ([geoip]) { mutate { remove_field => [\"geoip\"] add_field => { \"mutated\" => \"true\" } } }
else{ mutate { add_field => { \"geoip\" => \"somewhere\" } } }
if ([ip]) { mutate { remove_field => [\"ip\"] add_field => { \"mutated\" => \"true\" } } }
}
output { elasticsearch {} }"
output { elasticsearch { index => \"test-index\" } }"
}
]}

Expand Down