From c1a5486e0c559edce3baac6f9027957750b551bc Mon Sep 17 00:00:00 2001 From: Armin Date: Wed, 18 Apr 2018 13:39:00 +0200 Subject: [PATCH] CLEANUP+JAVAFICATION+PERFORMANCE: Keep hard reference to Ruby Thread Context, simplify code accordingly --- .../logstash/java_filter_delegator_spec.rb | 15 +- .../main/java/org/logstash/ConvertedMap.java | 3 +- .../logstash/config/ir/compiler/Closure.java | 69 +------ .../config/ir/compiler/DatasetCompiler.java | 17 +- .../config/ir/compiler/EventCondition.java | 3 +- .../ir/compiler/FilterDelegatorExt.java | 7 +- .../ir/compiler/MethodLevelSyntaxElement.java | 16 -- .../ir/compiler/OutputDelegatorExt.java | 5 + .../config/ir/compiler/SyntaxFactory.java | 194 +----------------- .../ir/compiler/ValueSyntaxElement.java | 10 - .../org/logstash/execution/WorkerLoop.java | 10 + .../ir/compiler/DatasetCompilerTest.java | 25 --- .../ext/JrubyMemoryReadClientExtTest.java | 5 +- 13 files changed, 49 insertions(+), 330 deletions(-) diff --git a/logstash-core/spec/logstash/java_filter_delegator_spec.rb b/logstash-core/spec/logstash/java_filter_delegator_spec.rb index 931afa965fc..c167da65002 100644 --- a/logstash-core/spec/logstash/java_filter_delegator_spec.rb +++ b/logstash-core/spec/logstash/java_filter_delegator_spec.rb @@ -83,20 +83,19 @@ def filter(event) context "when the flush return events" do it "increments the out" do - ruby_context = RubyUtil::RUBY.getCurrentContext - subject.to_java.multiFilter(ruby_context, [LogStash::Event.new]) + subject.to_java.multiFilter([LogStash::Event.new]) event_metrics = metric.collector.snapshot_metric.metric_store.get_with_path( "filter/my_filter" )[:filter][:my_filter][:events] expect(event_metrics[:out].value).to eq(0) - subject.to_java.flush(ruby_context, {}) + subject.to_java.flush({}) expect(event_metrics[:out].value).to eq(1) end end context "when the flush doesn't return anything" do it "doesnt increment the out" do - subject.to_java.flush(RubyUtil::RUBY.getCurrentContext, {}) + subject.to_java.flush({}) expect( metric.collector.snapshot_metric.metric_store. get_with_path("filter/my_filter")[:filter][:my_filter][:events][:duration_in_millis].value @@ -107,7 +106,7 @@ def filter(event) context "when the filter buffer events" do it "has incremented :in" do - subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events) + subject.to_java.multiFilter(events) expect( metric.collector.snapshot_metric.metric_store. get_with_path("filter/my_filter")[:filter][:my_filter][:events][:in].value @@ -115,7 +114,7 @@ def filter(event) end it "has not incremented :out" do - subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events) + subject.to_java.multiFilter(events) expect( metric.collector.snapshot_metric.metric_store. get_with_path("filter/my_filter")[:filter][:my_filter][:events][:out].value @@ -140,7 +139,7 @@ def filter(event) end it "increments the in/out of the metric" do - subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events) + subject.to_java.multiFilter(events) event_metrics = metric.collector.snapshot_metric.metric_store.get_with_path( "filter/my_filter" )[:filter][:my_filter][:events] @@ -171,7 +170,7 @@ def filter(event) end it "increments the in/out of the metric" do - subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events) + subject.to_java.multiFilter(events) event_metrics = metric.collector.snapshot_metric.metric_store.get_with_path( "filter/my_filter" )[:filter][:my_filter][:events] diff --git a/logstash-core/src/main/java/org/logstash/ConvertedMap.java b/logstash-core/src/main/java/org/logstash/ConvertedMap.java index 6f08f1b6068..2cf235889f9 100644 --- a/logstash-core/src/main/java/org/logstash/ConvertedMap.java +++ b/logstash-core/src/main/java/org/logstash/ConvertedMap.java @@ -8,6 +8,7 @@ import org.jruby.RubyString; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.execution.WorkerLoop; /** *

This class is an internal API and behaves very different from a standard {@link Map}.

@@ -60,7 +61,7 @@ public static ConvertedMap newFromMap(Map o) { } public static ConvertedMap newFromRubyHash(final RubyHash o) { - return newFromRubyHash(o.getRuntime().getCurrentContext(), o); + return newFromRubyHash(WorkerLoop.THREAD_CONTEXT.get(), o); } public static ConvertedMap newFromRubyHash(final ThreadContext context, final RubyHash o) { diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/Closure.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/Closure.java index 6af264497a9..6173a8c352f 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/Closure.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/Closure.java @@ -1,12 +1,9 @@ package org.logstash.config.ir.compiler; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; -import org.jruby.Ruby; -import org.jruby.runtime.ThreadContext; /** * A syntactic closure. @@ -18,28 +15,6 @@ final class Closure implements MethodLevelSyntaxElement { */ public static final Closure EMPTY = new Closure(Collections.emptyList()); - /** - * Variable declaration for the Ruby thread-context, - * renders as {@code final ThreadContext context}. - */ - private static final VariableDefinition RUBY_THREAD_CONTEXT = - new VariableDefinition(ThreadContext.class, "context"); - - /** - * Variable declaration for the Ruby thread-context, - * renders as {@code final ThreadContext context = RubyUtil.RUBY.getCurrentContext()}. - */ - private static final MethodLevelSyntaxElement CACHE_RUBY_THREADCONTEXT = - SyntaxFactory.definition( - RUBY_THREAD_CONTEXT, ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT - ); - - /** - * Variable referencing the current Ruby thread context. - */ - private static final ValueSyntaxElement CACHED_RUBY_THREADCONTEXT = - RUBY_THREAD_CONTEXT.access(); - private final List statements; public static Closure wrap(final MethodLevelSyntaxElement... statements) { @@ -78,50 +53,10 @@ public boolean empty() { @Override public String generateCode() { - final Collection optimized = - this.optimizeRubyThreadContexts().statements; - return optimized.isEmpty() ? "" : SyntaxFactory.join( - optimized.stream().map(MethodLevelSyntaxElement::generateCode).collect( + return statements.isEmpty() ? "" : SyntaxFactory.join( + statements.stream().map(MethodLevelSyntaxElement::generateCode).collect( Collectors.joining(";\n") ), ";" ); } - - /** - * Removes duplicate calls to {@link Ruby#getCurrentContext()} by caching them to a variable. - * @return Copy of this Closure without redundant calls to {@link Ruby#getCurrentContext()} - */ - private Closure optimizeRubyThreadContexts() { - final ArrayList rubyCalls = new ArrayList<>(); - for (int i = 0; i < statements.size(); ++i) { - if (statements.get(i).count(ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT) > 0) { - rubyCalls.add(i); - } - } - final Closure optimized; - if (rubyCalls.size() > 1) { - optimized = (Closure) new Closure().add(this).replace( - ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT, CACHED_RUBY_THREADCONTEXT - ); - optimized.statements.add(rubyCalls.get(0), CACHE_RUBY_THREADCONTEXT); - } else { - optimized = this; - } - return optimized; - } - - @Override - public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search, - final MethodLevelSyntaxElement replacement) { - final Closure result = new Closure(); - for (final MethodLevelSyntaxElement element : this.statements) { - result.add(element.replace(search, replacement)); - } - return result; - } - - @Override - public int count(final MethodLevelSyntaxElement search) { - return statements.stream().mapToInt(child -> child.count(search)).sum(); - } } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java index aafbd93d1cd..c1b4c99015f 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java @@ -187,7 +187,7 @@ public static ComputeStepSyntaxElement outputDataset(final Collection clazz, final ValueSyntaxElement argument) { return new SyntaxFactory.TypeCastStatement(clazz, argument); } public static MethodLevelSyntaxElement and(final ValueSyntaxElement left, final ValueSyntaxElement right) { - return new MethodLevelSyntaxElement() { - - @Override - public String generateCode() { - return join("(", left.generateCode(), "&&", right.generateCode(), ")"); - } - - @Override - public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search, - final MethodLevelSyntaxElement replacement) { - return and( - (ValueSyntaxElement) left.replace(search, replacement), - (ValueSyntaxElement) right.replace(search, replacement) - ); - } - - @Override - public int count(final MethodLevelSyntaxElement search) { - return left.count(search) + right.count(search); - } - }; + return () -> join("(", left.generateCode(), "&&", right.generateCode(), ")"); } public static ValueSyntaxElement ternary(final ValueSyntaxElement condition, @@ -81,50 +56,15 @@ public static ValueSyntaxElement ternary(final ValueSyntaxElement condition, } public static MethodLevelSyntaxElement not(final ValueSyntaxElement var) { - return new MethodLevelSyntaxElement() { - @Override - public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search, - final MethodLevelSyntaxElement replacement) { - return not((ValueSyntaxElement) var.replace(search, replacement)); - } - - @Override - public int count(final MethodLevelSyntaxElement search) { - return var.count(search); - } - - @Override - public String generateCode() { - return join("!(", var.generateCode(), ")"); - } - }; + return () -> join("!(", var.generateCode(), ")"); } public static MethodLevelSyntaxElement forLoop(final VariableDefinition element, final MethodLevelSyntaxElement iterable, final Closure body) { - return new MethodLevelSyntaxElement() { - @Override - public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search, - final MethodLevelSyntaxElement replacement) { - return forLoop( - element, iterable.replace(search, replacement), - (Closure) body.replace(search, replacement) - ); - } - - @Override - public int count(final MethodLevelSyntaxElement search) { - return iterable.count(search) + iterable.count(search); - } - - @Override - public String generateCode() { - return join( - "for (", element.generateCode(), " : ", - iterable.generateCode(), ") {\n", body.generateCode(), "\n}" - ); - } - }; + return () -> join( + "for (", element.generateCode(), " : ", + iterable.generateCode(), ") {\n", body.generateCode(), "\n}" + ); } public static MethodLevelSyntaxElement ifCondition(final MethodLevelSyntaxElement condition, @@ -134,37 +74,13 @@ public static MethodLevelSyntaxElement ifCondition(final MethodLevelSyntaxElemen public static MethodLevelSyntaxElement ifCondition(final MethodLevelSyntaxElement condition, final Closure left, final Closure right) { - return new MethodLevelSyntaxElement() { - @Override - public String generateCode() { - return join( - "if(", condition.generateCode(), ") {\n", left.generateCode(), - "\n}", - right.empty() ? "" : join(" else {\n", right.generateCode(), "\n}") - ); - } - - @Override - public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search, - final MethodLevelSyntaxElement replacement) { - return ifCondition( - condition.replace(search, replacement), - (Closure) left.replace(search, replacement), - (Closure) right.replace(search, replacement) - ); - } - - @Override - public int count(final MethodLevelSyntaxElement search) { - return condition.count(search) + left.count(search) + right.count(search); - } - }; + return () -> join( + "if(", condition.generateCode(), ") {\n", left.generateCode(), + "\n}", + right.empty() ? "" : join(" else {\n", right.generateCode(), "\n}") + ); } - /** - * Syntax Element that cannot be replaced via - * {@link MethodLevelSyntaxElement#replace(MethodLevelSyntaxElement, MethodLevelSyntaxElement)}. - */ public static final class IdentifierStatement implements ValueSyntaxElement { private final String value; @@ -177,17 +93,6 @@ private IdentifierStatement(final String value) { public String generateCode() { return value; } - - @Override - public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search, - final MethodLevelSyntaxElement replacement) { - return this; - } - - @Override - public int count(final MethodLevelSyntaxElement search) { - return this == search ? 1 : 0; - } } /** @@ -208,17 +113,6 @@ private Assignment(final SyntaxElement field, final MethodLevelSyntaxElement val public String generateCode() { return join(field.generateCode(), "=", value.generateCode()); } - - @Override - public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search, - final MethodLevelSyntaxElement replacement) { - return new SyntaxFactory.Assignment(field, value.replace(search, replacement)); - } - - @Override - public int count(final MethodLevelSyntaxElement search) { - return value.count(search); - } } /** @@ -237,17 +131,6 @@ public String generateCode() { return value; } - @Override - public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search, - final MethodLevelSyntaxElement replacement) { - return this.equals(search) ? replacement : this; - } - - @Override - public int count(final MethodLevelSyntaxElement search) { - return this.equals(search) ? 1 : 0; - } - @Override public boolean equals(final Object other) { if (this == other) { @@ -287,22 +170,6 @@ static final class MethodCallReturnValue implements ValueSyntaxElement { this.method = method; } - @Override - public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search, - final MethodLevelSyntaxElement replacement) { - return this.equals(search) ? replacement : new SyntaxFactory.MethodCallReturnValue( - instance.replace(search, replacement), method, - args.stream().map(var -> var.replace(search, replacement)) - .toArray(ValueSyntaxElement[]::new) - ); - } - - @Override - public int count(final MethodLevelSyntaxElement search) { - return this.equals(search) ? 1 : - instance.count(search) + args.stream().mapToInt(v -> v.count(search)).sum(); - } - @Override public String generateCode() { return join( @@ -339,19 +206,6 @@ private TypeCastStatement(final Class clazz, final ValueSyntaxElement argumen this.argument = argument; } - @Override - public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search, - final MethodLevelSyntaxElement replacement) { - return new SyntaxFactory.TypeCastStatement( - clazz, (ValueSyntaxElement) argument.replace(search, replacement) - ); - } - - @Override - public int count(final MethodLevelSyntaxElement search) { - return argument.count(search); - } - @Override public String generateCode() { return join("((", clazz.getName(), ")", argument.generateCode(), ")"); @@ -370,17 +224,6 @@ private ReturnStatement(final MethodLevelSyntaxElement value) { public String generateCode() { return join("return ", value.generateCode()); } - - @Override - public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search, - final MethodLevelSyntaxElement replacement) { - return new SyntaxFactory.ReturnStatement(value.replace(search, replacement)); - } - - @Override - public int count(final MethodLevelSyntaxElement search) { - return value.count(search); - } } private static final class TernaryStatement implements ValueSyntaxElement { @@ -405,20 +248,5 @@ public String generateCode() { right.generateCode(), ")" ); } - - @Override - public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search, - final MethodLevelSyntaxElement replacement) { - return new SyntaxFactory.TernaryStatement( - (ValueSyntaxElement) condition.replace(search, replacement), - (ValueSyntaxElement) left.replace(search, replacement), - (ValueSyntaxElement) right.replace(search, replacement) - ); - } - - @Override - public int count(final MethodLevelSyntaxElement search) { - return left.count(search) + right.count(search); - } } } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/ValueSyntaxElement.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/ValueSyntaxElement.java index b0b281181e9..e5c7e36e7c5 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/ValueSyntaxElement.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/ValueSyntaxElement.java @@ -1,20 +1,10 @@ package org.logstash.config.ir.compiler; -import org.jruby.Ruby; -import org.logstash.RubyUtil; - /** * An instance that can methods can be invoked on. */ interface ValueSyntaxElement extends MethodLevelSyntaxElement { - /** - * Return of the method call to {@link Ruby#getCurrentContext()} that has the current Ruby - * thread-context as its return value. - */ - ValueSyntaxElement GET_RUBY_THREAD_CONTEXT = - SyntaxFactory.constant(RubyUtil.class, "RUBY").call("getCurrentContext"); - /** * Call method on instance. * @param method Method Name diff --git a/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java b/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java index c25a7b7f58f..c3825ddf65d 100644 --- a/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java +++ b/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java @@ -4,11 +4,21 @@ import java.util.concurrent.atomic.LongAdder; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.jruby.runtime.ThreadContext; +import org.logstash.RubyUtil; import org.logstash.config.ir.CompiledPipeline; import org.logstash.config.ir.compiler.Dataset; public final class WorkerLoop implements Runnable { + /** + * Hard Reference to the Ruby {@link ThreadContext} for this thread. It is ok to keep + * a hard reference instead of Ruby's weak references here since we can expect worker threads + * to be runnable most of the time. + */ + public static final ThreadLocal THREAD_CONTEXT = + ThreadLocal.withInitial(RubyUtil.RUBY::getCurrentContext); + private static final Logger LOGGER = LogManager.getLogger(WorkerLoop.class); private final Dataset execution; diff --git a/logstash-core/src/test/java/org/logstash/config/ir/compiler/DatasetCompilerTest.java b/logstash-core/src/test/java/org/logstash/config/ir/compiler/DatasetCompilerTest.java index 45e4b10ecdb..bb5e74085fe 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/compiler/DatasetCompilerTest.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/compiler/DatasetCompilerTest.java @@ -2,7 +2,6 @@ import java.util.Collections; import org.jruby.RubyArray; -import org.jruby.runtime.ThreadContext; import org.junit.Test; import org.logstash.Event; import org.logstash.FieldReference; @@ -50,28 +49,4 @@ public void compilesSplitDataset() { assertThat(left.compute(batch, false, false).size(), is(1)); assertThat(right.compute(batch, false, false).size(), is(1)); } - - @Test - public void optimizesRedundantRubyThreadContext() { - assertThat( - Closure.wrap( - SyntaxFactory.definition( - new VariableDefinition(ThreadContext.class, "context1"), - ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT - ), - SyntaxFactory.definition( - new VariableDefinition(ThreadContext.class, "context2"), - ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT - ) - ).generateCode(), - is( - String.join( - "\n", - "org.jruby.runtime.ThreadContext context=org.logstash.RubyUtil.RUBY.getCurrentContext();", - "org.jruby.runtime.ThreadContext context1=context;", - "org.jruby.runtime.ThreadContext context2=context;" - ) - ) - ); - } } diff --git a/logstash-core/src/test/java/org/logstash/ext/JrubyMemoryReadClientExtTest.java b/logstash-core/src/test/java/org/logstash/ext/JrubyMemoryReadClientExtTest.java index b66279525c3..c3d9ac531d6 100644 --- a/logstash-core/src/test/java/org/logstash/ext/JrubyMemoryReadClientExtTest.java +++ b/logstash-core/src/test/java/org/logstash/ext/JrubyMemoryReadClientExtTest.java @@ -6,10 +6,9 @@ import java.util.concurrent.BlockingQueue; import org.jruby.RubyHash; import org.jruby.runtime.ThreadContext; -import org.jruby.runtime.builtin.IRubyObject; import org.junit.Test; -import org.logstash.RubyUtil; import org.logstash.execution.QueueBatch; +import org.logstash.execution.WorkerLoop; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -25,7 +24,7 @@ public void testInflightBatchesTracking() throws InterruptedException, IOExcepti new ArrayBlockingQueue<>(10); final JrubyMemoryReadClientExt client = JrubyMemoryReadClientExt.create(queue, 5, 50); - final ThreadContext context = RubyUtil.RUBY.getCurrentContext(); + final ThreadContext context = WorkerLoop.THREAD_CONTEXT.get(); final QueueBatch batch = client.readBatch(); final RubyHash inflight = (RubyHash) client.rubyGetInflightBatches(context); assertThat(inflight.size(), is(1));