-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
output.rb
1603 lines (1408 loc) · 62.4 KB
/
output.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'fluent/env'
require 'fluent/error'
require 'fluent/plugin/base'
require 'fluent/plugin/buffer'
require 'fluent/plugin_helper/record_accessor'
require 'fluent/msgpack_factory'
require 'fluent/log'
require 'fluent/plugin_id'
require 'fluent/plugin_helper'
require 'fluent/timezone'
require 'fluent/unique_id'
require 'fluent/clock'
require 'fluent/ext_monitor_require'
require 'time'
module Fluent
module Plugin
class Output < Base
include PluginId
include PluginLoggerMixin
include PluginHelper::Mixin
include UniqueId::Mixin
helpers_internal :thread, :retry_state, :metrics
CHUNK_KEY_PATTERN = /^[-_.@a-zA-Z0-9]+$/
CHUNK_KEY_PLACEHOLDER_PATTERN = /\$\{([-_.@$a-zA-Z0-9]+)\}/
CHUNK_TAG_PLACEHOLDER_PATTERN = /\$\{(tag(?:\[-?\d+\])?)\}/
CHUNK_ID_PLACEHOLDER_PATTERN = /\$\{chunk_id\}/
CHUNKING_FIELD_WARN_NUM = 4
config_param :time_as_integer, :bool, default: false
desc 'The threshold to show slow flush logs'
config_param :slow_flush_log_threshold, :float, default: 20.0
# `<buffer>` and `<secondary>` sections are available only when '#format' and '#write' are implemented
config_section :buffer, param_name: :buffer_config, init: true, required: false, multi: false, final: true do
config_argument :chunk_keys, :array, value_type: :string, default: []
config_param :@type, :string, default: 'memory', alias: :type
config_param :timekey, :time, default: nil # range size to be used: `time.to_i / @timekey`
config_param :timekey_wait, :time, default: 600
# These are for #extract_placeholders
config_param :timekey_use_utc, :bool, default: false # default is localtime
config_param :timekey_zone, :string, default: Time.now.strftime('%z') # e.g., "-0700" or "Asia/Tokyo"
desc 'If true, plugin will try to flush buffer just before shutdown.'
config_param :flush_at_shutdown, :bool, default: nil # change default by buffer_plugin.persistent?
desc 'How to enqueue chunks to be flushed. "interval" flushes per flush_interval, "immediate" flushes just after event arrival.'
config_param :flush_mode, :enum, list: [:default, :lazy, :interval, :immediate], default: :default
config_param :flush_interval, :time, default: 60, desc: 'The interval between buffer chunk flushes.'
config_param :flush_thread_count, :integer, default: 1, desc: 'The number of threads to flush the buffer.'
config_param :flush_thread_interval, :float, default: 1.0, desc: 'Seconds to sleep between checks for buffer flushes in flush threads.'
config_param :flush_thread_burst_interval, :float, default: 1.0, desc: 'Seconds to sleep between flushes when many buffer chunks are queued.'
config_param :delayed_commit_timeout, :time, default: 60, desc: 'Seconds of timeout for buffer chunks to be committed by plugins later.'
config_param :overflow_action, :enum, list: [:throw_exception, :block, :drop_oldest_chunk], default: :throw_exception, desc: 'The action when the size of buffer exceeds the limit.'
config_param :retry_forever, :bool, default: false, desc: 'If true, plugin will ignore retry_timeout and retry_max_times options and retry flushing forever.'
config_param :retry_timeout, :time, default: 72 * 60 * 60, desc: 'The maximum seconds to retry to flush while failing, until plugin discards buffer chunks.'
# 72hours == 17 times with exponential backoff (not to change default behavior)
config_param :retry_max_times, :integer, default: nil, desc: 'The maximum number of times to retry to flush while failing.'
config_param :retry_secondary_threshold, :float, default: 0.8, desc: 'ratio of retry_timeout to switch to use secondary while failing.'
# exponential backoff sequence will be initialized at the time of this threshold
desc 'How to wait next retry to flush buffer.'
config_param :retry_type, :enum, list: [:exponential_backoff, :periodic], default: :exponential_backoff
### Periodic -> fixed :retry_wait
### Exponential backoff: k is number of retry times
# c: constant factor, @retry_wait
# b: base factor, @retry_exponential_backoff_base
# k: times
# total retry time: c + c * b^1 + (...) + c*b^k = c*b^(k+1) - 1
config_param :retry_wait, :time, default: 1, desc: 'Seconds to wait before next retry to flush, or constant factor of exponential backoff.'
config_param :retry_exponential_backoff_base, :float, default: 2, desc: 'The base number of exponential backoff for retries.'
config_param :retry_max_interval, :time, default: nil, desc: 'The maximum interval seconds for exponential backoff between retries while failing.'
config_param :retry_randomize, :bool, default: true, desc: 'If true, output plugin will retry after randomized interval not to do burst retries.'
end
config_section :secondary, param_name: :secondary_config, required: false, multi: false, final: true do
config_param :@type, :string, default: nil, alias: :type
config_section :buffer, required: false, multi: false do
# dummy to detect invalid specification for here
end
config_section :secondary, required: false, multi: false do
# dummy to detect invalid specification for here
end
end
def process(tag, es)
raise NotImplementedError, "BUG: output plugins MUST implement this method"
end
def write(chunk)
raise NotImplementedError, "BUG: output plugins MUST implement this method"
end
def try_write(chunk)
raise NotImplementedError, "BUG: output plugins MUST implement this method"
end
def format(tag, time, record)
# standard msgpack_event_stream chunk will be used if this method is not implemented in plugin subclass
raise NotImplementedError, "BUG: output plugins MUST implement this method"
end
def formatted_to_msgpack_binary?
# To indicate custom format method (#format) returns msgpack binary or not.
# If #format returns msgpack binary, override this method to return true.
false
end
# Compatibility for existing plugins
def formatted_to_msgpack_binary
formatted_to_msgpack_binary?
end
def prefer_buffered_processing
# override this method to return false only when all of these are true:
# * plugin has both implementation for buffered and non-buffered methods
# * plugin is expected to work as non-buffered plugin if no `<buffer>` sections specified
true
end
def prefer_delayed_commit
# override this method to decide which is used of `write` or `try_write` if both are implemented
true
end
def multi_workers_ready?
false
end
# Internal states
FlushThreadState = Struct.new(:thread, :next_clock, :mutex, :cond_var)
DequeuedChunkInfo = Struct.new(:chunk_id, :time, :timeout) do
def expired?
time + timeout < Time.now
end
end
attr_reader :as_secondary, :delayed_commit, :delayed_commit_timeout, :timekey_zone
# for tests
attr_reader :buffer, :retry, :secondary, :chunk_keys, :chunk_key_accessors, :chunk_key_time, :chunk_key_tag
attr_accessor :output_enqueue_thread_waiting, :dequeued_chunks, :dequeued_chunks_mutex
# output_enqueue_thread_waiting: for test of output.rb itself
attr_accessor :retry_for_error_chunk # if true, error flush will be retried even if under_plugin_development is true
def num_errors
@num_errors_metrics.get
end
def emit_count
@emit_count_metrics.get
end
def emit_size
@emit_size_metrics.get
end
def emit_records
@emit_records_metrics.get
end
def write_count
@write_count_metrics.get
end
def rollback_count
@rollback_count_metrics.get
end
def initialize
super
@counter_mutex = Mutex.new
@flush_thread_mutex = Mutex.new
@buffering = false
@delayed_commit = false
@as_secondary = false
@primary_instance = nil
# TODO: well organized counters
@num_errors_metrics = nil
@emit_count_metrics = nil
@emit_records_metrics = nil
@emit_size_metrics = nil
@write_count_metrics = nil
@rollback_count_metrics = nil
@flush_time_count_metrics = nil
@slow_flush_count_metrics = nil
@enable_size_metrics = false
# How to process events is decided here at once, but it will be decided in delayed way on #configure & #start
if implement?(:synchronous)
if implement?(:buffered) || implement?(:delayed_commit)
@buffering = nil # do #configure or #start to determine this for full-featured plugins
else
@buffering = false
end
else
@buffering = true
end
@custom_format = implement?(:custom_format)
@enable_msgpack_streamer = false # decided later
@buffer = nil
@secondary = nil
@retry = nil
@dequeued_chunks = nil
@dequeued_chunks_mutex = nil
@output_enqueue_thread = nil
@output_flush_threads = nil
@output_flush_thread_current_position = 0
@simple_chunking = nil
@chunk_keys = @chunk_key_accessors = @chunk_key_time = @chunk_key_tag = nil
@flush_mode = nil
@timekey_zone = nil
@retry_for_error_chunk = false
end
def acts_as_secondary(primary)
@as_secondary = true
@primary_instance = primary
@chunk_keys = @primary_instance.chunk_keys || []
@chunk_key_tag = @primary_instance.chunk_key_tag || false
if @primary_instance.chunk_key_time
@chunk_key_time = @primary_instance.chunk_key_time
@timekey_zone = @primary_instance.timekey_zone
@output_time_formatter_cache = {}
end
self.context_router = primary.context_router
singleton_class.module_eval do
define_method(:commit_write){ |chunk_id| @primary_instance.commit_write(chunk_id, delayed: delayed_commit, secondary: true) }
define_method(:rollback_write){ |chunk_id, update_retry: true| @primary_instance.rollback_write(chunk_id, update_retry) }
end
end
def configure(conf)
unless implement?(:synchronous) || implement?(:buffered) || implement?(:delayed_commit)
raise "BUG: output plugin must implement some methods. see developer documents."
end
has_buffer_section = (conf.elements(name: 'buffer').size > 0)
has_flush_interval = conf.has_key?('flush_interval')
super
@num_errors_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "num_errors", help_text: "Number of count num errors")
@emit_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_count", help_text: "Number of count emits")
@emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_records", help_text: "Number of emit records")
@emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_size", help_text: "Total size of emit events")
@write_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "write_count", help_text: "Number of writing events")
@rollback_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "rollback_count", help_text: "Number of rollbacking operations")
@flush_time_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "flush_time_count", help_text: "Count of flush time")
@slow_flush_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "slow_flush_count", help_text: "Count of slow flush occurred time(s)")
if has_buffer_section
unless implement?(:buffered) || implement?(:delayed_commit)
raise Fluent::ConfigError, "<buffer> section is configured, but plugin '#{self.class}' doesn't support buffering"
end
@buffering = true
else # no buffer sections
if implement?(:synchronous)
if !implement?(:buffered) && !implement?(:delayed_commit)
if @as_secondary
raise Fluent::ConfigError, "secondary plugin '#{self.class}' must support buffering, but doesn't."
end
@buffering = false
else
if @as_secondary
# secondary plugin always works as buffered plugin without buffer instance
@buffering = true
else
# @buffering.nil? shows that enabling buffering or not will be decided in lazy way in #start
@buffering = nil
end
end
else # buffered or delayed_commit is supported by `unless` of first line in this method
@buffering = true
end
end
# Enable to update record size metrics or not
@enable_size_metrics = !!system_config.enable_size_metrics
if @as_secondary
if !@buffering && [email protected]?
raise Fluent::ConfigError, "secondary plugin '#{self.class}' must support buffering, but doesn't"
end
end
if (@buffering || @buffering.nil?) && !@as_secondary
# When @buffering.nil?, @buffer_config was initialized with default value for all parameters.
# If so, this configuration MUST success.
@chunk_keys = @buffer_config.chunk_keys.dup
@chunk_key_time = !!@chunk_keys.delete('time')
@chunk_key_tag = !!@chunk_keys.delete('tag')
if @chunk_keys.any? { |key|
begin
k = Fluent::PluginHelper::RecordAccessor::Accessor.parse_parameter(key)
if k.is_a?(String)
k !~ CHUNK_KEY_PATTERN
else
if key.start_with?('$[')
raise Fluent::ConfigError, "in chunk_keys: bracket notation is not allowed"
else
false
end
end
rescue => e
raise Fluent::ConfigError, "in chunk_keys: #{e.message}"
end
}
raise Fluent::ConfigError, "chunk_keys specification includes invalid char"
else
@chunk_key_accessors = Hash[@chunk_keys.map { |key| [key.to_sym, Fluent::PluginHelper::RecordAccessor::Accessor.new(key)] }]
end
if @chunk_key_time
raise Fluent::ConfigError, "<buffer ...> argument includes 'time', but timekey is not configured" unless @buffer_config.timekey
Fluent::Timezone.validate!(@buffer_config.timekey_zone)
@timekey_zone = @buffer_config.timekey_use_utc ? '+0000' : @buffer_config.timekey_zone
@timekey = @buffer_config.timekey
if @timekey <= 0
raise Fluent::ConfigError, "timekey should be greater than 0. current timekey: #{@timekey}"
end
@timekey_use_utc = @buffer_config.timekey_use_utc
@offset = Fluent::Timezone.utc_offset(@timekey_zone)
@calculate_offset = @offset.respond_to?(:call) ? @offset : nil
@output_time_formatter_cache = {}
end
if (@chunk_key_tag ? 1 : 0) + @chunk_keys.size >= CHUNKING_FIELD_WARN_NUM
log.warn "many chunk keys specified, and it may cause too many chunks on your system."
end
# no chunk keys or only tags (chunking can be done without iterating event stream)
@simple_chunking = !@chunk_key_time && @chunk_keys.empty?
@flush_mode = @buffer_config.flush_mode
if @flush_mode == :default
if has_flush_interval
log.info "'flush_interval' is configured at out side of <buffer>. 'flush_mode' is set to 'interval' to keep existing behaviour"
@flush_mode = :interval
else
@flush_mode = (@chunk_key_time ? :lazy : :interval)
end
end
buffer_type = @buffer_config[:@type]
buffer_conf = conf.elements(name: 'buffer').first || Fluent::Config::Element.new('buffer', '', {}, [])
@buffer = Plugin.new_buffer(buffer_type, parent: self)
@buffer.configure(buffer_conf)
keep_buffer_config_compat
@buffer.enable_update_timekeys if @chunk_key_time
@flush_at_shutdown = @buffer_config.flush_at_shutdown
if @flush_at_shutdown.nil?
@flush_at_shutdown = if @buffer.persistent?
false
else
true # flush_at_shutdown is true in default for on-memory buffer
end
elsif !@flush_at_shutdown && [email protected]?
buf_type = Plugin.lookup_type_from_class(@buffer.class)
log.warn "'flush_at_shutdown' is false, and buffer plugin '#{buf_type}' is not persistent buffer."
log.warn "your configuration will lose buffered data at shutdown. please confirm your configuration again."
end
if (@flush_mode != :interval) && buffer_conf.has_key?('flush_interval')
if buffer_conf.has_key?('flush_mode')
raise Fluent::ConfigError, "'flush_interval' can't be specified when 'flush_mode' is not 'interval' explicitly: '#{@flush_mode}'"
else
log.warn "'flush_interval' is ignored because default 'flush_mode' is not 'interval': '#{@flush_mode}'"
end
end
if @buffer.queued_chunks_limit_size.nil?
@buffer.queued_chunks_limit_size = @buffer_config.flush_thread_count
end
end
if @secondary_config
raise Fluent::ConfigError, "Invalid <secondary> section for non-buffered plugin" unless @buffering
raise Fluent::ConfigError, "<secondary> section cannot have <buffer> section" if @secondary_config.buffer
raise Fluent::ConfigError, "<secondary> section cannot have <secondary> section" if @secondary_config.secondary
if @buffer_config.retry_forever
log.warn "<secondary> with 'retry_forever', only unrecoverable errors are moved to secondary"
end
secondary_type = @secondary_config[:@type]
unless secondary_type
secondary_type = conf['@type'] # primary plugin type
end
secondary_conf = conf.elements(name: 'secondary').first
@secondary = Plugin.new_output(secondary_type)
unless @secondary.respond_to?(:acts_as_secondary)
raise Fluent::ConfigError, "Failed to setup secondary plugin in '#{conf['@type']}'. '#{secondary_type}' plugin in not allowed due to non buffered output"
end
@secondary.acts_as_secondary(self)
@secondary.configure(secondary_conf)
if (@secondary.class.to_s != "Fluent::Plugin::SecondaryFileOutput") &&
(self.class != @secondary.class) &&
(@custom_format || @secondary.implement?(:custom_format))
log.warn "Use different plugin for secondary. Check the plugin works with primary like secondary_file", primary: self.class.to_s, secondary: @secondary.class.to_s
end
else
@secondary = nil
end
self
end
def keep_buffer_config_compat
# Need this to call `@buffer_config.disable_chunk_backup` just as before,
# since some plugins may use this option in this way.
@buffer_config[:disable_chunk_backup] = @buffer.disable_chunk_backup
end
def start
super
if @buffering.nil?
@buffering = prefer_buffered_processing
if !@buffering && @buffer
@buffer.terminate # it's not started, so terminate will be enough
# At here, this plugin works as non-buffered plugin.
# Un-assign @buffer not to show buffering metrics (e.g., in_monitor_agent)
@buffer = nil
end
end
if @buffering
m = method(:emit_buffered)
singleton_class.module_eval do
define_method(:emit_events, m)
end
@custom_format = implement?(:custom_format)
@enable_msgpack_streamer = @custom_format ? formatted_to_msgpack_binary : true
@delayed_commit = if implement?(:buffered) && implement?(:delayed_commit)
prefer_delayed_commit
else
implement?(:delayed_commit)
end
@delayed_commit_timeout = @buffer_config.delayed_commit_timeout
else # !@buffering
m = method(:emit_sync)
singleton_class.module_eval do
define_method(:emit_events, m)
end
end
if @buffering && !@as_secondary
@retry = nil
@retry_mutex = Mutex.new
@buffer.start
@output_enqueue_thread = nil
@output_enqueue_thread_running = true
@output_flush_threads = []
@output_flush_threads_mutex = Mutex.new
@output_flush_threads_running = true
# mainly for test: detect enqueue works as code below:
# @output.interrupt_flushes
# # emits
# @output.enqueue_thread_wait
@output_flush_interrupted = false
@output_enqueue_thread_mutex = Mutex.new
@output_enqueue_thread_waiting = false
@dequeued_chunks = []
@dequeued_chunks_mutex = Mutex.new
@output_flush_thread_current_position = 0
@buffer_config.flush_thread_count.times do |i|
thread_title = "flush_thread_#{i}".to_sym
thread_state = FlushThreadState.new(nil, nil, Mutex.new, ConditionVariable.new)
thread = thread_create(thread_title) do
flush_thread_run(thread_state)
end
thread_state.thread = thread
@output_flush_threads_mutex.synchronize do
@output_flush_threads << thread_state
end
end
if !@under_plugin_development && (@flush_mode == :interval || @chunk_key_time)
@output_enqueue_thread = thread_create(:enqueue_thread, &method(:enqueue_thread_run))
end
end
@secondary.start if @secondary
end
def after_start
super
@secondary.after_start if @secondary
end
def stop
@secondary.stop if @secondary
@buffer.stop if @buffering && @buffer
super
end
def before_shutdown
@secondary.before_shutdown if @secondary
if @buffering && @buffer
if @flush_at_shutdown
force_flush
end
@buffer.before_shutdown
# Need to ensure to stop enqueueing ... after #shutdown, we cannot write any data
@output_enqueue_thread_running = false
if @output_enqueue_thread && @output_enqueue_thread.alive?
@output_enqueue_thread.wakeup
@output_enqueue_thread.join
end
end
super
end
def shutdown
@secondary.shutdown if @secondary
@buffer.shutdown if @buffering && @buffer
super
end
def after_shutdown
try_rollback_all if @buffering && !@as_secondary # rollback regardless with @delayed_commit, because secondary may do it
@secondary.after_shutdown if @secondary
if @buffering && @buffer
@buffer.after_shutdown
@output_flush_threads_running = false
if @output_flush_threads && !@output_flush_threads.empty?
@output_flush_threads.each do |state|
# to wakeup thread and make it to stop by itself
state.mutex.synchronize {
if state.thread && state.thread.status
state.next_clock = 0
state.cond_var.signal
end
}
Thread.pass
state.thread.join
end
end
end
super
end
def close
@buffer.close if @buffering && @buffer
@secondary.close if @secondary
super
end
def terminate
@buffer.terminate if @buffering && @buffer
@secondary.terminate if @secondary
super
end
def actual_flush_thread_count
return 0 unless @buffering
return @buffer_config.flush_thread_count unless @as_secondary
@primary_instance.buffer_config.flush_thread_count
end
# Ensures `path` (filename or filepath) processable
# only by the current thread in the current process.
# For multiple workers, the lock is shared if `path` is the same value.
# For multiple threads, the lock is shared by all threads in the same process.
def synchronize_path(path)
synchronize_path_in_workers(path) do
synchronize_in_threads do
yield
end
end
end
def synchronize_path_in_workers(path)
need_worker_lock = system_config.workers > 1
if need_worker_lock
acquire_worker_lock(path) { yield }
else
yield
end
end
def synchronize_in_threads
need_thread_lock = actual_flush_thread_count > 1
if need_thread_lock
@flush_thread_mutex.synchronize { yield }
else
yield
end
end
def support_in_v12_style?(feature)
# for plugins written in v0.12 styles
case feature
when :synchronous then false
when :buffered then false
when :delayed_commit then false
when :custom_format then false
else
raise ArgumentError, "unknown feature: #{feature}"
end
end
def implement?(feature)
methods_of_plugin = self.class.instance_methods(false)
case feature
when :synchronous then methods_of_plugin.include?(:process) || support_in_v12_style?(:synchronous)
when :buffered then methods_of_plugin.include?(:write) || support_in_v12_style?(:buffered)
when :delayed_commit then methods_of_plugin.include?(:try_write)
when :custom_format then methods_of_plugin.include?(:format) || support_in_v12_style?(:custom_format)
else
raise ArgumentError, "Unknown feature for output plugin: #{feature}"
end
end
def placeholder_validate!(name, str)
placeholder_validators(name, str).each do |v|
v.validate!
end
end
def placeholder_validators(name, str, time_key = (@chunk_key_time && @buffer_config.timekey), tag_key = @chunk_key_tag, chunk_keys = @chunk_keys)
validators = []
sec, title, example = get_placeholders_time(str)
if sec || time_key
validators << PlaceholderValidator.new(name, str, :time, {sec: sec, title: title, example: example, timekey: time_key})
end
parts = get_placeholders_tag(str)
if tag_key || !parts.empty?
validators << PlaceholderValidator.new(name, str, :tag, {parts: parts, tagkey: tag_key})
end
keys = get_placeholders_keys(str)
if chunk_keys && !chunk_keys.empty? || !keys.empty?
validators << PlaceholderValidator.new(name, str, :keys, {keys: keys, chunkkeys: chunk_keys})
end
validators
end
class PlaceholderValidator
attr_reader :name, :string, :type, :argument
def initialize(name, str, type, arg)
@name = name
@string = str
@type = type
raise ArgumentError, "invalid type:#{type}" if @type != :time && @type != :tag && @type != :keys
@argument = arg
end
def time?
@type == :time
end
def tag?
@type == :tag
end
def keys?
@type == :keys
end
def validate!
case @type
when :time then validate_time!
when :tag then validate_tag!
when :keys then validate_keys!
end
end
def validate_time!
sec = @argument[:sec]
title = @argument[:title]
example = @argument[:example]
timekey = @argument[:timekey]
if !sec && timekey
raise Fluent::ConfigError, "Parameter '#{name}: #{string}' doesn't have timestamp placeholders for timekey #{timekey.to_i}"
end
if sec && !timekey
raise Fluent::ConfigError, "Parameter '#{name}: #{string}' has timestamp placeholders, but chunk key 'time' is not configured"
end
if sec && timekey && timekey < sec
raise Fluent::ConfigError, "Parameter '#{name}: #{string}' doesn't have timestamp placeholder for #{title}('#{example}') for timekey #{timekey.to_i}"
end
end
def validate_tag!
parts = @argument[:parts]
tagkey = @argument[:tagkey]
if tagkey && parts.empty?
raise Fluent::ConfigError, "Parameter '#{name}: #{string}' doesn't have tag placeholder"
end
if !tagkey && !parts.empty?
raise Fluent::ConfigError, "Parameter '#{name}: #{string}' has tag placeholders, but chunk key 'tag' is not configured"
end
end
def validate_keys!
keys = @argument[:keys]
chunk_keys = @argument[:chunkkeys]
if (chunk_keys - keys).size > 0
not_specified = (chunk_keys - keys).sort
raise Fluent::ConfigError, "Parameter '#{name}: #{string}' doesn't have enough placeholders for keys #{not_specified.join(',')}"
end
if (keys - chunk_keys).size > 0
not_satisfied = (keys - chunk_keys).sort
raise Fluent::ConfigError, "Parameter '#{name}: #{string}' has placeholders, but chunk keys doesn't have keys #{not_satisfied.join(',')}"
end
end
end
TIME_KEY_PLACEHOLDER_THRESHOLDS = [
[1, :second, '%S'],
[60, :minute, '%M'],
[3600, :hour, '%H'],
[86400, :day, '%d'],
]
TIMESTAMP_CHECK_BASE_TIME = Time.parse("2016-01-01 00:00:00 UTC")
# it's not validated to use timekey larger than 1 day
def get_placeholders_time(str)
base_str = TIMESTAMP_CHECK_BASE_TIME.strftime(str)
TIME_KEY_PLACEHOLDER_THRESHOLDS.each do |triple|
sec = triple.first
return triple if (TIMESTAMP_CHECK_BASE_TIME + sec).strftime(str) != base_str
end
nil
end
# -1 means whole tag
def get_placeholders_tag(str)
# [["tag"],["tag[0]"]]
parts = []
str.scan(CHUNK_TAG_PLACEHOLDER_PATTERN).map(&:first).each do |ph|
if ph == "tag"
parts << -1
elsif ph =~ /^tag\[(-?\d+)\]$/
parts << $1.to_i
end
end
parts.sort
end
def get_placeholders_keys(str)
str.scan(CHUNK_KEY_PLACEHOLDER_PATTERN).map(&:first).reject{|s| (s == "tag") || (s == 'chunk_id') }.sort
end
# TODO: optimize this code
def extract_placeholders(str, chunk)
metadata = if chunk.is_a?(Fluent::Plugin::Buffer::Chunk)
chunk_passed = true
chunk.metadata
else
chunk_passed = false
# For existing plugins. Old plugin passes Chunk.metadata instead of Chunk
chunk
end
if metadata.empty?
str.sub(CHUNK_ID_PLACEHOLDER_PATTERN) {
if chunk_passed
dump_unique_id_hex(chunk.unique_id)
else
log.warn "${chunk_id} is not allowed in this plugin. Pass Chunk instead of metadata in extract_placeholders's 2nd argument"
end
}
else
rvalue = str.dup
# strftime formatting
if @chunk_key_time # this section MUST be earlier than rest to use raw 'str'
@output_time_formatter_cache[str] ||= Fluent::Timezone.formatter(@timekey_zone, str)
rvalue = @output_time_formatter_cache[str].call(metadata.timekey)
end
# ${tag}, ${tag[0]}, ${tag[1]}, ... , ${tag[-2]}, ${tag[-1]}
if @chunk_key_tag
if str.include?('${tag}')
rvalue = rvalue.gsub('${tag}', metadata.tag)
end
if CHUNK_TAG_PLACEHOLDER_PATTERN.match?(str)
hash = {}
tag_parts = metadata.tag.split('.')
tag_parts.each_with_index do |part, i|
hash["${tag[#{i}]}"] = part
hash["${tag[#{i-tag_parts.size}]}"] = part
end
rvalue = rvalue.gsub(CHUNK_TAG_PLACEHOLDER_PATTERN, hash)
end
if rvalue =~ CHUNK_TAG_PLACEHOLDER_PATTERN
log.warn "tag placeholder '#{$1}' not replaced. tag:#{metadata.tag}, template:#{str}"
end
end
# First we replace ${chunk_id} with chunk.unique_id (hexlified).
rvalue = rvalue.sub(CHUNK_ID_PLACEHOLDER_PATTERN) {
if chunk_passed
dump_unique_id_hex(chunk.unique_id)
else
log.warn "${chunk_id} is not allowed in this plugin. Pass Chunk instead of metadata in extract_placeholders's 2nd argument"
end
}
# Then, replace other ${chunk_key}s.
if !@chunk_keys.empty? && metadata.variables
hash = {'${tag}' => '${tag}'} # not to erase this wrongly
@chunk_keys.each do |key|
hash["${#{key}}"] = metadata.variables[key.to_sym]
end
rvalue = rvalue.gsub(CHUNK_KEY_PLACEHOLDER_PATTERN) do |matched|
hash.fetch(matched) do
log.warn "chunk key placeholder '#{matched[2..-2]}' not replaced. template:#{str}"
''
end
end
end
if rvalue =~ CHUNK_KEY_PLACEHOLDER_PATTERN
log.warn "chunk key placeholder '#{$1}' not replaced. template:#{str}"
end
rvalue
end
end
def emit_events(tag, es)
# actually this method will be overwritten by #configure
if @buffering
emit_buffered(tag, es)
else
emit_sync(tag, es)
end
end
def emit_sync(tag, es)
@emit_count_metrics.inc
begin
process(tag, es)
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
rescue
@num_errors_metrics.inc
raise
end
end
def emit_buffered(tag, es)
@emit_count_metrics.inc
begin
execute_chunking(tag, es, enqueue: (@flush_mode == :immediate))
if !@retry && @buffer.queued?(nil, optimistic: true)
submit_flush_once
end
rescue
# TODO: separate number of errors into emit errors and write/flush errors
@num_errors_metrics.inc
raise
end
end
# TODO: optimize this code
def metadata(tag, time, record)
# this arguments are ordered in output plugin's rule
# Metadata 's argument order is different from this one (timekey, tag, variables)
raise ArgumentError, "tag must be a String: #{tag.class}" unless tag.nil? || tag.is_a?(String)
raise ArgumentError, "time must be a Fluent::EventTime (or Integer): #{time.class}" unless time.nil? || time.is_a?(Fluent::EventTime) || time.is_a?(Integer)
raise ArgumentError, "record must be a Hash: #{record.class}" unless record.nil? || record.is_a?(Hash)
if @chunk_keys.nil? && @chunk_key_time.nil? && @chunk_key_tag.nil?
# for tests
return Struct.new(:timekey, :tag, :variables).new
end
# timekey is int from epoch, and `timekey - timekey % 60` is assumed to mach with 0s of each minutes.
# it's wrong if timezone is configured as one which supports leap second, but it's very rare and
# we can ignore it (especially in production systems).
if @chunk_keys.empty?
if !@chunk_key_time && !@chunk_key_tag
@buffer.metadata()
elsif @chunk_key_time && @chunk_key_tag
timekey = calculate_timekey(time)
@buffer.metadata(timekey: timekey, tag: tag)
elsif @chunk_key_time
timekey = calculate_timekey(time)
@buffer.metadata(timekey: timekey)
else
@buffer.metadata(tag: tag)
end
else
timekey = if @chunk_key_time
calculate_timekey(time)
else
nil
end
pairs = Hash[@chunk_key_accessors.map { |k, a| [k, a.call(record)] }]
@buffer.metadata(timekey: timekey, tag: (@chunk_key_tag ? tag : nil), variables: pairs)
end
end
def calculate_timekey(time)
time_int = time.to_i
if @timekey_use_utc
(time_int - (time_int % @timekey)).to_i
else
offset = @calculate_offset ? @calculate_offset.call(time) : @offset
(time_int - ((time_int + offset)% @timekey)).to_i
end
end
def chunk_for_test(tag, time, record)
require 'fluent/plugin/buffer/memory_chunk'
m = metadata(tag, time, record)
Fluent::Plugin::Buffer::MemoryChunk.new(m)
end
def execute_chunking(tag, es, enqueue: false)
if @simple_chunking
handle_stream_simple(tag, es, enqueue: enqueue)
elsif @custom_format
handle_stream_with_custom_format(tag, es, enqueue: enqueue)
else
handle_stream_with_standard_format(tag, es, enqueue: enqueue)
end
end
def write_guard(&block)
begin
block.call
rescue Fluent::Plugin::Buffer::BufferOverflowError
log.warn "failed to write data into buffer by buffer overflow", action: @buffer_config.overflow_action
case @buffer_config.overflow_action
when :throw_exception
raise
when :block
log.debug "buffer.write is now blocking"
until @buffer.storable?
if self.stopped?
log.error "breaking block behavior to shutdown Fluentd"
# to break infinite loop to exit Fluentd process
raise
end
log.trace "sleeping until buffer can store more data"
sleep 1
end
log.debug "retrying buffer.write after blocked operation"
retry
when :drop_oldest_chunk
begin
oldest = @buffer.dequeue_chunk
if oldest
log.warn "dropping oldest chunk to make space after buffer overflow", chunk_id: dump_unique_id_hex(oldest.unique_id)