Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion logstash-core/lib/logstash/config/mixin.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# encoding: utf-8
require "logstash/plugins/registry"
require "logstash/util/password"
require "logstash/util/safe_uri"
require "logstash/version"
Expand Down
1 change: 0 additions & 1 deletion logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
require "logstash/instrument/collector"
require "logstash/filter_delegator"
require "logstash/queue_factory"
require "logstash/plugins/plugin_factory"
require "logstash/compiler"
require "securerandom"

Expand Down
80 changes: 0 additions & 80 deletions logstash-core/lib/logstash/plugins/plugin_factory.rb

This file was deleted.

3 changes: 3 additions & 0 deletions logstash-core/lib/logstash/plugins/registry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
require "logstash/plugin"
require "logstash/plugins/hooks_registry"
require "logstash/modules/scaffold"
require "logstash/codecs/base"
require "logstash/filters/base"
require "logstash/outputs/base"

module LogStash module Plugins
class Registry
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/util/modules_setting_array.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# encoding: utf-8
require "forwardable"
require "logstash/util/password"

module LogStash module Util class ModulesSettingArray
Expand Down
10 changes: 9 additions & 1 deletion logstash-core/src/main/java/org/logstash/RubyUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ public final class RubyUtil {

public static final RubyClass PLUGIN_METRIC_FACTORY_CLASS;

public static final RubyClass PLUGIN_FACTORY_CLASS;

public static final RubyClass LOGGER;

public static final RubyModule LOGGABLE_MODULE;
Expand All @@ -146,6 +148,8 @@ public final class RubyUtil {

public static final RubyModule UTIL_MODULE;

public static final RubyClass CONFIGURATION_ERROR_CLASS;

/**
* Logstash Ruby Module.
*/
Expand Down Expand Up @@ -350,7 +354,7 @@ public final class RubyUtil {
LOGSTASH_MODULE.defineClassUnder(
"EnvironmentError", stdErr, JRubyLogstashErrorsExt.LogstashEnvironmentError::new
);
LOGSTASH_MODULE.defineClassUnder(
CONFIGURATION_ERROR_CLASS = LOGSTASH_MODULE.defineClassUnder(
"ConfigurationError", stdErr, JRubyLogstashErrorsExt.ConfigurationError::new
);
LOGSTASH_MODULE.defineClassUnder(
Expand Down Expand Up @@ -403,6 +407,10 @@ public final class RubyUtil {
RUBY_EVENT_CLASS.setConstant("VERSION_ONE", RUBY.newString(Event.VERSION_ONE));
RUBY_EVENT_CLASS.defineAnnotatedMethods(JrubyEventExtLibrary.RubyEvent.class);
RUBY_EVENT_CLASS.defineAnnotatedConstants(JrubyEventExtLibrary.RubyEvent.class);
PLUGIN_FACTORY_CLASS = PLUGINS_MODULE.defineClassUnder(
"PluginFactory", RUBY.getObject(), PluginFactoryExt.Plugins::new
);
PLUGIN_FACTORY_CLASS.defineAnnotatedMethods(PluginFactoryExt.Plugins.class);
RUBY.getGlobalVariables().set("$LS_JARS_LOADED", RUBY.newString("true"));
RubyJavaIntegration.setupRubyJavaIntegration(RUBY);
}
Expand Down
214 changes: 214 additions & 0 deletions logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java
Original file line number Diff line number Diff line change
@@ -1,23 +1,237 @@
package org.logstash.plugins;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyBasicObject;
import org.jruby.RubyClass;
import org.jruby.RubyHash;
import org.jruby.RubyInteger;
import org.jruby.RubyString;
import org.jruby.RubySymbol;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.config.ir.PipelineIR;
import org.logstash.config.ir.compiler.FilterDelegatorExt;
import org.logstash.config.ir.compiler.OutputDelegatorExt;
import org.logstash.config.ir.compiler.OutputStrategyExt;
import org.logstash.config.ir.compiler.RubyIntegration;
import org.logstash.config.ir.graph.Vertex;
import org.logstash.execution.ExecutionContextExt;
import org.logstash.instrument.metrics.AbstractMetricExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.NullMetricExt;

public final class PluginFactoryExt {

@JRubyClass(name = "PluginFactory")
public static final class Plugins extends RubyBasicObject
implements RubyIntegration.PluginFactory {

private static final RubyString ID_KEY = RubyUtil.RUBY.newString("id");

private static final RubySymbol NAME_KEY = RubyUtil.RUBY.newSymbol("name");

private final Collection<String> pluginsById = new HashSet<>();

private PipelineIR lir;

private PluginFactoryExt.ExecutionContext executionContext;

private PluginFactoryExt.Metrics metrics;

private RubyClass filterClass;

@JRubyMethod(name = "filter_delegator", meta = true, required = 5)
public static IRubyObject filterDelegator(final ThreadContext context,
final IRubyObject recv, final IRubyObject[] args) {
final RubyHash arguments = (RubyHash) args[2];
final IRubyObject filterInstance = args[1].callMethod(context, "new", arguments);
final RubyString id = (RubyString) arguments.op_aref(context, ID_KEY);
filterInstance.callMethod(
context, "metric=",
args[3].callMethod(context, "namespace", id.intern19())
);
filterInstance.callMethod(context, "execution_context=", args[4]);
return args[0].callMethod(context, "new", new IRubyObject[]{filterInstance, id});
}

public Plugins(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}

@JRubyMethod(required = 4)
public Plugins initialize(final ThreadContext context, final IRubyObject[] args) {
lir = (PipelineIR) args[0].toJava(PipelineIR.class);
metrics = (PluginFactoryExt.Metrics) args[1];
executionContext = (PluginFactoryExt.ExecutionContext) args[2];
filterClass = (RubyClass) args[3];
return this;
}

@SuppressWarnings("unchecked")
@Override
public IRubyObject buildInput(final RubyString name, final RubyInteger line,
final RubyInteger column, final IRubyObject args) {
return plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.INPUT,
name.asJavaString(), line.getIntValue(), column.getIntValue(),
(Map<String, IRubyObject>) args
);
}

@JRubyMethod(required = 4)
public IRubyObject buildInput(final ThreadContext context, final IRubyObject[] args) {
return buildInput(
(RubyString) args[0], args[1].convertToInteger(), args[2].convertToInteger(),
args[3]
);
}

@SuppressWarnings("unchecked")
@Override
public OutputDelegatorExt buildOutput(final RubyString name, final RubyInteger line,
final RubyInteger column, final IRubyObject args) {
return (OutputDelegatorExt) plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.OUTPUT,
name.asJavaString(), line.getIntValue(), column.getIntValue(),
(Map<String, IRubyObject>) args
);
}

@JRubyMethod(required = 4)
public OutputDelegatorExt buildOutput(final ThreadContext context,
final IRubyObject[] args) {
return buildOutput(
(RubyString) args[0], args[1].convertToInteger(), args[2].convertToInteger(), args[3]
);
}

@SuppressWarnings("unchecked")
@Override
public FilterDelegatorExt buildFilter(final RubyString name, final RubyInteger line,
final RubyInteger column, final IRubyObject args) {
return (FilterDelegatorExt) plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.FILTER,
name.asJavaString(), line.getIntValue(), column.getIntValue(),
(Map<String, IRubyObject>) args
);
}

@JRubyMethod(required = 4)
public IRubyObject buildFilter(final ThreadContext context, final IRubyObject[] args) {
return buildFilter(
(RubyString) args[0], args[1].convertToInteger(), args[2].convertToInteger(),
args[3]
);
}

@SuppressWarnings("unchecked")
@Override
public IRubyObject buildCodec(final RubyString name, final IRubyObject args) {
return plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.CODEC,
name.asJavaString(), 0, 0, (Map<String, IRubyObject>) args
);
}

@JRubyMethod(required = 4)
public IRubyObject buildCodec(final ThreadContext context, final IRubyObject[] args) {
return buildCodec((RubyString) args[0], args[1]);
}

@SuppressWarnings("unchecked")
@JRubyMethod(required = 4, optional = 1)
public IRubyObject plugin(final ThreadContext context, final IRubyObject[] args) {
return plugin(
context,
PluginLookup.PluginType.valueOf(args[0].asJavaString().toUpperCase(Locale.ENGLISH)),
args[1].asJavaString(),
args[2].convertToInteger().getIntValue(),
args[3].convertToInteger().getIntValue(),
args.length > 4 ? (Map<String, IRubyObject>) args[4] : new HashMap<>()
);
}

private IRubyObject plugin(final ThreadContext context,
final PluginLookup.PluginType type, final String name, final int line, final int column,
final Map<String, IRubyObject> args) {
final String id;
if (type == PluginLookup.PluginType.CODEC) {
id = UUID.randomUUID().toString();
} else {
id = lir.getGraph().vertices().filter(
v -> v.getSourceWithMetadata() != null
&& v.getSourceWithMetadata().getLine() == line
&& v.getSourceWithMetadata().getColumn() == column
).findFirst().map(Vertex::getId).orElse(null);
}
if (id == null) {
throw context.runtime.newRaiseException(
RubyUtil.CONFIGURATION_ERROR_CLASS,
String.format(
"Could not determine ID for %s/%s", type.rubyLabel().asJavaString(), name
)
);
}
if (pluginsById.contains(id)) {
throw context.runtime.newRaiseException(
RubyUtil.CONFIGURATION_ERROR_CLASS,
String.format("Two plugins have the id '%s', please fix this conflict", id)
);
}
pluginsById.add(id);
final AbstractNamespacedMetricExt typeScopedMetric =
metrics.create(context, type.rubyLabel());
final PluginLookup.PluginClass pluginClass = PluginLookup.lookup(type, name);
if (pluginClass.language() == PluginLookup.PluginLanguage.RUBY) {
final Map<String, Object> newArgs = new HashMap<>(args);
newArgs.put("id", id);
final RubyClass klass = (RubyClass) pluginClass.klass();
final ExecutionContextExt executionCntx = executionContext.create(
context, RubyUtil.RUBY.newString(id), klass.callMethod(context, "config_name")
);
final RubyHash rubyArgs = RubyHash.newHash(context.runtime);
rubyArgs.putAll(newArgs);
if (type == PluginLookup.PluginType.OUTPUT) {
return new OutputDelegatorExt(context.runtime, RubyUtil.OUTPUT_DELEGATOR_CLASS).init(
context,
new IRubyObject[]{
klass, typeScopedMetric, executionCntx,
OutputStrategyExt.OutputStrategyRegistryExt.instance(context, null),
rubyArgs
}
);
} else if (type == PluginLookup.PluginType.FILTER) {
return filterDelegator(
context, null,
new IRubyObject[]{
filterClass, klass, rubyArgs, typeScopedMetric, executionCntx
}
);
} else {
final IRubyObject pluginInstance = klass.callMethod(context, "new", rubyArgs);
final AbstractNamespacedMetricExt scopedMetric = typeScopedMetric.namespace(context, RubyUtil.RUBY.newSymbol(id));
scopedMetric.gauge(context, NAME_KEY, pluginInstance.callMethod(context, "config_name"));
pluginInstance.callMethod(context, "metric=", scopedMetric);
pluginInstance.callMethod(context, "execution_context=", executionCntx);
return pluginInstance;
}
} else {
return context.nil;
}
}
}

@JRubyClass(name = "ExecutionContextFactory")
public static final class ExecutionContext extends RubyBasicObject {

Expand Down
Loading