Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
Expand Down Expand Up @@ -101,21 +102,24 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
private UserQuotaManager quotaManager;
private final FlowCompilationValidationHelper flowCompilationValidationHelper;
private Optional<FlowTriggerHandler> flowTriggerHandler;
private Optional<FlowCatalog> flowCatalog;
@Getter
private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;

private final ClassAliasResolver<SpecCompiler> aliasResolver;

public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager,
Optional<Logger> log, FlowStatusGenerator flowStatusGenerator, boolean instrumentationEnabled,
Optional<FlowTriggerHandler> flowTriggerHandler, SharedFlowMetricsSingleton sharedFlowMetricsSingleton) {
Optional<FlowTriggerHandler> flowTriggerHandler, SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
Optional<FlowCatalog> flowCatalog) {
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
this.topologyCatalog = topologyCatalog;
this.dagManager = dagManager;
this.flowStatusGenerator = flowStatusGenerator;
this.flowTriggerHandler = flowTriggerHandler;
this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
this.flowCatalog = flowCatalog;
try {
String specCompilerClassName = ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
if (config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
Expand Down Expand Up @@ -161,9 +165,9 @@ public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Op
@Inject
public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator, Optional<TopologyCatalog> topologyCatalog,
Optional<DagManager> dagManager, Optional<Logger> log, Optional<FlowTriggerHandler> flowTriggerHandler,
SharedFlowMetricsSingleton sharedFlowMetricsSingleton) {
SharedFlowMetricsSingleton sharedFlowMetricsSingleton, Optional<FlowCatalog> flowCatalog) {
this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true, flowTriggerHandler,
sharedFlowMetricsSingleton);
sharedFlowMetricsSingleton, flowCatalog);
}


Expand Down Expand Up @@ -234,7 +238,8 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil

long startTime = System.nanoTime();
if (spec instanceof FlowSpec) {
Config flowConfig = ((FlowSpec) spec).getConfig();
FlowSpec flowSpec = (FlowSpec) spec;
Config flowConfig = (flowSpec).getConfig();
String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);

Expand All @@ -248,7 +253,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
return;
}
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec);
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

much nicer to get rid of those casts so far separated from the instanceof check!

FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDagOptional.get());
java.util.Optional<String> flowExecutionId = TimingEventUtils.getFlowExecutionIdFromFlowMetadata(flowMetadata);

Expand Down Expand Up @@ -299,7 +304,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil

// Depending on if DagManager is present, handle execution
if (this.dagManager.isPresent()) {
submitFlowToDagManager((FlowSpec) spec, jobExecutionPlanDag);
submitFlowToDagManager(flowSpec, jobExecutionPlanDag);
Comment on lines 306 to +307

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's consider dropping support for running GaaS w/o the DagManager. what are your thoughts @Will-Lo ?

(not as part of this PR, but as a follow-up in the next month or so)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think we shouldn't need to support a scenario where dagmanager is optional given how important it is for lifecycle management

} else {
// Schedule all compiled JobSpecs on their respective Executor
for (Dag.DagNode<JobExecutionPlan> dagNode : jobExecutionPlanDag.getNodes()) {
Expand Down Expand Up @@ -332,6 +337,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil
+ " for flow: " + spec, e);
}
}
deleteSpecFromCatalogIfAdhoc(flowSpec);
}
}
} else {
Expand Down Expand Up @@ -359,6 +365,13 @@ public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan> jobE
try {
// Send the dag to the DagManager
this.dagManager.get().addDag(jobExecutionPlanDag, true, true);

/*
Adhoc flows can be deleted after persisting it in DagManager as the DagManager's failure recovery method ensures
it will be executed in the event of downtime. Note that the responsibility of the multi-active scheduler mode ends
after this method is completed AND the consumption of a launch type event is committed to the consumer.
*/
deleteSpecFromCatalogIfAdhoc(flowSpec);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember correctly, they should already be getting deleted from the flow catalog

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To confirm, did you check the callers of FlowCatalog remove method ? Is it missing some case?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the deletion in NonScheduledJobRunner so that is causing current tests to fail. This is only called in case where dagManager is not present

} catch (Exception ex) {
String failureMessage = "Failed to add Job Execution Plan due to: " + ex.getMessage();
_log.warn("Orchestrator call - " + failureMessage, ex);
Expand Down Expand Up @@ -394,6 +407,33 @@ public void remove(Spec spec, Properties headers) throws IOException {
}
}

@Nonnull
@Override
public MetricContext getMetricContext() {
return this.metricContext;
}

@Override
public boolean isInstrumentationEnabled() {
return null != this.metricContext;
}

@Override
public List<Tag<?>> generateTags(State state) {
return Collections.emptyList();
}

@Override
public void switchMetricContext(List<Tag<?>> tags) {
throw new UnsupportedOperationException();
}

@Override
public void switchMetricContext(MetricContext context) {
throw new UnsupportedOperationException();
}


private void deleteFromExecutor(Spec spec, Properties headers) {
Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);

Expand All @@ -419,29 +459,12 @@ private void deleteFromExecutor(Spec spec, Properties headers) {
}
}

@Nonnull
@Override
public MetricContext getMetricContext() {
return this.metricContext;
}

@Override
public boolean isInstrumentationEnabled() {
return null != this.metricContext;
}

@Override
public List<Tag<?>> generateTags(State state) {
return Collections.emptyList();
}

@Override
public void switchMetricContext(List<Tag<?>> tags) {
throw new UnsupportedOperationException();
}

@Override
public void switchMetricContext(MetricContext context) {
throw new UnsupportedOperationException();
/*
Deletes spec from flowCatalog if it is an adhoc flow (not containing a job schedule)
*/
private void deleteSpecFromCatalogIfAdhoc(FlowSpec flowSpec) {
if (!flowSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
this.flowCatalog.get().remove(flowSpec.getUri(), new Properties(), false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
protected final Optional<FlowCatalog> flowCatalog;
protected final Optional<HelixManager> helixManager;
protected final Orchestrator orchestrator;
protected final Boolean warmStandbyEnabled;
protected final Boolean isWarmStandbyEnabled;
protected final Optional<UserQuotaManager> quotaManager;
protected final Optional<FlowTriggerHandler> flowTriggerHandler;
@Getter
Expand Down Expand Up @@ -170,7 +170,7 @@ public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
Config config,
Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog,
Orchestrator orchestrator, SchedulerService schedulerService, Optional<UserQuotaManager> quotaManager, Optional<Logger> log,
@Named(InjectionNames.WARM_STANDBY_ENABLED) boolean warmStandbyEnabled,
@Named(InjectionNames.WARM_STANDBY_ENABLED) boolean isWarmStandbyEnabled,
Optional<FlowTriggerHandler> flowTriggerHandler) throws Exception {
super(ConfigUtils.configToProperties(config), schedulerService);

Expand All @@ -185,7 +185,7 @@ public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
this.skipSchedulingFlowsAfterNumDays = Integer.parseInt(ConfigUtils.configToProperties(config).getProperty(ConfigurationKeys.SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS, String.valueOf(ConfigurationKeys.DEFAULT_NUM_DAYS_TO_SKIP_AFTER)));
this.isNominatedDRHandler = config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED)
&& config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED);
this.warmStandbyEnabled = warmStandbyEnabled;
this.isWarmStandbyEnabled = isWarmStandbyEnabled;
this.quotaManager = quotaManager;
this.flowTriggerHandler = flowTriggerHandler;
// Check that these metrics do not exist before adding, mainly for testing purpose which creates multiple instances
Expand All @@ -209,13 +209,13 @@ public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
public GobblinServiceJobScheduler(String serviceName, Config config, FlowStatusGenerator flowStatusGenerator,
Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog,
Optional<DagManager> dagManager, Optional<UserQuotaManager> quotaManager, SchedulerService schedulerService,
Optional<Logger> log, boolean warmStandbyEnabled, Optional <FlowTriggerHandler> flowTriggerHandler,
Optional<Logger> log, boolean isWarmStandbyEnabled, Optional <FlowTriggerHandler> flowTriggerHandler,
SharedFlowMetricsSingleton sharedFlowMetricsSingleton)
throws Exception {
this(serviceName, config, helixManager, flowCatalog, topologyCatalog,
new Orchestrator(config, flowStatusGenerator, topologyCatalog, dagManager, log, flowTriggerHandler,
sharedFlowMetricsSingleton),
schedulerService, quotaManager, log, warmStandbyEnabled, flowTriggerHandler);
sharedFlowMetricsSingleton, flowCatalog),
schedulerService, quotaManager, log, isWarmStandbyEnabled, flowTriggerHandler);
}

public synchronized void setActive(boolean isActive) {
Expand Down Expand Up @@ -560,7 +560,7 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
}
// Check quota limits against adhoc flows before saving the schedule
// In warm standby mode, this quota check will happen on restli API layer when we accept the flow
if (!this.warmStandbyEnabled && !jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
if (!this.isWarmStandbyEnabled && !jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
// This block should be reachable only for the execution for the adhoc flows
// For flow that has scheduler but run-immediately set to be true, we won't check the quota as we will use a different execution id later
if (quotaManager.isPresent()) {
Expand Down Expand Up @@ -820,7 +820,7 @@ public void run() {
}
}
}
GobblinServiceJobScheduler.this.flowCatalog.get().remove(specUri, new Properties(), false);
// Note that we only remove the spec from the flow catalog after it is orchestrated
GobblinServiceJobScheduler.this.scheduledFlowSpecs.remove(specUri.toString());
GobblinServiceJobScheduler.this.lastUpdatedTimeForFlowSpec.remove(specUri.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void setup() throws Exception {
this.orchestrator = new Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
this.mockStatusGenerator, Optional.of(this.topologyCatalog), Optional.<DagManager>absent(), Optional.of(logger),
Optional.of(this._mockFlowTriggerHandler), new SharedFlowMetricsSingleton(
ConfigUtils.propertiesToConfig(orchestratorProperties)));
ConfigUtils.propertiesToConfig(orchestratorProperties)), Optional.of(mock(FlowCatalog.class)));
this.topologyCatalog.addListener(orchestrator);
this.flowCatalog.addListener(orchestrator);
// Start application
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ public void testJobSchedulerAddFlowQuotaExceeded() throws Exception {
serviceLauncher.addService(flowCatalog);
serviceLauncher.start();

// We need to test adhoc flows since scheduled flows do not have a quota check in the scheduler

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be missing something here but even scheduled flows with RUN IMMEDIATELY set to true still follows this behavior?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I tried to test scheduled run immediately flows I saw this code block and see its only for adhoc

// For flow that has scheduler but run-immediately set to be true, we won't check the quota as we will use a different execution id later

FlowSpec flowSpec0 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"), "flowName0", "group1",
ConfigFactory.empty(), true);
FlowSpec flowSpec1 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"), "flowName1", "group1",
Expand All @@ -354,7 +355,8 @@ public void testJobSchedulerAddFlowQuotaExceeded() throws Exception {
SchedulerService schedulerService = new SchedulerService(new Properties());
// Mock a GaaS scheduler not in warm standby mode
GobblinServiceJobScheduler scheduler = new GobblinServiceJobScheduler("testscheduler",
ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService, Optional.of(new InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), false, Optional.of(Mockito.mock(
ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService,
Optional.of(new InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), false, Optional.of(Mockito.mock(
FlowTriggerHandler.class)));

schedulerService.startAsync().awaitRunning();
Expand All @@ -363,17 +365,17 @@ public void testJobSchedulerAddFlowQuotaExceeded() throws Exception {

scheduler.onAddSpec(flowSpec0); //Ignore the response for this request
Assert.assertThrows(RuntimeException.class, () -> scheduler.onAddSpec(flowSpec1));
// We don't check scheduledFlowSpecs size here because it results in a flaky timing issue where the spec may be
// deleted for adhoc flows before we assert the size.

Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1);
// Second flow should not be added to scheduled flows since it was rejected
Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1);
// set scheduler to be inactive and unschedule flows
scheduler.setActive(false);
Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 0);

//Mock a GaaS scheduler in warm standby mode, where we don't check quota
GobblinServiceJobScheduler schedulerWithWarmStandbyEnabled = new GobblinServiceJobScheduler("testscheduler",
ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService, Optional.of(new InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), true, Optional.of(Mockito.mock(
ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService,
Optional.of(new InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), true, Optional.of(Mockito.mock(
FlowTriggerHandler.class)));

schedulerWithWarmStandbyEnabled.startUp();
Expand All @@ -396,8 +398,8 @@ class TestGobblinServiceJobScheduler extends GobblinServiceJobScheduler {
public TestGobblinServiceJobScheduler(String serviceName, Config config,
Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Orchestrator orchestrator, Optional<UserQuotaManager> quotaManager,
SchedulerService schedulerService, boolean isWarmStandbyEnabled) throws Exception {
super(serviceName, config, Optional.absent(), flowCatalog, topologyCatalog, orchestrator, schedulerService, quotaManager, Optional.absent(), isWarmStandbyEnabled, Optional.of(Mockito.mock(
FlowTriggerHandler.class)));
super(serviceName, config, Optional.absent(), flowCatalog, topologyCatalog, orchestrator, schedulerService,
quotaManager, Optional.absent(), isWarmStandbyEnabled, Optional.of(Mockito.mock(FlowTriggerHandler.class)));
if (schedulerService != null) {
hasScheduler = true;
}
Expand Down