diff --git a/logstash-core/spec/logstash/java_filter_delegator_spec.rb b/logstash-core/spec/logstash/java_filter_delegator_spec.rb
index 931afa965fc..c167da65002 100644
--- a/logstash-core/spec/logstash/java_filter_delegator_spec.rb
+++ b/logstash-core/spec/logstash/java_filter_delegator_spec.rb
@@ -83,20 +83,19 @@ def filter(event)
context "when the flush return events" do
it "increments the out" do
- ruby_context = RubyUtil::RUBY.getCurrentContext
- subject.to_java.multiFilter(ruby_context, [LogStash::Event.new])
+ subject.to_java.multiFilter([LogStash::Event.new])
event_metrics = metric.collector.snapshot_metric.metric_store.get_with_path(
"filter/my_filter"
)[:filter][:my_filter][:events]
expect(event_metrics[:out].value).to eq(0)
- subject.to_java.flush(ruby_context, {})
+ subject.to_java.flush({})
expect(event_metrics[:out].value).to eq(1)
end
end
context "when the flush doesn't return anything" do
it "doesnt increment the out" do
- subject.to_java.flush(RubyUtil::RUBY.getCurrentContext, {})
+ subject.to_java.flush({})
expect(
metric.collector.snapshot_metric.metric_store.
get_with_path("filter/my_filter")[:filter][:my_filter][:events][:duration_in_millis].value
@@ -107,7 +106,7 @@ def filter(event)
context "when the filter buffer events" do
it "has incremented :in" do
- subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events)
+ subject.to_java.multiFilter(events)
expect(
metric.collector.snapshot_metric.metric_store.
get_with_path("filter/my_filter")[:filter][:my_filter][:events][:in].value
@@ -115,7 +114,7 @@ def filter(event)
end
it "has not incremented :out" do
- subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events)
+ subject.to_java.multiFilter(events)
expect(
metric.collector.snapshot_metric.metric_store.
get_with_path("filter/my_filter")[:filter][:my_filter][:events][:out].value
@@ -140,7 +139,7 @@ def filter(event)
end
it "increments the in/out of the metric" do
- subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events)
+ subject.to_java.multiFilter(events)
event_metrics = metric.collector.snapshot_metric.metric_store.get_with_path(
"filter/my_filter"
)[:filter][:my_filter][:events]
@@ -171,7 +170,7 @@ def filter(event)
end
it "increments the in/out of the metric" do
- subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events)
+ subject.to_java.multiFilter(events)
event_metrics = metric.collector.snapshot_metric.metric_store.get_with_path(
"filter/my_filter"
)[:filter][:my_filter][:events]
diff --git a/logstash-core/src/main/java/org/logstash/ConvertedMap.java b/logstash-core/src/main/java/org/logstash/ConvertedMap.java
index 6f08f1b6068..2cf235889f9 100644
--- a/logstash-core/src/main/java/org/logstash/ConvertedMap.java
+++ b/logstash-core/src/main/java/org/logstash/ConvertedMap.java
@@ -8,6 +8,7 @@
import org.jruby.RubyString;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
+import org.logstash.execution.WorkerLoop;
/**
*
This class is an internal API and behaves very different from a standard {@link Map}.
@@ -60,7 +61,7 @@ public static ConvertedMap newFromMap(Map extends Serializable, Object> o) {
}
public static ConvertedMap newFromRubyHash(final RubyHash o) {
- return newFromRubyHash(o.getRuntime().getCurrentContext(), o);
+ return newFromRubyHash(WorkerLoop.THREAD_CONTEXT.get(), o);
}
public static ConvertedMap newFromRubyHash(final ThreadContext context, final RubyHash o) {
diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/Closure.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/Closure.java
index 6af264497a9..6173a8c352f 100644
--- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/Closure.java
+++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/Closure.java
@@ -1,12 +1,9 @@
package org.logstash.config.ir.compiler;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
-import org.jruby.Ruby;
-import org.jruby.runtime.ThreadContext;
/**
* A syntactic closure.
@@ -18,28 +15,6 @@ final class Closure implements MethodLevelSyntaxElement {
*/
public static final Closure EMPTY = new Closure(Collections.emptyList());
- /**
- * Variable declaration for the Ruby thread-context,
- * renders as {@code final ThreadContext context}.
- */
- private static final VariableDefinition RUBY_THREAD_CONTEXT =
- new VariableDefinition(ThreadContext.class, "context");
-
- /**
- * Variable declaration for the Ruby thread-context,
- * renders as {@code final ThreadContext context = RubyUtil.RUBY.getCurrentContext()}.
- */
- private static final MethodLevelSyntaxElement CACHE_RUBY_THREADCONTEXT =
- SyntaxFactory.definition(
- RUBY_THREAD_CONTEXT, ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT
- );
-
- /**
- * Variable referencing the current Ruby thread context.
- */
- private static final ValueSyntaxElement CACHED_RUBY_THREADCONTEXT =
- RUBY_THREAD_CONTEXT.access();
-
private final List statements;
public static Closure wrap(final MethodLevelSyntaxElement... statements) {
@@ -78,50 +53,10 @@ public boolean empty() {
@Override
public String generateCode() {
- final Collection optimized =
- this.optimizeRubyThreadContexts().statements;
- return optimized.isEmpty() ? "" : SyntaxFactory.join(
- optimized.stream().map(MethodLevelSyntaxElement::generateCode).collect(
+ return statements.isEmpty() ? "" : SyntaxFactory.join(
+ statements.stream().map(MethodLevelSyntaxElement::generateCode).collect(
Collectors.joining(";\n")
), ";"
);
}
-
- /**
- * Removes duplicate calls to {@link Ruby#getCurrentContext()} by caching them to a variable.
- * @return Copy of this Closure without redundant calls to {@link Ruby#getCurrentContext()}
- */
- private Closure optimizeRubyThreadContexts() {
- final ArrayList rubyCalls = new ArrayList<>();
- for (int i = 0; i < statements.size(); ++i) {
- if (statements.get(i).count(ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT) > 0) {
- rubyCalls.add(i);
- }
- }
- final Closure optimized;
- if (rubyCalls.size() > 1) {
- optimized = (Closure) new Closure().add(this).replace(
- ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT, CACHED_RUBY_THREADCONTEXT
- );
- optimized.statements.add(rubyCalls.get(0), CACHE_RUBY_THREADCONTEXT);
- } else {
- optimized = this;
- }
- return optimized;
- }
-
- @Override
- public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search,
- final MethodLevelSyntaxElement replacement) {
- final Closure result = new Closure();
- for (final MethodLevelSyntaxElement element : this.statements) {
- result.add(element.replace(search, replacement));
- }
- return result;
- }
-
- @Override
- public int count(final MethodLevelSyntaxElement search) {
- return statements.stream().mapToInt(child -> child.count(search)).sum();
- }
}
diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java
index aafbd93d1cd..c1b4c99015f 100644
--- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java
+++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java
@@ -187,7 +187,7 @@ public static ComputeStepSyntaxElement outputDataset(final Collection clazz, final ValueSyntaxElement argument) {
return new SyntaxFactory.TypeCastStatement(clazz, argument);
}
public static MethodLevelSyntaxElement and(final ValueSyntaxElement left,
final ValueSyntaxElement right) {
- return new MethodLevelSyntaxElement() {
-
- @Override
- public String generateCode() {
- return join("(", left.generateCode(), "&&", right.generateCode(), ")");
- }
-
- @Override
- public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search,
- final MethodLevelSyntaxElement replacement) {
- return and(
- (ValueSyntaxElement) left.replace(search, replacement),
- (ValueSyntaxElement) right.replace(search, replacement)
- );
- }
-
- @Override
- public int count(final MethodLevelSyntaxElement search) {
- return left.count(search) + right.count(search);
- }
- };
+ return () -> join("(", left.generateCode(), "&&", right.generateCode(), ")");
}
public static ValueSyntaxElement ternary(final ValueSyntaxElement condition,
@@ -81,50 +56,15 @@ public static ValueSyntaxElement ternary(final ValueSyntaxElement condition,
}
public static MethodLevelSyntaxElement not(final ValueSyntaxElement var) {
- return new MethodLevelSyntaxElement() {
- @Override
- public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search,
- final MethodLevelSyntaxElement replacement) {
- return not((ValueSyntaxElement) var.replace(search, replacement));
- }
-
- @Override
- public int count(final MethodLevelSyntaxElement search) {
- return var.count(search);
- }
-
- @Override
- public String generateCode() {
- return join("!(", var.generateCode(), ")");
- }
- };
+ return () -> join("!(", var.generateCode(), ")");
}
public static MethodLevelSyntaxElement forLoop(final VariableDefinition element,
final MethodLevelSyntaxElement iterable, final Closure body) {
- return new MethodLevelSyntaxElement() {
- @Override
- public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search,
- final MethodLevelSyntaxElement replacement) {
- return forLoop(
- element, iterable.replace(search, replacement),
- (Closure) body.replace(search, replacement)
- );
- }
-
- @Override
- public int count(final MethodLevelSyntaxElement search) {
- return iterable.count(search) + iterable.count(search);
- }
-
- @Override
- public String generateCode() {
- return join(
- "for (", element.generateCode(), " : ",
- iterable.generateCode(), ") {\n", body.generateCode(), "\n}"
- );
- }
- };
+ return () -> join(
+ "for (", element.generateCode(), " : ",
+ iterable.generateCode(), ") {\n", body.generateCode(), "\n}"
+ );
}
public static MethodLevelSyntaxElement ifCondition(final MethodLevelSyntaxElement condition,
@@ -134,37 +74,13 @@ public static MethodLevelSyntaxElement ifCondition(final MethodLevelSyntaxElemen
public static MethodLevelSyntaxElement ifCondition(final MethodLevelSyntaxElement condition,
final Closure left, final Closure right) {
- return new MethodLevelSyntaxElement() {
- @Override
- public String generateCode() {
- return join(
- "if(", condition.generateCode(), ") {\n", left.generateCode(),
- "\n}",
- right.empty() ? "" : join(" else {\n", right.generateCode(), "\n}")
- );
- }
-
- @Override
- public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search,
- final MethodLevelSyntaxElement replacement) {
- return ifCondition(
- condition.replace(search, replacement),
- (Closure) left.replace(search, replacement),
- (Closure) right.replace(search, replacement)
- );
- }
-
- @Override
- public int count(final MethodLevelSyntaxElement search) {
- return condition.count(search) + left.count(search) + right.count(search);
- }
- };
+ return () -> join(
+ "if(", condition.generateCode(), ") {\n", left.generateCode(),
+ "\n}",
+ right.empty() ? "" : join(" else {\n", right.generateCode(), "\n}")
+ );
}
- /**
- * Syntax Element that cannot be replaced via
- * {@link MethodLevelSyntaxElement#replace(MethodLevelSyntaxElement, MethodLevelSyntaxElement)}.
- */
public static final class IdentifierStatement implements ValueSyntaxElement {
private final String value;
@@ -177,17 +93,6 @@ private IdentifierStatement(final String value) {
public String generateCode() {
return value;
}
-
- @Override
- public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search,
- final MethodLevelSyntaxElement replacement) {
- return this;
- }
-
- @Override
- public int count(final MethodLevelSyntaxElement search) {
- return this == search ? 1 : 0;
- }
}
/**
@@ -208,17 +113,6 @@ private Assignment(final SyntaxElement field, final MethodLevelSyntaxElement val
public String generateCode() {
return join(field.generateCode(), "=", value.generateCode());
}
-
- @Override
- public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search,
- final MethodLevelSyntaxElement replacement) {
- return new SyntaxFactory.Assignment(field, value.replace(search, replacement));
- }
-
- @Override
- public int count(final MethodLevelSyntaxElement search) {
- return value.count(search);
- }
}
/**
@@ -237,17 +131,6 @@ public String generateCode() {
return value;
}
- @Override
- public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search,
- final MethodLevelSyntaxElement replacement) {
- return this.equals(search) ? replacement : this;
- }
-
- @Override
- public int count(final MethodLevelSyntaxElement search) {
- return this.equals(search) ? 1 : 0;
- }
-
@Override
public boolean equals(final Object other) {
if (this == other) {
@@ -287,22 +170,6 @@ static final class MethodCallReturnValue implements ValueSyntaxElement {
this.method = method;
}
- @Override
- public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search,
- final MethodLevelSyntaxElement replacement) {
- return this.equals(search) ? replacement : new SyntaxFactory.MethodCallReturnValue(
- instance.replace(search, replacement), method,
- args.stream().map(var -> var.replace(search, replacement))
- .toArray(ValueSyntaxElement[]::new)
- );
- }
-
- @Override
- public int count(final MethodLevelSyntaxElement search) {
- return this.equals(search) ? 1 :
- instance.count(search) + args.stream().mapToInt(v -> v.count(search)).sum();
- }
-
@Override
public String generateCode() {
return join(
@@ -339,19 +206,6 @@ private TypeCastStatement(final Class> clazz, final ValueSyntaxElement argumen
this.argument = argument;
}
- @Override
- public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search,
- final MethodLevelSyntaxElement replacement) {
- return new SyntaxFactory.TypeCastStatement(
- clazz, (ValueSyntaxElement) argument.replace(search, replacement)
- );
- }
-
- @Override
- public int count(final MethodLevelSyntaxElement search) {
- return argument.count(search);
- }
-
@Override
public String generateCode() {
return join("((", clazz.getName(), ")", argument.generateCode(), ")");
@@ -370,17 +224,6 @@ private ReturnStatement(final MethodLevelSyntaxElement value) {
public String generateCode() {
return join("return ", value.generateCode());
}
-
- @Override
- public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search,
- final MethodLevelSyntaxElement replacement) {
- return new SyntaxFactory.ReturnStatement(value.replace(search, replacement));
- }
-
- @Override
- public int count(final MethodLevelSyntaxElement search) {
- return value.count(search);
- }
}
private static final class TernaryStatement implements ValueSyntaxElement {
@@ -405,20 +248,5 @@ public String generateCode() {
right.generateCode(), ")"
);
}
-
- @Override
- public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search,
- final MethodLevelSyntaxElement replacement) {
- return new SyntaxFactory.TernaryStatement(
- (ValueSyntaxElement) condition.replace(search, replacement),
- (ValueSyntaxElement) left.replace(search, replacement),
- (ValueSyntaxElement) right.replace(search, replacement)
- );
- }
-
- @Override
- public int count(final MethodLevelSyntaxElement search) {
- return left.count(search) + right.count(search);
- }
}
}
diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/ValueSyntaxElement.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/ValueSyntaxElement.java
index b0b281181e9..e5c7e36e7c5 100644
--- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/ValueSyntaxElement.java
+++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/ValueSyntaxElement.java
@@ -1,20 +1,10 @@
package org.logstash.config.ir.compiler;
-import org.jruby.Ruby;
-import org.logstash.RubyUtil;
-
/**
* An instance that can methods can be invoked on.
*/
interface ValueSyntaxElement extends MethodLevelSyntaxElement {
- /**
- * Return of the method call to {@link Ruby#getCurrentContext()} that has the current Ruby
- * thread-context as its return value.
- */
- ValueSyntaxElement GET_RUBY_THREAD_CONTEXT =
- SyntaxFactory.constant(RubyUtil.class, "RUBY").call("getCurrentContext");
-
/**
* Call method on instance.
* @param method Method Name
diff --git a/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java b/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java
index c25a7b7f58f..c3825ddf65d 100644
--- a/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java
+++ b/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java
@@ -4,11 +4,21 @@
import java.util.concurrent.atomic.LongAdder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.jruby.runtime.ThreadContext;
+import org.logstash.RubyUtil;
import org.logstash.config.ir.CompiledPipeline;
import org.logstash.config.ir.compiler.Dataset;
public final class WorkerLoop implements Runnable {
+ /**
+ * Hard Reference to the Ruby {@link ThreadContext} for this thread. It is ok to keep
+ * a hard reference instead of Ruby's weak references here since we can expect worker threads
+ * to be runnable most of the time.
+ */
+ public static final ThreadLocal THREAD_CONTEXT =
+ ThreadLocal.withInitial(RubyUtil.RUBY::getCurrentContext);
+
private static final Logger LOGGER = LogManager.getLogger(WorkerLoop.class);
private final Dataset execution;
diff --git a/logstash-core/src/test/java/org/logstash/config/ir/compiler/DatasetCompilerTest.java b/logstash-core/src/test/java/org/logstash/config/ir/compiler/DatasetCompilerTest.java
index 45e4b10ecdb..bb5e74085fe 100644
--- a/logstash-core/src/test/java/org/logstash/config/ir/compiler/DatasetCompilerTest.java
+++ b/logstash-core/src/test/java/org/logstash/config/ir/compiler/DatasetCompilerTest.java
@@ -2,7 +2,6 @@
import java.util.Collections;
import org.jruby.RubyArray;
-import org.jruby.runtime.ThreadContext;
import org.junit.Test;
import org.logstash.Event;
import org.logstash.FieldReference;
@@ -50,28 +49,4 @@ public void compilesSplitDataset() {
assertThat(left.compute(batch, false, false).size(), is(1));
assertThat(right.compute(batch, false, false).size(), is(1));
}
-
- @Test
- public void optimizesRedundantRubyThreadContext() {
- assertThat(
- Closure.wrap(
- SyntaxFactory.definition(
- new VariableDefinition(ThreadContext.class, "context1"),
- ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT
- ),
- SyntaxFactory.definition(
- new VariableDefinition(ThreadContext.class, "context2"),
- ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT
- )
- ).generateCode(),
- is(
- String.join(
- "\n",
- "org.jruby.runtime.ThreadContext context=org.logstash.RubyUtil.RUBY.getCurrentContext();",
- "org.jruby.runtime.ThreadContext context1=context;",
- "org.jruby.runtime.ThreadContext context2=context;"
- )
- )
- );
- }
}
diff --git a/logstash-core/src/test/java/org/logstash/ext/JrubyMemoryReadClientExtTest.java b/logstash-core/src/test/java/org/logstash/ext/JrubyMemoryReadClientExtTest.java
index b66279525c3..c3d9ac531d6 100644
--- a/logstash-core/src/test/java/org/logstash/ext/JrubyMemoryReadClientExtTest.java
+++ b/logstash-core/src/test/java/org/logstash/ext/JrubyMemoryReadClientExtTest.java
@@ -6,10 +6,9 @@
import java.util.concurrent.BlockingQueue;
import org.jruby.RubyHash;
import org.jruby.runtime.ThreadContext;
-import org.jruby.runtime.builtin.IRubyObject;
import org.junit.Test;
-import org.logstash.RubyUtil;
import org.logstash.execution.QueueBatch;
+import org.logstash.execution.WorkerLoop;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -25,7 +24,7 @@ public void testInflightBatchesTracking() throws InterruptedException, IOExcepti
new ArrayBlockingQueue<>(10);
final JrubyMemoryReadClientExt client =
JrubyMemoryReadClientExt.create(queue, 5, 50);
- final ThreadContext context = RubyUtil.RUBY.getCurrentContext();
+ final ThreadContext context = WorkerLoop.THREAD_CONTEXT.get();
final QueueBatch batch = client.readBatch();
final RubyHash inflight = (RubyHash) client.rubyGetInflightBatches(context);
assertThat(inflight.size(), is(1));