Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public FlowConfig getFlowConfig(FlowId flowId) throws FlowConfigLoggedException

try {
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
FlowSpec spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
FlowSpec spec = flowCatalog.getSpecs(flowUri);
return FlowSpec.Utils.toFlowConfig(spec);
} catch (URISyntaxException e) {
throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowId.getFlowName(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.linkedin.data.template.StringMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -35,6 +36,7 @@
import java.util.List;
import java.util.Properties;
import java.util.Set;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -56,6 +58,7 @@
*
*/
@Alpha
@AllArgsConstructor
@Data
@EqualsAndHashCode(exclude={"compilationErrors"})
@SuppressFBWarnings(value="SE_BAD_FIELD",
Expand All @@ -75,13 +78,23 @@ public class FlowSpec implements Configurable, Spec {
/** Human-readable description of the flow spec */
final String description;

/** Flow config as a typesafe config object */
final Config config;
/* Note that since getConfig() and getConfigAsProperties() are independent accessors, `volatile` doesn't ensure a
* consistent view between them. If one wants to access both, they should briefly synchronize on the FlowSpec object
* while obtaining them:
* FlowSpec fs = ...
* synchronized (fs) {
* fs.getConfig()
* fs.getConfigAsProperties()
* }
*/

/** Flow config as a typesafe config object which can be replaced */
private volatile Config config;

/** Flow config as a properties collection for backwards compatibility */
// Note that this property is not strictly necessary as it can be generated from the typesafe
// config. We use it as a cache until typesafe config is more widely adopted in Gobblin.
final Properties configAsProperties;
private volatile Properties configAsProperties;

/** URI of {@link org.apache.gobblin.runtime.api.JobTemplate} to use. */
final Optional<Set<URI>> templateURIs;
Expand Down Expand Up @@ -125,6 +138,24 @@ public static FlowSpec.Builder builder(URI catalogURI, Properties flowProps) {
}
}

/**
* Add property to Config (also propagated to the Properties field). These two fields should only be modified through
* this method to prevent inconsistency between them.
* Note that when the property is being added, config and configAsProperties can have different values, but they will
* be consistent by the time method returns.
* @param key
* @param value
*/
public synchronized void addProperty(String key, String value) {
this.config = config.withValue(key, ConfigValueFactory.fromAnyRef(value));
/* Make sure configAsProperties has been initialized. If it's just initialized, setting the property will be a
redundant operation. However, if it already existed we need to update/add the key-value pair.
*/
this.getConfigAsProperties();
this.configAsProperties.setProperty(key, value);

}

public void addCompilationError(String src, String dst, String errorMessage, int numberOfHops) {
this.compilationErrors.add(new CompilationError(getConfig(), src, dst, errorMessage, numberOfHops));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,11 @@ public boolean exists(URI uri) {
}

@Override
public Spec getSpecs(URI uri) throws SpecNotFoundException {
public FlowSpec getSpecs(URI uri) throws SpecNotFoundException {
try {
return specStore.getSpec(uri);
return (FlowSpec) specStore.getSpec(uri);
} catch (IOException e) {
throw new RuntimeException("Cannot retrieve Spec from Spec store for URI: " + uri, e);
throw new RuntimeException("Cannot retrieve FlowSpec from FlowSpec store for URI: " + uri, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.runtime.api;

import com.typesafe.config.Config;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Properties;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.service.FlowId;
import org.testng.Assert;
import org.testng.annotations.Test;


public class FlowSpecTest {

/**
* Tests that the addProperty() function to ensure the new flowSpec returned has the original properties and updated
* ones
* @throws URISyntaxException
*/
@Test
public void testAddProperty() throws URISyntaxException {
String flowGroup = "myGroup";
String flowName = "myName";
String flowExecutionId = "1234";
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);

// Create properties to be used as config
Properties properties = new Properties();
properties.setProperty(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup);
properties.setProperty(ConfigurationKeys.FLOW_NAME_KEY, flowName);
properties.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, "true");

FlowSpec flowSpec = FlowSpec.builder(flowUri).withConfigAsProperties(properties).build();
flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);

Properties updatedProperties = flowSpec.getConfigAsProperties();
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowExecutionId);
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_GROUP_KEY), flowGroup);
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_NAME_KEY), flowName);
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY), "true");

Config updatedConfig = flowSpec.getConfig();
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowExecutionId);
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY), flowGroup);
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_NAME_KEY), flowName);
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY), "true");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil
} else {
TimingEvent flowCompilationTimer = new TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
Optional<Dag<JobExecutionPlan>> compiledDagOptional =
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig, spec, flowGroup,
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup,
flowName);

if (!compiledDagOptional.isPresent()) {
Expand All @@ -264,7 +264,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil
}
Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
if (compiledDag.isEmpty()) {
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter, spec, flowMetadata);
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, flowMetadata);
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
SharedFlowMetricsSingleton.CompiledState.FAILED);
Expand All @@ -288,10 +288,9 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil
Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
}

public void submitFlowToDagManager(FlowSpec flowSpec, DagActionStore.DagAction flowAction) throws IOException, InterruptedException {
public void submitFlowToDagManager(FlowSpec flowSpec) throws IOException, InterruptedException {
Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec,
Optional.of(flowAction.getFlowExecutionId()));
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec);
if (optionalJobExecutionPlanDag.isPresent()) {
submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ private void unscheduleSpec(URI specURI, String specVersion) throws JobException
this.lastUpdatedTimeForFlowSpec.remove(specURI.toString());
unscheduleJob(specURI.toString());
try {
FlowSpec spec = (FlowSpec) this.flowCatalog.get().getSpecs(specURI);
FlowSpec spec = this.flowCatalog.get().getSpecs(specURI);
Properties properties = spec.getConfigAsProperties();
_log.info(jobSchedulerTracePrefixBuilder(properties) + "Unscheduled Spec");
} catch (SpecNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
Expand Down Expand Up @@ -65,13 +64,10 @@ public final class FlowCompilationValidationHelper {
* flowspec can be compiled. If the pre-conditions hold, then a JobExecutionPlan is constructed and returned to the
* caller.
* @param flowSpec
* @param optionalFlowExecutionId for scheduled (non-ad-hoc) flows, to pass the ID "laundered" via the DB;
* see: {@link org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter javadoc section titled
* `Database event_timestamp laundering`}
* @return jobExecutionPlan dag if one can be constructed for the given flowSpec
*/
public Optional<Dag<JobExecutionPlan>> createExecutionPlanIfValid(FlowSpec flowSpec,
Optional<String> optionalFlowExecutionId) throws IOException, InterruptedException {
public Optional<Dag<JobExecutionPlan>> createExecutionPlanIfValid(FlowSpec flowSpec)
throws IOException, InterruptedException {
Config flowConfig = flowSpec.getConfig();
String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
Expand All @@ -93,43 +89,43 @@ public Optional<Dag<JobExecutionPlan>> createExecutionPlanIfValid(FlowSpec flowS
return Optional.absent();
}

addFlowExecutionIdIfAbsent(flowMetadata, optionalFlowExecutionId, jobExecutionPlanDagOptional.get());
addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDagOptional.get());
flowCompilationTimer.stop(flowMetadata);
return jobExecutionPlanDagOptional;
}

/**
* Checks if flowSpec disallows concurrent executions, and if so then checks if another instance of the flow is
* already running and emits a FLOW FAILED event. Otherwise, this check passes.
* @return Optional<Dag<JobExecutionPlan>> if caller allowed to execute flow and compile spec, else absent Optional
* @return Optional<Dag<JobExecutionPlan>> if caller allowed to execute flow and compile flowSpec, else Optional.absent()
* @throws IOException
*/
public Optional<Dag<JobExecutionPlan>> validateAndHandleConcurrentExecution(Config flowConfig, Spec spec,
public Optional<Dag<JobExecutionPlan>> validateAndHandleConcurrentExecution(Config flowConfig, FlowSpec flowSpec,
String flowGroup, String flowName) throws IOException {
boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, isFlowConcurrencyEnabled);

Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(flowSpec);

if (isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, allowConcurrentExecution)) {
return Optional.fromNullable(jobExecutionPlanDag);
} else {
log.warn("Another instance of flowGroup: {}, flowName: {} running; Skipping flow execution since "
+ "concurrent executions are disabled for this flow.", flowGroup, flowName);
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(flowSpec,
SharedFlowMetricsSingleton.CompiledState.SKIPPED);
Instrumented.markMeter(sharedFlowMetricsSingleton.getSkippedFlowsMeter());
if (!isScheduledFlow((FlowSpec) spec)) {
if (!flowSpec.isScheduled()) {
// For ad-hoc flow, we might already increase quota, we need to decrease here
for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
quotaManager.releaseQuota(dagNode);
}
}

// Send FLOW_FAILED event
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec);
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec);
flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because another instance is running and concurrent "
+ "executions are disabled. Set flow.allowConcurrentExecution to true in the flow spec to change this behaviour.");
+ "executions are disabled. Set flow.allowConcurrentExecution to true in the flowSpec to change this behaviour.");
new TimingEvent(eventSubmitter, TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
return Optional.absent();
}
Expand All @@ -150,53 +146,34 @@ private boolean isExecutionPermitted(FlowStatusGenerator flowStatusGenerator, St

/**
* Abstraction used to populate the message of and emit a FlowCompileFailed event for the Orchestrator.
* @param spec
* @param flowSpec
* @param flowMetadata
*/
public static void populateFlowCompilationFailedEventMessage(EventSubmitter eventSubmitter, Spec spec,
Map<String, String> flowMetadata) {
public static void populateFlowCompilationFailedEventMessage(EventSubmitter eventSubmitter,
FlowSpec flowSpec, Map<String, String> flowMetadata) {
// For scheduled flows, we do not insert the flowExecutionId into the FlowSpec. As a result, if the flow
// compilation fails (i.e. we are unable to find a path), the metadata will not have flowExecutionId.
// In this case, the current time is used as the flow executionId.
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
Long.toString(System.currentTimeMillis()));

String message = "Flow was not compiled successfully.";
if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
message = message + " Compilation errors encountered: " + ((FlowSpec) spec).getCompilationErrors();
if (!flowSpec.getCompilationErrors().isEmpty()) {
message = message + " Compilation errors encountered: " + flowSpec.getCompilationErrors();
}
flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);

new TimingEvent(eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILE_FAILED).stop(flowMetadata);
}

/**
* If it is a scheduled flow (which does not have flowExecutionId in the FlowSpec) and the flow compilation is
* successful, retrieve flowExecutionId from the JobSpec.
* If it is a scheduled flow run without multi-active scheduler configuration (where the FlowSpec does not have a
* flowExecutionId) and the flow compilation is successful, retrieve flowExecutionId from the JobSpec.
*/
public static void addFlowExecutionIdIfAbsent(Map<String,String> flowMetadata,
Dag<JobExecutionPlan> jobExecutionPlanDag) {
addFlowExecutionIdIfAbsent(flowMetadata, Optional.absent(), jobExecutionPlanDag);
}

/**
* If it is a scheduled flow (which does not have flowExecutionId in the FlowSpec) and the flow compilation is
* successful, add a flowExecutionId using the optional parameter if it exists otherwise retrieve it from the JobSpec.
*/
public static void addFlowExecutionIdIfAbsent(Map<String,String> flowMetadata,
Optional<String> optionalFlowExecutionId, Dag<JobExecutionPlan> jobExecutionPlanDag) {
if (optionalFlowExecutionId.isPresent()) {
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, optionalFlowExecutionId.get());
}
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(
ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
}

/**
* Return true if the spec contains a schedule, false otherwise.
*/
public static boolean isScheduledFlow(FlowSpec spec) {
return spec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
Expand Down Expand Up @@ -275,9 +276,13 @@ protected void submitFlowToDagManagerHelper(DagActionStore.DagAction dagAction,
FlowSpec spec = null;
try {
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
// Pass flowExecutionId to DagManager to be used for scheduled flows that do not already contain a flowExecutionId
this.orchestrator.submitFlowToDagManager(spec, dagAction);
spec = flowCatalog.getSpecs(flowUri);
/* Update the spec to contain the flowExecutionId from the dagAction for scheduled flows that do not already
contain a flowExecutionId. Adhoc flowSpecs are already consistent with the dagAction so there's no effective
change. It's crucial to adopt the consensus flowExecutionId here to prevent creating a new one during compilation.
*/
spec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, dagAction.getFlowExecutionId());
this.orchestrator.submitFlowToDagManager(spec);
} catch (URISyntaxException e) {
log.warn("Could not create URI object for flowId {}. Exception {}", flowId, e.getMessage());
launchSubmissionMetricProxy.markFailure();
Expand Down
Loading