diff --git a/config/logstash.yml b/config/logstash.yml index aeeabaf8743..5a9948e5e6a 100644 --- a/config/logstash.yml +++ b/config/logstash.yml @@ -223,10 +223,6 @@ # # queue.checkpoint.writes: 1024 # -# If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page -# Default is 1000, 0 for no periodic checkpoint. -# -# queue.checkpoint.interval: 1000 # # ------------ Dead-Letter Queue Settings -------------- # Flag to turn on dead-letter queue. diff --git a/config/pipelines.yml b/config/pipelines.yml index ed52b38ed5b..97ce63c2958 100644 --- a/config/pipelines.yml +++ b/config/pipelines.yml @@ -68,10 +68,6 @@ # # Default is 1024, 0 for unlimited # queue.checkpoint.writes: 1024 # -# # If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page -# # Default is 1000, 0 for no periodic checkpoint. -# queue.checkpoint.interval: 1000 -# # # Enable Dead Letter Queueing for this pipeline. # dead_letter_queue.enable: false # diff --git a/docker/data/logstash/env2yaml/env2yaml.go b/docker/data/logstash/env2yaml/env2yaml.go index dc2641edf0b..95fc569b236 100644 --- a/docker/data/logstash/env2yaml/env2yaml.go +++ b/docker/data/logstash/env2yaml/env2yaml.go @@ -57,7 +57,7 @@ var validSettings = []string{ "queue.max_bytes", "queue.checkpoint.acks", "queue.checkpoint.writes", - "queue.checkpoint.interval", + "queue.checkpoint.interval", // remove it for #17155 "queue.drain", "dead_letter_queue.enable", "dead_letter_queue.max_bytes", diff --git a/docs/reference/persistent-queues.md b/docs/reference/persistent-queues.md index 028422088f7..5a3bc71b0d6 100644 --- a/docs/reference/persistent-queues.md +++ b/docs/reference/persistent-queues.md @@ -82,9 +82,6 @@ If you want to define values for a specific pipeline, use [`pipelines.yml`](/ref To avoid losing data in the persistent queue, you can set `queue.checkpoint.writes: 1` to force a checkpoint after each event is written. Keep in mind that disk writes have a resource cost. Setting this value to `1` ensures maximum durability, but can severely impact performance. See [Controlling durability](#durability-persistent-queues) to better understand the trade-offs. -`queue.checkpoint.interval` -: Sets the interval in milliseconds when a checkpoint is forced on the head page. Default is `1000`. Set to `0` to eliminate periodic checkpoints. - ## Configuration notes [pq-config-notes] diff --git a/logstash-core/lib/logstash/environment.rb b/logstash-core/lib/logstash/environment.rb index c3e01fc558e..b79bd58dcda 100644 --- a/logstash-core/lib/logstash/environment.rb +++ b/logstash-core/lib/logstash/environment.rb @@ -89,7 +89,7 @@ module Environment Setting::SettingNumeric.new("queue.max_events", 0), # 0 is unlimited Setting::SettingNumeric.new("queue.checkpoint.acks", 1024), # 0 is unlimited Setting::SettingNumeric.new("queue.checkpoint.writes", 1024), # 0 is unlimited - Setting::SettingNumeric.new("queue.checkpoint.interval", 1000), # 0 is no time-based checkpointing + Setting::SettingNumeric.new("queue.checkpoint.interval", 1000), # remove it for #17155 Setting::Boolean.new("queue.checkpoint.retry", true), Setting::Boolean.new("dead_letter_queue.enable", false), Setting::Bytes.new("dead_letter_queue.max_bytes", "1024mb"), diff --git a/logstash-core/lib/logstash/runner.rb b/logstash-core/lib/logstash/runner.rb index 1df665712ac..1f70555ca9d 100644 --- a/logstash-core/lib/logstash/runner.rb +++ b/logstash-core/lib/logstash/runner.rb @@ -270,6 +270,9 @@ def execute configure_pipeline_buffer_type end + deprecation_logger.deprecated "The setting `queue.checkpoint.interval` has no effect and will be removed from both " + + "logstash.yml and pipeline.yml in a future release." if @settings.set?("queue.checkpoint.interval") + while (msg = LogStash::DeprecationMessage.instance.shift) deprecation_logger.deprecated msg end diff --git a/logstash-core/lib/logstash/settings.rb b/logstash-core/lib/logstash/settings.rb index e879613e7d0..d7feb04b117 100644 --- a/logstash-core/lib/logstash/settings.rb +++ b/logstash-core/lib/logstash/settings.rb @@ -66,7 +66,7 @@ def self.included(base) "pipeline.ordered", "pipeline.ecs_compatibility", "queue.checkpoint.acks", - "queue.checkpoint.interval", + "queue.checkpoint.interval", # remove it for #17155 "queue.checkpoint.writes", "queue.checkpoint.retry", "queue.drain", diff --git a/logstash-core/spec/logstash/acked_queue_concurrent_stress_spec.rb b/logstash-core/spec/logstash/acked_queue_concurrent_stress_spec.rb index f5335f974dc..0b551494698 100644 --- a/logstash-core/spec/logstash/acked_queue_concurrent_stress_spec.rb +++ b/logstash-core/spec/logstash/acked_queue_concurrent_stress_spec.rb @@ -29,7 +29,7 @@ let(:reject_memo_keys) { [:reject_memo_keys, :path, :queue, :writer_threads, :collector, :metric, :reader_threads, :output_strings] } let(:queue) do - described_class.new(path, page_capacity, 0, queue_checkpoint_acks, queue_checkpoint_writes, queue_checkpoint_interval, false, queue_capacity) + described_class.new(path, page_capacity, 0, queue_checkpoint_acks, queue_checkpoint_writes, false, queue_capacity) end let(:writer_threads) do @@ -138,7 +138,6 @@ def publisher(items, writer) let(:queue_capacity_multiplier) { 128 } let(:queue_checkpoint_acks) { 1024 } let(:queue_checkpoint_writes) { 1024 } - let(:queue_checkpoint_interval) { 1000 } let(:batch_size) { 500 } let(:batch_wait) { 1000 } let(:expected_count) { 60000 } @@ -180,12 +179,12 @@ def publisher(items, writer) end context "> larger checkpoint interval <" do - let(:queue_checkpoint_interval) { 3000 } + it_behaves_like "a well behaved queue" end context "> smaller checkpoint interval <" do - let(:queue_checkpoint_interval) { 500 } + it_behaves_like "a well behaved queue" end @@ -256,12 +255,12 @@ def publisher(items, writer) end context "> larger checkpoint interval <" do - let(:queue_checkpoint_interval) { 3000 } + it_behaves_like "a well behaved queue" end context "> smaller checkpoint interval <" do - let(:queue_checkpoint_interval) { 500 } + it_behaves_like "a well behaved queue" end diff --git a/logstash-core/spec/logstash/instrument/wrapped_write_client_spec.rb b/logstash-core/spec/logstash/instrument/wrapped_write_client_spec.rb index 762943f350e..171525a30a3 100644 --- a/logstash-core/spec/logstash/instrument/wrapped_write_client_spec.rb +++ b/logstash-core/spec/logstash/instrument/wrapped_write_client_spec.rb @@ -125,7 +125,7 @@ def threaded_read_client context "WrappedAckedQueue" do let(:path) { Stud::Temporary.directory } - let(:queue) { LogStash::WrappedAckedQueue.new(path, 1024, 10, 1024, 1024, 1024, false, 4096) } + let(:queue) { LogStash::WrappedAckedQueue.new(path, 1024, 10, 1024, 1024, false, 4096) } before do read_client.set_events_metric(metric.namespace([:stats, :events])) diff --git a/logstash-core/spec/logstash/queue_factory_spec.rb b/logstash-core/spec/logstash/queue_factory_spec.rb index cae6b1fe7dd..bbc1972d342 100644 --- a/logstash-core/spec/logstash/queue_factory_spec.rb +++ b/logstash-core/spec/logstash/queue_factory_spec.rb @@ -29,7 +29,6 @@ LogStash::Setting::SettingNumeric.new("queue.max_events", 0), LogStash::Setting::SettingNumeric.new("queue.checkpoint.acks", 1024), LogStash::Setting::SettingNumeric.new("queue.checkpoint.writes", 1024), - LogStash::Setting::SettingNumeric.new("queue.checkpoint.interval", 1000), LogStash::Setting::Boolean.new("queue.checkpoint.retry", false), LogStash::Setting::SettingString.new("pipeline.id", pipeline_id), LogStash::Setting::PositiveInteger.new("pipeline.batch.size", 125), diff --git a/logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb b/logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb index 2a8e9b6767a..5c729196c4e 100644 --- a/logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb +++ b/logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb @@ -52,9 +52,8 @@ let(:max_bytes) { 0 } let(:checkpoint_acks) { 1024 } let(:checkpoint_writes) { 1024 } - let(:checkpoint_interval) { 0 } let(:path) { Stud::Temporary.directory } - let(:queue) { LogStash::WrappedAckedQueue.new(path, page_capacity, max_events, checkpoint_acks, checkpoint_writes, checkpoint_interval, false, max_bytes) } + let(:queue) { LogStash::WrappedAckedQueue.new(path, page_capacity, max_events, checkpoint_acks, checkpoint_writes, false, max_bytes) } after do queue.close diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java index b5d1288b89f..bcc9b139e58 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java @@ -88,7 +88,6 @@ public static AbstractWrappedQueueExt create(final ThreadContext context, final getSetting(context, settings, SettingKeyDefinitions.QUEUE_MAX_EVENTS), getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_WRITES), getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_ACKS), - getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_INTERVAL), getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_RETRY), getSetting(context, settings, SettingKeyDefinitions.QUEUE_MAX_BYTES) } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java index b132afe54bc..257ce824214 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java @@ -51,13 +51,13 @@ public final class JRubyWrappedAckedQueueExt extends AbstractWrappedQueueExt { @JRubyMethod(optional = 8) public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject[] args) throws IOException { - args = Arity.scanArgs(context.runtime, args, 8, 0); + args = Arity.scanArgs(context.runtime, args, 7, 0); int capacity = RubyFixnum.num2int(args[1]); int maxEvents = RubyFixnum.num2int(args[2]); int checkpointMaxWrites = RubyFixnum.num2int(args[3]); int checkpointMaxAcks = RubyFixnum.num2int(args[4]); - boolean checkpointRetry = !((RubyBoolean) args[6]).isFalse(); - long queueMaxBytes = RubyFixnum.num2long(args[7]); + boolean checkpointRetry = !((RubyBoolean) args[5]).isFalse(); + long queueMaxBytes = RubyFixnum.num2long(args[6]); this.queue = JRubyAckedQueueExt.create(args[0].asJavaString(), capacity, maxEvents, checkpointMaxWrites, checkpointMaxAcks, checkpointRetry, queueMaxBytes); diff --git a/logstash-core/src/main/java/org/logstash/common/SettingKeyDefinitions.java b/logstash-core/src/main/java/org/logstash/common/SettingKeyDefinitions.java index 97f00d358e7..7c5c47e3162 100644 --- a/logstash-core/src/main/java/org/logstash/common/SettingKeyDefinitions.java +++ b/logstash-core/src/main/java/org/logstash/common/SettingKeyDefinitions.java @@ -40,8 +40,6 @@ public class SettingKeyDefinitions { public static final String QUEUE_CHECKPOINT_ACKS = "queue.checkpoint.acks"; - public static final String QUEUE_CHECKPOINT_INTERVAL = "queue.checkpoint.interval"; - public static final String QUEUE_CHECKPOINT_RETRY = "queue.checkpoint.retry"; public static final String QUEUE_MAX_BYTES = "queue.max_bytes";