Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
29f5c2d
[GOBBLIN-1840] Helix Job scheduler should not try to replace running …
Jun 13, 2023
45c87ff
[GOBBLIN-1840] Remove unnecessary files
Jun 13, 2023
df182ff
[GOBBLIN-1840] Add config for throttleTimeoutDuration
Jun 13, 2023
3c552b7
[GOBBLIN-1840] Clean up format and coding standard
Jun 14, 2023
88b9a02
[GOBBLIN-1840] Clean up format layout
Jun 14, 2023
d75e522
[GOBBLIN-1840] Clean up auto format
Peiyingy Jun 14, 2023
4ddd7c5
[GOBBLIN-1840] Clear up empty space
Peiyingy Jun 14, 2023
0e7ba4d
[GOBBLIN-1840] Clarify naming standards and simplify repeated codes
Peiyingy Jun 15, 2023
c449a71
[GOBBLIN-1840] Add Javadoc on GobblinHelixJobSchedulerTest for settin…
Peiyingy Jun 15, 2023
3160d8b
[GOBBLIN-1840] Optimize imports and fix unit test errors
Peiyingy Jun 22, 2023
ae2b58c
[GOBBLIN-1840] Rewrite log info and add Javadoc
Peiyingy Jun 23, 2023
e6ea195
[GOBBLIN-1840] Remove job status check
Peiyingy Jun 23, 2023
6e8358c
[GOBBLIN-1840] Add log info and change config setting
Peiyingy Jun 27, 2023
a15afd8
[GOBBLIN-1840] Add @Slf4j in GobblinThrottlingHelixJobLauncherListener
Peiyingy Jun 27, 2023
bbe4a0b
[GOBBLIN-1840] Fix race condition of handleNewJobConfigArrival
Peiyingy Jun 27, 2023
701881e
[GOBBLIN-1840] Improve mockClock mechanism
Peiyingy Jun 27, 2023
cfdc115
[GOBBLIN-1840] Address comments
Peiyingy Jun 27, 2023
b598455
[GOBBLIN-1840] Only put entry in jobNameToNextSchedulableTime when th…
Peiyingy Jun 29, 2023
32ea7ed
[GOBBLIN-1840] Remove extra schedulable time updates
Peiyingy Jun 29, 2023
2496212
[GOBBLIN-1840] Fix checkstyle problems
Peiyingy Jun 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.gobblin.cluster;

import java.time.Duration;

import org.apache.gobblin.annotation.Alpha;


Expand Down Expand Up @@ -222,4 +224,11 @@ public class GobblinClusterConfigurationKeys {
public static final String CONTAINER_ID_KEY = GOBBLIN_HELIX_PREFIX + "containerId";

public static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX = GOBBLIN_CLUSTER_PREFIX + "sysProps";

public static final String HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY = "helix.job.scheduling.throttle.enabled";
public static final boolean DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY = false;
Comment thread
Peiyingy marked this conversation as resolved.

public static final String HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY = "helix.job.scheduling.throttle.timeout.seconds";
public static final long DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY = Duration.ofMinutes(40).getSeconds();;

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
package org.apache.gobblin.cluster;

import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -110,15 +114,19 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe

private boolean startServicesCompleted;
private final long helixJobStopTimeoutMillis;
private final Duration jobSchedulingThrottleTimeout;
private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
private boolean isThrottleEnabled;
private Clock clock;

public GobblinHelixJobScheduler(Config sysConfig,
HelixManager jobHelixManager,
Optional<HelixManager> taskDriverHelixManager,
EventBus eventBus,
Path appWorkDir, List<? extends Tag<?>> metadataTags,
SchedulerService schedulerService,
MutableJobCatalog jobCatalog) throws Exception {

MutableJobCatalog jobCatalog,
Clock clock) throws Exception {
super(ConfigUtils.configToProperties(sysConfig), schedulerService);
this.commonJobProperties = ConfigUtils.configToProperties(ConfigUtils.getConfigOrEmpty(sysConfig, COMMON_JOB_PROPS));
this.jobHelixManager = jobHelixManager;
Expand Down Expand Up @@ -162,6 +170,27 @@ public GobblinHelixJobScheduler(Config sysConfig,
this.helixWorkflowListingTimeoutMillis = ConfigUtils.getLong(sysConfig, GobblinClusterConfigurationKeys.HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS) * 1000;

this.jobSchedulingThrottleTimeout = Duration.of(ConfigUtils.getLong(sysConfig, GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY), ChronoUnit.SECONDS);

this.jobNameToNextSchedulableTime = new ConcurrentHashMap<>();

this.isThrottleEnabled = ConfigUtils.getBoolean(sysConfig, GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY);

this.clock = clock;
}

public GobblinHelixJobScheduler(Config sysConfig,
HelixManager jobHelixManager,
Optional<HelixManager> taskDriverHelixManager,
EventBus eventBus,
Path appWorkDir, List<? extends Tag<?>> metadataTags,
SchedulerService schedulerService,
MutableJobCatalog jobCatalog) throws Exception {

this(sysConfig, jobHelixManager, taskDriverHelixManager, eventBus, appWorkDir, metadataTags,
schedulerService, jobCatalog, Clock.systemUTC());
}

@Override
Expand Down Expand Up @@ -303,7 +332,7 @@ public Object get(long timeout, TimeUnit unit) throws InterruptedException, Exec
}

@Subscribe
public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
public synchronized void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
Comment thread
Peiyingy marked this conversation as resolved.
String jobUri = newJobArrival.getJobName();
LOGGER.info("Received new job configuration of job " + jobUri);
try {
Expand All @@ -315,24 +344,39 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, jobUri);

this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);

GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, jobNameToNextSchedulableTime,
jobSchedulingThrottleTimeout, clock)
: new GobblinHelixJobLauncherListener(this.launcherMetrics);
if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
LOGGER.info("Scheduling job " + jobUri);
scheduleJob(jobProps,
new GobblinHelixJobLauncherListener(this.launcherMetrics));
listener);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Does not need to be on a new line

} else {
LOGGER.info("No job schedule found, so running job " + jobUri);
LOGGER.info("No job schedule"
+ " found, so running job " + jobUri);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Does not need to be on a new line

this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
new GobblinHelixJobLauncherListener(this.launcherMetrics)));
listener));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Does not need to be on a new line

}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Does not need a new line

} catch (JobException je) {
LOGGER.error("Failed to schedule or run job " + jobUri, je);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update this log to say that you are resetting the clock

}
}

@Subscribe
public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
public synchronized void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@homatthew are we sure this change won't affect performance when those message-handling methods will be called frequently? (That's why initially I suggested having job level lock)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of offline discussion:

What kind of throughput are expecting with this job launcher? I.e. for fliptop I know the traffic is bursty but how is it bursty? What sort of magnitude are we talking about here?

we have 100k jobs submitted throughout the day, so around 1~2 per second? And cancel job can be triggered randomly but should be much in-frequent

Since the only blocking operation in the critical section is the delete operation, and there are infrequent deletes (usually this takes seconds to complete), we can go ahead with the change and add fine-grained locking in the future if necessary

LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
String jobName = updateJobArrival.getJobName();

if (this.isThrottleEnabled &&
this.jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.ofEpochMilli(0)).isAfter(clock.instant())) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This line is a bit dense. And to indicate beginning of time, the documentation for Instant has Instant.MIN or Instant.EPOCH which should be more readable.

Also, intuitively it feels a little weird to read as "nextSchedulableTime is after current time". I feel it's more intuitive for it to be

"current time is before nextSchedulableTime" i.e.

clock.instant().isBefore(jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.ofEpochMilli(0)))

or IMO even more readable

Instant nextSchedulableTime = jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.MIN);
if (this.isThrottleEnabled && clock.instant().isBefore(nextSchedulableTime)) {
...

LOGGER.info("Replanning is skipped for job {}. Current time is "
+ clock.instant() + " and the next schedulable time would be "

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clock.instant() should be using the {} syntax. Same for the nextSchedulable time. And instead of getting the value from the map, use the nextSchedulableTime variable

+ this.jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.ofEpochMilli(0)), jobName);
return;
}

try {
handleDeleteJobConfigArrival(new DeleteJobConfigArrivalEvent(updateJobArrival.getJobName(),
updateJobArrival.getJobConfig()));
Expand Down Expand Up @@ -360,7 +404,7 @@ private void waitForJobCompletion(String jobName) {
}

@Subscribe
public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {
public synchronized void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super minor nit. Not sure if it's even worth implementing:

Would we want to reset the Instant in the map to Instant.EPOCH if we delete a workflow? My understanding is that internally we don't use this delete job config method and only rely on update, so this wouldn't really affect our own use case.

I am not sure which behavior is more intuitive:

  1. If I explicitly delete, I should be able to reschedule it and bypass the throttle time
  2. Regardless of if I deleted the old flow, the throttle time should prevent resubmission

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current behavior is (2). And to make the behavior (1), we would:

  1. Store the current time in the map,
  2. Set the value in the map to Instant.EPOCH
  3. If there is a job exception we reset the value back to the original value that was in the map

The delete operations are synchronous and the method is synchronized, so this approach would be thread safe

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. In handleUpdateJobConfigArrival, we call handleDeleteJobConfigArrival directly. So if you want to specifically reset it, remember to distinguish the two calls here (one is called by handleUpdateJobConfigArrival) and another is called directly.
  2. Unless you change all the cancel APIs in our code to send a delete job message to trigger this method, then, in that case, resetting the timer can enable us to start a new job immediately, otherwise it does not make sense to achieve 1, as we don't explicitly delete anyway...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Since handleDeleteJobConfigArrival is a completely synchronous method, the synchronized method handleUpdateJobConfigArrival would just hold the lock while deleting and then proceed to handleNewJobConfigArrival. There would be no need to distinguish between the two since @Peiyingy needs to address the race condition described in https://github.com/apache/gobblin/pull/3704/files#r1243111712 by updating the map immediately in the newJobConfigArrival method.

  2. Yeah we don't call explicitly call delete so it's just semantics about which is more intuitive behavior if we ever use this in the future. Since this is purely hypothetical I don't want to waste effort changing the behavior to (1). I think we should just add a comment describing that deleting a workflow with throttling enabled means that the next schedulable time for the workflow will remain unchanged and you have to wait out the throttle timeout before being able to reschedule

LOGGER.info("Received delete for job configuration of job " + deleteJobArrival.getJobName());
try {
unscheduleJob(deleteJobArrival.getJobName());
Expand Down Expand Up @@ -443,6 +487,10 @@ private void cancelJobIfRequired(DeleteJobConfigArrivalEvent deleteJobArrival) t
}
}

public void setThrottleEnabled(boolean throttleEnabled) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use lombok @Setter instead of declaring one. Also, why do we need this? Isn't this value settable via the config even for testing?

isThrottleEnabled = throttleEnabled;
}

/**
* This class is responsible for running non-scheduled jobs.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.apache.gobblin.cluster;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.gobblin.runtime.JobContext;
import org.apache.gobblin.runtime.JobState;


/**
* A job listener used when {@link GobblinHelixJobLauncher} launches a job.
* In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
* listener would record jobName to next schedulable time to decide whether
* the replanning should be executed or skipped.
*/
public class GobblinThrottlingHelixJobLauncherListener extends GobblinHelixJobLauncherListener {

public final static Logger LOG = LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
Comment thread
Peiyingy marked this conversation as resolved.
private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
private Duration helixJobSchedulingThrottleTimeout;
private Clock clock;

public GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics jobLauncherMetrics,
ConcurrentHashMap jobNameToNextSchedulableTime, Duration helixJobSchedulingThrottleTimeout, Clock clock) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the it should be ConcurrentHashMap<String, Instant> instead of just ConcurrentHashMap?

super(jobLauncherMetrics);
this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
this.clock = clock;
}

@Override
public void onJobPrepare(JobContext jobContext)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why for the same job, why we try to update the schedulable time three times? once when we handle the message, once when we prepare the job, once when job start. This will be confusing reading the log.
If we have concerns about race conditions when we handle messages, can we only update it when handling a message

throws Exception {
super.onJobPrepare(jobContext);
Instant nextSchedulableTime = clock.instant().plus(helixJobSchedulingThrottleTimeout);
jobNameToNextSchedulableTime.put(jobContext.getJobName(), nextSchedulableTime);
LOG.info(jobContext.getJobName() + " finished prepare. The next schedulable time is " + nextSchedulableTime );

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: grammar "finished preparing"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

White space after the nextSchedulableTime

Comment thread
Peiyingy marked this conversation as resolved.
Outdated
}

@Override
public void onJobCompletion(JobContext jobContext)
throws Exception {
super.onJobCompletion(jobContext);
if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
jobNameToNextSchedulableTime.put(jobContext.getJobName(), Instant.ofEpochMilli(0));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a log if the job failed. I see there is an existing log for the entire job context, but having a log specifically from the throttling scheduler would be important here for those not familiar with the code when they are debugging

public void onJobCompletion(JobContext jobContext) throws Exception {
if (_log.isPresent()) {
_log.get().info("jobCompletion: " + jobContext);
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, instead of ofEpochMilli(0), let's use Instant.EPOCH

} else {
Instant nextSchedulableTime = clock.instant().plus(helixJobSchedulingThrottleTimeout);
jobNameToNextSchedulableTime.put(jobContext.getJobName(), nextSchedulableTime);
LOG.info(jobContext.getJobName() + " finished completion. The next schedulable time is " + nextSchedulableTime );

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe is completed instead of finished completion?

}
}

@Override
public void onJobCancellation(JobContext jobContext)
throws Exception {
super.onJobCancellation(jobContext);
jobNameToNextSchedulableTime.put(jobContext.getJobName(), Instant.ofEpochMilli(0));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as for job failed. We'd want something similar.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,11 @@ public static Map<String, String> getWorkflowIdsFromJobNames(TaskDriver taskDriv
}
Set<String> helixJobs = workflowConfig.getJobDag().getAllNodes();
for (String helixJob : helixJobs) {
Iterator<TaskConfig> taskConfigIterator = taskDriver.getJobConfig(helixJob).getTaskConfigMap().values().iterator();
JobConfig jobConfig = taskDriver.getJobConfig(helixJob);
if (jobConfig == null) {
throw new GobblinHelixUnexpectedStateException("Received null jobConfig from Helix. We should not see any null configs when reading all helixJobs. helixJob=%s", helixJob);
}
Iterator<TaskConfig> taskConfigIterator = jobConfig.getTaskConfigMap().values().iterator();
if (taskConfigIterator.hasNext()) {
TaskConfig taskConfig = taskConfigIterator.next();
String jobName = taskConfig.getConfigMap().get(ConfigurationKeys.JOB_NAME_KEY);
Expand Down
Loading