Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.linkedin.data.template.StringMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.net.URI;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -76,10 +75,11 @@ public class FlowSpec implements Configurable, Spec {
/** Human-readable description of the flow spec */
final String description;

/** Flow config as a typesafe config object*/
/** Flow config as a typesafe config object */
final Config config;

/** Flow config as a properties collection for backwards compatibility */
/** Flow config as a properties collection for backwards compatibility
* It can be updated to store properties in addition to ones in the immutable Config object */
// Note that this property is not strictly necessary as it can be generated from the typesafe
// config. We use it as a cache until typesafe config is more widely adopted in Gobblin.
final Properties configAsProperties;
Expand Down Expand Up @@ -125,6 +125,18 @@ public static FlowSpec.Builder builder(URI catalogURI, Properties flowProps) {
throw new RuntimeException("Unable to create a FlowSpec URI: " + e, e);
}
}

/**

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer if we name this function something more semantic, such as addMutableProperty and a reader for it as getMutableProperty so that the user will not attempt to read from this.confgs.

* Add new property at the specified path to the configAsProperties objects.
* Note: this does NOT update the Config so any property added through this function must be retrieved through the
* ConfigAsProperties field

@phet phet Nov 10, 2023

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be error prone... why not instead create a new Config and update the reference to point to that one? given immutability, k-v pairs in common (the vast majority) should be readily shared, for minimal performance penalty. I believe you'd create the successor using Config::withFallback (but I'm not 100% sure)

do be sure to synchronize an update of configAsProperties at that same time.

* @param path
* @param value
*/
public void addPropertyToConfigAsProperties(String path, String value) {
this.configAsProperties.setProperty(path, value);
}

public void addCompilationError(String src, String dst, String errorMessage, int numberOfHops) {
this.compilationErrors.add(new CompilationError(getConfig(), src, dst, errorMessage, numberOfHops));
}
Expand Down Expand Up @@ -518,15 +530,5 @@ public static int maxFlowSpecUriLength() {
return URI_SCHEME.length() + ":".length() // URI separator
+ URI_PATH_SEPARATOR.length() + ServiceConfigKeys.MAX_FLOW_NAME_LENGTH + URI_PATH_SEPARATOR.length() + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH;
}

/**
* Create a new FlowSpec object with the added property defined by path and value parameters
* @param path key for new property
* @param value
*/
public static FlowSpec createFlowSpecWithProperty(FlowSpec flowSpec, String path, String value) {
Config updatedConfig = flowSpec.getConfig().withValue(path, ConfigValueFactory.fromAnyRef(value));
return new Builder(flowSpec.getUri()).withConfig(updatedConfig).build();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my hunch is that this .build() is what generated all the garbage. most likely the conversion of config to props

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.gobblin.runtime.api;

import com.typesafe.config.Config;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Properties;
Expand All @@ -30,15 +29,16 @@
public class FlowSpecTest {

/**
* Tests that the addProperty() function to ensure the new flowSpec returned has the original properties and updated
* Tests that the addPropertyToConfigAsProperties() function results in a modified configAsProperties object in the
* FlowSpec containing both the original properties and the added flowExecutionId property
* ones
* @throws URISyntaxException
*/
@Test
public void testAddProperty() throws URISyntaxException {
public void testAddPropertyToConfigAsProperties() throws URISyntaxException {
String flowGroup = "myGroup";
String flowName = "myName";
String flowExecutionId = "myId";
String flowExecutionId = "1234";
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);

Expand All @@ -48,19 +48,13 @@ public void testAddProperty() throws URISyntaxException {
properties.setProperty(ConfigurationKeys.FLOW_NAME_KEY, flowName);
properties.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, "true");

FlowSpec originalFlowSpec = FlowSpec.builder(flowUri).withConfigAsProperties(properties).build();
FlowSpec updatedFlowSpec = FlowSpec.Utils.createFlowSpecWithProperty(originalFlowSpec, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
FlowSpec flowSpec = FlowSpec.builder(flowUri).withConfigAsProperties(properties).build();
flowSpec.addPropertyToConfigAsProperties(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);

Properties updatedProperties = updatedFlowSpec.getConfigAsProperties();
Properties updatedProperties = flowSpec.getConfigAsProperties();
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowExecutionId);
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_GROUP_KEY), flowGroup);
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_NAME_KEY), flowName);
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY), "true");

Config updatedConfig = updatedFlowSpec.getConfig();
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowExecutionId);
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY), flowGroup);
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_NAME_KEY), flowName);
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY), "true");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,9 @@ public synchronized void setActive(boolean active) {
log.error("Exception encountered when shutting down DagManager threads.", e);
}
}
} catch (IOException e) {
} catch (Throwable e) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. With the current code (i.e. the left side) also, leader transition should fail. I did not get what are we trying to achieve here.
  2. Is GobblinServiceManager (the caller of this method) not printing stacktrace?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not catching NullPointerException that occurred here

we're only catching IOException which results in leader transition silently not completing if NPE or something occurs. It is printing the stack trace but we already set this.active = true beforehand so it remains as leader.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree w/ the spirit here, but Throwable is pretty severe to catch... it would even grab OOM and other Errors... do we want that, or should you merely catch Exception?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. why do we set this.active=true when it is not ready yet to act as a leader?
  2. should we just reverse the leadership change in the catch block then?
  3. basically the only difference this change is making is printing a log line. lets try to handle the exception(including runtime) gracefully.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am updating it to catch RuntimeException then. How about that for the error catching?
I was thinking about setting this.active after we finish all the work, but the reason we may set it before is that loading all dags from state store and initializing may take time but in meantime we still want to accept new requests. I want to limit scope of significant change like that which may increase downtime.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching and throwing the exception will fall the service health check so the service deploy/startup will fail if we encounter NPE. Previously it would silently fail initialization is the issue.

// All exceptions should fail leader transition obviously to avoid case where transition to active fails to
// complete but is not apparent
log.error("Exception encountered when activating the new DagManager", e);
throw new RuntimeException(e);
}
Expand All @@ -507,6 +509,9 @@ public void handleLaunchFlowEvent(DagActionStore.DagAction launchAction) {
this.flowCompilationValidationHelper.createExecutionPlanIfValid(spec);
if (optionalJobExecutionPlanDag.isPresent()) {
addDag(optionalJobExecutionPlanDag.get(), true, true);
} else {
log.warn("Failed flow compilation of spec causing launch flow event to be skipped on startup. Flow {}", flowId);
this.dagManagerMetrics.incrementFailedLaunchCount();
Comment on lines +511 to +512

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't actually say what/how it failed flow compilation... is that because such a message would have already been logged?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, we don't have that information available here yet. We emit a flow compilation failed event that we can check

populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, flowMetadata);

}
// Upon handling the action, delete it so on leadership change this is not duplicated
this.dagActionStore.get().deleteDagAction(launchAction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
*/
@Slf4j
@Data
public final class FlowCompilationValidationHelper {
public class FlowCompilationValidationHelper {
private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
private final SpecCompiler specCompiler;
private final UserQuotaManager quotaManager;
Expand Down Expand Up @@ -144,7 +144,7 @@ public Optional<Dag<JobExecutionPlan>> validateAndHandleConcurrentExecution(Conf
* @param allowConcurrentExecution
* @return true if the {@link FlowSpec} allows concurrent executions or if no other instance of the flow is currently RUNNING.
*/
private boolean isExecutionPermitted(FlowStatusGenerator flowStatusGenerator, String flowName, String flowGroup,
protected boolean isExecutionPermitted(FlowStatusGenerator flowStatusGenerator, String flowName, String flowGroup,
boolean allowConcurrentExecution) {
return allowConcurrentExecution || !flowStatusGenerator.isFlowRunning(flowName, flowGroup);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ protected void submitFlowToDagManagerHelper(String flowGroup, String flowName, S
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
// Adds flowExecutionId to config to ensure they are consistent across hosts

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this brief comment may not draw enough attention to how critical this is!

FlowSpec updatedSpec = FlowSpec.Utils.createFlowSpecWithProperty(spec, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
this.orchestrator.submitFlowToDagManager(updatedSpec);
spec.addPropertyToConfigAsProperties(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you point me to the code where we query this flow execution id?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we query here from ConfigAsProperties

jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(

@arjun4084346 arjun4084346 Nov 10, 2023

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where we add flow.execution.id.
In li-gobblin AzkabanHeadlessProducer we get the configs
Map<String, String> flowParameters = new HashMap<>(Maps.fromProperties(ConfigUtils.configToProperties(jobConfig)));
and there we use configs from config object. so you have to make change there also, if you want this approach to work. lets think more thoroughly which approach is better and more intuitive.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to be careful here for adhoc flows that already save a flow execution ID in the flowspec configs, for those do we know if the flow execution ID in their config is consistent if they don't have a trigger time, or should we check if the property already exists in properties to not override them?

this.orchestrator.submitFlowToDagManager(spec);
} catch (URISyntaxException e) {
log.warn("Could not create URI object for flowId {}. Exception {}", flowId, e.getMessage());
this.failedFlowLaunchSubmissions.mark();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private void cleanUpDir(String dir) throws Exception {
}
}

private TopologySpec initTopologySpec() {
public static TopologySpec initTopologySpec() {
Properties properties = new Properties();
properties.put("specStore.fs.dir", TOPOLOGY_SPEC_STORE_DIR);
properties.put("specExecInstance.capabilities", TEST_SOURCE_NAME + ":" + TEST_SINK_NAME);
Expand All @@ -125,11 +125,11 @@ private TopologySpec initTopologySpec() {
return topologySpecBuilder.build();
}

private FlowSpec initFlowSpec() {
public static FlowSpec initFlowSpec() {
return initFlowSpec(TEST_FLOW_GROUP, TEST_FLOW_NAME, TEST_SOURCE_NAME, TEST_SINK_NAME);
}

private FlowSpec initFlowSpec(String flowGroup, String flowName, String source, String destination) {
private static FlowSpec initFlowSpec(String flowGroup, String flowName, String source, String destination) {
Properties properties = new Properties();
properties.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "* * * * *");
properties.put(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.service.modules.utils;

import com.google.common.base.Optional;
import java.io.IOException;
import java.util.Properties;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.service.modules.core.IdentityFlowToJobSpecCompilerTest;
import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ConfigUtils;
import org.junit.Assert;
import org.testng.annotations.Test;

import static org.mockito.Mockito.*;


/**
* Test functionality provided by the helper class re-used between the DagManager and Orchestrator for flow compilation.
*/
public class FlowCompilationValidationHelperTest {

class MockFlowCompilationValidationHelper extends FlowCompilationValidationHelper {

public MockFlowCompilationValidationHelper(SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
SpecCompiler specCompiler, UserQuotaManager quotaManager, Optional<EventSubmitter> eventSubmitter,
FlowStatusGenerator flowStatusGenerator, boolean isFlowConcurrencyEnabled) {
super(sharedFlowMetricsSingleton, specCompiler, quotaManager, eventSubmitter, flowStatusGenerator,
isFlowConcurrencyEnabled);
}

/*
In overriden function simply return if concurrent execution is allowed or not because flowStatusGenerator will be
mocked
*/
@Override
protected boolean isExecutionPermitted(FlowStatusGenerator flowStatusGenerator, String flowName, String flowGroup,
boolean allowConcurrentExecution) {
return allowConcurrentExecution;
}

}

/*
Creates a mock {@link FlowCompilationValidationHelper} which has a valid {@link SpecCompiler} but mocks other
components.
*/
MockFlowCompilationValidationHelper createMockFlowCompilationValidationHelper(boolean isFlowConcurrencyEnabled) {
SpecCompiler specCompiler = new IdentityFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(new Properties()));
specCompiler.onAddSpec(IdentityFlowToJobSpecCompilerTest.initTopologySpec());
return new MockFlowCompilationValidationHelper(mock(SharedFlowMetricsSingleton.class), specCompiler,
mock(UserQuotaManager.class), Optional.of(mock(EventSubmitter.class)), mock(FlowStatusGenerator.class),
isFlowConcurrencyEnabled);
}

/**
* Verifies that a flow spec that compiles to create a valid job execution plan dag will also generate one after
* having a flow execution id key-value pair added to its config
* @throws IOException
* @throws InterruptedException
*/
@Test
public void compileFlowSpec() throws IOException, InterruptedException {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to extend this test to also test for serialization/deserialization of the jobexecutionplan that is stored in the dag just because that was the area that caused the fatal issues

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to extend this test for serialization/deserialization using the convertDagIntoState / convertStateObjIntoDag methods from MysqlDagStateStore, however right after compiling the FlowSpec is not ready for serialization/deserialization as there are more properties that are initialized and added to the JobExecutionPlan dag ie: Future and various URIs. I actually will remove the FlowSpec update completely to avoid these sort of issues.

MockFlowCompilationValidationHelper mockFlowCompilationValidationHelper = createMockFlowCompilationValidationHelper(true);
FlowSpec flowSpec = IdentityFlowToJobSpecCompilerTest.initFlowSpec();
Optional<Dag<JobExecutionPlan>> dagOptional = mockFlowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec);
// Assert FlowSpec compilation results in non-null or empty dag
Assert.assertTrue(dagOptional.isPresent());
Assert.assertNotNull(dagOptional.get());
Assert.assertTrue(dagOptional.get().getNodes().size() == 1);
Assert.assertEquals(dagOptional.get().getStartNodes().size(), 1);

// Update flow spec and check if still compiles and passes checks
flowSpec.addPropertyToConfigAsProperties(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "54321");
dagOptional = mockFlowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec);
// Assert FlowSpec compilation results in non-null or empty dag
Assert.assertTrue(dagOptional.isPresent());
Assert.assertNotNull(dagOptional.get());
Assert.assertTrue(dagOptional.get().getNodes().size() == 1);
Assert.assertEquals(dagOptional.get().getStartNodes().size(), 1);
}

}