Skip to content

[GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time#3704

Merged
ZihanLi58 merged 20 commits intoapache:masterfrom
Peiyingy:py-helix-scheduler-throttle-GOBBLIN-1840
Jun 30, 2023
Merged

[GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time#3704
ZihanLi58 merged 20 commits intoapache:masterfrom
Peiyingy:py-helix-scheduler-throttle-GOBBLIN-1840

Conversation

@Peiyingy
Copy link
Copy Markdown
Contributor

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):

Problem Statement

Currently, there is a problem with the Helix replanner, that Azkaban jobs can be triggered at the same time, causing replanning to happen in a short time span twice or more. It is expensive to create a replanner, consuming a lot of resources and a long time for both the Zookeeper and the Application Master.

Solution

We implemented a concurrent hashmap to store the create time for each job so that we can check the hashmap record to make sure that we only reschedule the workflow when the last replanning is earlier than the throttle timeout threshold, which has a default time of an hour and totally configurable. We also have a throttling feature that is able to turn off, stopping this early return mechanism.

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:
Screenshot 2023-06-14 at 11 00 50 AM

The unit tests are permutations regards to three variables: same or different workflow, time span, and whether throttling is enabled. The original testNewJobAndUpdate is for the same workflow, long time period, and throttle enabled. The rest of the tests have descriptive names.

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@Peiyingy Peiyingy changed the title Py helix scheduler throttle gobblin 1840 [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time Jun 14, 2023
Copy link
Copy Markdown
Contributor

@umustafi umustafi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice work adding unit tests! few suggestions :)

private boolean startServicesCompleted;
private final long helixJobStopTimeoutMillis;
private final Duration throttleTimeoutDuration;
private ConcurrentHashMap<String, Instant> jobStartTimeMap;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jobUriToStartTimeMap? better if u can clarify what the string is. Also lets be consistent between START/CREATE time you mention in description. Why do we use Instant rather than Timestamp or Long (milliseconds)? The latter is typically what I see used in our code.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to adjusting the map name. Pretty sure the key is the jobName is the key, which refers to the Gobblin configuration job.name

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also just my opinion, but Instant (or I guess Timestamp) are less error prone to write code for than millis.

I don't have an opinion on changing the above duration to Millis long to fit the rest of the class. But Instant vs long is a big deal because long is hard to reason about. It often refers to epoch millis but you always have to add that epochmillis to the name of the map.

As for java.time.Instant vs java.sql.Timestamp, I've seen Instant used elsewhere. And personally haven't seen Timestamp used much. So clearly there are multiple pockets of usage. and either one makes sense probably.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have that strong of a preference with Instant vs. Timestamp/Long, the latter are more common on GaaS side so I was initially surprised. More important to update the map name.

public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
String jobName = updateJobArrival.getJobName();
boolean throttleEnabled = PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usually booleans are easily identified with is.... like isThrottleEnabled

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this default to false if config is not provided? if not provide a default value

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default is provided by GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY in the config file, which is set to false. Should I add an additional default proof in this part?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but you are not using that value here right? You want to provide DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY here so value is used

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the function of getPropAsBoolean, that is:

public static boolean getPropAsBoolean(   @NotNull  Properties properties,
  String key,
  String defaultValue )

so it will call String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY) as the default value if GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY is not assigned

throws Exception {
try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
HelixManager helixManager = HelixManagerFactory
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can u reuse helixManager across tests?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original approach was to use that across tests, but when the test number increases, it would cause the error HelixManager (ZkClient) is not connected, so I changed that to local variable to avoid this problem.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see, let's add a comment to explain that in javadoc for this testing class and make a method to create HelixManager that you can reuse across test. You can also put comment there to explain why you changed to local variable so the knowledge is preserved for those updating tests in future.

jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);

properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there's no properties2, then just name this properties

HelixManager helixManager = HelixManagerFactory
.getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
zkConnectingString);
GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here u can reuse perhaps, lots of repeated code. Let's try to DRY (do not repeat yourself)

Copy link
Copy Markdown
Contributor

@homatthew homatthew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments and replies to existing questions

private boolean startServicesCompleted;
private final long helixJobStopTimeoutMillis;
private final Duration throttleTimeoutDuration;
private ConcurrentHashMap<String, Instant> jobStartTimeMap;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to adjusting the map name. Pretty sure the key is the jobName is the key, which refers to the Gobblin configuration job.name


if (throttleEnabled && this.jobStartTimeMap.containsKey(jobName)) {
Instant jobStartTime = this.jobStartTimeMap.get(jobName);
Duration workflowDuration = Duration.between(jobStartTime, Instant.now());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe workflowRunningDuration is a more descriptive name. @ZihanLi58 can you help chime in here

private boolean startServicesCompleted;
private final long helixJobStopTimeoutMillis;
private final Duration throttleTimeoutDuration;
private ConcurrentHashMap<String, Instant> jobStartTimeMap;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also just my opinion, but Instant (or I guess Timestamp) are less error prone to write code for than millis.

I don't have an opinion on changing the above duration to Millis long to fit the rest of the class. But Instant vs long is a big deal because long is hard to reason about. It often refers to epoch millis but you always have to add that epochmillis to the name of the map.

As for java.time.Instant vs java.sql.Timestamp, I've seen Instant used elsewhere. And personally haven't seen Timestamp used much. So clearly there are multiple pockets of usage. and either one makes sense probably.

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Jun 15, 2023

Codecov Report

Merging #3704 (2496212) into master (51a852d) will decrease coverage by 1.22%.
The diff coverage is 67.18%.

@@             Coverage Diff              @@
##             master    #3704      +/-   ##
============================================
- Coverage     46.97%   45.76%   -1.22%     
+ Complexity    10794     9402    -1392     
============================================
  Files          2138     1863     -275     
  Lines         84132    74423    -9709     
  Branches       9356     8305    -1051     
============================================
- Hits          39518    34057    -5461     
+ Misses        41015    37266    -3749     
+ Partials       3599     3100     -499     
Impacted Files Coverage Δ
...in/java/org/apache/gobblin/cluster/HelixUtils.java 48.91% <50.00%> (+1.39%) ⬆️
...ache/gobblin/cluster/GobblinHelixJobScheduler.java 57.42% <55.81%> (+3.18%) ⬆️
...bblin/cluster/GobblinClusterConfigurationKeys.java 50.00% <100.00%> (+50.00%) ⬆️
...ter/GobblinThrottlingHelixJobLauncherListener.java 100.00% <100.00%> (ø)

... and 312 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Copy Markdown
Contributor

@ZihanLi58 ZihanLi58 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do have a race condition here if we receive one update request while the last job has not been submitted successfully.
Can we consider refractor the code a little bit and add lock there to make sure that no two messages for one same workflow will be handled at the same time?

Copy link
Copy Markdown
Contributor

@umustafi umustafi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks pretty good, one small comment

@Peiyingy Peiyingy force-pushed the py-helix-scheduler-throttle-GOBBLIN-1840 branch 2 times, most recently from 4fa777f to 3552fa3 Compare June 22, 2023 22:17
@Peiyingy Peiyingy force-pushed the py-helix-scheduler-throttle-GOBBLIN-1840 branch 2 times, most recently from ebd8e67 to 258de1a Compare June 22, 2023 23:02
@Peiyingy Peiyingy force-pushed the py-helix-scheduler-throttle-GOBBLIN-1840 branch from 258de1a to ae2b58c Compare June 23, 2023 01:18
LOGGER.info("Scheduling job " + jobUri);
scheduleJob(jobProps,
new GobblinHelixJobLauncherListener(this.launcherMetrics));
listener);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Does not need to be on a new line

} else {
LOGGER.info("No job schedule found, so running job " + jobUri);
LOGGER.info("No job schedule"
+ " found, so running job " + jobUri);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Does not need to be on a new line

+ " found, so running job " + jobUri);
this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
new GobblinHelixJobLauncherListener(this.launcherMetrics)));
listener));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Does not need to be on a new line

new GobblinHelixJobLauncherListener(this.launcherMetrics)));
listener));
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Does not need a new line

String jobName = updateJobArrival.getJobName();

if (this.isThrottleEnabled &&
this.jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.ofEpochMilli(0)).isAfter(clock.instant())) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This line is a bit dense. And to indicate beginning of time, the documentation for Instant has Instant.MIN or Instant.EPOCH which should be more readable.

Also, intuitively it feels a little weird to read as "nextSchedulableTime is after current time". I feel it's more intuitive for it to be

"current time is before nextSchedulableTime" i.e.

clock.instant().isBefore(jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.ofEpochMilli(0)))

or IMO even more readable

Instant nextSchedulableTime = jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.MIN);
if (this.isThrottleEnabled && clock.instant().isBefore(nextSchedulableTime)) {
...

}

// Time span exceeds throttle timeout, within same workflow, throttle is enabled
// Job will be updated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments describing the method should be a java doc instead of a regular comment

properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
GobblinHelixJobScheduler gobblinHelixJobScheduler;
if (isThrottleEnabled) {
gobblinHelixJobScheduler = new GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, java.util.Optional.empty(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can inject the clock regardless of if throttling is enabled. We'd never want to use UTC clock in a unit test IMO

gobblinHelixJobScheduler = new GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, java.util.Optional.empty(),
new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
}
gobblinHelixJobScheduler.setThrottleEnabled(isThrottleEnabled);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wouldn't we want to set this via config?

String workFlowId = null;
private String getWorkflowID (NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager)
throws Exception {
// endTime is manually set time period that we allow HelixUtils to fetch workflowIdMap before timeout
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better wording:

Poll helix for up to 30 seconds to fetch until a workflow with a matching job name exists in Helix and then return that workflowID

@@ -0,0 +1 @@
mock-maker-inline No newline at end of file
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this anymore since we are not mocking any static classes

Copy link
Copy Markdown
Contributor

@homatthew homatthew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more suggestions wrt loggers

@Peiyingy Peiyingy force-pushed the py-helix-scheduler-throttle-GOBBLIN-1840 branch from b7589ff to a15afd8 Compare June 27, 2023 02:09
Copy link
Copy Markdown
Contributor

@homatthew homatthew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a race condition. The rest are pretty much nits. Great work!

Instant nextSchedulableTime = jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.MIN);
if (this.isThrottleEnabled && clock.instant().isBefore(nextSchedulableTime)) {
LOGGER.info("Replanning is skipped for job {}. Current time is "
+ clock.instant() + " and the next schedulable time would be "
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clock.instant() should be using the {} syntax. Same for the nextSchedulable time. And instead of getting the value from the map, use the nextSchedulableTime variable

private Clock clock;

public GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics jobLauncherMetrics,
ConcurrentHashMap jobNameToNextSchedulableTime, Duration helixJobSchedulingThrottleTimeout, Clock clock) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the it should be ConcurrentHashMap<String, Instant> instead of just ConcurrentHashMap?

LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
String jobName = updateJobArrival.getJobName();

Instant nextSchedulableTime = jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.MIN);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random question, but is there a reason we use Instant.min as the default value here and Instant.EPOCH as the placeholder elsewhere?

final Properties properties1 =
GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, "1", workflowIdSuffix1);
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
Config helixJobSchedulerConfig = ConfigFactory.empty().withValue("helix.job.scheduling.throttle.enabled",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY instead of the raw string value

zkConnectingString);
GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager, isThrottleEnabled, mockClock);
final Properties properties =
GobblinHelixJobLauncherTest.generateJobProperties(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: was this meant to be on a new line? Seems like it would fit fine on the line above

return newJobConfigArrivalEvent;
}

private void connectAndAssertWorkflowId(String expectedSuffix, NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager ) throws Exception {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random question, but why do we use NewJobConfigArrivalEvent instead of just passing a string job name? There were some places in the code where we constructed a brand new NewJobConfigArrivalEvent just to pass it into this method.


@Subscribe
public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {
public synchronized void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super minor nit. Not sure if it's even worth implementing:

Would we want to reset the Instant in the map to Instant.EPOCH if we delete a workflow? My understanding is that internally we don't use this delete job config method and only rely on update, so this wouldn't really affect our own use case.

I am not sure which behavior is more intuitive:

  1. If I explicitly delete, I should be able to reschedule it and bypass the throttle time
  2. Regardless of if I deleted the old flow, the throttle time should prevent resubmission

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current behavior is (2). And to make the behavior (1), we would:

  1. Store the current time in the map,
  2. Set the value in the map to Instant.EPOCH
  3. If there is a job exception we reset the value back to the original value that was in the map

The delete operations are synchronous and the method is synchronized, so this approach would be thread safe

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. In handleUpdateJobConfigArrival, we call handleDeleteJobConfigArrival directly. So if you want to specifically reset it, remember to distinguish the two calls here (one is called by handleUpdateJobConfigArrival) and another is called directly.
  2. Unless you change all the cancel APIs in our code to send a delete job message to trigger this method, then, in that case, resetting the timer can enable us to start a new job immediately, otherwise it does not make sense to achieve 1, as we don't explicitly delete anyway...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Since handleDeleteJobConfigArrival is a completely synchronous method, the synchronized method handleUpdateJobConfigArrival would just hold the lock while deleting and then proceed to handleNewJobConfigArrival. There would be no need to distinguish between the two since @Peiyingy needs to address the race condition described in https://github.com/apache/gobblin/pull/3704/files#r1243111712 by updating the map immediately in the newJobConfigArrival method.

  2. Yeah we don't call explicitly call delete so it's just semantics about which is more intuitive behavior if we ever use this in the future. Since this is purely hypothetical I don't want to waste effort changing the behavior to (1). I think we should just add a comment describing that deleting a workflow with throttling enabled means that the next schedulable time for the workflow will remain unchanged and you have to wait out the throttle timeout before being able to reschedule


@Subscribe
public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {
public synchronized void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. In handleUpdateJobConfigArrival, we call handleDeleteJobConfigArrival directly. So if you want to specifically reset it, remember to distinguish the two calls here (one is called by handleUpdateJobConfigArrival) and another is called directly.
  2. Unless you change all the cancel APIs in our code to send a delete job message to trigger this method, then, in that case, resetting the timer can enable us to start a new job immediately, otherwise it does not make sense to achieve 1, as we don't explicitly delete anyway...


@Subscribe
public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
public synchronized void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@homatthew are we sure this change won't affect performance when those message-handling methods will be called frequently? (That's why initially I suggested having job level lock)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of offline discussion:

What kind of throughput are expecting with this job launcher? I.e. for fliptop I know the traffic is bursty but how is it bursty? What sort of magnitude are we talking about here?

we have 100k jobs submitted throughout the day, so around 1~2 per second? And cancel job can be triggered randomly but should be much in-frequent

Since the only blocking operation in the critical section is the delete operation, and there are infrequent deletes (usually this takes seconds to complete), we can go ahead with the change and add fine-grained locking in the future if necessary

public synchronized void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
String jobName = updateJobArrival.getJobName();
String jobUri = updateJobArrival.getJobName();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not mix up the usage of job uri and job name. If you are gonna use job name (e.g. jobNameToNextSchedulableTime), then use the term job name everywhere. And if you are gonna use job uri, then change it for all of them

@@ -357,19 +370,22 @@ public synchronized void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJ
}
} catch (JobException je) {
LOGGER.error("Failed to schedule or run job " + jobUri, je);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update this log to say that you are resetting the clock

}

private void runWorkflowTest(Instant mockedTime, String jobSuffix,
private void runWorkflowTest(Duration mockedTime, String jobSuffix,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does mockedTime still make sense? Seems like it now represents a step duration for incrementing the clock forward in time

AtomicReference<Instant> nextInstant = new AtomicReference<>(beginTime);
when(mockClock.instant()).thenAnswer(invocation -> {
Instant currentInstant = nextInstant.get();
nextInstant.set(currentInstant.plus(mockedTime));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that you're using AtomicReference. But you are not doing an atomic get and set, which basically defeats the point of what you're doing.

Did you mean to do something like getAndAccumulate?

* Deleting a workflow with throttling enabled means that the next
* schedulable time for the workflow will remain unchanged.
* Note: In such case, it is required to wait until the throttle
* timeout period elapses before the workflow can be rescheduled.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice comment!

GobblinHelixJobScheduler.this.runJob(this.jobProps, this.jobListener);
} catch (JobException je) {
LOGGER.error("Failed to run job " + this.jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
LOGGER.error("Failed to schedule or run job to run job " + this.jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo / wording. schedule or run job to run job

String assertUpdateWorkflowIdSuffix, boolean isThrottleEnabled, boolean isSameWorkflow) throws Exception {
Clock mockClock = Mockito.mock(Clock.class);
AtomicReference<Instant> nextInstant = new AtomicReference<>(beginTime);
when(mockClock.instant()).thenAnswer(invocation -> nextInstant.getAndAccumulate(nextInstant.get(), (currentInstant, x) -> currentInstant.plus(mockedPeriod)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nextInstant.get() is not used and is just a placeholder right? Since it seems like just a placeholder value you can use something like null

break;
}
Thread.sleep(100);
private void runWorkflowTest(Duration mockedPeriod, String jobSuffix,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A java doc that describes what these variables are so that future people can use this method would be helpful.

Also, mockedPeriod is a bit of a weird name. Since you're now using it to represent the step amount each time clock.instant() is called.

Is this really necessary? In your original implementation it was just returning a final Instant defined at the beginning which was a bit easier to reason about. But now we sort of rely on how many times clock.instant() is called to know what the current time is

@Peiyingy Peiyingy force-pushed the py-helix-scheduler-throttle-GOBBLIN-1840 branch from 5e1a77d to cfdc115 Compare June 28, 2023 20:22
Copy link
Copy Markdown
Contributor

@homatthew homatthew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work

);
return;
}
nextSchedulableTime = clock.instant().plus(jobSchedulingThrottleTimeout);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we only add entry to jobNameToNextSchedulableTime when the throttle is enabled? it is a hash map, where we can easily see memory leak when we not delete the entry properly

}

@Override
public void onJobPrepare(JobContext jobContext)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why for the same job, why we try to update the schedulable time three times? once when we handle the message, once when we prepare the job, once when job start. This will be confusing reading the log.
If we have concerns about race conditions when we handle messages, can we only update it when handling a message

Copy link
Copy Markdown
Contributor

@ZihanLi58 ZihanLi58 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, great work!

@ZihanLi58 ZihanLi58 merged commit 1ecce5b into apache:master Jun 30, 2023
phet added a commit to phet/gobblin that referenced this pull request Aug 15, 2023
* upstream/master:
  Fix bug with total count watermark whitelist (apache#3724)
  [GOBBLIN-1858] Fix logs relating to multi-active lease arbiter (apache#3720)
  [GOBBLIN-1838] Introduce total count based completion watermark (apache#3701)
  Correct num of failures (apache#3722)
  [GOBBLIN- 1856] Add flow trigger handler leasing metrics (apache#3717)
  [GOBBLIN-1857] Add override flag to force generate a job execution id based on gobbl… (apache#3719)
  [GOBBLIN-1855] Metadata writer tests do not work in isolation after upgrading to Iceberg 1.2.0 (apache#3718)
  Remove unused ORC writer code (apache#3710)
  [GOBBLIN-1853] Reduce # of Hive calls during schema related updates (apache#3716)
  [GOBBLIN-1851] Unit tests for MysqlMultiActiveLeaseArbiter with Single Participant (apache#3715)
  [GOBBLIN-1848] Add tags to dagmanager metrics for extensibility (apache#3712)
  [GOBBLIN-1849] Add Flow Group & Name to Job Config for Job Scheduler (apache#3713)
  [GOBBLIN-1841] Move disabling of current live instances to the GobblinClusterManager startup (apache#3708)
  [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time (apache#3704)
  [GOBBLIN-1847] Exceptions in the JobLauncher should try to delete the existing workflow if it is launched (apache#3711)
  [GOBBLIN-1842] Add timers to GobblinMCEWriter (apache#3703)
  [GOBBLIN-1844] Ignore workflows marked for deletion when calculating container count (apache#3709)
  [GOBBLIN-1846] Validate Multi-active Scheduler with Logs (apache#3707)
  [GOBBLIN-1845] Changes parallelstream to stream in DatasetsFinderFilteringDecorator  to avoid classloader issues in spark (apache#3706)
  [GOBBLIN-1843] Utility for detecting non optional unions should convert dataset urn to hive compatible format (apache#3705)
  [GOBBLIN-1837] Implement multi-active, non blocking for leader host (apache#3700)
  [GOBBLIN-1835]Upgrade Iceberg Version from 0.11.1 to 1.2.0 (apache#3697)
  Update CHANGELOG to reflect changes in 0.17.0
  Reserving 0.18.0 version for next release
  [GOBBLIN-1836] Ensuring Task Reliability: Handling Job Cancellation and Graceful Exits for Error-Free Completion (apache#3699)
  [GOBBLIN-1805] Check watermark for the most recent hour for quiet topics (apache#3698)
  [GOBBLIN-1825]Hive retention job should fail if deleting underlying files fail (apache#3687)
  [GOBBLIN-1823] Improving Container Calculation and Allocation Methodology (apache#3692)
  [GOBBLIN-1830] Improving Container Transition Tracking in Streaming Data Ingestion (apache#3693)
  [GOBBLIN-1833]Emit Completeness watermark information in snapshotCommitEvent (apache#3696)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants