Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions config/logstash.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,6 @@
#
# queue.checkpoint.writes: 1024
#
# If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
# Default is 1000, 0 for no periodic checkpoint.
#
# queue.checkpoint.interval: 1000
#
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
Expand Down
4 changes: 0 additions & 4 deletions config/pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@
# # Default is 1024, 0 for unlimited
# queue.checkpoint.writes: 1024
#
# # If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
# # Default is 1000, 0 for no periodic checkpoint.
# queue.checkpoint.interval: 1000
#
# # Enable Dead Letter Queueing for this pipeline.
# dead_letter_queue.enable: false
#
Expand Down
2 changes: 1 addition & 1 deletion docker/data/logstash/env2yaml/env2yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ var validSettings = []string{
"queue.max_bytes",
"queue.checkpoint.acks",
"queue.checkpoint.writes",
"queue.checkpoint.interval",
"queue.checkpoint.interval", // remove it for #17155
"queue.drain",
"dead_letter_queue.enable",
"dead_letter_queue.max_bytes",
Expand Down
3 changes: 0 additions & 3 deletions docs/reference/persistent-queues.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
---

Check notice on line 1 in docs/reference/persistent-queues.md

View workflow job for this annotation

GitHub Actions / docs-preview / build

Irregular whitespace character detected: U+200B (Zero Width Space (ZWSP)). This may impair Markdown rendering.
mapped_pages:
- https://www.elastic.co/guide/en/logstash/current/persistent-queues.html
---
Expand Down Expand Up @@ -82,9 +82,6 @@
To avoid losing data in the persistent queue, you can set `queue.checkpoint.writes: 1` to force a checkpoint after each event is written. Keep in mind that disk writes have a resource cost. Setting this value to `1` ensures maximum durability, but can severely impact performance. See [Controlling durability](#durability-persistent-queues) to better understand the trade-offs.


`queue.checkpoint.interval`
: Sets the interval in milliseconds when a checkpoint is forced on the head page. Default is `1000`. Set to `0` to eliminate periodic checkpoints.


## Configuration notes [pq-config-notes]

Expand Down
2 changes: 1 addition & 1 deletion logstash-core/lib/logstash/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ module Environment
Setting::SettingNumeric.new("queue.max_events", 0), # 0 is unlimited
Setting::SettingNumeric.new("queue.checkpoint.acks", 1024), # 0 is unlimited
Setting::SettingNumeric.new("queue.checkpoint.writes", 1024), # 0 is unlimited
Setting::SettingNumeric.new("queue.checkpoint.interval", 1000), # 0 is no time-based checkpointing
Setting::SettingNumeric.new("queue.checkpoint.interval", 1000), # 0 is no time-based checkpointing # remove it for #17155
Setting::Boolean.new("queue.checkpoint.retry", true),
Setting::Boolean.new("dead_letter_queue.enable", false),
Setting::Bytes.new("dead_letter_queue.max_bytes", "1024mb"),
Expand Down
3 changes: 3 additions & 0 deletions logstash-core/lib/logstash/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ def execute
configure_pipeline_buffer_type
end

logger.warn "The setting `queue.checkpoint.interval` is no longer supported and should be removed from both " +
"logstash.yml and pipeline.yml." if @settings.set?("queue.checkpoint.interval")

while (msg = LogStash::DeprecationMessage.instance.shift)
deprecation_logger.deprecated msg
end
Expand Down
2 changes: 1 addition & 1 deletion logstash-core/lib/logstash/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def self.included(base)
"pipeline.ordered",
"pipeline.ecs_compatibility",
"queue.checkpoint.acks",
"queue.checkpoint.interval",
"queue.checkpoint.interval", # remove it for #17155
"queue.checkpoint.writes",
"queue.checkpoint.retry",
"queue.drain",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
let(:reject_memo_keys) { [:reject_memo_keys, :path, :queue, :writer_threads, :collector, :metric, :reader_threads, :output_strings] }

let(:queue) do
described_class.new(path, page_capacity, 0, queue_checkpoint_acks, queue_checkpoint_writes, queue_checkpoint_interval, false, queue_capacity)
described_class.new(path, page_capacity, 0, queue_checkpoint_acks, queue_checkpoint_writes, false, queue_capacity)
end

let(:writer_threads) do
Expand Down Expand Up @@ -138,7 +138,6 @@ def publisher(items, writer)
let(:queue_capacity_multiplier) { 128 }
let(:queue_checkpoint_acks) { 1024 }
let(:queue_checkpoint_writes) { 1024 }
let(:queue_checkpoint_interval) { 1000 }
let(:batch_size) { 500 }
let(:batch_wait) { 1000 }
let(:expected_count) { 60000 }
Expand Down Expand Up @@ -180,12 +179,12 @@ def publisher(items, writer)
end

context "> larger checkpoint interval <" do
let(:queue_checkpoint_interval) { 3000 }

it_behaves_like "a well behaved queue"
end

context "> smaller checkpoint interval <" do
let(:queue_checkpoint_interval) { 500 }

it_behaves_like "a well behaved queue"
end

Expand Down Expand Up @@ -256,12 +255,12 @@ def publisher(items, writer)
end

context "> larger checkpoint interval <" do
let(:queue_checkpoint_interval) { 3000 }

it_behaves_like "a well behaved queue"
end

context "> smaller checkpoint interval <" do
let(:queue_checkpoint_interval) { 500 }

it_behaves_like "a well behaved queue"
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def threaded_read_client

context "WrappedAckedQueue" do
let(:path) { Stud::Temporary.directory }
let(:queue) { LogStash::WrappedAckedQueue.new(path, 1024, 10, 1024, 1024, 1024, false, 4096) }
let(:queue) { LogStash::WrappedAckedQueue.new(path, 1024, 10, 1024, 1024, false, 4096) }

before do
read_client.set_events_metric(metric.namespace([:stats, :events]))
Expand Down
1 change: 0 additions & 1 deletion logstash-core/spec/logstash/queue_factory_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
LogStash::Setting::SettingNumeric.new("queue.max_events", 0),
LogStash::Setting::SettingNumeric.new("queue.checkpoint.acks", 1024),
LogStash::Setting::SettingNumeric.new("queue.checkpoint.writes", 1024),
LogStash::Setting::SettingNumeric.new("queue.checkpoint.interval", 1000),
LogStash::Setting::Boolean.new("queue.checkpoint.retry", false),
LogStash::Setting::SettingString.new("pipeline.id", pipeline_id),
LogStash::Setting::PositiveInteger.new("pipeline.batch.size", 125),
Expand Down
3 changes: 1 addition & 2 deletions logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@
let(:max_bytes) { 0 }
let(:checkpoint_acks) { 1024 }
let(:checkpoint_writes) { 1024 }
let(:checkpoint_interval) { 0 }
let(:path) { Stud::Temporary.directory }
let(:queue) { LogStash::WrappedAckedQueue.new(path, page_capacity, max_events, checkpoint_acks, checkpoint_writes, checkpoint_interval, false, max_bytes) }
let(:queue) { LogStash::WrappedAckedQueue.new(path, page_capacity, max_events, checkpoint_acks, checkpoint_writes, false, max_bytes) }

after do
queue.close
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ public static AbstractWrappedQueueExt create(final ThreadContext context, final
getSetting(context, settings, SettingKeyDefinitions.QUEUE_MAX_EVENTS),
getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_WRITES),
getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_ACKS),
getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_INTERVAL),
getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_RETRY),
getSetting(context, settings, SettingKeyDefinitions.QUEUE_MAX_BYTES)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ public final class JRubyWrappedAckedQueueExt extends AbstractWrappedQueueExt {

@JRubyMethod(optional = 8)
public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject[] args) throws IOException {
args = Arity.scanArgs(context.runtime, args, 8, 0);
args = Arity.scanArgs(context.runtime, args, 7, 0);
int capacity = RubyFixnum.num2int(args[1]);
int maxEvents = RubyFixnum.num2int(args[2]);
int checkpointMaxWrites = RubyFixnum.num2int(args[3]);
int checkpointMaxAcks = RubyFixnum.num2int(args[4]);
boolean checkpointRetry = !((RubyBoolean) args[6]).isFalse();
long queueMaxBytes = RubyFixnum.num2long(args[7]);
boolean checkpointRetry = !((RubyBoolean) args[5]).isFalse();
long queueMaxBytes = RubyFixnum.num2long(args[6]);

this.queue = JRubyAckedQueueExt.create(args[0].asJavaString(), capacity, maxEvents,
checkpointMaxWrites, checkpointMaxAcks, checkpointRetry, queueMaxBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ public class SettingKeyDefinitions {

public static final String QUEUE_CHECKPOINT_ACKS = "queue.checkpoint.acks";

public static final String QUEUE_CHECKPOINT_INTERVAL = "queue.checkpoint.interval";

public static final String QUEUE_CHECKPOINT_RETRY = "queue.checkpoint.retry";

public static final String QUEUE_MAX_BYTES = "queue.max_bytes";
Expand Down
Loading