From 73cf30a26ad59f79f54ac82bfccc8f46ae6f3f5c Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Wed, 8 Nov 2023 16:30:22 -0800 Subject: [PATCH 1/7] Fix FlowSpec Updating Function * makes Config object with FlowSpec mutable * adds unit test to ensure flow compiles after updating FlowSpec * ensure DagManager resilient to exceptions on leadership change --- .../apache/gobblin/runtime/api/FlowSpec.java | 28 +++-- .../gobblin/runtime/api/FlowSpecTest.java | 10 +- .../modules/orchestration/DagManager.java | 7 +- .../FlowCompilationValidationHelper.java | 4 +- .../DagActionStoreChangeMonitor.java | 4 +- .../IdentityFlowToJobSpecCompilerTest.java | 6 +- .../FlowCompilationValidationHelperTest.java | 105 ++++++++++++++++++ 7 files changed, 139 insertions(+), 25 deletions(-) create mode 100644 gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java index 2df547596f6..bb6032d7dd6 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.Properties; import java.util.Set; +import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; @@ -57,6 +58,7 @@ * */ @Alpha +@AllArgsConstructor @Data @EqualsAndHashCode(exclude={"compilationErrors"}) @SuppressFBWarnings(value="SE_BAD_FIELD", @@ -76,8 +78,8 @@ public class FlowSpec implements Configurable, Spec { /** Human-readable description of the flow spec */ final String description; - /** Flow config as a typesafe config object*/ - final Config config; + /** Flow config as a typesafe config object, mutable to allow flowExecutionId to be stored */ + Config config; /** Flow config as a properties collection for backwards compatibility */ // Note that this property is not strictly necessary as it can be generated from the typesafe @@ -125,6 +127,18 @@ public static FlowSpec.Builder builder(URI catalogURI, Properties flowProps) { throw new RuntimeException("Unable to create a FlowSpec URI: " + e, e); } } + + /** + * Add new property at the specified path to the Config and configAsProperties objects. + * @param path + * @param value + */ + public void updateConfigAndPropertiesWithProperty(String path, String value) { + Config updatedConfig = this.config.withValue(path, ConfigValueFactory.fromAnyRef(value)); + this.configAsProperties.setProperty(path, value); + setConfig(updatedConfig); + } + public void addCompilationError(String src, String dst, String errorMessage, int numberOfHops) { this.compilationErrors.add(new CompilationError(getConfig(), src, dst, errorMessage, numberOfHops)); } @@ -519,14 +533,4 @@ public static int maxFlowSpecUriLength() { + URI_PATH_SEPARATOR.length() + ServiceConfigKeys.MAX_FLOW_NAME_LENGTH + URI_PATH_SEPARATOR.length() + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH; } } - - /** - * Create a new FlowSpec object with the added property defined by path and value parameters - * @param path key for new property - * @param value - */ - public static FlowSpec createFlowSpecWithProperty(FlowSpec flowSpec, String path, String value) { - Config updatedConfig = flowSpec.getConfig().withValue(path, ConfigValueFactory.fromAnyRef(value)); - return new Builder(flowSpec.getUri()).withConfig(updatedConfig).build(); - } } diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java index 793abd222b6..22fee976e3f 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java @@ -40,7 +40,7 @@ public class FlowSpecTest { public void testAddProperty() throws URISyntaxException { String flowGroup = "myGroup"; String flowName = "myName"; - String flowExecutionId = "myId"; + String flowExecutionId = "1234"; FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName); URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId); @@ -50,16 +50,16 @@ public void testAddProperty() throws URISyntaxException { properties.setProperty(ConfigurationKeys.FLOW_NAME_KEY, flowName); properties.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, "true"); - FlowSpec originalFlowSpec = FlowSpec.builder(flowUri).withConfigAsProperties(properties).build(); - FlowSpec updatedFlowSpec = createFlowSpecWithProperty(originalFlowSpec, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId); + FlowSpec flowSpec = FlowSpec.builder(flowUri).withConfigAsProperties(properties).build(); + flowSpec.updateConfigAndPropertiesWithProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId); - Properties updatedProperties = updatedFlowSpec.getConfigAsProperties(); + Properties updatedProperties = flowSpec.getConfigAsProperties(); Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowExecutionId); Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_GROUP_KEY), flowGroup); Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_NAME_KEY), flowName); Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY), "true"); - Config updatedConfig = updatedFlowSpec.getConfig(); + Config updatedConfig = flowSpec.getConfig(); Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowExecutionId); Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY), flowGroup); Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_NAME_KEY), flowName); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index c1553da39ca..4dcfd578246 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -484,7 +484,9 @@ public synchronized void setActive(boolean active) { log.error("Exception encountered when shutting down DagManager threads.", e); } } - } catch (IOException e) { + } catch (Exception e) { + // All exceptions should fail leader transition obviously to avoid case where transition to active fails to + // complete but is not apparent log.error("Exception encountered when activating the new DagManager", e); throw new RuntimeException(e); } @@ -507,6 +509,9 @@ public void handleLaunchFlowEvent(DagActionStore.DagAction launchAction) { this.flowCompilationValidationHelper.createExecutionPlanIfValid(spec); if (optionalJobExecutionPlanDag.isPresent()) { addDag(optionalJobExecutionPlanDag.get(), true, true); + } else { + log.warn("Failed flow compilation of spec causing launch flow event to be skipped on startup. Flow {}", flowId); + this.dagManagerMetrics.incrementFailedLaunchCount(); } // Upon handling the action, delete it so on leadership change this is not duplicated this.dagActionStore.get().deleteDagAction(launchAction); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java index 747ebc2bc3b..ca1a5877690 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java @@ -52,7 +52,7 @@ */ @Slf4j @Data -public final class FlowCompilationValidationHelper { +public class FlowCompilationValidationHelper { private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton; private final SpecCompiler specCompiler; private final UserQuotaManager quotaManager; @@ -144,7 +144,7 @@ public Optional> validateAndHandleConcurrentExecution(Conf * @param allowConcurrentExecution * @return true if the {@link FlowSpec} allows concurrent executions or if no other instance of the flow is currently RUNNING. */ - private boolean isExecutionPermitted(FlowStatusGenerator flowStatusGenerator, String flowName, String flowGroup, + protected boolean isExecutionPermitted(FlowStatusGenerator flowStatusGenerator, String flowName, String flowGroup, boolean allowConcurrentExecution) { return allowConcurrentExecution || !flowStatusGenerator.isFlowRunning(flowName, flowGroup); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java index c8d9c62b714..81153293d03 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java @@ -200,8 +200,8 @@ protected void submitFlowToDagManagerHelper(String flowGroup, String flowName, S URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId); spec = (FlowSpec) flowCatalog.getSpecs(flowUri); // Adds flowExecutionId to config to ensure they are consistent across hosts - FlowSpec updatedSpec = createFlowSpecWithProperty(spec, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId); - this.orchestrator.submitFlowToDagManager(updatedSpec); + spec.updateConfigAndPropertiesWithProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId); + this.orchestrator.submitFlowToDagManager(spec); } catch (URISyntaxException e) { log.warn("Could not create URI object for flowId {}. Exception {}", flowId, e.getMessage()); this.failedFlowLaunchSubmissions.mark(); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java index d4f2bc2baa6..e3fca532187 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java @@ -109,7 +109,7 @@ private void cleanUpDir(String dir) throws Exception { } } - private TopologySpec initTopologySpec() { + public static TopologySpec initTopologySpec() { Properties properties = new Properties(); properties.put("specStore.fs.dir", TOPOLOGY_SPEC_STORE_DIR); properties.put("specExecInstance.capabilities", TEST_SOURCE_NAME + ":" + TEST_SINK_NAME); @@ -125,11 +125,11 @@ private TopologySpec initTopologySpec() { return topologySpecBuilder.build(); } - private FlowSpec initFlowSpec() { + public static FlowSpec initFlowSpec() { return initFlowSpec(TEST_FLOW_GROUP, TEST_FLOW_NAME, TEST_SOURCE_NAME, TEST_SINK_NAME); } - private FlowSpec initFlowSpec(String flowGroup, String flowName, String source, String destination) { + private static FlowSpec initFlowSpec(String flowGroup, String flowName, String source, String destination) { Properties properties = new Properties(); properties.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "* * * * *"); properties.put(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java new file mode 100644 index 00000000000..e1f9c7d4d69 --- /dev/null +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.utils; + +import com.google.common.base.Optional; +import java.io.IOException; +import java.util.Properties; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.service.modules.core.IdentityFlowToJobSpecCompilerTest; +import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler; +import org.apache.gobblin.service.modules.flow.SpecCompiler; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.service.monitoring.FlowStatusGenerator; +import org.apache.gobblin.util.ConfigUtils; +import org.junit.Assert; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.*; + + +/** + * Test functionality provided by the helper class re-used between the DagManager and Orchestrator for flow compilation. + */ +public class FlowCompilationValidationHelperTest { + + class MockFlowCompilationValidationHelper extends FlowCompilationValidationHelper { + + public MockFlowCompilationValidationHelper(SharedFlowMetricsSingleton sharedFlowMetricsSingleton, + SpecCompiler specCompiler, UserQuotaManager quotaManager, Optional eventSubmitter, + FlowStatusGenerator flowStatusGenerator, boolean isFlowConcurrencyEnabled) { + super(sharedFlowMetricsSingleton, specCompiler, quotaManager, eventSubmitter, flowStatusGenerator, + isFlowConcurrencyEnabled); + } + + /* + In overriden function simply return if concurrent execution is allowed or not because flowStatusGenerator will be + mocked + */ + @Override + protected boolean isExecutionPermitted(FlowStatusGenerator flowStatusGenerator, String flowName, String flowGroup, + boolean allowConcurrentExecution) { + return allowConcurrentExecution; + } + + } + + /* + Creates a mock {@link FlowCompilationValidationHelper} which has a valid {@link SpecCompiler} but mocks other + components. + */ + MockFlowCompilationValidationHelper createMockFlowCompilationValidationHelper(boolean isFlowConcurrencyEnabled) { + SpecCompiler specCompiler = new IdentityFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(new Properties())); + specCompiler.onAddSpec(IdentityFlowToJobSpecCompilerTest.initTopologySpec()); + return new MockFlowCompilationValidationHelper(mock(SharedFlowMetricsSingleton.class), specCompiler, + mock(UserQuotaManager.class), Optional.of(mock(EventSubmitter.class)), mock(FlowStatusGenerator.class), + isFlowConcurrencyEnabled); + } + + /** + * Verifies that a flow spec that compiles to create a valid job execution plan dag will also generate one after + * having a flow execution id key-value pair added to its config + * @throws IOException + * @throws InterruptedException + */ + @Test + public void compileFlowSpec() throws IOException, InterruptedException { + MockFlowCompilationValidationHelper mockFlowCompilationValidationHelper = createMockFlowCompilationValidationHelper(true); + FlowSpec flowSpec = IdentityFlowToJobSpecCompilerTest.initFlowSpec(); + Optional> dagOptional = mockFlowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec); + // Assert FlowSpec compilation results in non-null or empty dag + Assert.assertTrue(dagOptional.isPresent()); + Assert.assertNotNull(dagOptional.get()); + Assert.assertTrue(dagOptional.get().getNodes().size() == 1); + Assert.assertEquals(dagOptional.get().getStartNodes().size(), 1); + + // Update flow spec and check if still compiles and passes checks + flowSpec.updateConfigAndPropertiesWithProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "54321"); + dagOptional = mockFlowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec); + // Assert FlowSpec compilation results in non-null or empty dag + Assert.assertTrue(dagOptional.isPresent()); + Assert.assertNotNull(dagOptional.get()); + Assert.assertTrue(dagOptional.get().getNodes().size() == 1); + Assert.assertEquals(dagOptional.get().getStartNodes().size(), 1); + } + +} From c88da7aeccd1e35a4ffcf54db8f85147136192f7 Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Wed, 8 Nov 2023 17:26:31 -0800 Subject: [PATCH 2/7] Only update Properties obj not Config to avoid GC overhead --- .../apache/gobblin/runtime/api/FlowSpec.java | 18 ++++++++---------- .../gobblin/runtime/api/FlowSpecTest.java | 16 ++++------------ .../DagActionStoreChangeMonitor.java | 2 +- .../FlowCompilationValidationHelperTest.java | 2 +- 4 files changed, 14 insertions(+), 24 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java index bb6032d7dd6..9a79670aba3 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java @@ -26,7 +26,6 @@ import com.linkedin.data.template.StringMap; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigValueFactory; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.net.URI; import java.net.URISyntaxException; @@ -36,7 +35,6 @@ import java.util.List; import java.util.Properties; import java.util.Set; -import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; @@ -58,7 +56,6 @@ * */ @Alpha -@AllArgsConstructor @Data @EqualsAndHashCode(exclude={"compilationErrors"}) @SuppressFBWarnings(value="SE_BAD_FIELD", @@ -78,10 +75,11 @@ public class FlowSpec implements Configurable, Spec { /** Human-readable description of the flow spec */ final String description; - /** Flow config as a typesafe config object, mutable to allow flowExecutionId to be stored */ - Config config; + /** Flow config as a typesafe config object */ + final Config config; - /** Flow config as a properties collection for backwards compatibility */ + /** Flow config as a properties collection for backwards compatibility + * It can be updated to store properties in addition to ones in the immutable Config object */ // Note that this property is not strictly necessary as it can be generated from the typesafe // config. We use it as a cache until typesafe config is more widely adopted in Gobblin. final Properties configAsProperties; @@ -129,14 +127,14 @@ public static FlowSpec.Builder builder(URI catalogURI, Properties flowProps) { } /** - * Add new property at the specified path to the Config and configAsProperties objects. + * Add new property at the specified path to the configAsProperties objects. + * Note: this does NOT update the Config so any property added through this function must be retrieved through the + * ConfigAsProperties field * @param path * @param value */ - public void updateConfigAndPropertiesWithProperty(String path, String value) { - Config updatedConfig = this.config.withValue(path, ConfigValueFactory.fromAnyRef(value)); + public void addPropertyToConfigAsProperties(String path, String value) { this.configAsProperties.setProperty(path, value); - setConfig(updatedConfig); } public void addCompilationError(String src, String dst, String errorMessage, int numberOfHops) { diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java index 22fee976e3f..31e1f1e0e94 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java @@ -17,7 +17,6 @@ package org.apache.gobblin.runtime.api; -import com.typesafe.config.Config; import java.net.URI; import java.net.URISyntaxException; import java.util.Properties; @@ -26,18 +25,17 @@ import org.testng.Assert; import org.testng.annotations.Test; -import static org.apache.gobblin.runtime.api.FlowSpec.*; - public class FlowSpecTest { /** - * Tests that the addProperty() function to ensure the new flowSpec returned has the original properties and updated + * Tests that the addPropertyToConfigAsProperties() function results in a modified configAsProperties object in the + * FlowSpec containing both the original properties and the added flowExecutionId property * ones * @throws URISyntaxException */ @Test - public void testAddProperty() throws URISyntaxException { + public void testAddPropertyToConfigAsProperties() throws URISyntaxException { String flowGroup = "myGroup"; String flowName = "myName"; String flowExecutionId = "1234"; @@ -51,18 +49,12 @@ public void testAddProperty() throws URISyntaxException { properties.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, "true"); FlowSpec flowSpec = FlowSpec.builder(flowUri).withConfigAsProperties(properties).build(); - flowSpec.updateConfigAndPropertiesWithProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId); + flowSpec.addPropertyToConfigAsProperties(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId); Properties updatedProperties = flowSpec.getConfigAsProperties(); Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowExecutionId); Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_GROUP_KEY), flowGroup); Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_NAME_KEY), flowName); Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY), "true"); - - Config updatedConfig = flowSpec.getConfig(); - Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowExecutionId); - Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY), flowGroup); - Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_NAME_KEY), flowName); - Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY), "true"); } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java index 81153293d03..0b16b916784 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java @@ -200,7 +200,7 @@ protected void submitFlowToDagManagerHelper(String flowGroup, String flowName, S URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId); spec = (FlowSpec) flowCatalog.getSpecs(flowUri); // Adds flowExecutionId to config to ensure they are consistent across hosts - spec.updateConfigAndPropertiesWithProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId); + spec.addPropertyToConfigAsProperties(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId); this.orchestrator.submitFlowToDagManager(spec); } catch (URISyntaxException e) { log.warn("Could not create URI object for flowId {}. Exception {}", flowId, e.getMessage()); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java index e1f9c7d4d69..6d1e2692b45 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java @@ -93,7 +93,7 @@ public void compileFlowSpec() throws IOException, InterruptedException { Assert.assertEquals(dagOptional.get().getStartNodes().size(), 1); // Update flow spec and check if still compiles and passes checks - flowSpec.updateConfigAndPropertiesWithProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "54321"); + flowSpec.addPropertyToConfigAsProperties(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "54321"); dagOptional = mockFlowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec); // Assert FlowSpec compilation results in non-null or empty dag Assert.assertTrue(dagOptional.isPresent()); From 3001cd60645be334a830f903ec0de3b2816db0c6 Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Wed, 8 Nov 2023 17:31:46 -0800 Subject: [PATCH 3/7] Address findbugs error --- .../gobblin/service/modules/orchestration/DagManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index 4dcfd578246..d11b2312868 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -484,7 +484,7 @@ public synchronized void setActive(boolean active) { log.error("Exception encountered when shutting down DagManager threads.", e); } } - } catch (Exception e) { + } catch (Throwable e) { // All exceptions should fail leader transition obviously to avoid case where transition to active fails to // complete but is not apparent log.error("Exception encountered when activating the new DagManager", e); From 906a65d91c3f7adab197656559b484f3fc44533f Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Mon, 13 Nov 2023 16:29:34 -0800 Subject: [PATCH 4/7] Avoid updating or creating new FlowSpec objects by passing flowExecutionId directly to metadata --- .../apache/gobblin/runtime/api/FlowSpec.java | 14 +-- .../gobblin/runtime/api/FlowSpecTest.java | 60 ------------- .../modules/orchestration/DagManager.java | 4 +- .../modules/orchestration/Orchestrator.java | 10 ++- .../FlowCompilationValidationHelper.java | 17 ++-- .../DagActionStoreChangeMonitor.java | 7 +- .../FlowCompilationValidationHelperTest.java | 90 ++++++------------- 7 files changed, 50 insertions(+), 152 deletions(-) delete mode 100644 gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java index 9a79670aba3..4b77294e67e 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java @@ -78,8 +78,7 @@ public class FlowSpec implements Configurable, Spec { /** Flow config as a typesafe config object */ final Config config; - /** Flow config as a properties collection for backwards compatibility - * It can be updated to store properties in addition to ones in the immutable Config object */ + /** Flow config as a properties collection for backwards compatibility */ // Note that this property is not strictly necessary as it can be generated from the typesafe // config. We use it as a cache until typesafe config is more widely adopted in Gobblin. final Properties configAsProperties; @@ -126,17 +125,6 @@ public static FlowSpec.Builder builder(URI catalogURI, Properties flowProps) { } } - /** - * Add new property at the specified path to the configAsProperties objects. - * Note: this does NOT update the Config so any property added through this function must be retrieved through the - * ConfigAsProperties field - * @param path - * @param value - */ - public void addPropertyToConfigAsProperties(String path, String value) { - this.configAsProperties.setProperty(path, value); - } - public void addCompilationError(String src, String dst, String errorMessage, int numberOfHops) { this.compilationErrors.add(new CompilationError(getConfig(), src, dst, errorMessage, numberOfHops)); } diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java deleted file mode 100644 index 31e1f1e0e94..00000000000 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.runtime.api; - -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Properties; -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.service.FlowId; -import org.testng.Assert; -import org.testng.annotations.Test; - - -public class FlowSpecTest { - - /** - * Tests that the addPropertyToConfigAsProperties() function results in a modified configAsProperties object in the - * FlowSpec containing both the original properties and the added flowExecutionId property - * ones - * @throws URISyntaxException - */ - @Test - public void testAddPropertyToConfigAsProperties() throws URISyntaxException { - String flowGroup = "myGroup"; - String flowName = "myName"; - String flowExecutionId = "1234"; - FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName); - URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId); - - // Create properties to be used as config - Properties properties = new Properties(); - properties.setProperty(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup); - properties.setProperty(ConfigurationKeys.FLOW_NAME_KEY, flowName); - properties.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, "true"); - - FlowSpec flowSpec = FlowSpec.builder(flowUri).withConfigAsProperties(properties).build(); - flowSpec.addPropertyToConfigAsProperties(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId); - - Properties updatedProperties = flowSpec.getConfigAsProperties(); - Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowExecutionId); - Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_GROUP_KEY), flowGroup); - Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_NAME_KEY), flowName); - Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY), "true"); - } -} diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index d11b2312868..9aec448ddef 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -484,7 +484,7 @@ public synchronized void setActive(boolean active) { log.error("Exception encountered when shutting down DagManager threads.", e); } } - } catch (Throwable e) { + } catch (RuntimeException | IOException e) { // All exceptions should fail leader transition obviously to avoid case where transition to active fails to // complete but is not apparent log.error("Exception encountered when activating the new DagManager", e); @@ -506,7 +506,7 @@ public void handleLaunchFlowEvent(DagActionStore.DagAction launchAction) { URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId); FlowSpec spec = (FlowSpec) flowCatalog.getSpecs(flowUri); Optional> optionalJobExecutionPlanDag = - this.flowCompilationValidationHelper.createExecutionPlanIfValid(spec); + this.flowCompilationValidationHelper.createExecutionPlanIfValid(spec, Optional.absent()); if (optionalJobExecutionPlanDag.isPresent()) { addDag(optionalJobExecutionPlanDag.get(), true, true); } else { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index bcb1743200a..ab50f6d1623 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -249,7 +249,8 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil return; } Map flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec); - FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDagOptional.get()); + FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, Optional.absent(), + jobExecutionPlanDagOptional.get()); // If multi-active scheduler is enabled do not pass onto DagManager, otherwise scheduler forwards it directly // Skip flow compilation as well, since we recompile after receiving event from DagActionStoreChangeMonitor later @@ -285,7 +286,8 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec, SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL); - FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag); + FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, Optional.absent(), + jobExecutionPlanDag); if (flowCompilationTimer.isPresent()) { flowCompilationTimer.get().stop(flowMetadata); } @@ -335,9 +337,9 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS); } - public void submitFlowToDagManager(FlowSpec flowSpec) throws IOException, InterruptedException { + public void submitFlowToDagManager(FlowSpec flowSpec, Optional optionalFlowExecutionId) throws IOException, InterruptedException { Optional> optionalJobExecutionPlanDag = - this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec); + this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec, optionalFlowExecutionId); if (optionalJobExecutionPlanDag.isPresent()) { submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get()); } else { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java index ca1a5877690..84f107085da 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java @@ -64,10 +64,12 @@ public class FlowCompilationValidationHelper { * For a given a flowSpec, verifies that an execution is allowed (in case there is an ongoing execution) and the * flowspec can be compiled. If the pre-conditions hold, then a JobExecutionPlan is constructed and returned to the * caller. + * @param flowSpec + * @param optionalFlowExecutionId provided for executions of scheduled events which should use a consistent id * @return jobExecutionPlan dag if one can be constructed for the given flowSpec */ - public Optional> createExecutionPlanIfValid(FlowSpec flowSpec) - throws IOException, InterruptedException { + public Optional> createExecutionPlanIfValid(FlowSpec flowSpec, + Optional optionalFlowExecutionId) throws IOException, InterruptedException { Config flowConfig = flowSpec.getConfig(); String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); @@ -90,7 +92,7 @@ public Optional> createExecutionPlanIfValid(FlowSpec flowS return Optional.absent(); } - addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDagOptional.get()); + addFlowExecutionIdIfAbsent(flowMetadata, optionalFlowExecutionId, jobExecutionPlanDagOptional.get()); if (flowCompilationTimer.isPresent()) { flowCompilationTimer.get().stop(flowMetadata); } @@ -177,11 +179,14 @@ public static void populateFlowCompilationFailedEventMessage(Optional flowMetadata, - Dag jobExecutionPlanDag) { + Optional optionalFlowExecutionId, Dag jobExecutionPlanDag) { + if (optionalFlowExecutionId.isPresent()) { + flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, optionalFlowExecutionId.get()); + } flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty( ConfigurationKeys.FLOW_EXECUTION_ID_KEY)); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java index e1b439a6d22..e9497c52650 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java @@ -18,6 +18,7 @@ package org.apache.gobblin.service.monitoring; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -30,7 +31,6 @@ import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; import org.apache.gobblin.metrics.ContextAwareGauge; import org.apache.gobblin.metrics.ContextAwareMeter; @@ -202,9 +202,8 @@ protected void submitFlowToDagManagerHelper(String flowGroup, String flowName, S try { URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId); spec = (FlowSpec) flowCatalog.getSpecs(flowUri); - // Adds flowExecutionId to config to ensure they are consistent across hosts - spec.addPropertyToConfigAsProperties(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId); - this.orchestrator.submitFlowToDagManager(spec); + // Pass flowExecutionId to DagManager to be used for scheduled flows that do not already contain a flowExecutionId + this.orchestrator.submitFlowToDagManager(spec, Optional.of(flowExecutionId)); } catch (URISyntaxException e) { log.warn("Could not create URI object for flowId {}. Exception {}", flowId, e.getMessage()); this.failedFlowLaunchSubmissions.mark(); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java index 6d1e2692b45..5f778f3f8f0 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java @@ -18,88 +18,52 @@ package org.apache.gobblin.service.modules.utils; import com.google.common.base.Optional; -import java.io.IOException; -import java.util.Properties; -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.metrics.event.EventSubmitter; -import org.apache.gobblin.runtime.api.FlowSpec; -import org.apache.gobblin.service.modules.core.IdentityFlowToJobSpecCompilerTest; -import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler; -import org.apache.gobblin.service.modules.flow.SpecCompiler; +import java.net.URISyntaxException; +import java.util.HashMap; +import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.service.modules.flowgraph.Dag; -import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; +import org.apache.gobblin.service.modules.orchestration.DagTestUtils; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; -import org.apache.gobblin.service.monitoring.FlowStatusGenerator; -import org.apache.gobblin.util.ConfigUtils; import org.junit.Assert; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import static org.mockito.Mockito.*; - /** * Test functionality provided by the helper class re-used between the DagManager and Orchestrator for flow compilation. */ public class FlowCompilationValidationHelperTest { + private String dagId = "testDag"; + private Long jobSpecFlowExecutionId = 1234L; + private String newFlowExecutionId = "5678"; + private String existingFlowExecutionId = "9999"; + private Dag jobExecutionPlanDag; - class MockFlowCompilationValidationHelper extends FlowCompilationValidationHelper { - - public MockFlowCompilationValidationHelper(SharedFlowMetricsSingleton sharedFlowMetricsSingleton, - SpecCompiler specCompiler, UserQuotaManager quotaManager, Optional eventSubmitter, - FlowStatusGenerator flowStatusGenerator, boolean isFlowConcurrencyEnabled) { - super(sharedFlowMetricsSingleton, specCompiler, quotaManager, eventSubmitter, flowStatusGenerator, - isFlowConcurrencyEnabled); - } - - /* - In overriden function simply return if concurrent execution is allowed or not because flowStatusGenerator will be - mocked - */ - @Override - protected boolean isExecutionPermitted(FlowStatusGenerator flowStatusGenerator, String flowName, String flowGroup, - boolean allowConcurrentExecution) { - return allowConcurrentExecution; - } + @BeforeClass + public void setup() throws URISyntaxException { + jobExecutionPlanDag = DagTestUtils.buildDag(dagId, jobSpecFlowExecutionId); } /* - Creates a mock {@link FlowCompilationValidationHelper} which has a valid {@link SpecCompiler} but mocks other - components. + Tests that addFlowExecutionIdIfAbsent adds flowExecutionId to a flowMetadata object when it is absent, prioritizing + the optional flowExecutionId over the one from the job spec */ - MockFlowCompilationValidationHelper createMockFlowCompilationValidationHelper(boolean isFlowConcurrencyEnabled) { - SpecCompiler specCompiler = new IdentityFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(new Properties())); - specCompiler.onAddSpec(IdentityFlowToJobSpecCompilerTest.initTopologySpec()); - return new MockFlowCompilationValidationHelper(mock(SharedFlowMetricsSingleton.class), specCompiler, - mock(UserQuotaManager.class), Optional.of(mock(EventSubmitter.class)), mock(FlowStatusGenerator.class), - isFlowConcurrencyEnabled); + @Test + public void testAddFlowExecutionIdWhenAbsent() { + HashMap flowMetadata = new HashMap<>(); + FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, Optional.of(newFlowExecutionId), jobExecutionPlanDag); + Assert.assertEquals(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD), newFlowExecutionId); } - /** - * Verifies that a flow spec that compiles to create a valid job execution plan dag will also generate one after - * having a flow execution id key-value pair added to its config - * @throws IOException - * @throws InterruptedException + /* + Tests that addFlowExecutionIdIfAbsent does not update an existing flowExecutionId in a flowMetadata object */ @Test - public void compileFlowSpec() throws IOException, InterruptedException { - MockFlowCompilationValidationHelper mockFlowCompilationValidationHelper = createMockFlowCompilationValidationHelper(true); - FlowSpec flowSpec = IdentityFlowToJobSpecCompilerTest.initFlowSpec(); - Optional> dagOptional = mockFlowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec); - // Assert FlowSpec compilation results in non-null or empty dag - Assert.assertTrue(dagOptional.isPresent()); - Assert.assertNotNull(dagOptional.get()); - Assert.assertTrue(dagOptional.get().getNodes().size() == 1); - Assert.assertEquals(dagOptional.get().getStartNodes().size(), 1); - - // Update flow spec and check if still compiles and passes checks - flowSpec.addPropertyToConfigAsProperties(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "54321"); - dagOptional = mockFlowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec); - // Assert FlowSpec compilation results in non-null or empty dag - Assert.assertTrue(dagOptional.isPresent()); - Assert.assertNotNull(dagOptional.get()); - Assert.assertTrue(dagOptional.get().getNodes().size() == 1); - Assert.assertEquals(dagOptional.get().getStartNodes().size(), 1); + public void testSkipAddingFlowExecutionIdWhenPresent() { + HashMap flowMetadata = new HashMap<>(); + flowMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, existingFlowExecutionId); + FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, Optional.of(newFlowExecutionId), jobExecutionPlanDag); + Assert.assertEquals(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD), existingFlowExecutionId); } - } From 5a3e73557b3d03fc79d593e1ff2030af9bdeb47f Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Mon, 13 Nov 2023 16:39:36 -0800 Subject: [PATCH 5/7] Remove changes that are not needed anymore --- .../gobblin/service/modules/orchestration/DagManager.java | 3 +-- .../modules/utils/FlowCompilationValidationHelper.java | 4 ++-- .../modules/core/IdentityFlowToJobSpecCompilerTest.java | 6 +++--- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index 9aec448ddef..44f543d20aa 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -485,8 +485,7 @@ public synchronized void setActive(boolean active) { } } } catch (RuntimeException | IOException e) { - // All exceptions should fail leader transition obviously to avoid case where transition to active fails to - // complete but is not apparent + // Catch and throw to avoid case where transition to active fails to complete but is not apparent log.error("Exception encountered when activating the new DagManager", e); throw new RuntimeException(e); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java index 84f107085da..d31797dddbe 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java @@ -52,7 +52,7 @@ */ @Slf4j @Data -public class FlowCompilationValidationHelper { +public final class FlowCompilationValidationHelper { private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton; private final SpecCompiler specCompiler; private final UserQuotaManager quotaManager; @@ -146,7 +146,7 @@ public Optional> validateAndHandleConcurrentExecution(Conf * @param allowConcurrentExecution * @return true if the {@link FlowSpec} allows concurrent executions or if no other instance of the flow is currently RUNNING. */ - protected boolean isExecutionPermitted(FlowStatusGenerator flowStatusGenerator, String flowName, String flowGroup, + private boolean isExecutionPermitted(FlowStatusGenerator flowStatusGenerator, String flowName, String flowGroup, boolean allowConcurrentExecution) { return allowConcurrentExecution || !flowStatusGenerator.isFlowRunning(flowName, flowGroup); } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java index e3fca532187..d4f2bc2baa6 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java @@ -109,7 +109,7 @@ private void cleanUpDir(String dir) throws Exception { } } - public static TopologySpec initTopologySpec() { + private TopologySpec initTopologySpec() { Properties properties = new Properties(); properties.put("specStore.fs.dir", TOPOLOGY_SPEC_STORE_DIR); properties.put("specExecInstance.capabilities", TEST_SOURCE_NAME + ":" + TEST_SINK_NAME); @@ -125,11 +125,11 @@ public static TopologySpec initTopologySpec() { return topologySpecBuilder.build(); } - public static FlowSpec initFlowSpec() { + private FlowSpec initFlowSpec() { return initFlowSpec(TEST_FLOW_GROUP, TEST_FLOW_NAME, TEST_SOURCE_NAME, TEST_SINK_NAME); } - private static FlowSpec initFlowSpec(String flowGroup, String flowName, String source, String destination) { + private FlowSpec initFlowSpec(String flowGroup, String flowName, String source, String destination) { Properties properties = new Properties(); properties.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "* * * * *"); properties.put(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup); From 3d80b3eccbb54a9cc8f96ecc7a2b031b38e6c1bc Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Tue, 14 Nov 2023 09:42:39 -0800 Subject: [PATCH 6/7] Add TODO to handle failed DagManager leadership change --- .../gobblin/service/modules/core/GobblinServiceManager.java | 2 ++ .../gobblin/service/modules/orchestration/DagManager.java | 3 +-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java index 23ae03406e2..9212eae91be 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java @@ -315,6 +315,8 @@ private void handleLeadershipChange(NotificationContext changeContext) { this.gitConfigMonitor.setActive(true); } + // TODO: surround by try/catch to disconnect from Helix and fail the leader transition if DagManager is not + // transitioned properly if (configuration.isDagManagerEnabled()) { //Activate DagManager only if TopologyCatalog is initialized. If not; skip activation. if (this.topologyCatalog.getInitComplete().getCount() == 0) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index 44f543d20aa..c90841b50a1 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -484,8 +484,7 @@ public synchronized void setActive(boolean active) { log.error("Exception encountered when shutting down DagManager threads.", e); } } - } catch (RuntimeException | IOException e) { - // Catch and throw to avoid case where transition to active fails to complete but is not apparent + } catch (IOException e) { log.error("Exception encountered when activating the new DagManager", e); throw new RuntimeException(e); } From 36162db3a4582065f577d588851c2677602cdc67 Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Tue, 14 Nov 2023 15:01:27 -0800 Subject: [PATCH 7/7] Overload function and add more documentation --- .../runtime/api/MysqlMultiActiveLeaseArbiter.java | 9 +++++---- .../service/modules/orchestration/Orchestrator.java | 6 ++---- .../utils/FlowCompilationValidationHelper.java | 13 ++++++++++++- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java index 05449767cf1..925490cd944 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java @@ -57,12 +57,13 @@ * than epsilon and encapsulate executor communication latency including retry attempts * * The `event_timestamp` is the time of the flow_action event request. - * --- Note --- + * --- Database event_timestamp laundering --- * We only use the participant's local event_timestamp internally to identify the particular flow_action event, but * after interacting with the database utilize the CURRENT_TIMESTAMP of the database to insert or keep - * track of our event. This is to avoid any discrepancies due to clock drift between participants as well as - * variation in local time and database time for future comparisons. - * ---Event consolidation--- + * track of our event, "laundering" or replacing the local timestamp with the database one. This is to avoid any + * discrepancies due to clock drift between participants as well as variation in local time and database time for + * future comparisons. + * --- Event consolidation --- * Note that for the sake of simplification, we only allow one event associated with a particular flow's flow_action * (ie: only one LAUNCH for example of flow FOO, but there can be a LAUNCH, KILL, & RESUME for flow FOO at once) during * the time it takes to execute the flow action. In most cases, the execution time should be so negligible that this diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index ab50f6d1623..0461bb11f30 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -249,8 +249,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil return; } Map flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec); - FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, Optional.absent(), - jobExecutionPlanDagOptional.get()); + FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDagOptional.get()); // If multi-active scheduler is enabled do not pass onto DagManager, otherwise scheduler forwards it directly // Skip flow compilation as well, since we recompile after receiving event from DagActionStoreChangeMonitor later @@ -286,8 +285,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec, SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL); - FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, Optional.absent(), - jobExecutionPlanDag); + FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag); if (flowCompilationTimer.isPresent()) { flowCompilationTimer.get().stop(flowMetadata); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java index d31797dddbe..64c093007e0 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java @@ -65,7 +65,9 @@ public final class FlowCompilationValidationHelper { * flowspec can be compiled. If the pre-conditions hold, then a JobExecutionPlan is constructed and returned to the * caller. * @param flowSpec - * @param optionalFlowExecutionId provided for executions of scheduled events which should use a consistent id + * @param optionalFlowExecutionId for scheduled (non-ad-hoc) flows, to pass the ID "laundered" via the DB; + * see: {@link MysqlMultiActiveLeaseArbiter javadoc section titled + * `Database event_timestamp laundering`} * @return jobExecutionPlan dag if one can be constructed for the given flowSpec */ public Optional> createExecutionPlanIfValid(FlowSpec flowSpec, @@ -178,6 +180,15 @@ public static void populateFlowCompilationFailedEventMessage(Optional flowMetadata, + Dag jobExecutionPlanDag) { + addFlowExecutionIdIfAbsent(flowMetadata, Optional.absent(), jobExecutionPlanDag); + } + /** * If it is a scheduled flow (which does not have flowExecutionId in the FlowSpec) and the flow compilation is * successful, add a flowExecutionId using the optional parameter if it exists otherwise retrieve it from the JobSpec.