Skip to content

[GOBBLIN-1837] Implement multi-active, non blocking for leader host#3700

Merged
Will-Lo merged 11 commits intoapache:masterfrom
umustafi:multiActiveScheduler
Jun 15, 2023
Merged

[GOBBLIN-1837] Implement multi-active, non blocking for leader host#3700
Will-Lo merged 11 commits intoapache:masterfrom
umustafi:multiActiveScheduler

Conversation

@umustafi
Copy link
Copy Markdown
Contributor

@umustafi umustafi commented May 26, 2023

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):
    This task will include the implementation of non-blocking, multi-active scheduler for each host. The algorithm is described in much greater detail in the Java docs written for the classes below. Please read them for clarification. This PR will NOT include metric emission or unit tests for validation. That will be done in a separate follow-up ticket. The work in this ticket includes

New Classes

  • define a general interface MultiActiveLeaseArbiter used to define a generic approach for the non-blocking, multi participant system which will be used for the Scheduler but can be extended in the future to DagManager and other modules of the system that we want to alter to multi-active
  • implements a MySQL based implementation of MultiActiveLeaseArbiter called MsqlMultiActiveLeaseArbiter which uses a MySQL store to resolve ownership of a flow event among multiple competing participants
  • defines a table of constants in the database to have consistency across participants when applying the non-blocking algorithm above
  • defines a handler FlowTriggerHandler to coordinate between hosts with enabled schedulers to respond to flow action events -> only handling triggers to LAUNCH an event in the meantime

Modifications to Existing Classes

  • update DagActionStore schema and DagActionStoreMonitor to act upon new LAUNCH type events in addition to KILL/RESUME
  • update JobScheduler to store the timestamp of the trigger event within it's job properties
  • update Orchestrator logic to trigger the event using the algorithm above if multi-active scheduler is enabled, otherwise submit events directly to the DagManager after receiving a scheduler trigger

NOTE: because I'm updating the DagActionStore schema this change will require manually altering the primary key of the table before deploying these changes. MySQL only creates/updates the table if the same table name does not exist.

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:
    Limiting the scope of this PR to the implementation and will focus on metrics, logging for validation, and unit tests in a separate PR.

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Copy link
Copy Markdown
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall, very nice work urmi!

(I have a bit more to go, since this is a BIG PR, but here are my first thoughts)

*/
DagAction getDagAction(String flowGroup, String flowName, String flowExecutionId) throws IOException, SpecNotFoundException,
SQLException;
DagAction getDagAction(String flowGroup, String flowName, String flowExecutionId, DagActionValue dagActionValue)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one thing I don't understand: a DagAction has exactly the four fields that are params to this method. if so, does this method just duplicate exists?

merely wondering: is there a use case for getting any and all actions related to a particular flow execution?

relatedly w/ deleteDagAction (above), couldn't that take just a single param of type DagAction?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right actually using getDagAction doesn't make sense anymore since the action is part of the primary key. Instead it may be useful to have getDagActions(flow identifiers) to get all pending actions associated with a flow right now. We don't have any explicit use case at the moment so I will remove this method.

Any method now with the store needs all columns that comprise the primary key, so we can actually pass DagAction to any of these functions but looking at how the functions are used we will end up creating a new DagAction object then pass to the function then unpack those values anyway so I am not certain that changing the signature is that beneficial unless we care more about encapsulating the idea that the PK is needed for all of these actions and that DagAction is PK.

Copy link
Copy Markdown
Contributor

@phet phet May 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the delete case especially, I wondered whether we'd already have a DagAction on hand.

overall, and IMO unfortunately, we use quite little abstraction throughout gobblin service. most emblematic is the regular use of (String, String, String) or (String, String, long) to specify a flow execution ID. an alternative impl, by contrast might combine a FlowExecutionId w/ a DagActionValue to form a DagAction. that would be not only more succinct and self-documenting, but also more typesafe. it's from this general perspective that I prefer the signature:

deleteDagAction(DagAction)

to

deleteDagAction(String, String, String, DagActionValue)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing to utilize DagAction in signature but leaving the other methods as is for now.

Comment on lines +46 to +48
RETRY,
CANCEL,
NEXT_HOP
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I mentioned ADVANCE elsewhere, but NEXT_HOP is fine too. as for RETRY, I believe RESUME is the terminology we've adopted pretty widely--or do you find precendent for RETRY?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of action RETRY and RESUME work similarly, but we use them to describe different starting points. RETRY is invoked by DagManager automatically if a flow fails and is configured to allow retries. RESUME is manually invoked by the user. It may be worth to have the differentiation noted for logging purposes but treat these cases the same when it comes to acting on them.

Copy link
Copy Markdown
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whew! nice work overall, with a couple design points to discuss

Copy link
Copy Markdown
Contributor

@Will-Lo Will-Lo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good so far, few comments

@umustafi umustafi force-pushed the multiActiveScheduler branch from 3667771 to 82708a1 Compare June 7, 2023 05:46
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Jun 7, 2023

Codecov Report

Merging #3700 (b6b78a5) into master (a00b57c) will decrease coverage by 0.07%.
The diff coverage is 12.11%.

@@             Coverage Diff              @@
##             master    #3700      +/-   ##
============================================
- Coverage     46.89%   46.82%   -0.07%     
- Complexity    10772    10801      +29     
============================================
  Files          2138     2141       +3     
  Lines         84139    84405     +266     
  Branches       9357     9383      +26     
============================================
+ Hits          39456    39525      +69     
- Misses        41078    41277     +199     
+ Partials       3605     3603       -2     
Impacted Files Coverage Δ
...pache/gobblin/configuration/ConfigurationKeys.java 0.00% <ø> (ø)
.../org/apache/gobblin/service/ServiceConfigKeys.java 0.00% <ø> (ø)
...e/gobblin/runtime/api/MultiActiveLeaseArbiter.java 0.00% <0.00%> (ø)
...blin/runtime/api/MysqlMultiActiveLeaseArbiter.java 0.00% <0.00%> (ø)
...apache/gobblin/runtime/metrics/RuntimeMetrics.java 0.00% <ø> (ø)
...rg/apache/gobblin/runtime/util/InjectionNames.java 0.00% <ø> (ø)
...ava/org/apache/gobblin/scheduler/JobScheduler.java 0.00% <0.00%> (ø)
...ervice/modules/orchestration/TimingEventUtils.java 96.42% <0.00%> (ø)
...ceFlowExecutionResourceHandlerWithWarmStandby.java 0.00% <0.00%> (ø)
...ervice/monitoring/DagActionStoreChangeMonitor.java 0.00% <0.00%> (ø)
... and 10 more

... and 25 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Copy Markdown
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm about 90% done w/ this PR... here are most of the comments to consider. I'll need to finish off a little later.

this.dagActionStore.get().deleteDagAction(dagId.flowGroup, dagId.flowName, dagId.flowExecutionId);
this.dagActionStore.get().deleteDagAction(
new DagActionStore.DagAction(dagId.flowGroup, dagId.flowName, dagId.flowExecutionId,
DagActionStore.FlowActionType.KILL));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we know for sure that this would be a FlowActionType.KILL? (may be worth a code comment)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right this depends on the caller, I did not notice that. I modified the method to take the FlowActionType as a parameter to the function. As of now it should only have values RESUME or KILL

Copy link
Copy Markdown
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, done for now

Comment on lines +96 to +98
+ "WHEN CURRENT_TIMESTAMP < (lease_acquisition_timestamp + linger) then 1"
+ "WHEN CURRENT_TIMESTAMP >= (lease_acquisition_timestamp + linger) then 2"
+ "ELSE 3 END as leaseValidityStatus, linger FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
Copy link
Copy Markdown
Contributor

@phet phet Jun 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about:

(lease_acquisition_timestamp + linger) < CURRENT_TIMESTAMP as isLeaseExpired

then you either have:
1 (TRUE) - expired
0 (FALSE) - not expired
NULL - no lease

(IMO easier to follow than SQL CASE)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to use boolean value for it but if the column is NULL then the boolean returned is false so it becomes hard to distinguish between leaseValid and noLease. I end up having to specially define the no lease case.

jobProps.setProperty(ConfigurationKeys.SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY, String.valueOf(status.getReminderEventTimeMillis()));
jobProps.setProperty(ConfigurationKeys.SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY, String.valueOf(status.getReminderEventTimeMillis()));
JobKey key = new JobKey(flowAction.getFlowName(), flowAction.getFlowGroup());
Trigger trigger = this.jobScheduler.getTrigger(key, jobProps);
Copy link
Copy Markdown
Contributor

@phet phet Jun 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I guess I'm unfamiliar: what's the meaning/nature of this Trigger we get from one scheduler and give to another?

(I'd love to avoid bringing in the dependency on JobScheduler if we can avoid it, and instead have this class depend only on the SchedulerService.)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comments to clarify but there is an existing function in JobScheduler named getTrigger (will rename this to createTrigger) we are using to create a new Trigger for the job that will fire after the lease should expire and passing it to the SchedulerService to schedule it.

Copy link
Copy Markdown
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is taking shape quite nicely! mostly small comments... we're nearly there!

@@ -199,23 +197,27 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
int leaseValidityStatus = resultSet.getInt(4);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hopefully doesn't feel like overkill, but I'd abstract this by defining a static inner @Data class with an overloaded constructor (or static factory method) taking a ResultSet

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a large benefit from this since the static class will have to encode these column retrievals anyway but I made it a bit more clear by using the column name instead of index to retrieve the values so it's more readable.

Comment on lines +322 to +323
if (this.eventSubmitter.isPresent()) {
new TimingEvent(this.eventSubmitter.get(), TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tip: ifPresent()

(note: this works easily and naturally... unless checked exceptions)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This format is used in many other places in the Orchestrator so I will leave as is

@umustafi umustafi force-pushed the multiActiveScheduler branch from 4799679 to 859490a Compare June 14, 2023 17:43
@umustafi umustafi force-pushed the multiActiveScheduler branch from 859490a to 8e434cb Compare June 14, 2023 19:40
Copy link
Copy Markdown
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

absolutely excellent work--very nice job here!

@Will-Lo Will-Lo merged commit faa3a4f into apache:master Jun 15, 2023
phet added a commit to phet/gobblin that referenced this pull request Aug 15, 2023
* upstream/master:
  Fix bug with total count watermark whitelist (apache#3724)
  [GOBBLIN-1858] Fix logs relating to multi-active lease arbiter (apache#3720)
  [GOBBLIN-1838] Introduce total count based completion watermark (apache#3701)
  Correct num of failures (apache#3722)
  [GOBBLIN- 1856] Add flow trigger handler leasing metrics (apache#3717)
  [GOBBLIN-1857] Add override flag to force generate a job execution id based on gobbl… (apache#3719)
  [GOBBLIN-1855] Metadata writer tests do not work in isolation after upgrading to Iceberg 1.2.0 (apache#3718)
  Remove unused ORC writer code (apache#3710)
  [GOBBLIN-1853] Reduce # of Hive calls during schema related updates (apache#3716)
  [GOBBLIN-1851] Unit tests for MysqlMultiActiveLeaseArbiter with Single Participant (apache#3715)
  [GOBBLIN-1848] Add tags to dagmanager metrics for extensibility (apache#3712)
  [GOBBLIN-1849] Add Flow Group & Name to Job Config for Job Scheduler (apache#3713)
  [GOBBLIN-1841] Move disabling of current live instances to the GobblinClusterManager startup (apache#3708)
  [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time (apache#3704)
  [GOBBLIN-1847] Exceptions in the JobLauncher should try to delete the existing workflow if it is launched (apache#3711)
  [GOBBLIN-1842] Add timers to GobblinMCEWriter (apache#3703)
  [GOBBLIN-1844] Ignore workflows marked for deletion when calculating container count (apache#3709)
  [GOBBLIN-1846] Validate Multi-active Scheduler with Logs (apache#3707)
  [GOBBLIN-1845] Changes parallelstream to stream in DatasetsFinderFilteringDecorator  to avoid classloader issues in spark (apache#3706)
  [GOBBLIN-1843] Utility for detecting non optional unions should convert dataset urn to hive compatible format (apache#3705)
  [GOBBLIN-1837] Implement multi-active, non blocking for leader host (apache#3700)
  [GOBBLIN-1835]Upgrade Iceberg Version from 0.11.1 to 1.2.0 (apache#3697)
  Update CHANGELOG to reflect changes in 0.17.0
  Reserving 0.18.0 version for next release
  [GOBBLIN-1836] Ensuring Task Reliability: Handling Job Cancellation and Graceful Exits for Error-Free Completion (apache#3699)
  [GOBBLIN-1805] Check watermark for the most recent hour for quiet topics (apache#3698)
  [GOBBLIN-1825]Hive retention job should fail if deleting underlying files fail (apache#3687)
  [GOBBLIN-1823] Improving Container Calculation and Allocation Methodology (apache#3692)
  [GOBBLIN-1830] Improving Container Transition Tracking in Streaming Data Ingestion (apache#3693)
  [GOBBLIN-1833]Emit Completeness watermark information in snapshotCommitEvent (apache#3696)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants