-
Notifications
You must be signed in to change notification settings - Fork 749
[GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time #3704
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 17 commits
29f5c2d
45c87ff
df182ff
3c552b7
88b9a02
d75e522
4ddd7c5
0e7ba4d
c449a71
3160d8b
ae2b58c
e6ea195
6e8358c
a15afd8
bbe4a0b
701881e
cfdc115
b598455
32ea7ed
2496212
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,10 @@ | |
| package org.apache.gobblin.cluster; | ||
|
|
||
| import java.io.IOException; | ||
| import java.time.Clock; | ||
| import java.time.Duration; | ||
| import java.time.Instant; | ||
| import java.time.temporal.ChronoUnit; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
|
|
@@ -111,14 +115,28 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe | |
| private boolean startServicesCompleted; | ||
| private final long helixJobStopTimeoutMillis; | ||
|
|
||
| /** | ||
| * The throttling timeout prevents helix workflows with the same job name / URI from being submitted | ||
| * more than once within the timeout period. This timeout is not reset by deletes / cancels, meaning that | ||
| * if you delete a workflow within the timeout period, you cannot reschedule until the timeout period is complete. | ||
| * However, if there is an error when launching the job, you can immediately reschedule the flow. <br><br> | ||
| * | ||
| * NOTE: This throttle timeout period starts when the job launcher thread picks up the runnable. Meaning that the | ||
| * time it takes to submit to Helix and start running the flow is also included as part of the timeout period | ||
| */ | ||
| private final Duration jobSchedulingThrottleTimeout; | ||
| private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime; | ||
| private boolean isThrottleEnabled; | ||
| private Clock clock; | ||
|
|
||
| public GobblinHelixJobScheduler(Config sysConfig, | ||
| HelixManager jobHelixManager, | ||
| Optional<HelixManager> taskDriverHelixManager, | ||
| EventBus eventBus, | ||
| Path appWorkDir, List<? extends Tag<?>> metadataTags, | ||
| SchedulerService schedulerService, | ||
| MutableJobCatalog jobCatalog) throws Exception { | ||
|
|
||
| MutableJobCatalog jobCatalog, | ||
| Clock clock) throws Exception { | ||
| super(ConfigUtils.configToProperties(sysConfig), schedulerService); | ||
| this.commonJobProperties = ConfigUtils.configToProperties(ConfigUtils.getConfigOrEmpty(sysConfig, COMMON_JOB_PROPS)); | ||
| this.jobHelixManager = jobHelixManager; | ||
|
|
@@ -162,6 +180,27 @@ public GobblinHelixJobScheduler(Config sysConfig, | |
| this.helixWorkflowListingTimeoutMillis = ConfigUtils.getLong(sysConfig, GobblinClusterConfigurationKeys.HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS, | ||
| GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS) * 1000; | ||
|
|
||
| this.jobSchedulingThrottleTimeout = Duration.of(ConfigUtils.getLong(sysConfig, GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY, | ||
| GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY), ChronoUnit.SECONDS); | ||
|
|
||
| this.jobNameToNextSchedulableTime = new ConcurrentHashMap<>(); | ||
|
|
||
| this.isThrottleEnabled = ConfigUtils.getBoolean(sysConfig, GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, | ||
| GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY); | ||
|
|
||
| this.clock = clock; | ||
| } | ||
|
|
||
| public GobblinHelixJobScheduler(Config sysConfig, | ||
| HelixManager jobHelixManager, | ||
| Optional<HelixManager> taskDriverHelixManager, | ||
| EventBus eventBus, | ||
| Path appWorkDir, List<? extends Tag<?>> metadataTags, | ||
| SchedulerService schedulerService, | ||
| MutableJobCatalog jobCatalog) throws Exception { | ||
|
|
||
| this(sysConfig, jobHelixManager, taskDriverHelixManager, eventBus, appWorkDir, metadataTags, | ||
| schedulerService, jobCatalog, Clock.systemUTC()); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -206,9 +245,9 @@ protected void startServices() throws Exception { | |
|
|
||
| if (cleanAllDistJobs) { | ||
| for (org.apache.gobblin.configuration.State state : this.jobsMapping.getAllStates()) { | ||
| String jobUri = state.getId(); | ||
| LOGGER.info("Delete mapping for job " + jobUri); | ||
| this.jobsMapping.deleteMapping(jobUri); | ||
| String jobName = state.getId(); | ||
| LOGGER.info("Delete mapping for job " + jobName); | ||
| this.jobsMapping.deleteMapping(jobName); | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -303,36 +342,66 @@ public Object get(long timeout, TimeUnit unit) throws InterruptedException, Exec | |
| } | ||
|
|
||
| @Subscribe | ||
| public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) { | ||
| String jobUri = newJobArrival.getJobName(); | ||
| LOGGER.info("Received new job configuration of job " + jobUri); | ||
| public synchronized void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) { | ||
|
Peiyingy marked this conversation as resolved.
|
||
| String jobName = newJobArrival.getJobName(); | ||
| LOGGER.info("Received new job configuration of job " + jobName); | ||
|
|
||
| Instant nextSchedulableTime = jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.EPOCH); | ||
| if (this.isThrottleEnabled && clock.instant().isBefore(nextSchedulableTime)) { | ||
| LOGGER.info("Adding new job is skipped for job {}. Current time is {} and the next schedulable time would be {}", | ||
| jobName, | ||
| clock.instant(), | ||
| nextSchedulableTime | ||
| ); | ||
| return; | ||
| } | ||
| nextSchedulableTime = clock.instant().plus(jobSchedulingThrottleTimeout); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we only add entry to jobNameToNextSchedulableTime when the throttle is enabled? it is a hash map, where we can easily see memory leak when we not delete the entry properly |
||
| jobNameToNextSchedulableTime.put(jobName, nextSchedulableTime); | ||
|
|
||
| try { | ||
| Properties jobProps = new Properties(); | ||
| jobProps.putAll(this.commonJobProperties); | ||
| jobProps.putAll(newJobArrival.getJobConfig()); | ||
|
|
||
| // set uri so that we can delete this job later | ||
| jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, jobUri); | ||
| jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, jobName); | ||
|
|
||
| this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps); | ||
|
|
||
| GobblinHelixJobLauncherListener listener = isThrottleEnabled ? | ||
| new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, jobNameToNextSchedulableTime, | ||
| jobSchedulingThrottleTimeout, clock) | ||
| : new GobblinHelixJobLauncherListener(this.launcherMetrics); | ||
| if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) { | ||
| LOGGER.info("Scheduling job " + jobUri); | ||
| scheduleJob(jobProps, | ||
| new GobblinHelixJobLauncherListener(this.launcherMetrics)); | ||
| LOGGER.info("Scheduling job " + jobName); | ||
| scheduleJob(jobProps, listener); | ||
| } else { | ||
| LOGGER.info("No job schedule found, so running job " + jobUri); | ||
| this.jobExecutor.execute(new NonScheduledJobRunner(jobProps, | ||
| new GobblinHelixJobLauncherListener(this.launcherMetrics))); | ||
| LOGGER.info("No job schedule found, so running job " + jobName); | ||
| this.jobExecutor.execute(new NonScheduledJobRunner(jobProps, listener)); | ||
| } | ||
| } catch (JobException je) { | ||
| LOGGER.error("Failed to schedule or run job " + jobUri, je); | ||
| LOGGER.error("Failed to schedule or run job {} . Reset the next scheduable time to {}", | ||
| jobName, | ||
| Instant.EPOCH, | ||
| je); | ||
| jobNameToNextSchedulableTime.put(jobName, Instant.EPOCH); | ||
| } | ||
| } | ||
|
|
||
| @Subscribe | ||
| public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) { | ||
| public synchronized void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @homatthew are we sure this change won't affect performance when those message-handling methods will be called frequently? (That's why initially I suggested having job level lock)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Summary of offline discussion:
Since the only blocking operation in the critical section is the delete operation, and there are infrequent deletes (usually this takes seconds to complete), we can go ahead with the change and add fine-grained locking in the future if necessary |
||
| LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName()); | ||
| String jobName = updateJobArrival.getJobName(); | ||
|
|
||
| Instant nextSchedulableTime = jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.EPOCH); | ||
| if (this.isThrottleEnabled && clock.instant().isBefore(nextSchedulableTime)) { | ||
| LOGGER.info("Replanning is skipped for job {}. Current time is {} and the next schedulable time would be {}", | ||
| jobName, | ||
| clock.instant(), | ||
| nextSchedulableTime | ||
| ); | ||
| return; | ||
| } | ||
|
|
||
| try { | ||
| handleDeleteJobConfigArrival(new DeleteJobConfigArrivalEvent(updateJobArrival.getJobName(), | ||
| updateJobArrival.getJobConfig())); | ||
|
|
@@ -359,8 +428,17 @@ private void waitForJobCompletion(String jobName) { | |
| } | ||
| } | ||
|
|
||
| /*** | ||
| * Deleting a workflow with throttling enabled means that the next | ||
| * schedulable time for the workflow will remain unchanged. | ||
| * Note: In such case, it is required to wait until the throttle | ||
| * timeout period elapses before the workflow can be rescheduled. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice comment! |
||
| * | ||
| * @param deleteJobArrival | ||
| * @throws InterruptedException | ||
| */ | ||
| @Subscribe | ||
| public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException { | ||
| public synchronized void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Super minor nit. Not sure if it's even worth implementing: Would we want to reset the I am not sure which behavior is more intuitive:
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current behavior is (2). And to make the behavior (1), we would:
The delete operations are synchronous and the method is
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| LOGGER.info("Received delete for job configuration of job " + deleteJobArrival.getJobName()); | ||
| try { | ||
| unscheduleJob(deleteJobArrival.getJobName()); | ||
|
|
@@ -373,8 +451,8 @@ public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobAr | |
| @Subscribe | ||
| public void handleCancelJobConfigArrival(CancelJobConfigArrivalEvent cancelJobArrival) | ||
| throws InterruptedException { | ||
| String jobUri = cancelJobArrival.getJoburi(); | ||
| LOGGER.info("Received cancel for job configuration of job " + jobUri); | ||
| String jobName = cancelJobArrival.getJoburi(); | ||
| LOGGER.info("Received cancel for job configuration of job " + jobName); | ||
| Optional<String> distributedJobMode; | ||
| Optional<String> planningJob = Optional.empty(); | ||
| Optional<String> actualJob = Optional.empty(); | ||
|
|
@@ -384,14 +462,14 @@ public void handleCancelJobConfigArrival(CancelJobConfigArrivalEvent cancelJobAr | |
| this.jobSchedulerMetrics.numCancellationStart.incrementAndGet(); | ||
|
|
||
| try { | ||
| distributedJobMode = this.jobsMapping.getDistributedJobMode(jobUri); | ||
| distributedJobMode = this.jobsMapping.getDistributedJobMode(jobName); | ||
| if (distributedJobMode.isPresent() && Boolean.parseBoolean(distributedJobMode.get())) { | ||
| planningJob = this.jobsMapping.getPlanningJobId(jobUri); | ||
| planningJob = this.jobsMapping.getPlanningJobId(jobName); | ||
| } else { | ||
| actualJob = this.jobsMapping.getActualJobId(jobUri); | ||
| actualJob = this.jobsMapping.getActualJobId(jobName); | ||
| } | ||
| } catch (IOException e) { | ||
| LOGGER.warn("jobsMapping could not be retrieved for job {}", jobUri); | ||
| LOGGER.warn("jobsMapping could not be retrieved for job {}", jobName); | ||
| return; | ||
| } | ||
|
|
||
|
|
@@ -466,7 +544,7 @@ public void run() { | |
| GobblinHelixJobScheduler.this.jobSchedulerMetrics.updateTimeBetweenJobSchedulingAndJobLaunching(this.creationTimeInMillis, System.currentTimeMillis()); | ||
| GobblinHelixJobScheduler.this.runJob(this.jobProps, this.jobListener); | ||
| } catch (JobException je) { | ||
| LOGGER.error("Failed to run job " + this.jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je); | ||
| LOGGER.error("Failed to schedule or run job " + this.jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,79 @@ | ||
| package org.apache.gobblin.cluster; | ||
|
|
||
| import java.time.Clock; | ||
| import java.time.Duration; | ||
| import java.time.Instant; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
|
|
||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import lombok.extern.slf4j.Slf4j; | ||
|
|
||
| import org.apache.gobblin.runtime.JobContext; | ||
| import org.apache.gobblin.runtime.JobState; | ||
|
|
||
|
|
||
| /** | ||
| * A job listener used when {@link GobblinHelixJobLauncher} launches a job. | ||
| * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this | ||
| * listener would record jobName to next schedulable time to decide whether | ||
| * the replanning should be executed or skipped. | ||
| */ | ||
| @Slf4j | ||
| public class GobblinThrottlingHelixJobLauncherListener extends GobblinHelixJobLauncherListener { | ||
|
|
||
| public final static Logger LOG = LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class); | ||
|
Peiyingy marked this conversation as resolved.
|
||
| private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime; | ||
| private Duration helixJobSchedulingThrottleTimeout; | ||
| private Clock clock; | ||
|
|
||
| public GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics jobLauncherMetrics, | ||
| ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime, Duration helixJobSchedulingThrottleTimeout, Clock clock) { | ||
| super(jobLauncherMetrics); | ||
| this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime; | ||
| this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout; | ||
| this.clock = clock; | ||
| } | ||
|
|
||
| @Override | ||
| public void onJobPrepare(JobContext jobContext) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why for the same job, why we try to update the schedulable time three times? once when we handle the message, once when we prepare the job, once when job start. This will be confusing reading the log. |
||
| throws Exception { | ||
| super.onJobPrepare(jobContext); | ||
| Instant nextSchedulableTime = clock.instant().plus(helixJobSchedulingThrottleTimeout); | ||
| jobNameToNextSchedulableTime.put(jobContext.getJobName(), nextSchedulableTime); | ||
| LOG.info("{} finished preparing. The next schedulable time is {}", jobContext.getJobName(), nextSchedulableTime); | ||
| } | ||
|
|
||
| @Override | ||
| public void onJobStart(JobContext jobContext) | ||
| throws Exception { | ||
| super.onJobStart(jobContext); | ||
| Instant nextSchedulableTime = clock.instant().plus(helixJobSchedulingThrottleTimeout); | ||
| jobNameToNextSchedulableTime.put(jobContext.getJobName(), nextSchedulableTime); | ||
| LOG.info("{} has started. The next schedulable time is {}", jobContext.getJobName(), nextSchedulableTime); | ||
| } | ||
|
|
||
| @Override | ||
| public void onJobCompletion(JobContext jobContext) | ||
| throws Exception { | ||
| super.onJobCompletion(jobContext); | ||
| if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) { | ||
| jobNameToNextSchedulableTime.put(jobContext.getJobName(), Instant.EPOCH); | ||
| LOG.info("{} failed. The next schedulable time is {} so that any future schedule attempts will be allowed.", | ||
|
Peiyingy marked this conversation as resolved.
|
||
| jobContext.getJobName(), | ||
| Instant.EPOCH); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void onJobCancellation(JobContext jobContext) | ||
| throws Exception { | ||
| super.onJobCancellation(jobContext); | ||
| jobNameToNextSchedulableTime.put(jobContext.getJobName(), Instant.EPOCH); | ||
| LOG.info("{} is cancelled. The next schedulable time is {} so that any future schedule attempts will be allowed.", | ||
| jobContext.getJobName(), | ||
| Instant.EPOCH); | ||
|
|
||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.