Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions logstash-core/spec/logstash/java_filter_delegator_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,19 @@ def filter(event)

context "when the flush return events" do
it "increments the out" do
ruby_context = RubyUtil::RUBY.getCurrentContext
subject.to_java.multiFilter(ruby_context, [LogStash::Event.new])
subject.to_java.multiFilter([LogStash::Event.new])
event_metrics = metric.collector.snapshot_metric.metric_store.get_with_path(
"filter/my_filter"
)[:filter][:my_filter][:events]
expect(event_metrics[:out].value).to eq(0)
subject.to_java.flush(ruby_context, {})
subject.to_java.flush({})
expect(event_metrics[:out].value).to eq(1)
end
end

context "when the flush doesn't return anything" do
it "doesnt increment the out" do
subject.to_java.flush(RubyUtil::RUBY.getCurrentContext, {})
subject.to_java.flush({})
expect(
metric.collector.snapshot_metric.metric_store.
get_with_path("filter/my_filter")[:filter][:my_filter][:events][:duration_in_millis].value
Expand All @@ -107,15 +106,15 @@ def filter(event)
context "when the filter buffer events" do

it "has incremented :in" do
subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events)
subject.to_java.multiFilter(events)
expect(
metric.collector.snapshot_metric.metric_store.
get_with_path("filter/my_filter")[:filter][:my_filter][:events][:in].value
).to eq(events.size)
end

it "has not incremented :out" do
subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events)
subject.to_java.multiFilter(events)
expect(
metric.collector.snapshot_metric.metric_store.
get_with_path("filter/my_filter")[:filter][:my_filter][:events][:out].value
Expand All @@ -140,7 +139,7 @@ def filter(event)
end

it "increments the in/out of the metric" do
subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events)
subject.to_java.multiFilter(events)
event_metrics = metric.collector.snapshot_metric.metric_store.get_with_path(
"filter/my_filter"
)[:filter][:my_filter][:events]
Expand Down Expand Up @@ -171,7 +170,7 @@ def filter(event)
end

it "increments the in/out of the metric" do
subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events)
subject.to_java.multiFilter(events)
event_metrics = metric.collector.snapshot_metric.metric_store.get_with_path(
"filter/my_filter"
)[:filter][:my_filter][:events]
Expand Down
3 changes: 2 additions & 1 deletion logstash-core/src/main/java/org/logstash/ConvertedMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.jruby.RubyString;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.execution.WorkerLoop;

/**
* <p>This class is an internal API and behaves very different from a standard {@link Map}.</p>
Expand Down Expand Up @@ -60,7 +61,7 @@ public static ConvertedMap newFromMap(Map<? extends Serializable, Object> o) {
}

public static ConvertedMap newFromRubyHash(final RubyHash o) {
return newFromRubyHash(o.getRuntime().getCurrentContext(), o);
return newFromRubyHash(WorkerLoop.THREAD_CONTEXT.get(), o);
}

public static ConvertedMap newFromRubyHash(final ThreadContext context, final RubyHash o) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package org.logstash.config.ir.compiler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.jruby.Ruby;
import org.jruby.runtime.ThreadContext;

/**
* A syntactic closure.
Expand All @@ -18,28 +15,6 @@ final class Closure implements MethodLevelSyntaxElement {
*/
public static final Closure EMPTY = new Closure(Collections.emptyList());

/**
* Variable declaration for the Ruby thread-context,
* renders as {@code final ThreadContext context}.
*/
private static final VariableDefinition RUBY_THREAD_CONTEXT =
new VariableDefinition(ThreadContext.class, "context");

/**
* Variable declaration for the Ruby thread-context,
* renders as {@code final ThreadContext context = RubyUtil.RUBY.getCurrentContext()}.
*/
private static final MethodLevelSyntaxElement CACHE_RUBY_THREADCONTEXT =
SyntaxFactory.definition(
RUBY_THREAD_CONTEXT, ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT
);

/**
* Variable referencing the current Ruby thread context.
*/
private static final ValueSyntaxElement CACHED_RUBY_THREADCONTEXT =
RUBY_THREAD_CONTEXT.access();

private final List<MethodLevelSyntaxElement> statements;

public static Closure wrap(final MethodLevelSyntaxElement... statements) {
Expand Down Expand Up @@ -78,50 +53,10 @@ public boolean empty() {

@Override
public String generateCode() {
final Collection<MethodLevelSyntaxElement> optimized =
this.optimizeRubyThreadContexts().statements;
return optimized.isEmpty() ? "" : SyntaxFactory.join(
optimized.stream().map(MethodLevelSyntaxElement::generateCode).collect(
return statements.isEmpty() ? "" : SyntaxFactory.join(
statements.stream().map(MethodLevelSyntaxElement::generateCode).collect(
Collectors.joining(";\n")
), ";"
);
}

/**
* Removes duplicate calls to {@link Ruby#getCurrentContext()} by caching them to a variable.
* @return Copy of this Closure without redundant calls to {@link Ruby#getCurrentContext()}
*/
private Closure optimizeRubyThreadContexts() {
final ArrayList<Integer> rubyCalls = new ArrayList<>();
for (int i = 0; i < statements.size(); ++i) {
if (statements.get(i).count(ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT) > 0) {
rubyCalls.add(i);
}
}
final Closure optimized;
if (rubyCalls.size() > 1) {
optimized = (Closure) new Closure().add(this).replace(
ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT, CACHED_RUBY_THREADCONTEXT
);
optimized.statements.add(rubyCalls.get(0), CACHE_RUBY_THREADCONTEXT);
} else {
optimized = this;
}
return optimized;
}

@Override
public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search,
final MethodLevelSyntaxElement replacement) {
final Closure result = new Closure();
for (final MethodLevelSyntaxElement element : this.statements) {
result.add(element.replace(search, replacement));
}
return result;
}

@Override
public int count(final MethodLevelSyntaxElement search) {
return statements.stream().mapToInt(child -> child.count(search)).sum();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,20 +187,15 @@ public static ComputeStepSyntaxElement<Dataset> outputDataset(final Collection<D

private static ValueSyntaxElement invokeOutput(final ValueSyntaxElement output,
final MethodLevelSyntaxElement events) {
return output.call("multiReceive", ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT, events);
return output.call("multiReceive", events);
}

private static Closure filterBody(final ValueSyntaxElement outputBuffer,
final ValueSyntaxElement inputBuffer, final ClassFields fields,
final FilterDelegatorExt plugin) {
final ValueSyntaxElement filterField = fields.add(plugin);
final Closure body = Closure.wrap(
buffer(
outputBuffer,
filterField.call(
"multiFilter", ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT, inputBuffer
)
)
buffer(outputBuffer, filterField.call("multiFilter", inputBuffer))
);
if (plugin.hasFlush()) {
body.add(callFilterFlush(fields, outputBuffer, filterField, !plugin.periodicFlush()));
Expand Down Expand Up @@ -317,13 +312,7 @@ private static MethodLevelSyntaxElement callFilterFlush(final ClassFields fields
);
}
return SyntaxFactory.ifCondition(
condition,
Closure.wrap(
buffer(
resultBuffer,
filterPlugin.call(FLUSH, ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT, flushArgs)
)
)
condition, Closure.wrap(buffer(resultBuffer, filterPlugin.call(FLUSH, flushArgs)))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.logstash.config.ir.expression.binary.RegexEq;
import org.logstash.config.ir.expression.unary.Not;
import org.logstash.config.ir.expression.unary.Truthy;
import org.logstash.execution.WorkerLoop;
import org.logstash.ext.JrubyEventExtLibrary;

/**
Expand Down Expand Up @@ -598,7 +599,7 @@ private FieldMatches(final String field, final String regex) {
public boolean fulfilled(final JrubyEventExtLibrary.RubyEvent event) {
final Object tomatch = event.getEvent().getUnconvertedField(field);
return tomatch instanceof RubyString &&
!((RubyString) tomatch).match(RubyUtil.RUBY.getCurrentContext(), regex).isNil();
!((RubyString) tomatch).match(WorkerLoop.THREAD_CONTEXT.get(), regex).isNil();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.execution.WorkerLoop;
import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.instrument.metrics.MetricKeys;
import org.logstash.instrument.metrics.counter.LongCounter;
Expand Down Expand Up @@ -127,7 +128,8 @@ public IRubyObject strategy(final ThreadContext context) {
}

@SuppressWarnings("unchecked")
public RubyArray multiFilter(final ThreadContext context, final RubyArray batch) {
public RubyArray multiFilter(final RubyArray batch) {
final ThreadContext context = WorkerLoop.THREAD_CONTEXT.get();
eventMetricIn.increment((long) batch.size());
final long start = System.nanoTime();
final RubyArray result = (RubyArray) filter.callMethod(context, "multi_filter", batch);
Expand All @@ -144,7 +146,8 @@ public RubyArray multiFilter(final ThreadContext context, final RubyArray batch)
return result;
}

public RubyArray flush(final ThreadContext context, final RubyHash options) {
public RubyArray flush(final RubyHash options) {
final ThreadContext context = WorkerLoop.THREAD_CONTEXT.get();
final IRubyObject newEvents = filter.callMethod(context, "flush", options);
final RubyArray result;
if (newEvents.isNil()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,4 @@ interface MethodLevelSyntaxElement extends SyntaxElement {
* Syntax element that generates {@code return null}.
*/
MethodLevelSyntaxElement RETURN_NULL = SyntaxFactory.ret(SyntaxFactory.value("null"));

/**
* Replace any occurrences of {@code search} by {@code replacement} in this element.
* @param search Syntax element to replace
* @param replacement Replacement
* @return A copy of this element with the replacement applied
*/
MethodLevelSyntaxElement replace(MethodLevelSyntaxElement search,
MethodLevelSyntaxElement replacement);

/**
* Count the number of occurrences of {@code search} in this element.
* @param search Element to count
* @return Number of occurrences
*/
int count(MethodLevelSyntaxElement search);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.jruby.runtime.Block;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.execution.WorkerLoop;
import org.logstash.instrument.metrics.MetricKeys;
import org.logstash.instrument.metrics.counter.LongCounter;

Expand Down Expand Up @@ -134,6 +135,10 @@ public IRubyObject strategy(final ThreadContext context) {
return strategy;
}

public IRubyObject multiReceive(final RubyArray events) {
return multiReceive(WorkerLoop.THREAD_CONTEXT.get(), events);
}

@JRubyMethod(name = "multi_receive")
public IRubyObject multiReceive(final ThreadContext context, final IRubyObject events) {
final RubyArray batch = (RubyArray) events;
Expand Down
Loading