2323
2424require "logstash/devutils/rspec/spec_helper"
2525
26- def generate_message ( number )
27- message = { }
28- number . times do |i |
29- message [ "field#{ i } " ] = "value#{ i } "
30- end
31- message . to_json
32- end
33-
3426describe "Test Dead Letter Queue" do
3527
28+ # template with an ip field
29+ let ( :template ) { { "index_patterns" : [ "te*" ] , "mappings" : { "properties" : { "ip" : { "type" : "ip" } } } } }
30+ # a message that is incompatible with the template
31+ let ( :message ) { { "message" : "hello" , "ip" : 1 } . to_json }
32+
3633 before ( :all ) {
3734 @fixture = Fixture . new ( __FILE__ )
3835 es_allow_wildcard_deletes ( @fixture . get_service ( "elasticsearch" ) . get_client )
@@ -45,7 +42,9 @@ def generate_message(number)
4542
4643 before ( :each ) {
4744 IO . write ( config_yaml_file , config_yaml )
48- clean_es ( @fixture . get_service ( "elasticsearch" ) . get_client )
45+ es_client = @fixture . get_service ( "elasticsearch" ) . get_client
46+ clean_es ( es_client )
47+ es_client . perform_request ( "PUT" , "_template/ip-template" , { } , template )
4948 }
5049
5150
@@ -76,15 +75,18 @@ def generate_message(number)
7675 logstash_service . wait_for_logstash
7776 try ( 60 ) do
7877 begin
79- result = es_client . search ( index : 'logstash-* ' , size : 0 , q : '*' )
78+ result = es_client . search ( index : 'test-index ' , size : 0 , q : '*' )
8079 rescue Elasticsearch ::Transport ::Transport ::Errors ::ServiceUnavailable => e
8180 puts "Elasticsearch unavailable #{ e . inspect } "
8281 hits = 0
82+ rescue Elasticsearch ::Transport ::Transport ::Errors ::NotFound => e
83+ puts "Index not found"
84+ hits = 0
8385 end
8486 expect ( result ) . to have_hits ( 1000 )
8587 end
8688
87- result = es_client . search ( index : 'logstash-* ' , size : 1 , q : '*' )
89+ result = es_client . search ( index : 'test-index ' , size : 1 , q : '*' )
8890 s = result [ "hits" ] [ "hits" ] [ 0 ] [ "_source" ]
8991 expect ( s [ "mutated" ] ) . to eq ( "true" )
9092 end
@@ -100,44 +102,41 @@ def generate_message(number)
100102 end
101103
102104 context 'with multiple pipelines' do
103- let ( :message ) { generate_message ( 100 ) }
104105 let ( :pipelines ) { [
105106 {
106107 "pipeline.id" => "test" ,
107108 "pipeline.workers" => 1 ,
108109 "dead_letter_queue.enable" => true ,
109- "pipeline.batch.size" => 1 ,
110- "config.string" => "input { generator { message => '#{ message } ' codec => \" json\" count => 1000 } } filter { mutate { add_field => { \" geoip \" => \" somewhere \" } } } output { elasticsearch { } }"
110+ "pipeline.batch.size" => 100 ,
111+ "config.string" => "input { generator { message => '#{ message } ' codec => \" json\" count => 1000 } } output { elasticsearch { index => \" test-index \" } }"
111112 } ,
112113 {
113114 "pipeline.id" => "test2" ,
114115 "pipeline.workers" => 1 ,
115116 "dead_letter_queue.enable" => false ,
116- "pipeline.batch.size" => 1 ,
117- "config.string" => "input { dead_letter_queue { pipeline_id => 'test' path => \" #{ dlq_dir } \" commit_offsets => true } } filter { mutate { remove_field => [\" geoip \" ] add_field => {\" mutated\" => \" true\" } } } output { elasticsearch {} }"
117+ "pipeline.batch.size" => 100 ,
118+ "config.string" => "input { dead_letter_queue { pipeline_id => 'test' path => \" #{ dlq_dir } \" commit_offsets => true } } filter { mutate { remove_field => [\" ip \" ] add_field => {\" mutated\" => \" true\" } } } output { elasticsearch { index => \" test-index \" } }"
118119 }
119120 ] }
120121
121122 it_behaves_like 'it can send 1000 documents to and index from the dlq'
122123 end
123124
124125 context 'with a single pipeline' do
125- let ( :message ) { generate_message ( 100 ) }
126126 let ( :pipelines ) { [
127127 {
128128 "pipeline.id" => "main" ,
129129 "pipeline.workers" => 1 ,
130130 "dead_letter_queue.enable" => true ,
131- "pipeline.batch.size" => 1 ,
131+ "pipeline.batch.size" => 100 ,
132132 "config.string" => "
133133 input { generator{ message => '#{ message } ' codec => \" json\" count => 1000 }
134134 dead_letter_queue { path => \" #{ dlq_dir } \" commit_offsets => true }
135135 }
136136 filter {
137- if ([geoip]) { mutate { remove_field => [\" geoip\" ] add_field => { \" mutated\" => \" true\" } } }
138- else{ mutate { add_field => { \" geoip\" => \" somewhere\" } } }
137+ if ([ip]) { mutate { remove_field => [\" ip\" ] add_field => { \" mutated\" => \" true\" } } }
139138 }
140- output { elasticsearch {} }"
139+ output { elasticsearch { index => \" test-index \" } }"
141140 }
142141 ] }
143142
0 commit comments