diff --git a/mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/source/JobSource.java b/mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/source/JobSource.java index 52054289a..bf1cdf824 100644 --- a/mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/source/JobSource.java +++ b/mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/source/JobSource.java @@ -41,6 +41,7 @@ import io.vavr.Tuple2; import java.io.IOException; import java.util.*; +import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; @@ -54,6 +55,7 @@ public class JobSource extends AbstractSourceJobSource implements Source targets; private final List jobs = new ArrayList<>(); + private final Map additionalUrlParams = new HashMap<>(); public JobSource(List targets) { this.targets = targets; @@ -68,6 +70,11 @@ public JobSource(String targetInfoStr) { this.targets = parseTargetInfo(targetInfoStr); } + public JobSource withUrlParams(Map params) { + this.additionalUrlParams.putAll(params); + return this; + } + @Override public List> getParameters() { @@ -86,6 +93,9 @@ public List> getParameters() { public Observable> call(Context context, Index index) { if (targets.isEmpty()) { targets = parseInputParameters(context); + for (TargetInfo targetInfo : targets) { + targetInfo.additionalParams.putAll(this.additionalUrlParams); + } } Observable> sourceObs = null; @@ -274,6 +284,7 @@ public static class TargetInfoBuilder { private boolean enableMetaMessages = false; private boolean enableCompressedBinary = false; private Map additionalParams = new HashMap<>(); + private Supplier> additionalParamsSupplier = () -> new HashMap<>(); public TargetInfoBuilder() { } @@ -319,6 +330,11 @@ public TargetInfoBuilder withAdditionalParams(Map additionalPara return this; } + public TargetInfoBuilder withAdditionalParams(Supplier> additionalParamsSupplier) { + this.additionalParamsSupplier = additionalParamsSupplier; + return this; + } + public TargetInfo build() { return new TargetInfo( sourceJobName,