diff --git a/logstash-core/spec/logstash/queue_factory_spec.rb b/logstash-core/spec/logstash/queue_factory_spec.rb index 56e54dabb88..8ce1f29525a 100644 --- a/logstash-core/spec/logstash/queue_factory_spec.rb +++ b/logstash-core/spec/logstash/queue_factory_spec.rb @@ -50,10 +50,16 @@ subject { described_class } context "when `queue.type` is `persisted`" do + let(:queue_path) { ::File.join(settings.get("path.queue"), pipeline_id) } + before do settings.set("queue.type", "persisted") end + after(:each) do + FileUtils.rm_rf(queue_path) + end + it "returns a `WrappedAckedQueue`" do queue = subject.create(settings) expect(queue).to be_kind_of(LogStash::WrappedAckedQueue) @@ -61,11 +67,6 @@ end describe "per pipeline id subdirectory creation" do - let(:queue_path) { ::File.join(settings.get("path.queue"), pipeline_id) } - - after :each do - FileUtils.rm_rf(queue_path) - end it "creates a queue directory based on the pipeline id" do expect(Dir.exist?(queue_path)).to be_falsey @@ -74,6 +75,19 @@ queue.close end end + + context "when queue.max_bytes is larger than Java int" do + let(:large_queue_max_bytes) { 2**31 } # 2^31 bytes, bigger than 2^31-1 int limit + before(:each) do + settings.set("queue.max_bytes", large_queue_max_bytes) + end + it "does not raise error" do + queue = nil + expect { queue = subject.create(settings) }.to_not raise_error + expect(queue.queue.max_size_in_bytes).to eq(large_queue_max_bytes) + queue.close + end + end end context "when `queue.type` is `memory`" do diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java index de692ee8561..b711e2c6317 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java @@ -160,7 +160,7 @@ private static Settings extractQueueSettings(final IRubyObject settings) { .checkpointMaxWrites(getSetting(context, settings, QUEUE_CHECKPOINT_WRITES).toJava(Integer.class)) .checkpointMaxAcks(getSetting(context, settings, QUEUE_CHECKPOINT_ACKS).toJava(Integer.class)) .checkpointRetry(getSetting(context, settings, QUEUE_CHECKPOINT_RETRY).isTrue()) - .queueMaxBytes(getSetting(context, settings, QUEUE_MAX_BYTES).toJava(Integer.class)) + .queueMaxBytes(getSetting(context, settings, QUEUE_MAX_BYTES).toJava(Long.class)) .compressionCodecFactory(extractConfiguredCodec(settings)) .build(); }