Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions config/logstash.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,6 @@
#
# queue.checkpoint.writes: 1024
#
# If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
# Default is 1000, 0 for no periodic checkpoint.
#
# queue.checkpoint.interval: 1000
#
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
Expand Down
4 changes: 0 additions & 4 deletions config/pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@
# # Default is 1024, 0 for unlimited
# queue.checkpoint.writes: 1024
#
# # If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
# # Default is 1000, 0 for no periodic checkpoint.
# queue.checkpoint.interval: 1000
#
# # Enable Dead Letter Queueing for this pipeline.
# dead_letter_queue.enable: false
#
Expand Down
2 changes: 1 addition & 1 deletion docker/data/logstash/env2yaml/env2yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ var validSettings = []string{
"queue.max_bytes",
"queue.checkpoint.acks",
"queue.checkpoint.writes",
"queue.checkpoint.interval",
"queue.checkpoint.interval", // remove it for #17155
"queue.drain",
"dead_letter_queue.enable",
"dead_letter_queue.max_bytes",
Expand Down
3 changes: 0 additions & 3 deletions docs/reference/persistent-queues.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ If you want to define values for a specific pipeline, use [`pipelines.yml`](/ref
To avoid losing data in the persistent queue, you can set `queue.checkpoint.writes: 1` to force a checkpoint after each event is written. Keep in mind that disk writes have a resource cost. Setting this value to `1` ensures maximum durability, but can severely impact performance. See [Controlling durability](#durability-persistent-queues) to better understand the trade-offs.


`queue.checkpoint.interval`
: Sets the interval in milliseconds when a checkpoint is forced on the head page. Default is `1000`. Set to `0` to eliminate periodic checkpoints.


## Configuration notes [pq-config-notes]

Expand Down
2 changes: 1 addition & 1 deletion logstash-core/lib/logstash/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ module Environment
Setting::SettingNumeric.new("queue.max_events", 0), # 0 is unlimited
Setting::SettingNumeric.new("queue.checkpoint.acks", 1024), # 0 is unlimited
Setting::SettingNumeric.new("queue.checkpoint.writes", 1024), # 0 is unlimited
Setting::SettingNumeric.new("queue.checkpoint.interval", 1000), # 0 is no time-based checkpointing
Setting::SettingNumeric.new("queue.checkpoint.interval", 1000), # remove it for #17155
Setting::Boolean.new("queue.checkpoint.retry", true),
Setting::Boolean.new("dead_letter_queue.enable", false),
Setting::Bytes.new("dead_letter_queue.max_bytes", "1024mb"),
Expand Down
3 changes: 3 additions & 0 deletions logstash-core/lib/logstash/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ def execute
configure_pipeline_buffer_type
end

deprecation_logger.deprecated "The setting `queue.checkpoint.interval` has no effect and will be removed from both " +
"logstash.yml and pipeline.yml in a future release." if @settings.set?("queue.checkpoint.interval")

while (msg = LogStash::DeprecationMessage.instance.shift)
deprecation_logger.deprecated msg
end
Expand Down
2 changes: 1 addition & 1 deletion logstash-core/lib/logstash/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def self.included(base)
"pipeline.ordered",
"pipeline.ecs_compatibility",
"queue.checkpoint.acks",
"queue.checkpoint.interval",
"queue.checkpoint.interval", # remove it for #17155
"queue.checkpoint.writes",
"queue.checkpoint.retry",
"queue.drain",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
let(:reject_memo_keys) { [:reject_memo_keys, :path, :queue, :writer_threads, :collector, :metric, :reader_threads, :output_strings] }

let(:queue) do
described_class.new(path, page_capacity, 0, queue_checkpoint_acks, queue_checkpoint_writes, queue_checkpoint_interval, false, queue_capacity)
described_class.new(path, page_capacity, 0, queue_checkpoint_acks, queue_checkpoint_writes, false, queue_capacity)
end

let(:writer_threads) do
Expand Down Expand Up @@ -138,7 +138,6 @@ def publisher(items, writer)
let(:queue_capacity_multiplier) { 128 }
let(:queue_checkpoint_acks) { 1024 }
let(:queue_checkpoint_writes) { 1024 }
let(:queue_checkpoint_interval) { 1000 }
let(:batch_size) { 500 }
let(:batch_wait) { 1000 }
let(:expected_count) { 60000 }
Expand Down Expand Up @@ -180,12 +179,12 @@ def publisher(items, writer)
end

context "> larger checkpoint interval <" do
let(:queue_checkpoint_interval) { 3000 }

it_behaves_like "a well behaved queue"
end

context "> smaller checkpoint interval <" do
let(:queue_checkpoint_interval) { 500 }

it_behaves_like "a well behaved queue"
end

Expand Down Expand Up @@ -256,12 +255,12 @@ def publisher(items, writer)
end

context "> larger checkpoint interval <" do
let(:queue_checkpoint_interval) { 3000 }

it_behaves_like "a well behaved queue"
end

context "> smaller checkpoint interval <" do
let(:queue_checkpoint_interval) { 500 }

it_behaves_like "a well behaved queue"
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def threaded_read_client

context "WrappedAckedQueue" do
let(:path) { Stud::Temporary.directory }
let(:queue) { LogStash::WrappedAckedQueue.new(path, 1024, 10, 1024, 1024, 1024, false, 4096) }
let(:queue) { LogStash::WrappedAckedQueue.new(path, 1024, 10, 1024, 1024, false, 4096) }

before do
read_client.set_events_metric(metric.namespace([:stats, :events]))
Expand Down
1 change: 0 additions & 1 deletion logstash-core/spec/logstash/queue_factory_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
LogStash::Setting::SettingNumeric.new("queue.max_events", 0),
LogStash::Setting::SettingNumeric.new("queue.checkpoint.acks", 1024),
LogStash::Setting::SettingNumeric.new("queue.checkpoint.writes", 1024),
LogStash::Setting::SettingNumeric.new("queue.checkpoint.interval", 1000),
LogStash::Setting::Boolean.new("queue.checkpoint.retry", false),
LogStash::Setting::SettingString.new("pipeline.id", pipeline_id),
LogStash::Setting::PositiveInteger.new("pipeline.batch.size", 125),
Expand Down
3 changes: 1 addition & 2 deletions logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@
let(:max_bytes) { 0 }
let(:checkpoint_acks) { 1024 }
let(:checkpoint_writes) { 1024 }
let(:checkpoint_interval) { 0 }
let(:path) { Stud::Temporary.directory }
let(:queue) { LogStash::WrappedAckedQueue.new(path, page_capacity, max_events, checkpoint_acks, checkpoint_writes, checkpoint_interval, false, max_bytes) }
let(:queue) { LogStash::WrappedAckedQueue.new(path, page_capacity, max_events, checkpoint_acks, checkpoint_writes, false, max_bytes) }

after do
queue.close
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ public static AbstractWrappedQueueExt create(final ThreadContext context, final
getSetting(context, settings, SettingKeyDefinitions.QUEUE_MAX_EVENTS),
getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_WRITES),
getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_ACKS),
getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_INTERVAL),
getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_RETRY),
getSetting(context, settings, SettingKeyDefinitions.QUEUE_MAX_BYTES)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ public final class JRubyWrappedAckedQueueExt extends AbstractWrappedQueueExt {

@JRubyMethod(optional = 8)
public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject[] args) throws IOException {
args = Arity.scanArgs(context.runtime, args, 8, 0);
args = Arity.scanArgs(context.runtime, args, 7, 0);
int capacity = RubyFixnum.num2int(args[1]);
int maxEvents = RubyFixnum.num2int(args[2]);
int checkpointMaxWrites = RubyFixnum.num2int(args[3]);
int checkpointMaxAcks = RubyFixnum.num2int(args[4]);
boolean checkpointRetry = !((RubyBoolean) args[6]).isFalse();
long queueMaxBytes = RubyFixnum.num2long(args[7]);
boolean checkpointRetry = !((RubyBoolean) args[5]).isFalse();
long queueMaxBytes = RubyFixnum.num2long(args[6]);

this.queue = JRubyAckedQueueExt.create(args[0].asJavaString(), capacity, maxEvents,
checkpointMaxWrites, checkpointMaxAcks, checkpointRetry, queueMaxBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ public class SettingKeyDefinitions {

public static final String QUEUE_CHECKPOINT_ACKS = "queue.checkpoint.acks";

public static final String QUEUE_CHECKPOINT_INTERVAL = "queue.checkpoint.interval";

public static final String QUEUE_CHECKPOINT_RETRY = "queue.checkpoint.retry";

public static final String QUEUE_MAX_BYTES = "queue.max_bytes";
Expand Down
Loading