Skip to content

Commit b4ea520

Browse files
JAVAFICATION: Move PluginFactory to Java
Fixes #9610
1 parent 06eaa84 commit b4ea520

File tree

9 files changed

+292
-85
lines changed

9 files changed

+292
-85
lines changed

logstash-core/lib/logstash/config/mixin.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
# encoding: utf-8
2-
require "logstash/plugins/registry"
32
require "logstash/util/password"
43
require "logstash/util/safe_uri"
54
require "logstash/version"

logstash-core/lib/logstash/pipeline.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
require "logstash/instrument/collector"
1414
require "logstash/filter_delegator"
1515
require "logstash/queue_factory"
16-
require "logstash/plugins/plugin_factory"
1716
require "logstash/compiler"
1817
require "securerandom"
1918

logstash-core/lib/logstash/plugins/plugin_factory.rb

Lines changed: 0 additions & 80 deletions
This file was deleted.

logstash-core/lib/logstash/plugins/registry.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
require "logstash/plugin"
44
require "logstash/plugins/hooks_registry"
55
require "logstash/modules/scaffold"
6+
require "logstash/codecs/base"
7+
require "logstash/filters/base"
8+
require "logstash/outputs/base"
69

710
module LogStash module Plugins
811
class Registry

logstash-core/lib/logstash/util/modules_setting_array.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# encoding: utf-8
2+
require "forwardable"
23
require "logstash/util/password"
34

45
module LogStash module Util class ModulesSettingArray

logstash-core/src/main/java/org/logstash/RubyUtil.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ public final class RubyUtil {
138138

139139
public static final RubyClass PLUGIN_METRIC_FACTORY_CLASS;
140140

141+
public static final RubyClass PLUGIN_FACTORY_CLASS;
142+
141143
public static final RubyClass LOGGER;
142144

143145
public static final RubyModule LOGGABLE_MODULE;
@@ -146,6 +148,8 @@ public final class RubyUtil {
146148

147149
public static final RubyModule UTIL_MODULE;
148150

151+
public static final RubyClass CONFIGURATION_ERROR_CLASS;
152+
149153
/**
150154
* Logstash Ruby Module.
151155
*/
@@ -350,7 +354,7 @@ public final class RubyUtil {
350354
LOGSTASH_MODULE.defineClassUnder(
351355
"EnvironmentError", stdErr, JRubyLogstashErrorsExt.LogstashEnvironmentError::new
352356
);
353-
LOGSTASH_MODULE.defineClassUnder(
357+
CONFIGURATION_ERROR_CLASS = LOGSTASH_MODULE.defineClassUnder(
354358
"ConfigurationError", stdErr, JRubyLogstashErrorsExt.ConfigurationError::new
355359
);
356360
LOGSTASH_MODULE.defineClassUnder(
@@ -403,6 +407,10 @@ public final class RubyUtil {
403407
RUBY_EVENT_CLASS.setConstant("VERSION_ONE", RUBY.newString(Event.VERSION_ONE));
404408
RUBY_EVENT_CLASS.defineAnnotatedMethods(JrubyEventExtLibrary.RubyEvent.class);
405409
RUBY_EVENT_CLASS.defineAnnotatedConstants(JrubyEventExtLibrary.RubyEvent.class);
410+
PLUGIN_FACTORY_CLASS = PLUGINS_MODULE.defineClassUnder(
411+
"PluginFactory", RUBY.getObject(), PluginFactoryExt.Plugins::new
412+
);
413+
PLUGIN_FACTORY_CLASS.defineAnnotatedMethods(PluginFactoryExt.Plugins.class);
406414
RUBY.getGlobalVariables().set("$LS_JARS_LOADED", RUBY.newString("true"));
407415
RubyJavaIntegration.setupRubyJavaIntegration(RUBY);
408416
}

logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,237 @@
11
package org.logstash.plugins;
22

33
import java.util.Arrays;
4+
import java.util.Collection;
5+
import java.util.HashMap;
6+
import java.util.HashSet;
7+
import java.util.Locale;
8+
import java.util.Map;
9+
import java.util.UUID;
410
import org.jruby.Ruby;
511
import org.jruby.RubyArray;
612
import org.jruby.RubyBasicObject;
713
import org.jruby.RubyClass;
14+
import org.jruby.RubyHash;
15+
import org.jruby.RubyInteger;
16+
import org.jruby.RubyString;
817
import org.jruby.RubySymbol;
918
import org.jruby.anno.JRubyClass;
1019
import org.jruby.anno.JRubyMethod;
1120
import org.jruby.runtime.ThreadContext;
1221
import org.jruby.runtime.builtin.IRubyObject;
1322
import org.logstash.RubyUtil;
23+
import org.logstash.config.ir.PipelineIR;
24+
import org.logstash.config.ir.compiler.FilterDelegatorExt;
25+
import org.logstash.config.ir.compiler.OutputDelegatorExt;
26+
import org.logstash.config.ir.compiler.OutputStrategyExt;
27+
import org.logstash.config.ir.compiler.RubyIntegration;
28+
import org.logstash.config.ir.graph.Vertex;
1429
import org.logstash.execution.ExecutionContextExt;
1530
import org.logstash.instrument.metrics.AbstractMetricExt;
1631
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
1732
import org.logstash.instrument.metrics.NullMetricExt;
1833

1934
public final class PluginFactoryExt {
2035

36+
@JRubyClass(name = "PluginFactory")
37+
public static final class Plugins extends RubyBasicObject
38+
implements RubyIntegration.PluginFactory {
39+
40+
private static final RubyString ID_KEY = RubyUtil.RUBY.newString("id");
41+
42+
private static final RubySymbol NAME_KEY = RubyUtil.RUBY.newSymbol("name");
43+
44+
private final Collection<String> pluginsById = new HashSet<>();
45+
46+
private PipelineIR lir;
47+
48+
private PluginFactoryExt.ExecutionContext executionContext;
49+
50+
private PluginFactoryExt.Metrics metrics;
51+
52+
private RubyClass filterClass;
53+
54+
@JRubyMethod(name = "filter_delegator", meta = true, required = 5)
55+
public static IRubyObject filterDelegator(final ThreadContext context,
56+
final IRubyObject recv, final IRubyObject[] args) {
57+
final RubyHash arguments = (RubyHash) args[2];
58+
final IRubyObject filterInstance = args[1].callMethod(context, "new", arguments);
59+
final RubyString id = (RubyString) arguments.op_aref(context, ID_KEY);
60+
filterInstance.callMethod(
61+
context, "metric=",
62+
args[3].callMethod(context, "namespace", id.intern19())
63+
);
64+
filterInstance.callMethod(context, "execution_context=", args[4]);
65+
return args[0].callMethod(context, "new", new IRubyObject[]{filterInstance, id});
66+
}
67+
68+
public Plugins(final Ruby runtime, final RubyClass metaClass) {
69+
super(runtime, metaClass);
70+
}
71+
72+
@JRubyMethod(required = 4)
73+
public Plugins initialize(final ThreadContext context, final IRubyObject[] args) {
74+
lir = (PipelineIR) args[0].toJava(PipelineIR.class);
75+
metrics = (PluginFactoryExt.Metrics) args[1];
76+
executionContext = (PluginFactoryExt.ExecutionContext) args[2];
77+
filterClass = (RubyClass) args[3];
78+
return this;
79+
}
80+
81+
@SuppressWarnings("unchecked")
82+
@Override
83+
public IRubyObject buildInput(final RubyString name, final RubyInteger line,
84+
final RubyInteger column, final IRubyObject args) {
85+
return plugin(
86+
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.INPUT,
87+
name.asJavaString(), line.getIntValue(), column.getIntValue(),
88+
(Map<String, IRubyObject>) args
89+
);
90+
}
91+
92+
@JRubyMethod(required = 4)
93+
public IRubyObject buildInput(final ThreadContext context, final IRubyObject[] args) {
94+
return buildInput(
95+
(RubyString) args[0], args[1].convertToInteger(), args[2].convertToInteger(),
96+
args[3]
97+
);
98+
}
99+
100+
@SuppressWarnings("unchecked")
101+
@Override
102+
public OutputDelegatorExt buildOutput(final RubyString name, final RubyInteger line,
103+
final RubyInteger column, final IRubyObject args) {
104+
return (OutputDelegatorExt) plugin(
105+
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.OUTPUT,
106+
name.asJavaString(), line.getIntValue(), column.getIntValue(),
107+
(Map<String, IRubyObject>) args
108+
);
109+
}
110+
111+
@JRubyMethod(required = 4)
112+
public OutputDelegatorExt buildOutput(final ThreadContext context,
113+
final IRubyObject[] args) {
114+
return buildOutput(
115+
(RubyString) args[0], args[1].convertToInteger(), args[2].convertToInteger(), args[3]
116+
);
117+
}
118+
119+
@SuppressWarnings("unchecked")
120+
@Override
121+
public FilterDelegatorExt buildFilter(final RubyString name, final RubyInteger line,
122+
final RubyInteger column, final IRubyObject args) {
123+
return (FilterDelegatorExt) plugin(
124+
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.FILTER,
125+
name.asJavaString(), line.getIntValue(), column.getIntValue(),
126+
(Map<String, IRubyObject>) args
127+
);
128+
}
129+
130+
@JRubyMethod(required = 4)
131+
public IRubyObject buildFilter(final ThreadContext context, final IRubyObject[] args) {
132+
return buildFilter(
133+
(RubyString) args[0], args[1].convertToInteger(), args[2].convertToInteger(),
134+
args[3]
135+
);
136+
}
137+
138+
@SuppressWarnings("unchecked")
139+
@Override
140+
public IRubyObject buildCodec(final RubyString name, final IRubyObject args) {
141+
return plugin(
142+
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.CODEC,
143+
name.asJavaString(), 0, 0, (Map<String, IRubyObject>) args
144+
);
145+
}
146+
147+
@JRubyMethod(required = 4)
148+
public IRubyObject buildCodec(final ThreadContext context, final IRubyObject[] args) {
149+
return buildCodec((RubyString) args[0], args[1]);
150+
}
151+
152+
@SuppressWarnings("unchecked")
153+
@JRubyMethod(required = 4, optional = 1)
154+
public IRubyObject plugin(final ThreadContext context, final IRubyObject[] args) {
155+
return plugin(
156+
context,
157+
PluginLookup.PluginType.valueOf(args[0].asJavaString().toUpperCase(Locale.ENGLISH)),
158+
args[1].asJavaString(),
159+
args[2].convertToInteger().getIntValue(),
160+
args[3].convertToInteger().getIntValue(),
161+
args.length > 4 ? (Map<String, IRubyObject>) args[4] : new HashMap<>()
162+
);
163+
}
164+
165+
private IRubyObject plugin(final ThreadContext context,
166+
final PluginLookup.PluginType type, final String name, final int line, final int column,
167+
final Map<String, IRubyObject> args) {
168+
final String id;
169+
if (type == PluginLookup.PluginType.CODEC) {
170+
id = UUID.randomUUID().toString();
171+
} else {
172+
id = lir.getGraph().vertices().filter(
173+
v -> v.getSourceWithMetadata() != null
174+
&& v.getSourceWithMetadata().getLine() == line
175+
&& v.getSourceWithMetadata().getColumn() == column
176+
).findFirst().map(Vertex::getId).orElse(null);
177+
}
178+
if (id == null) {
179+
throw context.runtime.newRaiseException(
180+
RubyUtil.CONFIGURATION_ERROR_CLASS,
181+
String.format(
182+
"Could not determine ID for %s/%s", type.rubyLabel().asJavaString(), name
183+
)
184+
);
185+
}
186+
if (pluginsById.contains(id)) {
187+
throw context.runtime.newRaiseException(
188+
RubyUtil.CONFIGURATION_ERROR_CLASS,
189+
String.format("Two plugins have the id '%s', please fix this conflict", id)
190+
);
191+
}
192+
pluginsById.add(id);
193+
final AbstractNamespacedMetricExt typeScopedMetric =
194+
metrics.create(context, type.rubyLabel());
195+
final PluginLookup.PluginClass pluginClass = PluginLookup.lookup(type, name);
196+
if (pluginClass.language() == PluginLookup.PluginLanguage.RUBY) {
197+
final Map<String, Object> newArgs = new HashMap<>(args);
198+
newArgs.put("id", id);
199+
final RubyClass klass = (RubyClass) pluginClass.klass();
200+
final ExecutionContextExt executionCntx = executionContext.create(
201+
context, RubyUtil.RUBY.newString(id), klass.callMethod(context, "config_name")
202+
);
203+
final RubyHash rubyArgs = RubyHash.newHash(context.runtime);
204+
rubyArgs.putAll(newArgs);
205+
if (type == PluginLookup.PluginType.OUTPUT) {
206+
return new OutputDelegatorExt(context.runtime, RubyUtil.OUTPUT_DELEGATOR_CLASS).init(
207+
context,
208+
new IRubyObject[]{
209+
klass, typeScopedMetric, executionCntx,
210+
OutputStrategyExt.OutputStrategyRegistryExt.instance(context, null),
211+
rubyArgs
212+
}
213+
);
214+
} else if (type == PluginLookup.PluginType.FILTER) {
215+
return filterDelegator(
216+
context, null,
217+
new IRubyObject[]{
218+
filterClass, klass, rubyArgs, typeScopedMetric, executionCntx
219+
}
220+
);
221+
} else {
222+
final IRubyObject pluginInstance = klass.callMethod(context, "new", rubyArgs);
223+
final AbstractNamespacedMetricExt scopedMetric = typeScopedMetric.namespace(context, RubyUtil.RUBY.newSymbol(id));
224+
scopedMetric.gauge(context, NAME_KEY, pluginInstance.callMethod(context, "config_name"));
225+
pluginInstance.callMethod(context, "metric=", scopedMetric);
226+
pluginInstance.callMethod(context, "execution_context=", executionCntx);
227+
return pluginInstance;
228+
}
229+
} else {
230+
return context.nil;
231+
}
232+
}
233+
}
234+
21235
@JRubyClass(name = "ExecutionContextFactory")
22236
public static final class ExecutionContext extends RubyBasicObject {
23237

0 commit comments

Comments
 (0)