Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2173] Avoid Adhoc flow spec addition for non leasable entity #4076

Merged
merged 8 commits into from
Nov 19, 2024

Conversation

vsinghal85
Copy link
Contributor

@vsinghal85 vsinghal85 commented Nov 14, 2024

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

Before launching the flows we store adhoc flows temporarily in our flowspec DB. In case of concurrent execution of the flows within the epsilon time defined the first execution is launched successfully, but for the second execution flow spec entry is created but launching is not successful since lease cannot be acquired, for this scenario we should prevent creation of flowspec entry in the spec store, this PR contains code change to address this.

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

  • Tests added to ensure onAddSpec in Orchestrator is successful only if lease can be acquired on entity(call to function leasableEntryExists), if it does not then throw LeaseNotAvailable Exception

  • Tests in MysqlMultiActiveLeaseArbiter to ensure existsLeasableEntry returns false, if tried to call this function within epsilon time of acquiring the lease else return true.

  • existing tests pass

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@vsinghal85 vsinghal85 marked this pull request as ready for review November 14, 2024 17:41
@@ -17,13 +17,16 @@

package org.apache.gobblin.service.modules.orchestration;

import com.linkedin.restli.common.HttpStatus;
import com.linkedin.restli.server.RestLiServiceException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix import ordering, see from : https://gobblin.apache.org/docs/developer-guide/CodingStyle/

this.specCompiler.onAddSpec(flowSpec);
}

private void processFlowSpecForAdhocFlows(FlowSpec flowSpec) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this function is validating whether or not the flow should be created based on whether it's a duplicate adhoc flow within a short span, can this be named differently to convey it's purpose like validateConcurrentAdhocFlowCreation ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the name appropriately.

String flowGroup = "testGroup";
DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowName, flowGroup, System.currentTimeMillis(), "testJob", DagActionStore.DagActionType.LAUNCH);
DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction);
leaseArbiter.existsLeasableEntity(leaseParams);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No assertions? Also in this unit test we are mocking leaseArbiter, so I don't think it adds any value on testing the functionality of leaseArbiter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out missed adding the Assertion for this test, purpose of this test is to verify that MySQLStatemanagementStore returns the value returned by leaseArbiter, also updated this test.

@@ -133,6 +139,25 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
return new AddSpecResponse<>(null);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an FYI, this also gets called during updating a flow. But since we have a condition of checking the flow is scheduled or not and we don't expect users to update an adhoc flow, we should be fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But since this can still be called for adhoc flows, it would be good to test what the behaviour is. No need to handle it specially, but to know what the behaviour is would be good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the callout, will add it for the test suite

@vsinghal85 vsinghal85 changed the title Avoid Adhoc flow spec addition for non leasable entity [GOBBLIN-2173]Avoid Adhoc flow spec addition for non leasable entity Nov 15, 2024
@vsinghal85 vsinghal85 changed the title [GOBBLIN-2173]Avoid Adhoc flow spec addition for non leasable entity [GOBBLIN-2173] Avoid Adhoc flow spec addition for non leasable entity Nov 15, 2024
* @param leaseParams uniquely identifies the flow, the present action upon it, the time the action
* was triggered, and if the dag action event we're checking on is a reminder event
*/
boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider renaming this to isLeaseAcquirable for conciseness and to be consistent with other method names

@@ -362,6 +362,12 @@ else if (leaseValidityStatus == 2) {
}
}

@Override
public boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add javadoc.. something like:
Determines if a lease can be acquired for the given flow. A lease is acquirable if ...

DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
try {
if (!dagManagementStateStore.canAcquireLeaseOnEntity(leaseParams)) {
throw new LeaseUnavailableException("Lease already occupied by another execution of this flow");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add an info log here with flowGroup, flowName.. it would be useful in debugging

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added lease params which contains details of flow name and flow group

@@ -133,6 +139,25 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
return new AddSpecResponse<>(null);
}

private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add javadoc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

}

@Test
public void testOnAddSpec_withFlowSpec_Available() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use camelcase for naming methods

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +341 to +348
ConfigBuilder configBuilder = ConfigBuilder.create()
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
.addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName")
.addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 1/0 * ? * *")
.addPrimitive("gobblin.flow.sourceIdentifier", "source")
.addPrimitive("gobblin.flow.destinationIdentifier", "destination");
Config config = configBuilder.build();
FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use this.flowSpec here since it already has the schedule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need different configs in flowspec depending on the case we are testing and hence would prefer to define new ones.

and throw LeaseUnavailableException
*/
@Test(expectedExceptions = LeaseUnavailableException.class)
public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also test the scenario when canAcquireLeaseOnEntity returns true for adhoc flow since the other test is for scheduled flow

Copy link
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good impl and nice test coverage!

my suggestions are around naming and the importance of encapsulating impl details strictly within their own (lower) levels - don't let them leak upwards through each successive calling layer!

* An {@link RuntimeException} thrown when lease cannot be acquired on provided entity.
*/
public class LeaseUnavailableException extends RuntimeException {
public LeaseUnavailableException(String message) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

beyond clearly naming for callers, impl-wise, this definitely relates to a flow, so that should be a ctor param. consider whether to allow a catcher to reach in to access the details as instance member(s) or merely to use internally in the ctor, to contextualize the message passed along to super.

@@ -61,6 +61,17 @@ public interface MultiActiveLeaseArbiter {
LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boolean adoptConsensusFlowExecutionId)
throws IOException;

/**
* This method checks if lease can be acquired on provided flow in lease params
* returns true if entry for the same flow does not exists within epsilon time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very reasonable method-level javadoc... but it turns out epsilon is not mentioned anywhere in class-level javadoc, so this method description lacks context.

so, please add the class-level info. mentioning the name 'epsilon' is fine, but definitely also give it a more specific name, like "Lease Consolidation Period".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus);
completeLeaseHelper(launchLeaseParams3);
Thread.sleep(LESS_THAN_EPSILON);
Assert.assertFalse(mysqlMultiActiveLeaseArbiter.isLeaseAcquirable(launchLeaseParams3));
Copy link
Contributor

@phet phet Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the whole idea is that a "similar" (but NOT same) lease isn't itself already within epsilon. hence, be sure to test LeaseParams that were NOT just given to tryAcquireLease

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created new launch param launchLeaseParams3_similar

@@ -256,7 +257,10 @@ public CreateKVResponse<ComplexResourceKey<FlowId, FlowStatusId>, FlowConfig> cr
responseMap = this.flowCatalog.put(flowSpec, true);
} catch (QuotaExceededException e) {
throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage());
} catch (Throwable e) {
} catch(LeaseUnavailableException e){
throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, e.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usually exception messages are designed for logging, more than for end-user consumption, so probably not appropriate to blindly return that. (it's sometimes done for a 5xx error, as above... but even that can be inadvisable.)

anyway, the 409 above might offer a better template:

return new CreateKVResponse<>(new RestLiServiceException(HttpStatus.S_409_CONFLICT,
    "FlowSpec with URI " + flowSpec.getUri() + " was launched less than N secs ago, no action will be taken"));

(to provide N we may wish to tunnel the value of epsilon... or at least how many secs remain before a subsequent launch would be possible)

also: when do we want to return (as that 409 above does), vs. throw?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated as discussed offline

Copy link
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very close!

Comment on lines 107 to 111
* Returns true if lease can be acquired on entity provided in leaseParams.
* @param leaseParams uniquely identifies the flow, the present action upon it, the time the action was triggered,
* and if the dag action event we're checking on is a reminder event
* Check if an action exists in dagAction store by flow group, flow name, flow execution id, and job name.
* @param flowGroup flow group for the dag action
* @param flowName flow name for the dag action
* @param flowExecutionId flow execution for the dag action
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc seems out-of-date, esp. mentioning LeaseParams and DagAction

also, out-of-date:

Returns true if lease can be acquired on entity provided in leaseParams.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated javadoc

@@ -63,13 +63,13 @@ LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boole

/**
* This method checks if lease can be acquired on provided flow in lease params
* returns true if entry for the same flow does not exists within epsilon time
* returns true if entry for the same flow does not exists within Lease Consolidation Period
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sense is reversed here...

maybe:

Check whether the same flowGroup+flowName is within the Lease Consolidation Period (aka. epsilon) from other, unrelated leasing activity

this is also out-of-date:

@return true if lease can be acquired on the flow passed in the lease params, false otherwise

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated javadoc

if (!dagManagementStateStore.isLeaseAcquirable(leaseParams)) {
throw new LeaseUnavailableException("Lease already occupied by another execution of this flow");
if (dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow(flowGroup, flowName, FlowUtils.getOrCreateFlowExecutionId(flowSpec))) {
throw new TooSoonToRerunSameFlowException("Lease already occupied by another execution of this flow", flowSpec);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. we have an .info line above announcing the check, so let's follow w/ a .warn line here when the check fails. suggest: "another recent adhoc flow exec found for...."

  2. exception msg could benefit from minor improvements, yet--however it's phrased--it belongs encapsulated in the TooSoonToRerun... ctor, which should take solely a FlowSpec param

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, Thanks for the suggestion

Comment on lines 61 to 62
private static final long flowExecutionId = 12345677L;
private static final long flowExecutionId1 = 12345996L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I never noticed before that flowExecutionId, which is customarily millis-since-epoch only has 7 digits when it should have 10. let's fix that and also define this as:

private static final long flowExecutionIdAlt = flowExecutionId + ...; // whatever you consider a reasonable (later) offset

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the same variables to store millis

@codecov-commenter
Copy link

codecov-commenter commented Nov 19, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 42.03%. Comparing base (edaf474) to head (01cac22).
Report is 3 commits behind head on master.

❗ There is a different number of reports uploaded between BASE (edaf474) and HEAD (01cac22). Click for more details.

HEAD has 1 upload less than BASE
Flag BASE (edaf474) HEAD (01cac22)
2 1
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #4076      +/-   ##
============================================
- Coverage     51.47%   42.03%   -9.45%     
+ Complexity     7546     2334    -5212     
============================================
  Files          1386      496     -890     
  Lines         52105    20961   -31144     
  Branches       5727     2433    -3294     
============================================
- Hits          26821     8810   -18011     
+ Misses        22990    11238   -11752     
+ Partials       2294      913    -1381     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.


🚨 Try these New Features:

Comment on lines 397 to 398
if (error instanceof TooSoonToRerunSameFlowException) {
throw (TooSoonToRerunSameFlowException) error;
if (error instanceof RuntimeException && error.getCause() instanceof TooSoonToRerunSameFlowException) {
throw (TooSoonToRerunSameFlowException) error.getCause();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the cast isn't necessary (the reason I suggested wrapping TooSoonToR... was to enable uniform, type-agnostic code here.)

for a method with the signature throws Throwable, aren't these two equivalent?

throw error.getCause();
throw (T) error.getCause();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense updated

Comment on lines 330 to 329
@Test(expectedExceptions = TooSoonToRerunSameFlowException.class)
@Test(expectedExceptions = RuntimeException.class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after signing off, I realized my literal advice would compromise clarity, foul up tests, etc. (in this very way)

sorry for that half-baked advice... try this instead:

public static class TooSoonToRerunSameFlowException extends RuntimeException {
  @Getter private final FlowSpec flowSpec;

  /**
   * Account for unwrapping within @{link FlowCatalog#updateOrAddSpecHelper}`s `CallbackResult` error handling for `SpecCatalogListener`s
   * @return `TooSoonToRerunSameFlowException` wrapped in another `TooSoonToRerunSameFlowException
   */
  public static TooSoonToRerunSameFlowException wrappedOnce(FlowSpec flowSpec) {
    return new TooSoonToRerunSameFlowException(flowSpec, new TooSoonToRerunSameFlowException(flowSpec));
  }

  public TooSoonToRerunSameFlowException(FlowSpec flowSpec) {
    this(flowSpec, null);
  }

  /** restricted-access ctor: use {@link #wrappedOnce(String)} instead */
  private TooSoonToRerunSameFlowException(FlowSpec flowSpec, Throwable cause) {
    super("Lease already occupied by another recent execution of this flow: " + flowSpec, cause);
    this.flowSpec = flowSpec;
  }
}

then replace:

throw new RuntimeException(new TooSoonToRerunSameFlowException(flowSpec));

with

throw TooSoonToRerunSameFlowException.wrappedOnce(flowSpec);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure updated as per suggestion

Comment on lines 365 to 368
/*
Determines if a lease can be acquired for the given flow. A lease is acquirable if
no existing lease record exists in arbiter table or the record is older then epsilon time
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably no need for this comment here in the impl, but if you want one, bring it into line w/ the orig from the interface

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, since javadoc already there for the interface, removed comment from here

@@ -24,6 +24,7 @@
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this import belongs a few lines down w/ other apache gobblin pkgs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

@@ -133,6 +137,29 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
return new AddSpecResponse<>(null);
}

/*
enforces that a similar adhoc flow is not launching,
else throw TooSoonToRerunSameFlowException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: {@link TooSoonToRerunSameFlowException}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure updated

Copy link
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

excellent improvement with great impl and tests - very nice work!

lease consolidation time, then we do not execute this flow, hence do not process and store the spec
and throw RuntimeException
*/
@Test(expectedExceptions = RuntimeException.class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forgot to change this back to TooSoonToRerun...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad, updated



/**
* An {@link RuntimeException} thrown when lease cannot be acquired on provided entity.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated javadoc

Copy link
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

excellent!

@phet phet merged commit e5d897e into apache:master Nov 19, 2024
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants