Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
29f5c2d
[GOBBLIN-1840] Helix Job scheduler should not try to replace running …
Jun 13, 2023
45c87ff
[GOBBLIN-1840] Remove unnecessary files
Jun 13, 2023
df182ff
[GOBBLIN-1840] Add config for throttleTimeoutDuration
Jun 13, 2023
3c552b7
[GOBBLIN-1840] Clean up format and coding standard
Jun 14, 2023
88b9a02
[GOBBLIN-1840] Clean up format layout
Jun 14, 2023
d75e522
[GOBBLIN-1840] Clean up auto format
Peiyingy Jun 14, 2023
4ddd7c5
[GOBBLIN-1840] Clear up empty space
Peiyingy Jun 14, 2023
0e7ba4d
[GOBBLIN-1840] Clarify naming standards and simplify repeated codes
Peiyingy Jun 15, 2023
c449a71
[GOBBLIN-1840] Add Javadoc on GobblinHelixJobSchedulerTest for settin…
Peiyingy Jun 15, 2023
3160d8b
[GOBBLIN-1840] Optimize imports and fix unit test errors
Peiyingy Jun 22, 2023
ae2b58c
[GOBBLIN-1840] Rewrite log info and add Javadoc
Peiyingy Jun 23, 2023
e6ea195
[GOBBLIN-1840] Remove job status check
Peiyingy Jun 23, 2023
6e8358c
[GOBBLIN-1840] Add log info and change config setting
Peiyingy Jun 27, 2023
a15afd8
[GOBBLIN-1840] Add @Slf4j in GobblinThrottlingHelixJobLauncherListener
Peiyingy Jun 27, 2023
bbe4a0b
[GOBBLIN-1840] Fix race condition of handleNewJobConfigArrival
Peiyingy Jun 27, 2023
701881e
[GOBBLIN-1840] Improve mockClock mechanism
Peiyingy Jun 27, 2023
cfdc115
[GOBBLIN-1840] Address comments
Peiyingy Jun 27, 2023
b598455
[GOBBLIN-1840] Only put entry in jobNameToNextSchedulableTime when th…
Peiyingy Jun 29, 2023
32ea7ed
[GOBBLIN-1840] Remove extra schedulable time updates
Peiyingy Jun 29, 2023
2496212
[GOBBLIN-1840] Fix checkstyle problems
Peiyingy Jun 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,11 @@ public class GobblinClusterConfigurationKeys {
public static final String CONTAINER_ID_KEY = GOBBLIN_HELIX_PREFIX + "containerId";

public static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX = GOBBLIN_CLUSTER_PREFIX + "sysProps";

public static final String HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY = "helix.job.scheduling.throttle.enabled";
public static final boolean DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY = false;
Comment thread
Peiyingy marked this conversation as resolved.

public static final String HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY = "helix.job.scheduling.throttle.timeout.seconds";
public static final long DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY = 3600;

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.gobblin.cluster;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -110,6 +113,8 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe

private boolean startServicesCompleted;
private final long helixJobStopTimeoutMillis;
private final Duration throttleTimeoutDurationSecs;
private ConcurrentHashMap<String, Instant> jobNameToStartTimeMap;

public GobblinHelixJobScheduler(Config sysConfig,
HelixManager jobHelixManager,
Expand Down Expand Up @@ -162,6 +167,11 @@ public GobblinHelixJobScheduler(Config sysConfig,
this.helixWorkflowListingTimeoutMillis = ConfigUtils.getLong(sysConfig, GobblinClusterConfigurationKeys.HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS) * 1000;

this.throttleTimeoutDurationSecs = Duration.of(ConfigUtils.getLong(sysConfig, GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY), ChronoUnit.SECONDS);

this.jobNameToStartTimeMap = new ConcurrentHashMap<>();

}

@Override
Expand Down Expand Up @@ -325,6 +335,8 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
new GobblinHelixJobLauncherListener(this.launcherMetrics)));
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Does not need a new line

this.jobNameToStartTimeMap.put(jobUri, Instant.now());
} catch (JobException je) {
LOGGER.error("Failed to schedule or run job " + jobUri, je);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update this log to say that you are resetting the clock

}
Expand All @@ -333,6 +345,20 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
@Subscribe
public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
String jobName = updateJobArrival.getJobName();
boolean isThrottleEnabled = PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),
GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY));

if (isThrottleEnabled && this.jobNameToStartTimeMap.containsKey(jobName)) {
Instant jobStartTime = this.jobNameToStartTimeMap.get(jobName);
Duration workflowRunningDuration = Duration.between(jobStartTime, Instant.now());
if (workflowRunningDuration.minus(throttleTimeoutDurationSecs).isNegative()) {
LOGGER.info("Replanning is skipped for job {} ", jobName);
Comment thread
Peiyingy marked this conversation as resolved.
Outdated
return;
}
}

try {
handleDeleteJobConfigArrival(new DeleteJobConfigArrivalEvent(updateJobArrival.getJobName(),
updateJobArrival.getJobConfig()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,11 @@ public static Map<String, String> getWorkflowIdsFromJobNames(TaskDriver taskDriv
}
Set<String> helixJobs = workflowConfig.getJobDag().getAllNodes();
for (String helixJob : helixJobs) {
Iterator<TaskConfig> taskConfigIterator = taskDriver.getJobConfig(helixJob).getTaskConfigMap().values().iterator();
JobConfig jobConfig = taskDriver.getJobConfig(helixJob);
if (jobConfig == null) {
throw new GobblinHelixUnexpectedStateException("Received null jobConfig from Helix. We should not see any null configs when reading all helixJobs. helixJob=%s", helixJob);
}
Iterator<TaskConfig> taskConfigIterator = jobConfig.getTaskConfigMap().values().iterator();
if (taskConfigIterator.hasNext()) {
TaskConfig taskConfig = taskConfigIterator.next();
String jobName = taskConfig.getConfigMap().get(ConfigurationKeys.JOB_NAME_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.time.Instant;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
Expand All @@ -32,6 +34,7 @@
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.assertj.core.util.Lists;
import org.mockito.MockedStatic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand All @@ -52,6 +55,8 @@
import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
import org.apache.gobblin.scheduler.SchedulerService;

import static org.mockito.Mockito.*;


/**
* Unit tests for {@link org.apache.gobblin.cluster.GobblinHelixJobScheduler}.
Expand All @@ -61,7 +66,6 @@
public class GobblinHelixJobSchedulerTest {
public final static Logger LOG = LoggerFactory.getLogger(GobblinHelixJobSchedulerTest.class);

private HelixManager helixManager;
private FileSystem localFs;
private Path appWorkDir;
private final Closer closer = Closer.create();
Expand All @@ -70,9 +74,16 @@ public class GobblinHelixJobSchedulerTest {
private GobblinTaskRunner gobblinTaskRunner;

private Thread thread;

private final String workflowIdSuffix1 = "_1504201348471";
private final String workflowIdSuffix2 = "_1504201348472";
private final String workflowIdSuffix3 = "_1504201348473";

private Instant beginTime = Instant.ofEpochMilli(0);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instant.EPOCH

private Instant withinThrottlePeriod = Instant.ofEpochMilli(1);
private Instant exceedsThrottlePeriod = Instant.ofEpochMilli(3600001);

private String zkConnectingString;
private String helixClusterName;

@BeforeClass
public void setUp()
Expand All @@ -96,17 +107,11 @@ public void setUp()
ConfigValueFactory.fromAnyRef(sourceJsonFile.getAbsolutePath()))
.withValue(ConfigurationKeys.JOB_STATE_IN_STATE_STORE, ConfigValueFactory.fromAnyRef("true")).resolve();

String zkConnectingString = baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
String helixClusterName = baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
this.zkConnectingString = baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
this.helixClusterName = baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);

HelixUtils.createGobblinHelixCluster(zkConnectingString, helixClusterName);

this.helixManager = HelixManagerFactory
.getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
zkConnectingString);
this.closer.register(() -> helixManager.disconnect());
this.helixManager.connect();

this.localFs = FileSystem.getLocal(new Configuration());

this.closer.register(() -> {
Expand All @@ -129,58 +134,168 @@ public void setUp()
this.thread.start();
}

// Time span exceeds throttle timeout, within same workflow, throttle is enabled
// Job will be updated

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments describing the method should be a java doc instead of a regular comment

@Test
public void testNewJobAndUpdate()
throws Exception {
runWorkflowTest(exceedsThrottlePeriod, "UpdateSameWorkflowLongPeriodThrottle",
workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2, workflowIdSuffix2,
true, true);
}

// Time span is within throttle timeout, within same workflow, throttle is enabled
// Job will not be updated
@Test
public void testUpdateSameWorkflowShortPeriodThrottle()
throws Exception {
runWorkflowTest(withinThrottlePeriod, "UpdateSameWorkflowShortPeriodThrottle",
workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2, workflowIdSuffix1,
true, true);
}

// Time span exceeds throttle timeout, within same workflow, throttle is not enabled
// Job will be updated
@Test
public void testUpdateSameWorkflowLongPeriodNoThrottle()
throws Exception {
runWorkflowTest(exceedsThrottlePeriod, "UpdateSameWorkflowLongPeriodNoThrottle",
workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2, workflowIdSuffix2,
false, true);
}

// Time span is within throttle timeout, within same workflow, throttle is not enabled
// Job will be updated
@Test
public void testUpdateSameWorkflowShortPeriodNoThrottle()
Comment thread
Peiyingy marked this conversation as resolved.
throws Exception {
runWorkflowTest(withinThrottlePeriod, "UpdateSameWorkflowShortPeriodNoThrottle",
workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2, workflowIdSuffix2,
false, true);
}

// Time span is within throttle timeout, within different workflow, throttle is enabled
// Job will be updated
public void testUpdateDiffWorkflowShortPeriodThrottle()
throws Exception {
runWorkflowTest(withinThrottlePeriod, "UpdateDiffWorkflowShortPeriodThrottle",
workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix3, workflowIdSuffix3,
true, false);
}

// Time span is within throttle timeout, within different workflow, throttle is not enabled
// Job will be updated
@Test
public void testUpdateDiffWorkflowShortPeriodNoThrottle()
throws Exception {
runWorkflowTest(withinThrottlePeriod, "UpdateDiffWorkflowShortPeriodNoThrottle",
workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix3, workflowIdSuffix3,
false, false);
}

// Time span exceeds throttle timeout, within different workflow, throttle is enabled
// Job will be updated
@Test
public void testUpdateDiffWorkflowLongPeriodThrottle()
throws Exception {
runWorkflowTest(exceedsThrottlePeriod, "UpdateDiffWorkflowLongPeriodThrottle",
workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix3, workflowIdSuffix3,
true, false);
}

// Time span exceeds throttle timeout, within different workflow, throttle is not enabled
// Job will be updated
@Test
public void testUpdateDiffWorkflowLongPeriodNoThrottle()
throws Exception {
runWorkflowTest(exceedsThrottlePeriod, "UpdateDiffWorkflowLongPeriodNoThrottle",
workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix3, workflowIdSuffix3,
false, false);
}

private GobblinHelixJobScheduler createJobScheduler(HelixManager helixManager) throws Exception {
java.nio.file.Path p = Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
Config config = ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
ConfigValueFactory.fromAnyRef("/tmp/" + GobblinHelixJobScheduler.class.getSimpleName()));
ConfigValueFactory.fromAnyRef(p.toString()));
SchedulerService schedulerService = new SchedulerService(new Properties());
NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
jobCatalog.startAsync();
GobblinHelixJobScheduler jobScheduler =
new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, java.util.Optional.empty(),
new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);

final Properties properties1 =
GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, "1", workflowIdSuffix1);
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
return new GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, java.util.Optional.empty(),
new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
}

private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties properties, String suffix) {
properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
NewJobConfigArrivalEvent newJobConfigArrivalEvent =
new NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1);
jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
"job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) + workflowIdSuffix2);
Map<String, String> workflowIdMap;
this.helixManager.connect();
new NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties);
properties.setProperty(ConfigurationKeys.JOB_ID_KEY,
"job_" + properties.getProperty(ConfigurationKeys.JOB_NAME_KEY) + suffix);
return newJobConfigArrivalEvent;
}

private void connectAndAssertWorkflowId(String expectedSuffix, NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager ) throws Exception {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random question, but why do we use NewJobConfigArrivalEvent instead of just passing a string job name? There were some places in the code where we constructed a brand new NewJobConfigArrivalEvent just to pass it into this method.

helixManager.connect();
String workFlowId = getWorkflowID(newJobConfigArrivalEvent, helixManager);
Assert.assertNotNull(workFlowId);
Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
}

String workFlowId = null;
private String getWorkflowID (NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager )
throws Exception {
// endTime is manually set time period that we allow HelixUtils to fetch workflowIdMap before timeout

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better wording:

Poll helix for up to 30 seconds to fetch until a workflow with a matching job name exists in Helix and then return that workflowID

long endTime = System.currentTimeMillis() + 30000;
Comment thread
Peiyingy marked this conversation as resolved.
Map<String, String> workflowIdMap;
while (System.currentTimeMillis() < endTime) {
workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
try{
workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(helixManager,
Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
} catch(GobblinHelixUnexpectedStateException e){
continue;
}
if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
break;
return workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
}
Thread.sleep(100);
}
Assert.assertNotNull(workFlowId);
Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix1));
return null;
}

jobScheduler.handleUpdateJobConfigArrival(
new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
this.helixManager.connect();
endTime = System.currentTimeMillis() + 30000;
while (System.currentTimeMillis() < endTime) {
workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
break;
private void runWorkflowTest(Instant mockedTime, String jobSuffix, String newJobWorkflowIdSuffix,
String updateWorkflowIdSuffix1, String updateWorkflowIdSuffix2,
String assertUpdateWorkflowIdSuffix, boolean isThrottleEnabled, boolean isSameWorkflow) throws Exception {
try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
mocked.when(Instant::now).thenReturn(beginTime, mockedTime);

// helixManager is set to local variable to avoid the HelixManager (ZkClient) is not connected error across tests
HelixManager helixManager = HelixManagerFactory
.getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
zkConnectingString);
GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
final Properties properties =
GobblinHelixJobLauncherTest.generateJobProperties(
this.baseConfig, jobSuffix, newJobWorkflowIdSuffix);
NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties, updateWorkflowIdSuffix1);
jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
connectAndAssertWorkflowId(newJobWorkflowIdSuffix, newJobConfigArrivalEvent, helixManager);

if (isSameWorkflow) {
properties.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, String.valueOf(isThrottleEnabled));
jobScheduler.handleUpdateJobConfigArrival(
new UpdateJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties));
connectAndAssertWorkflowId(assertUpdateWorkflowIdSuffix, newJobConfigArrivalEvent, helixManager);
}
else {
final Properties properties2 =
GobblinHelixJobLauncherTest.generateJobProperties(
this.baseConfig, jobSuffix + '2', updateWorkflowIdSuffix2);
NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
new NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2);
properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, String.valueOf(isThrottleEnabled));
jobScheduler.handleUpdateJobConfigArrival(
new UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2));
connectAndAssertWorkflowId(assertUpdateWorkflowIdSuffix, newJobConfigArrivalEvent2, helixManager);
}
Thread.sleep(100);
}
Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix2));
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mock-maker-inline

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this anymore since we are not mocking any static classes