-
Notifications
You must be signed in to change notification settings - Fork 751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-1863] Multi-Active Launch Job Related Issues #3727
Conversation
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
Outdated
Show resolved
Hide resolved
public void submitFlowToDagManager(FlowSpec flowSpec) | ||
throws IOException { | ||
submitFlowToDagManager(flowSpec, specCompiler.compileFlow(flowSpec)); | ||
Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(flowSpec); | ||
if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) { | ||
emitFlowCompilationFailedEvent(flowSpec, TimingEventUtils.getFlowMetadata(flowSpec)); | ||
return; | ||
} | ||
submitFlowToDagManager(flowSpec, jobExecutionPlanDag); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I'm wondering why we don't directly call the orchestrate()
function from the SpecMonitor instead of submitFlowToDagManager
, because there are a few more checks done in that function, namely:
- Checking for another execution
- Ensuring that the specCompiler is ready before compilation (happens less these days with a file based compiler but still possible that there will be a race condition introduced where you try to compile a flow when the service starts up but the flowgraph is not finished loading into memory).
If this is not possible, we need to abstract these checks to another function and ensure that they're done before submitting to the dagmanager in any configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't call orchestrate function directly from the DagActionStoreMonitor
because the chain of events is Scheduler -> Orchestrator -> FlowTriggerHandler (Multi-Active algorithm) -> MySQL -> Brooklin change stream -> DagActionStoreMonitor -> DagManager
so we don't want the MA algorithm triggered again. However, I did end up abstracting all the checks done to compile the flow and ensure an execution is allowed to re-use when submitting the flow to the DagManager
from the DagActionStoreMonitor
public void handleLaunchFlowEvent(DagActionStore.DagAction action) { | ||
if (this.specCompiler.isPresent()) { | ||
FlowId flowId = new FlowId().setFlowGroup(action.getFlowGroup()).setFlowName(action.getFlowName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment here as the one above, wanted to double check that all the checks are being done before submitting these flows which the orchestrator is doing, otherwise there will be some bugs introduced that are guarded against. Is it possible to use a helper or a static function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re-used helper function created in Orchestrator
...lin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
Outdated
Show resolved
Hide resolved
Optional<TimingEvent> flowCompileFailedTimer = eventSubmitter.transform(submitter -> | ||
new TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILE_FAILED)); | ||
|
||
if (flowCompileFailedTimer.isPresent()) { | ||
flowCompileFailedTimer.get().stop(flowMetadata); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this works (w/o being challenged by checked exceptions), it would be more idiomatic:
eventSubmitter.transform(...).ifPresent(timer -> timer.stop(fmd));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See previous response about Java Optional vs. Google Optional
return; | ||
} | ||
submitFlowToDagManager(flowSpec, jobExecutionPlanDag); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find else
MUCH preferable to the early return here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With lots of nesting it becomes hard to tell at which points we may have exited the code block, I'd prefer early return since I end up adding a few more checks that must pass before we submit to the dagManager.
* @param spec | ||
* @param flowMetadata | ||
*/ | ||
public void emitFlowCompilationFailedEvent(Spec spec, Map<String, String> flowMetadata) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no biggie, but naming-wise, I agree the timing is emitted... but what may be most noteworthy is that it inserts the METADATA_MESSAGE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed to populateFlowCompilationFailedEventMessage
, what do you think?
* the process. | ||
*/ | ||
public void handleLaunchFlowEvent(DagActionStore.DagAction action) { | ||
if (this.specCompiler.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.specCompiler.ifPresent(theSpecCompiler -> {
...
});
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is using google library's Optional which does not have this method. I've generally seen us using this Optional version so will keep as is.
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
Outdated
Show resolved
Hide resolved
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
Outdated
Show resolved
Hide resolved
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
Outdated
Show resolved
Hide resolved
...lin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
Show resolved
Hide resolved
try { | ||
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId); | ||
spec = (FlowSpec) flowCatalog.getSpecs(flowUri); | ||
Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag = orchestrator.handleChecksBeforeExecution(spec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking since we need to pass the orchestrator to the dagmanager here which seems like an antipattern, it would be easier to have the orchestrator handle the dagmanager launch event on startup. What would the LOE be in that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving this function to the Orchestrator
, does not actually change the code pattern here since there's only a bit of wrapper code (mainly to get the spec corresponding to the dagAction
) around the Orchestrator
helper funcs that it relies on. It's more clear to keep it here since the use case is DagManager
specific
Codecov Report
@@ Coverage Diff @@
## master #3727 +/- ##
============================================
- Coverage 47.45% 47.08% -0.38%
- Complexity 8687 10856 +2169
============================================
Files 1731 2144 +413
Lines 66736 84746 +18010
Branches 7219 9409 +2190
============================================
+ Hits 31671 39902 +8231
- Misses 32306 41222 +8916
- Partials 2759 3622 +863
... and 426 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
* DagManager checks leader status before adding dag to avoid NPE * handle launch events after leader change in DagManager * Refactor Orchestrator and DagManager * remove unecessary specCompiler changes * add docstring --------- Co-authored-by: Urmi Mustafi <[email protected]>
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
Bugs
DagManager
check leader status beforeaddDag
bc calling this method from non-leader hosts throws aNPE
which may caused failed dag event to be emitted. Then those job executions are skipped since they are marked as failed.LAUNCH
type events upon leader change and setting a new participantDagManager
to be active. Failing to handle these events may be causing missed flow launches on any leader change or restart.Refactoring Done
DagManager
andOrchestrator
. It takes in theeventSubmitter
as an argument to emit the event.Orchestrator.orchestrate
done to compile the flow and ensure an execution is allowed to be re-used by the DagActionStoreChangeMonitor when submitting the flow to theDagManager
by callingsubmitFlowToDagManager
.DagManager
.Tests
NA
Commits