Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,51 @@
* upon by the "Elastic Integration Filter Plugin" for Logstash and their external shapes must not change
* without coordination with the maintainers of that project.
*
* @param <T> the actual type of the Elasticsearch API being mirrored
* @param <INTERNAL> the actual type of the Elasticsearch API being mirrored
*/
public interface StableBridgeAPI<T> {
T unwrap();
public interface StableBridgeAPI<INTERNAL> {
INTERNAL toInternal();

static <T> T unwrapNullable(final StableBridgeAPI<T> nullableStableBridgeAPI) {
static <T> T toInternalNullable(final StableBridgeAPI<T> nullableStableBridgeAPI) {
if (Objects.isNull(nullableStableBridgeAPI)) {
return null;
}
return nullableStableBridgeAPI.unwrap();
return nullableStableBridgeAPI.toInternal();
}

static <K, T> Map<K, T> unwrap(final Map<K, ? extends StableBridgeAPI<T>> bridgeMap) {
return bridgeMap.entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> e.getValue().unwrap()));
static <K, T> Map<K, T> toInternal(final Map<K, ? extends StableBridgeAPI<T>> bridgeMap) {
return bridgeMap.entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> e.getValue().toInternal()));
}

static <K, T, B extends StableBridgeAPI<T>> Map<K, B> wrap(final Map<K, T> rawMap, final Function<T, B> wrapFunction) {
return rawMap.entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> wrapFunction.apply(e.getValue())));
static <K, T, B extends StableBridgeAPI<T>> Map<K, B> fromInternal(final Map<K, T> rawMap, final Function<T, B> externalizor) {
return rawMap.entrySet()
.stream()
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> externalizor.apply(e.getValue())));
}

static <T, B extends StableBridgeAPI<T>> B wrap(final T delegate, final Function<T, B> wrapFunction) {
static <T, B extends StableBridgeAPI<T>> B fromInternal(final T delegate, final Function<T, B> externalizor) {
if (Objects.isNull(delegate)) {
return null;
}
return wrapFunction.apply(delegate);
return externalizor.apply(delegate);
}

abstract class Proxy<T> implements StableBridgeAPI<T> {
protected final T delegate;
/**
* An {@code ProxyInternal<INTERNAL>} is an implementation of {@code StableBridgeAPI<INTERNAL>} that
* proxies calls to a delegate that is an actual {@code INTERNAL}.
*
* @param <INTERNAL>
*/
abstract class ProxyInternal<INTERNAL> implements StableBridgeAPI<INTERNAL> {
protected final INTERNAL internalDelegate;

protected Proxy(final T delegate) {
this.delegate = delegate;
protected ProxyInternal(final INTERNAL internalDelegate) {
this.internalDelegate = internalDelegate;
}

@Override
public T unwrap() {
return delegate;
public INTERNAL toInternal() {
return internalDelegate;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,33 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.logstashbridge.StableBridgeAPI;

public class SettingsBridge extends StableBridgeAPI.Proxy<Settings> {
/**
* An external bridge for {@link Settings}
*/
public class SettingsBridge extends StableBridgeAPI.ProxyInternal<Settings> {

public static SettingsBridge wrap(final Settings delegate) {
public static SettingsBridge fromInternal(final Settings delegate) {
return new SettingsBridge(delegate);
}

public static Builder builder() {
return Builder.wrap(Settings.builder());
return Builder.fromInternal(Settings.builder());
}

public SettingsBridge(final Settings delegate) {
super(delegate);
}

@Override
public Settings unwrap() {
return this.delegate;
public Settings toInternal() {
return this.internalDelegate;
}

public static class Builder extends StableBridgeAPI.Proxy<Settings.Builder> {
static Builder wrap(final Settings.Builder delegate) {
/**
* An external bridge for {@link Settings.Builder} that proxies calls to a real {@link Settings.Builder}
*/
public static class Builder extends StableBridgeAPI.ProxyInternal<Settings.Builder> {
static Builder fromInternal(final Settings.Builder delegate) {
return new Builder(delegate);
}

Expand All @@ -40,12 +46,12 @@ private Builder(final Settings.Builder delegate) {
}

public Builder put(final String key, final String value) {
this.delegate.put(key, value);
this.internalDelegate.put(key, value);
return this;
}

public SettingsBridge build() {
return new SettingsBridge(this.delegate.build());
return new SettingsBridge(this.internalDelegate.build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

import java.io.Closeable;

/**
* An external bridge for {@link IOUtils}
*/
public class IOUtilsBridge {
public static void closeWhileHandlingException(final Iterable<? extends Closeable> objects) {
IOUtils.closeWhileHandlingException(objects);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,24 @@

import java.nio.file.Path;

public class EnvironmentBridge extends StableBridgeAPI.Proxy<Environment> {
public static EnvironmentBridge wrap(final Environment delegate) {
/**
* An external bridge for {@link Environment}
*/
public class EnvironmentBridge extends StableBridgeAPI.ProxyInternal<Environment> {
public static EnvironmentBridge fromInternal(final Environment delegate) {
return new EnvironmentBridge(delegate);
}

public EnvironmentBridge(final SettingsBridge settingsBridge, final Path configPath) {
this(new Environment(settingsBridge.unwrap(), configPath));
this(new Environment(settingsBridge.toInternal(), configPath));
}

private EnvironmentBridge(final Environment delegate) {
super(delegate);
}

@Override
public Environment unwrap() {
return this.delegate;
public Environment toInternal() {
return this.internalDelegate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

import java.util.Map;

/**
* An external bridge for {@link ConfigurationUtils}
*/
public class ConfigurationUtilsBridge {
public static TemplateScriptBridge.Factory compileTemplate(
final String processorType,
Expand All @@ -23,7 +26,7 @@ public static TemplateScriptBridge.Factory compileTemplate(
final ScriptServiceBridge scriptServiceBridge
) {
return new TemplateScriptBridge.Factory(
ConfigurationUtils.compileTemplate(processorType, processorTag, propertyName, propertyValue, scriptServiceBridge.unwrap())
ConfigurationUtils.compileTemplate(processorType, processorTag, propertyName, propertyValue, scriptServiceBridge.toInternal())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
import java.util.Set;
import java.util.function.BiConsumer;

public class IngestDocumentBridge extends StableBridgeAPI.Proxy<IngestDocument> {
/**
* An external bridge for {@link IngestDocument} that proxies calls through a real {@link IngestDocument}
*/
public class IngestDocumentBridge extends StableBridgeAPI.ProxyInternal<IngestDocument> {

public static final class Constants {
public static final String METADATA_VERSION_FIELD_NAME = IngestDocument.Metadata.VERSION.getFieldName();

private Constants() {}
}

public static IngestDocumentBridge wrap(final IngestDocument ingestDocument) {
public static IngestDocumentBridge fromInternalNullable(final IngestDocument ingestDocument) {
if (ingestDocument == null) {
return null;
}
Expand All @@ -42,54 +45,57 @@ private IngestDocumentBridge(IngestDocument inner) {
}

public MetadataBridge getMetadata() {
return new MetadataBridge(delegate.getMetadata());
return new MetadataBridge(internalDelegate.getMetadata());
}

public Map<String, Object> getSource() {
return delegate.getSource();
return internalDelegate.getSource();
}

public boolean updateIndexHistory(final String index) {
return delegate.updateIndexHistory(index);
return internalDelegate.updateIndexHistory(index);
}

public Set<String> getIndexHistory() {
return Set.copyOf(delegate.getIndexHistory());
return Set.copyOf(internalDelegate.getIndexHistory());
}

public boolean isReroute() {
return LogstashInternalBridge.isReroute(delegate);
return LogstashInternalBridge.isReroute(internalDelegate);
}

public void resetReroute() {
LogstashInternalBridge.resetReroute(delegate);
LogstashInternalBridge.resetReroute(internalDelegate);
}

public Map<String, Object> getIngestMetadata() {
return delegate.getIngestMetadata();
return internalDelegate.getIngestMetadata();
}

public <T> T getFieldValue(final String fieldName, final Class<T> type) {
return delegate.getFieldValue(fieldName, type);
return internalDelegate.getFieldValue(fieldName, type);
}

public <T> T getFieldValue(final String fieldName, final Class<T> type, final boolean ignoreMissing) {
return delegate.getFieldValue(fieldName, type, ignoreMissing);
return internalDelegate.getFieldValue(fieldName, type, ignoreMissing);
}

public String renderTemplate(final TemplateScriptBridge.Factory templateScriptFactory) {
return delegate.renderTemplate(templateScriptFactory.unwrap());
return internalDelegate.renderTemplate(templateScriptFactory.toInternal());
}

public void setFieldValue(final String path, final Object value) {
delegate.setFieldValue(path, value);
internalDelegate.setFieldValue(path, value);
}

public void removeField(final String path) {
delegate.removeField(path);
internalDelegate.removeField(path);
}

public void executePipeline(final PipelineBridge pipelineBridge, final BiConsumer<IngestDocumentBridge, Exception> handler) {
this.delegate.executePipeline(pipelineBridge.unwrap(), (unwrapped, e) -> handler.accept(IngestDocumentBridge.wrap(unwrapped), e));
this.internalDelegate.executePipeline(pipelineBridge.toInternal(),
(ingestDocument, e) -> {
handler.accept(IngestDocumentBridge.fromInternalNullable(ingestDocument), e);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
import java.util.Map;
import java.util.function.BiConsumer;

public class PipelineBridge extends StableBridgeAPI.Proxy<Pipeline> {
public static PipelineBridge wrap(final Pipeline pipeline) {
/**
* An external bridge for {@link Pipeline}
*/
public class PipelineBridge extends StableBridgeAPI.ProxyInternal<Pipeline> {
public static PipelineBridge fromInternal(final Pipeline pipeline) {
return new PipelineBridge(pipeline);
}

Expand All @@ -28,12 +31,12 @@ public static PipelineBridge create(
Map<String, ProcessorBridge.Factory> processorFactories,
ScriptServiceBridge scriptServiceBridge
) throws Exception {
return wrap(
return fromInternal(
Pipeline.create(
id,
config,
StableBridgeAPI.unwrap(processorFactories),
StableBridgeAPI.unwrapNullable(scriptServiceBridge),
StableBridgeAPI.toInternal(processorFactories),
StableBridgeAPI.toInternalNullable(scriptServiceBridge),
null
)
);
Expand All @@ -44,13 +47,13 @@ public PipelineBridge(final Pipeline delegate) {
}

public String getId() {
return delegate.getId();
return internalDelegate.getId();
}

public void execute(final IngestDocumentBridge ingestDocumentBridge, final BiConsumer<IngestDocumentBridge, Exception> handler) {
this.delegate.execute(
StableBridgeAPI.unwrapNullable(ingestDocumentBridge),
(unwrapped, e) -> handler.accept(IngestDocumentBridge.wrap(unwrapped), e)
this.internalDelegate.execute(
StableBridgeAPI.toInternalNullable(ingestDocumentBridge),
(ingestDocument, e) -> handler.accept(IngestDocumentBridge.fromInternalNullable(ingestDocument), e)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@

import java.util.Map;

public class PipelineConfigurationBridge extends StableBridgeAPI.Proxy<PipelineConfiguration> {
/**
* An external bridge for {@link PipelineConfiguration}
*/
public class PipelineConfigurationBridge extends StableBridgeAPI.ProxyInternal<PipelineConfiguration> {
public PipelineConfigurationBridge(final PipelineConfiguration delegate) {
super(delegate);
}
Expand All @@ -25,33 +28,33 @@ public PipelineConfigurationBridge(final String pipelineId, final String jsonEnc
}

public String getId() {
return delegate.getId();
return internalDelegate.getId();
}

public Map<String, Object> getConfig() {
return delegate.getConfig();
return internalDelegate.getConfig();
}

public Map<String, Object> getConfig(final boolean unmodifiable) {
return delegate.getConfig(unmodifiable);
return internalDelegate.getConfig(unmodifiable);
}

@Override
public int hashCode() {
return delegate.hashCode();
return internalDelegate.hashCode();
}

@Override
public String toString() {
return delegate.toString();
return internalDelegate.toString();
}

@Override
public boolean equals(final Object obj) {
if (this == obj) {
return true;
} else if (obj instanceof PipelineConfigurationBridge other) {
return delegate.equals(other.delegate);
return internalDelegate.equals(other.internalDelegate);
} else {
return false;
}
Expand Down
Loading