From 10e343755da4c93bc562d2cf88aa515a2b43fb84 Mon Sep 17 00:00:00 2001 From: Shanthoosh Venkataraman Date: Fri, 28 Sep 2018 14:46:24 -0700 Subject: [PATCH 1/5] Configuration table fixes. * Fix the default value of debounce time configuration. * Remove the coordination utils configuration from the table(Infer that based upon job.coordinator.factory configuration). Remove the defintion of coordination utils factory in configuration from unit-tests. * Default job.coordinator.factory to ZkJobCoordinatorFactory if it is not defined by the user. --- .../versioned/jobs/configuration-table.html | 23 ++------ .../samza/config/JobCoordinatorConfig.java | 34 +++++++---- .../config/TestJobCoordinatorConfig.java | 58 ------------------- .../sql/testutil/SamzaSqlTestConfig.java | 1 - .../samza/test/framework/TestRunner.java | 3 - .../EndOfStreamIntegrationTest.java | 2 - .../WatermarkIntegrationTest.java | 2 - .../samza/test/framework/SchedulingTest.java | 1 - .../TestRepartitionJoinWindowApp.java | 3 - .../operator/TestRepartitionWindowApp.java | 1 - .../samza/test/table/TestLocalTable.java | 2 - 11 files changed, 28 insertions(+), 102 deletions(-) delete mode 100644 samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 26b466199a..852df41861 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -448,9 +448,12 @@

Samza Configuration Reference

job.coordinator.factory - + org.apache.samza.zk.ZkJobCoordinatorFactory - Class to use for job coordination. Currently available values are: + The fully-qualified name of the Java class which determines the factory class which will build the JobCoordinator. + The default configuration value if the property is not present is job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory.
+ The user can specify a custom implementation of the JobCoordinatorFactory where a custom logic is implemented for distributed coordination of stream processors.
+ Samza supports the following coordination modes out of the box.
org.apache.samza.standalone.PassthroughJobCoordinatorFactory
Fixed partition mapping. No Zoookeeper.
@@ -461,20 +464,6 @@

Samza Configuration Reference

Required only for non-cluster-managed applications. Please see the required value for task-name-grouper-factory - - job.coordination.utils.factory - org.apache.samza.zk.ZkCoordinationUtilsFactory - - Class to use to create CoordinationUtils. Currently available values are: -
-
org.apache.samza.zk.ZkCoordinationUtilsFactory
-
ZooKeeper based coordination utils.
-
org.apache.samza.coordinator.AzureCoordinationUtilsFactory
-
Azure based coordination utils.
- These coordination utils are currently used for intermediate stream creation. -
- - job.logged.store.base.dir @@ -539,7 +528,7 @@

Samza Configuration Reference

job.debounce.time.ms - 2000 + 20000 How long the Leader processor will wait before recalculating the JobModel on change of registered processors. diff --git a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java index 23227276c1..0f6a713b9a 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java @@ -22,33 +22,44 @@ import com.google.common.base.Strings; import org.apache.samza.SamzaException; import org.apache.samza.coordinator.CoordinationUtilsFactory; +import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; +import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; import org.apache.samza.util.Util; import org.apache.samza.zk.ZkCoordinationUtilsFactory; +import org.apache.samza.zk.ZkJobCoordinatorFactory; + +import java.util.Objects; public class JobCoordinatorConfig extends MapConfig { public static final String JOB_COORDINATOR_FACTORY = "job.coordinator.factory"; - public static final String JOB_COORDINATION_UTILS_FACTORY = "job.coordination.utils.factory"; - public final static String DEFAULT_COORDINATION_UTILS_FACTORY = ZkCoordinationUtilsFactory.class.getName(); + public final static String DEFAULT_COORDINATOR_UTILS_FACTORY = ZkJobCoordinatorFactory.class.getName(); + private static final String AZURE_COORDINATION_UTILS_FACTORY = "org.apache.samza.coordinator.AzureCoordinationUtilsFactory"; + private static final String AZURE_COORDINATOR_FACTORY = "org.apache.samza.coordinator.AzureJobCoordinatorFactory"; public JobCoordinatorConfig(Config config) { super(config); } public String getJobCoordinationUtilsFactoryClassName() { - String className = get(JOB_COORDINATION_UTILS_FACTORY, DEFAULT_COORDINATION_UTILS_FACTORY); + String coordinatorFactory = get(JOB_COORDINATOR_FACTORY, DEFAULT_COORDINATOR_UTILS_FACTORY); - if (Strings.isNullOrEmpty(className)) { - throw new SamzaException("Empty config for " + JOB_COORDINATION_UTILS_FACTORY + " = " + className); + String coordinationUtilsFactory; + if (Objects.equals(coordinatorFactory, AZURE_COORDINATOR_FACTORY)) { + coordinationUtilsFactory = AZURE_COORDINATION_UTILS_FACTORY; + } else if (Objects.equals(coordinatorFactory, PassthroughJobCoordinatorFactory.class.getName())) { + coordinationUtilsFactory = PassthroughCoordinationUtilsFactory.class.getName(); + } else { + coordinationUtilsFactory = ZkCoordinationUtilsFactory.class.getName(); } try { - Class.forName(className); + Class.forName(coordinationUtilsFactory); } catch (ClassNotFoundException e) { throw new SamzaException( - "Failed to validate config value for " + JOB_COORDINATION_UTILS_FACTORY + " = " + className, e); + "Failed to validate config value for " + JOB_COORDINATOR_FACTORY + " = " + coordinationUtilsFactory, e); } - return className; + return coordinationUtilsFactory; } public CoordinationUtilsFactory getCoordinationUtilsFactory() { @@ -61,10 +72,9 @@ public CoordinationUtilsFactory getCoordinationUtilsFactory() { public String getJobCoordinatorFactoryClassName() { String jobCoordinatorFactoryClassName = get(JOB_COORDINATOR_FACTORY); if (Strings.isNullOrEmpty(jobCoordinatorFactoryClassName)) { - throw new ConfigException( - String.format("Missing config - %s. Cannot start StreamProcessor!", JOB_COORDINATOR_FACTORY)); + return ZkJobCoordinatorFactory.class.getName(); + } else { + return jobCoordinatorFactoryClassName; } - - return jobCoordinatorFactoryClassName; } } diff --git a/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java b/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java deleted file mode 100644 index 2ef92b5c14..0000000000 --- a/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.config; - -import java.util.HashMap; -import java.util.Map; -import junit.framework.Assert; -import org.apache.samza.SamzaException; -import org.apache.samza.zk.ZkCoordinationUtilsFactory; -import org.junit.Test; - - -public class TestJobCoordinatorConfig { - - private final static String NONEXISTING_FACTORY_CLASS = "AnotherFactory"; - private final static String ANOTHER_FACTORY_CLASS = TestJobCoordinatorConfig.class.getName(); // any valid name - - @Test - public void testJobCoordinationUtilsFactoryConfig() { - - Map map = new HashMap<>(); - JobCoordinatorConfig jConfig = new JobCoordinatorConfig(new MapConfig(map)); - - // test default value - Assert.assertEquals(ZkCoordinationUtilsFactory.class.getName(), jConfig.getJobCoordinationUtilsFactoryClassName()); - - map.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, ANOTHER_FACTORY_CLASS); - jConfig = new JobCoordinatorConfig(new MapConfig(map)); - Assert.assertEquals(ANOTHER_FACTORY_CLASS, jConfig.getJobCoordinationUtilsFactoryClassName()); - - // failure case - map.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, NONEXISTING_FACTORY_CLASS); - jConfig = new JobCoordinatorConfig(new MapConfig(map)); - try { - jConfig.getJobCoordinationUtilsFactoryClassName(); - Assert.fail("Failed to validate loading of fake class: " + NONEXISTING_FACTORY_CLASS); - } catch (SamzaException e) { - // expected - } - } -} diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java index a96fd0893c..97168d8f16 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java @@ -74,7 +74,6 @@ public static Map fetchStaticConfigsWithFactories(Map - *
  • "job.coordination.utils.factory" = {@link PassthroughCoordinationUtilsFactory}
  • *
  • "job.coordination.factory" = {@link PassthroughJobCoordinatorFactory}
  • *
  • "task.name.grouper.factory" = {@link SingleContainerGrouperFactory}
  • *
  • "job.name" = "test-samza"
  • @@ -98,7 +96,6 @@ private TestRunner() { this.inMemoryScope = RandomStringUtils.random(10, true, true); configs.put(JobConfig.JOB_NAME(), JOB_NAME); configs.put(JobConfig.PROCESSOR_ID(), "1"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); addConfig(JobConfig.JOB_DEFAULT_SYSTEM(), JOB_DEFAULT_SYSTEM); diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java index d2aab116f5..136507453f 100644 --- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java @@ -39,7 +39,6 @@ import org.apache.samza.runtime.ApplicationRunners; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; -import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; import org.apache.samza.test.controlmessages.TestData.PageView; import org.apache.samza.test.controlmessages.TestData.PageViewJsonSerdeFactory; @@ -81,7 +80,6 @@ public void testPipeline() throws Exception { configs.put(JobConfig.JOB_NAME(), "test-eos-job"); configs.put(JobConfig.PROCESSOR_ID(), "1"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java index 05818e96b9..6e60f467b5 100644 --- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java @@ -58,7 +58,6 @@ import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.StringSerdeFactory; -import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemAdmin; @@ -133,7 +132,6 @@ public void testWatermark() throws Exception { configs.put(JobConfig.JOB_NAME(), "test-watermark-job"); configs.put(JobConfig.PROCESSOR_ID(), "1"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java index 658492a7a8..7e89fa930a 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java @@ -52,7 +52,6 @@ public void testJob() throws InterruptedException { configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); configs.put("job.systemstreampartition.grouper.factory", "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory"); configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.SingleContainerGrouperFactory"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); configs.put(JobConfig.PROCESSOR_ID(), "0"); runApplication(new TestSchedulingApp(), "SchedulingTest", configs); diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java index 144f1253a2..340f0e75a0 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java @@ -82,7 +82,6 @@ public void testRepartitionJoinWindowAppWithoutDeletionOnCommit() throws Excepti String appName = "UserPageAdClickCounter"; Map configs = new HashMap<>(); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); configs.put(JobConfig.PROCESSOR_ID(), "0"); configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); configs.put("systems.kafka.samza.delete.committed.messages", "false"); @@ -112,7 +111,6 @@ public void testRepartitionJoinWindowAppAndDeleteMessagesOnCommit() throws Excep final String appName = "UserPageAdClickCounter2"; Map configs = new HashMap<>(); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); configs.put(JobConfig.PROCESSOR_ID(), "0"); configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); configs.put("systems.kafka.samza.delete.committed.messages", "true"); @@ -160,7 +158,6 @@ public void testBroadcastApp() { String outputTopicName = "user-ad-click-counts"; Map configs = new HashMap<>(); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); configs.put(JobConfig.PROCESSOR_ID(), "0"); configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); configs.put(BroadcastAssertApp.INPUT_TOPIC_NAME_PROP, inputTopicName1); diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java index 2e1de96394..2f08fede28 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java @@ -61,7 +61,6 @@ public void testRepartitionedSessionWindowCounter() throws Exception { Map configs = new HashMap<>(); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); configs.put(JobConfig.PROCESSOR_ID(), "0"); configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java index 419f6c87ec..46cbdad9bf 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java @@ -48,7 +48,6 @@ import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; -import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; import org.apache.samza.storage.kv.Entry; import org.apache.samza.storage.kv.KeyValueStore; @@ -308,7 +307,6 @@ static Map getBaseJobConfig(String bootstrapUrl, String zkConnec configs.put(JobConfig.JOB_NAME(), "test-table-job"); configs.put(JobConfig.PROCESSOR_ID(), "1"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); From 7cb7334ed86166c98a9b8c4c7c6d2b6ac947dfad Mon Sep 17 00:00:00 2001 From: Shanthoosh Venkataraman Date: Mon, 1 Oct 2018 12:57:30 -0700 Subject: [PATCH 2/5] Review comments. --- .../java/org/apache/samza/config/JobCoordinatorConfig.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java index 0f6a713b9a..15e313dd64 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java @@ -32,7 +32,7 @@ public class JobCoordinatorConfig extends MapConfig { public static final String JOB_COORDINATOR_FACTORY = "job.coordinator.factory"; - public final static String DEFAULT_COORDINATOR_UTILS_FACTORY = ZkJobCoordinatorFactory.class.getName(); + public final static String DEFAULT_COORDINATOR_FACTORY = ZkJobCoordinatorFactory.class.getName(); private static final String AZURE_COORDINATION_UTILS_FACTORY = "org.apache.samza.coordinator.AzureCoordinationUtilsFactory"; private static final String AZURE_COORDINATOR_FACTORY = "org.apache.samza.coordinator.AzureJobCoordinatorFactory"; @@ -41,7 +41,7 @@ public JobCoordinatorConfig(Config config) { } public String getJobCoordinationUtilsFactoryClassName() { - String coordinatorFactory = get(JOB_COORDINATOR_FACTORY, DEFAULT_COORDINATOR_UTILS_FACTORY); + String coordinatorFactory = get(JOB_COORDINATOR_FACTORY, DEFAULT_COORDINATOR_FACTORY); String coordinationUtilsFactory; if (Objects.equals(coordinatorFactory, AZURE_COORDINATOR_FACTORY)) { From d5adda61e55e147f06f8336a389a6165ee63f65e Mon Sep 17 00:00:00 2001 From: Shanthoosh Venkataraman Date: Mon, 1 Oct 2018 14:27:50 -0700 Subject: [PATCH 3/5] Review comments. --- .../java/org/apache/samza/config/JobCoordinatorConfig.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java index 15e313dd64..e7ee4e5d00 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java @@ -28,8 +28,6 @@ import org.apache.samza.zk.ZkCoordinationUtilsFactory; import org.apache.samza.zk.ZkJobCoordinatorFactory; -import java.util.Objects; - public class JobCoordinatorConfig extends MapConfig { public static final String JOB_COORDINATOR_FACTORY = "job.coordinator.factory"; public final static String DEFAULT_COORDINATOR_FACTORY = ZkJobCoordinatorFactory.class.getName(); @@ -44,9 +42,9 @@ public String getJobCoordinationUtilsFactoryClassName() { String coordinatorFactory = get(JOB_COORDINATOR_FACTORY, DEFAULT_COORDINATOR_FACTORY); String coordinationUtilsFactory; - if (Objects.equals(coordinatorFactory, AZURE_COORDINATOR_FACTORY)) { + if (AZURE_COORDINATOR_FACTORY.equals(coordinatorFactory)) { coordinationUtilsFactory = AZURE_COORDINATION_UTILS_FACTORY; - } else if (Objects.equals(coordinatorFactory, PassthroughJobCoordinatorFactory.class.getName())) { + } else if (PassthroughJobCoordinatorFactory.class.getName().equals(coordinatorFactory)) { coordinationUtilsFactory = PassthroughCoordinationUtilsFactory.class.getName(); } else { coordinationUtilsFactory = ZkCoordinationUtilsFactory.class.getName(); From 73d462d7d7b4603fae5b30460a83dfa53fd02b4d Mon Sep 17 00:00:00 2001 From: Shanthoosh Venkataraman Date: Tue, 2 Oct 2018 13:25:14 -0700 Subject: [PATCH 4/5] Review comments. --- .../java/org/apache/samza/config/JobCoordinatorConfig.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java index e7ee4e5d00..60c43c34b2 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java @@ -46,8 +46,10 @@ public String getJobCoordinationUtilsFactoryClassName() { coordinationUtilsFactory = AZURE_COORDINATION_UTILS_FACTORY; } else if (PassthroughJobCoordinatorFactory.class.getName().equals(coordinatorFactory)) { coordinationUtilsFactory = PassthroughCoordinationUtilsFactory.class.getName(); - } else { + } else if (ZkJobCoordinatorFactory.class.getName().equals(coordinatorFactory)) { coordinationUtilsFactory = ZkCoordinationUtilsFactory.class.getName(); + } else { + throw new SamzaException(String.format("Coordination factory: %s defined by the config: %s is invalid.", coordinatorFactory, JOB_COORDINATOR_FACTORY)); } try { From 965e5117840ca17d30a6fefcc3b4e86a9813ad62 Mon Sep 17 00:00:00 2001 From: Shanthoosh Venkataraman Date: Fri, 5 Oct 2018 13:18:36 -0700 Subject: [PATCH 5/5] Review comments. --- docs/learn/documentation/versioned/jobs/configuration-table.html | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 852df41861..35ddcab50b 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -451,7 +451,6 @@

    Samza Configuration Reference

    org.apache.samza.zk.ZkJobCoordinatorFactory The fully-qualified name of the Java class which determines the factory class which will build the JobCoordinator. - The default configuration value if the property is not present is job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory.
    The user can specify a custom implementation of the JobCoordinatorFactory where a custom logic is implemented for distributed coordination of stream processors.
    Samza supports the following coordination modes out of the box.