Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,17 @@
let(:output_strings) { [] }
let(:reject_memo_keys) { [:reject_memo_keys, :path, :queue, :writer_threads, :collector, :metric, :reader_threads, :output_strings] }

let(:queue_settings) do
LogStash::AckedQueue.file_settings_builder(path)
.capacity(page_capacity)
.checkpointMaxAcks(queue_checkpoint_acks)
.checkpointMaxWrites(queue_checkpoint_writes)
.queueMaxBytes(queue_capacity)
.build
end

let(:queue) do
described_class.new(path, page_capacity, 0, queue_checkpoint_acks, queue_checkpoint_writes, false, queue_capacity)
described_class.new(queue_settings)
end

let(:writer_threads) do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,18 @@ def threaded_read_client

context "WrappedAckedQueue" do
let(:path) { Stud::Temporary.directory }
let(:queue) { LogStash::WrappedAckedQueue.new(path, 1024, 10, 1024, 1024, false, 4096) }

let(:queue_settings) do
LogStash::AckedQueue.file_settings_builder(path)
.capacity(1024)
.maxUnread(10)
.checkpointMaxAcks(1024)
.checkpointMaxWrites(1024)
.queueMaxBytes(4096)
.build
end

let(:queue) { LogStash::WrappedAckedQueue.new(queue_settings) }

before do
read_client.set_events_metric(metric.namespace([:stats, :events]))
Expand Down
13 changes: 12 additions & 1 deletion logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,18 @@
let(:checkpoint_acks) { 1024 }
let(:checkpoint_writes) { 1024 }
let(:path) { Stud::Temporary.directory }
let(:queue) { LogStash::WrappedAckedQueue.new(path, page_capacity, max_events, checkpoint_acks, checkpoint_writes, false, max_bytes) }

let(:queue_settings) do
LogStash::AckedQueue.file_settings_builder(path)
.capacity(page_capacity)
.maxUnread(max_events)
.checkpointMaxAcks(checkpoint_acks)
.checkpointMaxWrites(checkpoint_writes)
.queueMaxBytes(max_bytes)
.build
end

let(:queue) { LogStash::WrappedAckedQueue.new(queue_settings) }

after do
queue.close
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.Event;
import org.logstash.RubyUtil;
import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt;
import org.logstash.common.SettingKeyDefinitions;
import org.logstash.execution.AbstractWrappedQueueExt;
import org.logstash.ext.JrubyWrappedSynchronousQueueExt;

import static org.logstash.common.SettingKeyDefinitions.*;

/**
* Persistent queue factory JRuby extension.
* */
Expand Down Expand Up @@ -69,29 +72,16 @@ public static AbstractWrappedQueueExt create(final ThreadContext context, final
final IRubyObject settings) throws IOException {
final String type = getSetting(context, settings, QUEUE_TYPE_CONTEXT_NAME).asJavaString();
if (PERSISTED_TYPE.equals(type)) {
final Path queuePath = Paths.get(
getSetting(context, settings, SettingKeyDefinitions.PATH_QUEUE).asJavaString(),
getSetting(context, settings, SettingKeyDefinitions.PIPELINE_ID).asJavaString()
);
final Settings queueSettings = extractQueueSettings(settings);
final Path queuePath = Paths.get(queueSettings.getDirPath());

// Files.createDirectories raises a FileAlreadyExistsException
// if pipeline queue path is a symlink, so worth checking against Files.exists
if (Files.exists(queuePath) == false) {
Files.createDirectories(queuePath);
}

return new JRubyWrappedAckedQueueExt(context.runtime, RubyUtil.WRAPPED_ACKED_QUEUE_CLASS)
.initialize(
context, new IRubyObject[]{
context.runtime.newString(queuePath.toString()),
getSetting(context, settings, SettingKeyDefinitions.QUEUE_PAGE_CAPACITY),
getSetting(context, settings, SettingKeyDefinitions.QUEUE_MAX_EVENTS),
getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_WRITES),
getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_ACKS),
getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_RETRY),
getSetting(context, settings, SettingKeyDefinitions.QUEUE_MAX_BYTES)
}
);
return JRubyWrappedAckedQueueExt.create(context, queueSettings);
} else if (MEMORY_TYPE.equals(type)) {
return new JrubyWrappedSynchronousQueueExt(
context.runtime, RubyUtil.WRAPPED_SYNCHRONOUS_QUEUE_CLASS
Expand All @@ -118,4 +108,21 @@ private static IRubyObject getSetting(final ThreadContext context, final IRubyOb
final String name) {
return settings.callMethod(context, "get_value", context.runtime.newString(name));
}

private static Settings extractQueueSettings(final IRubyObject settings) {
final ThreadContext context = settings.getRuntime().getCurrentContext();
final Path queuePath = Paths.get(
getSetting(context, settings, PATH_QUEUE).asJavaString(),
getSetting(context, settings, PIPELINE_ID).asJavaString()
);
return SettingsImpl.fileSettingsBuilder(queuePath.toString())
.elementClass(Event.class)
.capacity(getSetting(context, settings, QUEUE_PAGE_CAPACITY).toJava(Integer.class))
.maxUnread(getSetting(context, settings, QUEUE_MAX_EVENTS).toJava(Integer.class))
.checkpointMaxWrites(getSetting(context, settings, QUEUE_CHECKPOINT_WRITES).toJava(Integer.class))
.checkpointMaxAcks(getSetting(context, settings, QUEUE_CHECKPOINT_ACKS).toJava(Integer.class))
.checkpointRetry(getSetting(context, settings, QUEUE_CHECKPOINT_RETRY).isTrue())
.queueMaxBytes(getSetting(context, settings, QUEUE_MAX_BYTES).toJava(Integer.class))
.build();
}
}
29 changes: 29 additions & 0 deletions logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

package org.logstash.ackedqueue;

import java.util.ArrayList;
import java.util.List;

/**
* Persistent queue settings definition.
* */
Expand All @@ -41,6 +44,32 @@ public interface Settings {

boolean getCheckpointRetry();

/**
* Validate and return the settings, or throw descriptive {@link QueueRuntimeException}
* @param settings the settings to validate
* @return the settings that were provided
*/
static Settings ensureValid(final Settings settings) {
final List<String> errors = new ArrayList<>();

if (settings == null) {
errors.add("settings cannot be null");
} else {
if (settings.getDirPath() == null) {
errors.add("dirPath cannot be null");
}
if (settings.getElementClass() == null) {
errors.add("elementClass cannot be null");
}
}

if (!errors.isEmpty()) {
throw new QueueRuntimeException(String.format("Invalid Queue Settings: %s", errors));
}

return settings;
}

/**
* Persistent queue Setting's fluent builder definition
* */
Expand Down
136 changes: 57 additions & 79 deletions logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,38 @@
* under the License.
*/


package org.logstash.ackedqueue;

/**
* Persistent queue settings implementation.
* */
public class SettingsImpl implements Settings {
private String dirForFiles;
private Class<? extends Queueable> elementClass;
private int capacity;
private long queueMaxBytes;
private int maxUnread;
private int checkpointMaxAcks;
private int checkpointMaxWrites;
private boolean checkpointRetry;
private final String dirForFiles;
private final Class<? extends Queueable> elementClass;
private final int capacity;
private final long queueMaxBytes;
private final int maxUnread;
private final int checkpointMaxAcks;
private final int checkpointMaxWrites;
private final boolean checkpointRetry;

public static Builder builder(final Settings settings) {
return new BuilderImpl(settings.getDirPath(), settings.getElementClass(), settings.getCapacity(),
settings.getQueueMaxBytes(), settings.getMaxUnread(), settings.getCheckpointMaxAcks(),
settings.getCheckpointMaxWrites(), settings.getCheckpointRetry()
);
return new BuilderImpl(settings);
}

public static Builder fileSettingsBuilder(final String dirForFiles) {
return new BuilderImpl(dirForFiles);
}

private SettingsImpl(final String dirForFiles, final Class<? extends Queueable> elementClass,
final int capacity, final long queueMaxBytes, final int maxUnread,
final int checkpointMaxAcks, final int checkpointMaxWrites, boolean checkpointRetry) {
this.dirForFiles = dirForFiles;
this.elementClass = elementClass;
this.capacity = capacity;
this.queueMaxBytes = queueMaxBytes;
this.maxUnread = maxUnread;
this.checkpointMaxAcks = checkpointMaxAcks;
this.checkpointMaxWrites = checkpointMaxWrites;
this.checkpointRetry = checkpointRetry;
private SettingsImpl(final BuilderImpl builder) {
this.dirForFiles = builder.dirForFiles;
this.elementClass = builder.elementClass;
this.capacity = builder.capacity;
this.queueMaxBytes = builder.queueMaxBytes;
this.maxUnread = builder.maxUnread;
this.checkpointMaxAcks = builder.checkpointMaxAcks;
this.checkpointMaxWrites = builder.checkpointMaxWrites;
this.checkpointRetry = builder.checkpointRetry;
}

@Override
Expand Down Expand Up @@ -132,103 +126,87 @@ private static final class BuilderImpl implements Builder {

private final String dirForFiles;

private final Class<? extends Queueable> elementClass;
private Class<? extends Queueable> elementClass;

private final int capacity;
private int capacity;

private final long queueMaxBytes;
private long queueMaxBytes;

private final int maxUnread;
private int maxUnread;

private final int checkpointMaxAcks;
private int checkpointMaxAcks;

private final int checkpointMaxWrites;
private int checkpointMaxWrites;

private final boolean checkpointRetry;
private boolean checkpointRetry;

private BuilderImpl(final String dirForFiles) {
this(dirForFiles, null, DEFAULT_CAPACITY, DEFAULT_MAX_QUEUE_BYTES,
DEFAULT_MAX_UNREAD, DEFAULT_CHECKPOINT_MAX_ACKS, DEFAULT_CHECKPOINT_MAX_WRITES, false
);
this.dirForFiles = dirForFiles;
this.elementClass = null;
this.capacity = DEFAULT_CAPACITY;
this.queueMaxBytes = DEFAULT_MAX_QUEUE_BYTES;
this.maxUnread = DEFAULT_MAX_UNREAD;
this.checkpointMaxAcks = DEFAULT_CHECKPOINT_MAX_ACKS;
this.checkpointMaxWrites = DEFAULT_CHECKPOINT_MAX_WRITES;
this.checkpointRetry = false;
}

private BuilderImpl(final String dirForFiles, final Class<? extends Queueable> elementClass,
final int capacity, final long queueMaxBytes, final int maxUnread,
final int checkpointMaxAcks, final int checkpointMaxWrites, final boolean checkpointRetry) {
this.dirForFiles = dirForFiles;
this.elementClass = elementClass;
this.capacity = capacity;
this.queueMaxBytes = queueMaxBytes;
this.maxUnread = maxUnread;
this.checkpointMaxAcks = checkpointMaxAcks;
this.checkpointMaxWrites = checkpointMaxWrites;
this.checkpointRetry = checkpointRetry;
private BuilderImpl(final Settings settings) {
this.dirForFiles = settings.getDirPath();
this.elementClass = settings.getElementClass();
this.capacity = settings.getCapacity();
this.queueMaxBytes = settings.getQueueMaxBytes();
this.maxUnread = settings.getMaxUnread();
this.checkpointMaxAcks = settings.getCheckpointMaxAcks();
this.checkpointMaxWrites = settings.getCheckpointMaxWrites();
this.checkpointRetry = settings.getCheckpointRetry();
}

@Override
public Builder elementClass(final Class<? extends Queueable> elementClass) {
return new BuilderImpl(
this.dirForFiles, elementClass, this.capacity, this.queueMaxBytes, this.maxUnread,
this.checkpointMaxAcks, this.checkpointMaxWrites, this.checkpointRetry
);
this.elementClass = elementClass;
return this;
}

@Override
public Builder capacity(final int capacity) {
return new BuilderImpl(
this.dirForFiles, this.elementClass, capacity, this.queueMaxBytes, this.maxUnread,
this.checkpointMaxAcks, this.checkpointMaxWrites, this.checkpointRetry
);
this.capacity = capacity;
return this;
}

@Override
public Builder queueMaxBytes(final long size) {
return new BuilderImpl(
this.dirForFiles, this.elementClass, this.capacity, size, this.maxUnread,
this.checkpointMaxAcks, this.checkpointMaxWrites, this.checkpointRetry
);
this.queueMaxBytes = size;
return this;
}

@Override
public Builder maxUnread(final int maxUnread) {
return new BuilderImpl(
this.dirForFiles, this.elementClass,
this.capacity, this.queueMaxBytes, maxUnread, this.checkpointMaxAcks,
this.checkpointMaxWrites, this.checkpointRetry
);
this.maxUnread = maxUnread;
return this;
}

@Override
public Builder checkpointMaxAcks(final int checkpointMaxAcks) {
return new BuilderImpl(
this.dirForFiles, this.elementClass,
this.capacity, this.queueMaxBytes, this.maxUnread, checkpointMaxAcks,
this.checkpointMaxWrites, this.checkpointRetry
);
this.checkpointMaxAcks = checkpointMaxAcks;
return this;
}

@Override
public Builder checkpointMaxWrites(final int checkpointMaxWrites) {
return new BuilderImpl(
this.dirForFiles, this.elementClass, this.capacity, this.queueMaxBytes,
this.maxUnread, this.checkpointMaxAcks, checkpointMaxWrites, this.checkpointRetry
);
this.checkpointMaxWrites = checkpointMaxWrites;
return this;
}

@Override
public Builder checkpointRetry(final boolean checkpointRetry) {
return new BuilderImpl(
this.dirForFiles, this.elementClass, this.capacity, this.queueMaxBytes,
this.maxUnread, this.checkpointMaxAcks, checkpointMaxWrites, checkpointRetry
);
this.checkpointRetry = checkpointRetry;
return this;
}

@Override
public Settings build() {
return new SettingsImpl(
this.dirForFiles, this.elementClass, this.capacity, this.queueMaxBytes,
this.maxUnread, this.checkpointMaxAcks, this.checkpointMaxWrites, this.checkpointRetry
);
return Settings.ensureValid(new SettingsImpl(this));
}
}
}
Loading