Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
29f5c2d
[GOBBLIN-1840] Helix Job scheduler should not try to replace running …
Jun 13, 2023
45c87ff
[GOBBLIN-1840] Remove unnecessary files
Jun 13, 2023
df182ff
[GOBBLIN-1840] Add config for throttleTimeoutDuration
Jun 13, 2023
3c552b7
[GOBBLIN-1840] Clean up format and coding standard
Jun 14, 2023
88b9a02
[GOBBLIN-1840] Clean up format layout
Jun 14, 2023
d75e522
[GOBBLIN-1840] Clean up auto format
Peiyingy Jun 14, 2023
4ddd7c5
[GOBBLIN-1840] Clear up empty space
Peiyingy Jun 14, 2023
0e7ba4d
[GOBBLIN-1840] Clarify naming standards and simplify repeated codes
Peiyingy Jun 15, 2023
c449a71
[GOBBLIN-1840] Add Javadoc on GobblinHelixJobSchedulerTest for settin…
Peiyingy Jun 15, 2023
3160d8b
[GOBBLIN-1840] Optimize imports and fix unit test errors
Peiyingy Jun 22, 2023
ae2b58c
[GOBBLIN-1840] Rewrite log info and add Javadoc
Peiyingy Jun 23, 2023
e6ea195
[GOBBLIN-1840] Remove job status check
Peiyingy Jun 23, 2023
6e8358c
[GOBBLIN-1840] Add log info and change config setting
Peiyingy Jun 27, 2023
a15afd8
[GOBBLIN-1840] Add @Slf4j in GobblinThrottlingHelixJobLauncherListener
Peiyingy Jun 27, 2023
bbe4a0b
[GOBBLIN-1840] Fix race condition of handleNewJobConfigArrival
Peiyingy Jun 27, 2023
701881e
[GOBBLIN-1840] Improve mockClock mechanism
Peiyingy Jun 27, 2023
cfdc115
[GOBBLIN-1840] Address comments
Peiyingy Jun 27, 2023
b598455
[GOBBLIN-1840] Only put entry in jobNameToNextSchedulableTime when th…
Peiyingy Jun 29, 2023
32ea7ed
[GOBBLIN-1840] Remove extra schedulable time updates
Peiyingy Jun 29, 2023
2496212
[GOBBLIN-1840] Fix checkstyle problems
Peiyingy Jun 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.gobblin.cluster;

import java.time.Duration;

import org.apache.gobblin.annotation.Alpha;


Expand Down Expand Up @@ -222,4 +224,11 @@ public class GobblinClusterConfigurationKeys {
public static final String CONTAINER_ID_KEY = GOBBLIN_HELIX_PREFIX + "containerId";

public static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX = GOBBLIN_CLUSTER_PREFIX + "sysProps";

public static final String HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY = "helix.job.scheduling.throttle.enabled";
public static final boolean DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY = false;

public static final String HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY = "helix.job.scheduling.throttle.timeout.seconds";
public static final long DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY = Duration.ofMinutes(40).getSeconds();;

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
package org.apache.gobblin.cluster;

import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -111,14 +115,28 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
private boolean startServicesCompleted;
private final long helixJobStopTimeoutMillis;

/**
* The throttling timeout prevents helix workflows with the same job name / URI from being submitted
* more than once within the timeout period. This timeout is not reset by deletes / cancels, meaning that
* if you delete a workflow within the timeout period, you cannot reschedule until the timeout period is complete.
* However, if there is an error when launching the job, you can immediately reschedule the flow. <br><br>
*
* NOTE: This throttle timeout period starts when the job launcher thread picks up the runnable. Meaning that the
* time it takes to submit to Helix and start running the flow is also included as part of the timeout period
*/
private final Duration jobSchedulingThrottleTimeout;
private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
private boolean isThrottleEnabled;
private Clock clock;

public GobblinHelixJobScheduler(Config sysConfig,
HelixManager jobHelixManager,
Optional<HelixManager> taskDriverHelixManager,
EventBus eventBus,
Path appWorkDir, List<? extends Tag<?>> metadataTags,
SchedulerService schedulerService,
MutableJobCatalog jobCatalog) throws Exception {

MutableJobCatalog jobCatalog,
Clock clock) throws Exception {
super(ConfigUtils.configToProperties(sysConfig), schedulerService);
this.commonJobProperties = ConfigUtils.configToProperties(ConfigUtils.getConfigOrEmpty(sysConfig, COMMON_JOB_PROPS));
this.jobHelixManager = jobHelixManager;
Expand Down Expand Up @@ -162,6 +180,27 @@ public GobblinHelixJobScheduler(Config sysConfig,
this.helixWorkflowListingTimeoutMillis = ConfigUtils.getLong(sysConfig, GobblinClusterConfigurationKeys.HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS) * 1000;

this.jobSchedulingThrottleTimeout = Duration.of(ConfigUtils.getLong(sysConfig, GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY), ChronoUnit.SECONDS);

this.jobNameToNextSchedulableTime = new ConcurrentHashMap<>();

this.isThrottleEnabled = ConfigUtils.getBoolean(sysConfig, GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY);

this.clock = clock;
}

public GobblinHelixJobScheduler(Config sysConfig,
HelixManager jobHelixManager,
Optional<HelixManager> taskDriverHelixManager,
EventBus eventBus,
Path appWorkDir, List<? extends Tag<?>> metadataTags,
SchedulerService schedulerService,
MutableJobCatalog jobCatalog) throws Exception {

this(sysConfig, jobHelixManager, taskDriverHelixManager, eventBus, appWorkDir, metadataTags,
schedulerService, jobCatalog, Clock.systemUTC());
}

@Override
Expand Down Expand Up @@ -206,9 +245,9 @@ protected void startServices() throws Exception {

if (cleanAllDistJobs) {
for (org.apache.gobblin.configuration.State state : this.jobsMapping.getAllStates()) {
String jobUri = state.getId();
LOGGER.info("Delete mapping for job " + jobUri);
this.jobsMapping.deleteMapping(jobUri);
String jobName = state.getId();
LOGGER.info("Delete mapping for job " + jobName);
this.jobsMapping.deleteMapping(jobName);
}
}
}
Expand Down Expand Up @@ -303,36 +342,70 @@ public Object get(long timeout, TimeUnit unit) throws InterruptedException, Exec
}

@Subscribe
public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
String jobUri = newJobArrival.getJobName();
LOGGER.info("Received new job configuration of job " + jobUri);
public synchronized void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
String jobName = newJobArrival.getJobName();
LOGGER.info("Received new job configuration of job " + jobName);

Instant nextSchedulableTime = jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.EPOCH);
if (this.isThrottleEnabled && clock.instant().isBefore(nextSchedulableTime)) {
LOGGER.info("Adding new job is skipped for job {}. Current time is {} and the next schedulable time would be {}",
jobName,
clock.instant(),
nextSchedulableTime
);
return;
}

if (isThrottleEnabled) {
nextSchedulableTime = clock.instant().plus(jobSchedulingThrottleTimeout);
jobNameToNextSchedulableTime.put(jobName, nextSchedulableTime);
}

try {
Properties jobProps = new Properties();
jobProps.putAll(this.commonJobProperties);
jobProps.putAll(newJobArrival.getJobConfig());

// set uri so that we can delete this job later
jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, jobUri);
jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, jobName);

this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);

GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, jobNameToNextSchedulableTime)
: new GobblinHelixJobLauncherListener(this.launcherMetrics);
if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
LOGGER.info("Scheduling job " + jobUri);
scheduleJob(jobProps,
new GobblinHelixJobLauncherListener(this.launcherMetrics));
LOGGER.info("Scheduling job " + jobName);
scheduleJob(jobProps, listener);
} else {
LOGGER.info("No job schedule found, so running job " + jobUri);
this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
new GobblinHelixJobLauncherListener(this.launcherMetrics)));
LOGGER.info("No job schedule found, so running job " + jobName);
this.jobExecutor.execute(new NonScheduledJobRunner(jobProps, listener));
}
} catch (JobException je) {
LOGGER.error("Failed to schedule or run job " + jobUri, je);
LOGGER.error("Failed to schedule or run job {} . Reset the next scheduable time to {}",
jobName,
Instant.EPOCH,
je);
if (isThrottleEnabled) {
jobNameToNextSchedulableTime.put(jobName, Instant.EPOCH);
}
}
}

@Subscribe
public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
public synchronized void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@homatthew are we sure this change won't affect performance when those message-handling methods will be called frequently? (That's why initially I suggested having job level lock)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of offline discussion:

What kind of throughput are expecting with this job launcher? I.e. for fliptop I know the traffic is bursty but how is it bursty? What sort of magnitude are we talking about here?

we have 100k jobs submitted throughout the day, so around 1~2 per second? And cancel job can be triggered randomly but should be much in-frequent

Since the only blocking operation in the critical section is the delete operation, and there are infrequent deletes (usually this takes seconds to complete), we can go ahead with the change and add fine-grained locking in the future if necessary

LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
String jobName = updateJobArrival.getJobName();

Instant nextSchedulableTime = jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.EPOCH);
if (this.isThrottleEnabled && clock.instant().isBefore(nextSchedulableTime)) {
LOGGER.info("Replanning is skipped for job {}. Current time is {} and the next schedulable time would be {}",
jobName,
clock.instant(),
nextSchedulableTime
);
return;
}

try {
handleDeleteJobConfigArrival(new DeleteJobConfigArrivalEvent(updateJobArrival.getJobName(),
updateJobArrival.getJobConfig()));
Expand All @@ -359,8 +432,17 @@ private void waitForJobCompletion(String jobName) {
}
}

/***
* Deleting a workflow with throttling enabled means that the next
* schedulable time for the workflow will remain unchanged.
* Note: In such case, it is required to wait until the throttle
* timeout period elapses before the workflow can be rescheduled.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice comment!

*
* @param deleteJobArrival
* @throws InterruptedException
*/
@Subscribe
public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {
public synchronized void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super minor nit. Not sure if it's even worth implementing:

Would we want to reset the Instant in the map to Instant.EPOCH if we delete a workflow? My understanding is that internally we don't use this delete job config method and only rely on update, so this wouldn't really affect our own use case.

I am not sure which behavior is more intuitive:

  1. If I explicitly delete, I should be able to reschedule it and bypass the throttle time
  2. Regardless of if I deleted the old flow, the throttle time should prevent resubmission

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current behavior is (2). And to make the behavior (1), we would:

  1. Store the current time in the map,
  2. Set the value in the map to Instant.EPOCH
  3. If there is a job exception we reset the value back to the original value that was in the map

The delete operations are synchronous and the method is synchronized, so this approach would be thread safe

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. In handleUpdateJobConfigArrival, we call handleDeleteJobConfigArrival directly. So if you want to specifically reset it, remember to distinguish the two calls here (one is called by handleUpdateJobConfigArrival) and another is called directly.
  2. Unless you change all the cancel APIs in our code to send a delete job message to trigger this method, then, in that case, resetting the timer can enable us to start a new job immediately, otherwise it does not make sense to achieve 1, as we don't explicitly delete anyway...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Since handleDeleteJobConfigArrival is a completely synchronous method, the synchronized method handleUpdateJobConfigArrival would just hold the lock while deleting and then proceed to handleNewJobConfigArrival. There would be no need to distinguish between the two since @Peiyingy needs to address the race condition described in https://github.com/apache/gobblin/pull/3704/files#r1243111712 by updating the map immediately in the newJobConfigArrival method.

  2. Yeah we don't call explicitly call delete so it's just semantics about which is more intuitive behavior if we ever use this in the future. Since this is purely hypothetical I don't want to waste effort changing the behavior to (1). I think we should just add a comment describing that deleting a workflow with throttling enabled means that the next schedulable time for the workflow will remain unchanged and you have to wait out the throttle timeout before being able to reschedule

LOGGER.info("Received delete for job configuration of job " + deleteJobArrival.getJobName());
try {
unscheduleJob(deleteJobArrival.getJobName());
Expand All @@ -373,8 +455,8 @@ public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobAr
@Subscribe
public void handleCancelJobConfigArrival(CancelJobConfigArrivalEvent cancelJobArrival)
throws InterruptedException {
String jobUri = cancelJobArrival.getJoburi();
LOGGER.info("Received cancel for job configuration of job " + jobUri);
String jobName = cancelJobArrival.getJoburi();
LOGGER.info("Received cancel for job configuration of job " + jobName);
Optional<String> distributedJobMode;
Optional<String> planningJob = Optional.empty();
Optional<String> actualJob = Optional.empty();
Expand All @@ -384,14 +466,14 @@ public void handleCancelJobConfigArrival(CancelJobConfigArrivalEvent cancelJobAr
this.jobSchedulerMetrics.numCancellationStart.incrementAndGet();

try {
distributedJobMode = this.jobsMapping.getDistributedJobMode(jobUri);
distributedJobMode = this.jobsMapping.getDistributedJobMode(jobName);
if (distributedJobMode.isPresent() && Boolean.parseBoolean(distributedJobMode.get())) {
planningJob = this.jobsMapping.getPlanningJobId(jobUri);
planningJob = this.jobsMapping.getPlanningJobId(jobName);
} else {
actualJob = this.jobsMapping.getActualJobId(jobUri);
actualJob = this.jobsMapping.getActualJobId(jobName);
}
} catch (IOException e) {
LOGGER.warn("jobsMapping could not be retrieved for job {}", jobUri);
LOGGER.warn("jobsMapping could not be retrieved for job {}", jobName);
return;
}

Expand Down Expand Up @@ -466,7 +548,7 @@ public void run() {
GobblinHelixJobScheduler.this.jobSchedulerMetrics.updateTimeBetweenJobSchedulingAndJobLaunching(this.creationTimeInMillis, System.currentTimeMillis());
GobblinHelixJobScheduler.this.runJob(this.jobProps, this.jobListener);
} catch (JobException je) {
LOGGER.error("Failed to run job " + this.jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
LOGGER.error("Failed to schedule or run job " + this.jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.cluster;

import java.time.Instant;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.runtime.JobContext;
import org.apache.gobblin.runtime.JobState;


/**
* A job listener used when {@link GobblinHelixJobLauncher} launches a job.
* In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
* listener would record jobName to next schedulable time to decide whether
* the replanning should be executed or skipped.
*/
@Slf4j
public class GobblinThrottlingHelixJobLauncherListener extends GobblinHelixJobLauncherListener {

public final static Logger LOG = LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;

public GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics jobLauncherMetrics,
ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime) {
super(jobLauncherMetrics);
this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
}

@Override
public void onJobCompletion(JobContext jobContext)
throws Exception {
super.onJobCompletion(jobContext);
if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
jobNameToNextSchedulableTime.put(jobContext.getJobName(), Instant.EPOCH);
LOG.info("{} failed. The next schedulable time is {} so that any future schedule attempts will be allowed.",
jobContext.getJobName(), Instant.EPOCH);
}
}

@Override
public void onJobCancellation(JobContext jobContext)
throws Exception {
super.onJobCancellation(jobContext);
jobNameToNextSchedulableTime.put(jobContext.getJobName(), Instant.EPOCH);
LOG.info("{} is cancelled. The next schedulable time is {} so that any future schedule attempts will be allowed.",
jobContext.getJobName(), Instant.EPOCH);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,11 @@ public static Map<String, String> getWorkflowIdsFromJobNames(TaskDriver taskDriv
}
Set<String> helixJobs = workflowConfig.getJobDag().getAllNodes();
for (String helixJob : helixJobs) {
Iterator<TaskConfig> taskConfigIterator = taskDriver.getJobConfig(helixJob).getTaskConfigMap().values().iterator();
JobConfig jobConfig = taskDriver.getJobConfig(helixJob);
if (jobConfig == null) {
throw new GobblinHelixUnexpectedStateException("Received null jobConfig from Helix. We should not see any null configs when reading all helixJobs. helixJob=%s", helixJob);
}
Iterator<TaskConfig> taskConfigIterator = jobConfig.getTaskConfigMap().values().iterator();
if (taskConfigIterator.hasNext()) {
TaskConfig taskConfig = taskConfigIterator.next();
String jobName = taskConfig.getConfigMap().get(ConfigurationKeys.JOB_NAME_KEY);
Expand Down
Loading