Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 5 additions & 17 deletions docs/learn/documentation/versioned/jobs/configuration-table.html
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,11 @@ <h1>Samza Configuration Reference</h1>
</tr>
<tr>
<td class="property" id="job.coordinator.factory">job.coordinator.factory</td>
<td class="default"></td>
<td class="default">org.apache.samza.zk.ZkJobCoordinatorFactory</td>
<td class="description">
Class to use for job coordination. Currently available values are:
The fully-qualified name of the Java class which determines the factory class which will build the JobCoordinator.
The user can specify a custom implementation of the JobCoordinatorFactory where a custom logic is implemented for distributed coordination of stream processors. <br>
Samza supports the following coordination modes out of the box.
<dl>
<dt><code>org.apache.samza.standalone.PassthroughJobCoordinatorFactory</code></dt>
<dd>Fixed partition mapping. No Zoookeeper. </dd>
Expand All @@ -461,20 +463,6 @@ <h1>Samza Configuration Reference</h1>
Required only for non-cluster-managed applications. Please see the required value for <a href=#task-name-grouper-factory>task-name-grouper-factory </a>
</td>
</tr>
<tr>
<td class="property" id="job.coordination.utils.factory">job.coordination.utils.factory</td>
<td class="default">org.apache.samza.zk.ZkCoordinationUtilsFactory</td>
<td class="description">
Class to use to create CoordinationUtils. Currently available values are:
<dl>
<dt><code>org.apache.samza.zk.ZkCoordinationUtilsFactory</code></dt>
<dd>ZooKeeper based coordination utils.</dd>
<dt><code>org.apache.samza.coordinator.AzureCoordinationUtilsFactory</code></dt>
<dd>Azure based coordination utils.</dd>
These coordination utils are currently used for intermediate stream creation.
</dl>
</td>
</tr>

<tr>
<td class="property" id="job.logged.store.base.dir">job.logged.store.base.dir</td>
Expand Down Expand Up @@ -539,7 +527,7 @@ <h1>Samza Configuration Reference</h1>
</tr>
<tr>
<td class="property" id="job.debounce.time.ms">job.debounce.time.ms</td>
<td class="default"> 2000 </td>
<td class="default"> 20000 </td>
<td class="description">
How long the Leader processor will wait before recalculating the JobModel on change of registered processors.
</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,44 @@
import com.google.common.base.Strings;
import org.apache.samza.SamzaException;
import org.apache.samza.coordinator.CoordinationUtilsFactory;
import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.util.Util;
import org.apache.samza.zk.ZkCoordinationUtilsFactory;
import org.apache.samza.zk.ZkJobCoordinatorFactory;

public class JobCoordinatorConfig extends MapConfig {
public static final String JOB_COORDINATOR_FACTORY = "job.coordinator.factory";
public static final String JOB_COORDINATION_UTILS_FACTORY = "job.coordination.utils.factory";
public final static String DEFAULT_COORDINATION_UTILS_FACTORY = ZkCoordinationUtilsFactory.class.getName();
public final static String DEFAULT_COORDINATOR_FACTORY = ZkJobCoordinatorFactory.class.getName();
private static final String AZURE_COORDINATION_UTILS_FACTORY = "org.apache.samza.coordinator.AzureCoordinationUtilsFactory";
private static final String AZURE_COORDINATOR_FACTORY = "org.apache.samza.coordinator.AzureJobCoordinatorFactory";

public JobCoordinatorConfig(Config config) {
super(config);
}

public String getJobCoordinationUtilsFactoryClassName() {
String className = get(JOB_COORDINATION_UTILS_FACTORY, DEFAULT_COORDINATION_UTILS_FACTORY);
String coordinatorFactory = get(JOB_COORDINATOR_FACTORY, DEFAULT_COORDINATOR_FACTORY);

if (Strings.isNullOrEmpty(className)) {
throw new SamzaException("Empty config for " + JOB_COORDINATION_UTILS_FACTORY + " = " + className);
String coordinationUtilsFactory;
if (AZURE_COORDINATOR_FACTORY.equals(coordinatorFactory)) {
coordinationUtilsFactory = AZURE_COORDINATION_UTILS_FACTORY;
} else if (PassthroughJobCoordinatorFactory.class.getName().equals(coordinatorFactory)) {
coordinationUtilsFactory = PassthroughCoordinationUtilsFactory.class.getName();
} else if (ZkJobCoordinatorFactory.class.getName().equals(coordinatorFactory)) {
coordinationUtilsFactory = ZkCoordinationUtilsFactory.class.getName();
} else {
throw new SamzaException(String.format("Coordination factory: %s defined by the config: %s is invalid.", coordinatorFactory, JOB_COORDINATOR_FACTORY));
}

try {
Class.forName(className);
Class.forName(coordinationUtilsFactory);
} catch (ClassNotFoundException e) {
throw new SamzaException(
"Failed to validate config value for " + JOB_COORDINATION_UTILS_FACTORY + " = " + className, e);
"Failed to validate config value for " + JOB_COORDINATOR_FACTORY + " = " + coordinationUtilsFactory, e);
}

return className;
return coordinationUtilsFactory;
}

public CoordinationUtilsFactory getCoordinationUtilsFactory() {
Expand All @@ -61,10 +72,9 @@ public CoordinationUtilsFactory getCoordinationUtilsFactory() {
public String getJobCoordinatorFactoryClassName() {
String jobCoordinatorFactoryClassName = get(JOB_COORDINATOR_FACTORY);
if (Strings.isNullOrEmpty(jobCoordinatorFactoryClassName)) {
throw new ConfigException(
String.format("Missing config - %s. Cannot start StreamProcessor!", JOB_COORDINATOR_FACTORY));
return ZkJobCoordinatorFactory.class.getName();
} else {
return jobCoordinatorFactoryClassName;
}

return jobCoordinatorFactoryClassName;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, St
staticConfigs.put(JobConfig.JOB_NAME(), "sql-job");
staticConfigs.put(JobConfig.PROCESSOR_ID(), "1");
staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
staticConfigs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());

staticConfigs.put(SamzaSqlApplicationConfig.CFG_IO_RESOLVER, "config");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.operators.KV;
import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.system.EndOfStreamMessage;
import org.apache.samza.system.IncomingMessageEnvelope;
Expand All @@ -71,7 +70,6 @@
*
* The following configs are set by default
* <ol>
* <li>"job.coordination.utils.factory" = {@link PassthroughCoordinationUtilsFactory}</li>
* <li>"job.coordination.factory" = {@link PassthroughJobCoordinatorFactory}</li>
* <li>"task.name.grouper.factory" = {@link SingleContainerGrouperFactory}</li>
* <li>"job.name" = "test-samza"</li>
Expand All @@ -98,7 +96,6 @@ private TestRunner() {
this.inMemoryScope = RandomStringUtils.random(10, true, true);
configs.put(JobConfig.JOB_NAME(), JOB_NAME);
configs.put(JobConfig.PROCESSOR_ID(), "1");
configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
addConfig(JobConfig.JOB_DEFAULT_SYSTEM(), JOB_DEFAULT_SYSTEM);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.samza.runtime.ApplicationRunners;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.test.controlmessages.TestData.PageView;
import org.apache.samza.test.controlmessages.TestData.PageViewJsonSerdeFactory;
Expand Down Expand Up @@ -81,7 +80,6 @@ public void testPipeline() throws Exception {

configs.put(JobConfig.JOB_NAME(), "test-eos-job");
configs.put(JobConfig.PROCESSOR_ID(), "1");
configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.StringSerdeFactory;
import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemAdmin;
Expand Down Expand Up @@ -133,7 +132,6 @@ public void testWatermark() throws Exception {

configs.put(JobConfig.JOB_NAME(), "test-watermark-job");
configs.put(JobConfig.PROCESSOR_ID(), "1");
configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public void testJob() throws InterruptedException {
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
configs.put("job.systemstreampartition.grouper.factory", "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory");
configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.SingleContainerGrouperFactory");
configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
configs.put(JobConfig.PROCESSOR_ID(), "0");

runApplication(new TestSchedulingApp(), "SchedulingTest", configs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public void testRepartitionJoinWindowAppWithoutDeletionOnCommit() throws Excepti
String appName = "UserPageAdClickCounter";
Map<String, String> configs = new HashMap<>();
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
configs.put(JobConfig.PROCESSOR_ID(), "0");
configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
configs.put("systems.kafka.samza.delete.committed.messages", "false");
Expand Down Expand Up @@ -112,7 +111,6 @@ public void testRepartitionJoinWindowAppAndDeleteMessagesOnCommit() throws Excep
final String appName = "UserPageAdClickCounter2";
Map<String, String> configs = new HashMap<>();
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
configs.put(JobConfig.PROCESSOR_ID(), "0");
configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
configs.put("systems.kafka.samza.delete.committed.messages", "true");
Expand Down Expand Up @@ -160,7 +158,6 @@ public void testBroadcastApp() {
String outputTopicName = "user-ad-click-counts";
Map<String, String> configs = new HashMap<>();
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
configs.put(JobConfig.PROCESSOR_ID(), "0");
configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
configs.put(BroadcastAssertApp.INPUT_TOPIC_NAME_PROP, inputTopicName1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public void testRepartitionedSessionWindowCounter() throws Exception {

Map<String, String> configs = new HashMap<>();
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
configs.put(JobConfig.PROCESSOR_ID(), "0");
configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueStore;
Expand Down Expand Up @@ -308,7 +307,6 @@ static Map<String, String> getBaseJobConfig(String bootstrapUrl, String zkConnec

configs.put(JobConfig.JOB_NAME(), "test-table-job");
configs.put(JobConfig.PROCESSOR_ID(), "1");
configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());

Expand Down