Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
Expand Down Expand Up @@ -101,21 +102,24 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
private UserQuotaManager quotaManager;
private final FlowCompilationValidationHelper flowCompilationValidationHelper;
private Optional<FlowTriggerHandler> flowTriggerHandler;
private Optional<FlowCatalog> flowCatalog;
@Getter
private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;

private final ClassAliasResolver<SpecCompiler> aliasResolver;

public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager,
Optional<Logger> log, FlowStatusGenerator flowStatusGenerator, boolean instrumentationEnabled,
Optional<FlowTriggerHandler> flowTriggerHandler, SharedFlowMetricsSingleton sharedFlowMetricsSingleton) {
Optional<FlowTriggerHandler> flowTriggerHandler, SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
Optional<FlowCatalog> flowCatalog) {
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
this.topologyCatalog = topologyCatalog;
this.dagManager = dagManager;
this.flowStatusGenerator = flowStatusGenerator;
this.flowTriggerHandler = flowTriggerHandler;
this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
this.flowCatalog = flowCatalog;
try {
String specCompilerClassName = ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
if (config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
Expand Down Expand Up @@ -161,9 +165,9 @@ public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Op
@Inject
public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator, Optional<TopologyCatalog> topologyCatalog,
Optional<DagManager> dagManager, Optional<Logger> log, Optional<FlowTriggerHandler> flowTriggerHandler,
SharedFlowMetricsSingleton sharedFlowMetricsSingleton) {
SharedFlowMetricsSingleton sharedFlowMetricsSingleton, Optional<FlowCatalog> flowCatalog) {
this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true, flowTriggerHandler,
sharedFlowMetricsSingleton);
sharedFlowMetricsSingleton, flowCatalog);
}


Expand Down Expand Up @@ -359,6 +363,11 @@ public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan> jobE
try {
// Send the dag to the DagManager
this.dagManager.get().addDag(jobExecutionPlanDag, true, true);

// Delete spec from flow catalog for adhoc executions after persisting it in DagManager
if (!flowSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
this.flowCatalog.get().remove(flowSpec.getUri(), new Properties(), false);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this work for launch events of adhoc flows when a leader goes down? (i.e. need to re-compile the flow from the flow catalog in the dag action change monitor).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will. If we've gotten to the point where addDag persists the dag to dagStateStore, now its responsibility of dagManager to execute it. The multi-active scheduler component ensures persisting to dagActionStore so the event will not be reprocessed through dagActionChangeMonitor. The dagActionChangeMonitor only commits the offset after submitting a launch event to the Orchestrator for compiling and completing the addDag call. The new leader's DagManager will recover it from DagStateStore if the old leader never completes it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great news! let's consider a code comment to preserve this knowledge

} catch (Exception ex) {
String failureMessage = "Failed to add Job Execution Plan due to: " + ex.getMessage();
_log.warn("Orchestrator call - " + failureMessage, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
protected final Optional<FlowCatalog> flowCatalog;
protected final Optional<HelixManager> helixManager;
protected final Orchestrator orchestrator;
protected final Boolean warmStandbyEnabled;
protected final Boolean isWarmStandbyEnabled;
protected final Optional<UserQuotaManager> quotaManager;
protected final Optional<FlowTriggerHandler> flowTriggerHandler;
@Getter
Expand Down Expand Up @@ -170,7 +170,7 @@ public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
Config config,
Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog,
Orchestrator orchestrator, SchedulerService schedulerService, Optional<UserQuotaManager> quotaManager, Optional<Logger> log,
@Named(InjectionNames.WARM_STANDBY_ENABLED) boolean warmStandbyEnabled,
@Named(InjectionNames.WARM_STANDBY_ENABLED) boolean isWarmStandbyEnabled,
Optional<FlowTriggerHandler> flowTriggerHandler) throws Exception {
super(ConfigUtils.configToProperties(config), schedulerService);

Expand All @@ -185,7 +185,7 @@ public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
this.skipSchedulingFlowsAfterNumDays = Integer.parseInt(ConfigUtils.configToProperties(config).getProperty(ConfigurationKeys.SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS, String.valueOf(ConfigurationKeys.DEFAULT_NUM_DAYS_TO_SKIP_AFTER)));
this.isNominatedDRHandler = config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED)
&& config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED);
this.warmStandbyEnabled = warmStandbyEnabled;
this.isWarmStandbyEnabled = isWarmStandbyEnabled;
this.quotaManager = quotaManager;
this.flowTriggerHandler = flowTriggerHandler;
// Check that these metrics do not exist before adding, mainly for testing purpose which creates multiple instances
Expand All @@ -209,13 +209,13 @@ public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
public GobblinServiceJobScheduler(String serviceName, Config config, FlowStatusGenerator flowStatusGenerator,
Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog,
Optional<DagManager> dagManager, Optional<UserQuotaManager> quotaManager, SchedulerService schedulerService,
Optional<Logger> log, boolean warmStandbyEnabled, Optional <FlowTriggerHandler> flowTriggerHandler,
Optional<Logger> log, boolean isWarmStandbyEnabled, Optional <FlowTriggerHandler> flowTriggerHandler,
SharedFlowMetricsSingleton sharedFlowMetricsSingleton)
throws Exception {
this(serviceName, config, helixManager, flowCatalog, topologyCatalog,
new Orchestrator(config, flowStatusGenerator, topologyCatalog, dagManager, log, flowTriggerHandler,
sharedFlowMetricsSingleton),
schedulerService, quotaManager, log, warmStandbyEnabled, flowTriggerHandler);
sharedFlowMetricsSingleton, flowCatalog),
schedulerService, quotaManager, log, isWarmStandbyEnabled, flowTriggerHandler);
}

public synchronized void setActive(boolean isActive) {
Expand Down Expand Up @@ -560,7 +560,7 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
}
// Check quota limits against adhoc flows before saving the schedule
// In warm standby mode, this quota check will happen on restli API layer when we accept the flow
if (!this.warmStandbyEnabled && !jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
if (!this.isWarmStandbyEnabled && !jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
// This block should be reachable only for the execution for the adhoc flows
// For flow that has scheduler but run-immediately set to be true, we won't check the quota as we will use a different execution id later
if (quotaManager.isPresent()) {
Expand Down Expand Up @@ -820,7 +820,7 @@ public void run() {
}
}
}
GobblinServiceJobScheduler.this.flowCatalog.get().remove(specUri, new Properties(), false);
// Note that we only remove the spec from the flow catalog after it is orchestrated
GobblinServiceJobScheduler.this.scheduledFlowSpecs.remove(specUri.toString());
GobblinServiceJobScheduler.this.lastUpdatedTimeForFlowSpec.remove(specUri.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void setup() throws Exception {
this.orchestrator = new Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
this.mockStatusGenerator, Optional.of(this.topologyCatalog), Optional.<DagManager>absent(), Optional.of(logger),
Optional.of(this._mockFlowTriggerHandler), new SharedFlowMetricsSingleton(
ConfigUtils.propertiesToConfig(orchestratorProperties)));
ConfigUtils.propertiesToConfig(orchestratorProperties)), Optional.of(mock(FlowCatalog.class)));
this.topologyCatalog.addListener(orchestrator);
this.flowCatalog.addListener(orchestrator);
// Start application
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ public void testJobSchedulerAddFlowQuotaExceeded() throws Exception {
SchedulerService schedulerService = new SchedulerService(new Properties());
// Mock a GaaS scheduler not in warm standby mode
GobblinServiceJobScheduler scheduler = new GobblinServiceJobScheduler("testscheduler",
ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService, Optional.of(new InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), false, Optional.of(Mockito.mock(
ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService,
Optional.of(new InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), false, Optional.of(Mockito.mock(
FlowTriggerHandler.class)));

schedulerService.startAsync().awaitRunning();
Expand All @@ -373,7 +374,8 @@ public void testJobSchedulerAddFlowQuotaExceeded() throws Exception {

//Mock a GaaS scheduler in warm standby mode, where we don't check quota
GobblinServiceJobScheduler schedulerWithWarmStandbyEnabled = new GobblinServiceJobScheduler("testscheduler",
ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService, Optional.of(new InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), true, Optional.of(Mockito.mock(
ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService,
Optional.of(new InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), true, Optional.of(Mockito.mock(
FlowTriggerHandler.class)));

schedulerWithWarmStandbyEnabled.startUp();
Expand All @@ -396,8 +398,8 @@ class TestGobblinServiceJobScheduler extends GobblinServiceJobScheduler {
public TestGobblinServiceJobScheduler(String serviceName, Config config,
Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Orchestrator orchestrator, Optional<UserQuotaManager> quotaManager,
SchedulerService schedulerService, boolean isWarmStandbyEnabled) throws Exception {
super(serviceName, config, Optional.absent(), flowCatalog, topologyCatalog, orchestrator, schedulerService, quotaManager, Optional.absent(), isWarmStandbyEnabled, Optional.of(Mockito.mock(
FlowTriggerHandler.class)));
super(serviceName, config, Optional.absent(), flowCatalog, topologyCatalog, orchestrator, schedulerService,
quotaManager, Optional.absent(), isWarmStandbyEnabled, Optional.of(Mockito.mock(FlowTriggerHandler.class)));
if (schedulerService != null) {
hasScheduler = true;
}
Expand Down