Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
29f5c2d
[GOBBLIN-1840] Helix Job scheduler should not try to replace running …
Jun 13, 2023
45c87ff
[GOBBLIN-1840] Remove unnecessary files
Jun 13, 2023
df182ff
[GOBBLIN-1840] Add config for throttleTimeoutDuration
Jun 13, 2023
3c552b7
[GOBBLIN-1840] Clean up format and coding standard
Jun 14, 2023
88b9a02
[GOBBLIN-1840] Clean up format layout
Jun 14, 2023
d75e522
[GOBBLIN-1840] Clean up auto format
Peiyingy Jun 14, 2023
4ddd7c5
[GOBBLIN-1840] Clear up empty space
Peiyingy Jun 14, 2023
0e7ba4d
[GOBBLIN-1840] Clarify naming standards and simplify repeated codes
Peiyingy Jun 15, 2023
c449a71
[GOBBLIN-1840] Add Javadoc on GobblinHelixJobSchedulerTest for settin…
Peiyingy Jun 15, 2023
3160d8b
[GOBBLIN-1840] Optimize imports and fix unit test errors
Peiyingy Jun 22, 2023
ae2b58c
[GOBBLIN-1840] Rewrite log info and add Javadoc
Peiyingy Jun 23, 2023
e6ea195
[GOBBLIN-1840] Remove job status check
Peiyingy Jun 23, 2023
6e8358c
[GOBBLIN-1840] Add log info and change config setting
Peiyingy Jun 27, 2023
a15afd8
[GOBBLIN-1840] Add @Slf4j in GobblinThrottlingHelixJobLauncherListener
Peiyingy Jun 27, 2023
bbe4a0b
[GOBBLIN-1840] Fix race condition of handleNewJobConfigArrival
Peiyingy Jun 27, 2023
701881e
[GOBBLIN-1840] Improve mockClock mechanism
Peiyingy Jun 27, 2023
cfdc115
[GOBBLIN-1840] Address comments
Peiyingy Jun 27, 2023
b598455
[GOBBLIN-1840] Only put entry in jobNameToNextSchedulableTime when th…
Peiyingy Jun 29, 2023
32ea7ed
[GOBBLIN-1840] Remove extra schedulable time updates
Peiyingy Jun 29, 2023
2496212
[GOBBLIN-1840] Fix checkstyle problems
Peiyingy Jun 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,11 @@ public class GobblinClusterConfigurationKeys {
public static final String CONTAINER_ID_KEY = GOBBLIN_HELIX_PREFIX + "containerId";

public static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX = GOBBLIN_CLUSTER_PREFIX + "sysProps";

public static final String HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY = "helix.job.scheduling.throttle.enabled";
public static final boolean DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY = false;
Comment thread
Peiyingy marked this conversation as resolved.

public static final String HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY = "helix.job.scheduling.throttle.timeout.seconds";
public static final long DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY = 3600;

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.gobblin.cluster;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -110,14 +113,16 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe

private boolean startServicesCompleted;
private final long helixJobStopTimeoutMillis;
private final Duration throttleTimeoutDuration;
private ConcurrentHashMap<String, Instant> jobStartTimeMap;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jobUriToStartTimeMap? better if u can clarify what the string is. Also lets be consistent between START/CREATE time you mention in description. Why do we use Instant rather than Timestamp or Long (milliseconds)? The latter is typically what I see used in our code.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to adjusting the map name. Pretty sure the key is the jobName is the key, which refers to the Gobblin configuration job.name

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also just my opinion, but Instant (or I guess Timestamp) are less error prone to write code for than millis.

I don't have an opinion on changing the above duration to Millis long to fit the rest of the class. But Instant vs long is a big deal because long is hard to reason about. It often refers to epoch millis but you always have to add that epochmillis to the name of the map.

As for java.time.Instant vs java.sql.Timestamp, I've seen Instant used elsewhere. And personally haven't seen Timestamp used much. So clearly there are multiple pockets of usage. and either one makes sense probably.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have that strong of a preference with Instant vs. Timestamp/Long, the latter are more common on GaaS side so I was initially surprised. More important to update the map name.


public GobblinHelixJobScheduler(Config sysConfig,
HelixManager jobHelixManager,
Optional<HelixManager> taskDriverHelixManager,
EventBus eventBus,
Path appWorkDir, List<? extends Tag<?>> metadataTags,
SchedulerService schedulerService,
MutableJobCatalog jobCatalog) throws Exception {
HelixManager jobHelixManager,
Comment thread
Peiyingy marked this conversation as resolved.
Outdated
Optional<HelixManager> taskDriverHelixManager,
EventBus eventBus,
Path appWorkDir, List<? extends Tag<?>> metadataTags,
SchedulerService schedulerService,
MutableJobCatalog jobCatalog) throws Exception {

super(ConfigUtils.configToProperties(sysConfig), schedulerService);
this.commonJobProperties = ConfigUtils.configToProperties(ConfigUtils.getConfigOrEmpty(sysConfig, COMMON_JOB_PROPS));
Expand All @@ -131,28 +136,28 @@ public GobblinHelixJobScheduler(Config sysConfig,
this.metricContext = Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(properties), this.getClass());

int metricsWindowSizeInMin = ConfigUtils.getInt(sysConfig,
ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);

this.launcherMetrics = new GobblinHelixJobLauncherMetrics("launcherInScheduler",
this.metricContext,
metricsWindowSizeInMin);
this.metricContext,
metricsWindowSizeInMin);

this.jobSchedulerMetrics = new GobblinHelixJobSchedulerMetrics(this.jobExecutor,
this.metricContext,
metricsWindowSizeInMin);
this.metricContext,
metricsWindowSizeInMin);

this.jobsMapping = new HelixJobsMapping(ConfigUtils.propertiesToConfig(properties),
PathUtils.getRootPath(appWorkDir).toUri(),
appWorkDir.toString());

this.planningJobLauncherMetrics = new GobblinHelixPlanningJobLauncherMetrics("planningLauncherInScheduler",
this.metricContext,
metricsWindowSizeInMin, this.jobsMapping);
this.metricContext,
metricsWindowSizeInMin, this.jobsMapping);

this.helixMetrics = new GobblinHelixMetrics("helixMetricsInJobScheduler",
this.metricContext,
metricsWindowSizeInMin);
this.metricContext,
metricsWindowSizeInMin);

this.startServicesCompleted = false;

Expand All @@ -162,14 +167,19 @@ public GobblinHelixJobScheduler(Config sysConfig,
this.helixWorkflowListingTimeoutMillis = ConfigUtils.getLong(sysConfig, GobblinClusterConfigurationKeys.HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS) * 1000;

this.throttleTimeoutDuration = Duration.of(ConfigUtils.getLong(sysConfig, GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY,
Comment thread
Peiyingy marked this conversation as resolved.
Outdated
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY), ChronoUnit.SECONDS);

this.jobStartTimeMap = new ConcurrentHashMap<>();

}

@Override
public Collection<StandardMetrics> getStandardMetricsCollection() {
return ImmutableList.of(this.launcherMetrics,
this.jobSchedulerMetrics,
this.planningJobLauncherMetrics,
this.helixMetrics);
this.jobSchedulerMetrics,
this.planningJobLauncherMetrics,
this.helixMetrics);
}

@Override
Expand All @@ -188,9 +198,9 @@ public void scheduleJob(Properties jobProps, JobListener jobListener) throws Job
}

scheduleJob(jobProps,
jobListener,
Maps.newHashMap(),
GobblinHelixJob.class);
jobListener,
Maps.newHashMap(),
GobblinHelixJob.class);

} catch (Exception e) {
throw new JobException("Failed to schedule job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
Expand Down Expand Up @@ -319,12 +329,14 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
LOGGER.info("Scheduling job " + jobUri);
scheduleJob(jobProps,
new GobblinHelixJobLauncherListener(this.launcherMetrics));
new GobblinHelixJobLauncherListener(this.launcherMetrics));
} else {
LOGGER.info("No job schedule found, so running job " + jobUri);
this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
new GobblinHelixJobLauncherListener(this.launcherMetrics)));
new GobblinHelixJobLauncherListener(this.launcherMetrics)));
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Does not need a new line

this.jobStartTimeMap.put(jobUri, Instant.now());
} catch (JobException je) {
LOGGER.error("Failed to schedule or run job " + jobUri, je);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update this log to say that you are resetting the clock

}
Expand All @@ -333,6 +345,20 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
@Subscribe
public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
String jobName = updateJobArrival.getJobName();
boolean throttleEnabled = PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usually booleans are easily identified with is.... like isThrottleEnabled

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this default to false if config is not provided? if not provide a default value

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default is provided by GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY in the config file, which is set to false. Should I add an additional default proof in this part?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but you are not using that value here right? You want to provide DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY here so value is used

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the function of getPropAsBoolean, that is:

public static boolean getPropAsBoolean(   @NotNull  Properties properties,
  String key,
  String defaultValue )

so it will call String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY) as the default value if GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY is not assigned

GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY));

if (throttleEnabled && this.jobStartTimeMap.containsKey(jobName)) {
Instant jobStartTime = this.jobStartTimeMap.get(jobName);
Duration workflowDuration = Duration.between(jobStartTime, Instant.now());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe workflowRunningDuration is a more descriptive name. @ZihanLi58 can you help chime in here

Duration difference = workflowDuration.minus(throttleTimeoutDuration);
Comment thread
Peiyingy marked this conversation as resolved.
Outdated
if (difference.isNegative()) {
Comment thread
Peiyingy marked this conversation as resolved.
Outdated
return;
}
}

try {
handleDeleteJobConfigArrival(new DeleteJobConfigArrivalEvent(updateJobArrival.getJobName(),
updateJobArrival.getJobConfig()));
Expand Down Expand Up @@ -452,7 +478,7 @@ class NonScheduledJobRunner implements Runnable {
private final Long creationTimeInMillis;

public NonScheduledJobRunner(Properties jobProps,
GobblinHelixJobLauncherListener jobListener) {
GobblinHelixJobLauncherListener jobListener) {

this.jobProps = jobProps;
this.jobListener = jobListener;
Expand All @@ -470,4 +496,4 @@ public void run() {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,11 @@ public static Map<String, String> getWorkflowIdsFromJobNames(TaskDriver taskDriv
}
Set<String> helixJobs = workflowConfig.getJobDag().getAllNodes();
for (String helixJob : helixJobs) {
Iterator<TaskConfig> taskConfigIterator = taskDriver.getJobConfig(helixJob).getTaskConfigMap().values().iterator();
JobConfig jobConfig = taskDriver.getJobConfig(helixJob);
if (jobConfig == null) {
throw new GobblinHelixUnexpectedStateException("Received null jobConfig from Helix. We should not see any null configs when reading all helixJobs. helixJob=%s", helixJob);
}
Iterator<TaskConfig> taskConfigIterator = jobConfig.getTaskConfigMap().values().iterator();
if (taskConfigIterator.hasNext()) {
TaskConfig taskConfig = taskConfigIterator.next();
String jobName = taskConfig.getConfigMap().get(ConfigurationKeys.JOB_NAME_KEY);
Expand Down
Loading