Skip to content

Commit

Permalink
[GOBBLIN-1884] Delete Dag Action After Loading from Store Upon Startup (
Browse files Browse the repository at this point in the history
#3746)

* delete dag action after loading from store upon startup

* Address review comments from PR#3743 and this one

* Add unit tests

* Ensure reminder event uses new JobKey

---------

Co-authored-by: Urmi Mustafi <[email protected]>
  • Loading branch information
umustafi and Urmi Mustafi authored Aug 22, 2023
1 parent 4e6fa49 commit e2af88a
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 27 deletions.
74 changes: 74 additions & 0 deletions FlowTriggerHandlerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.service.modules.orchestration;

import java.util.Properties;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.junit.Assert;
import org.quartz.JobDataMap;
import org.testng.annotations.Test;


public class FlowTriggerHandlerTest {
String newCronExpression = "0 0 0 ? * * 2024";
long newEventToRevisit = 123L;
long newEventToTrigger = 456L;

/**
* Provides an input with all three values (cronExpression, reminderTimestamp, originalEventTime) set in the map
* Properties and checks that they are updated properly
*/
@Test
public void testUpdatePropsInJobDataMap() {
JobDataMap oldJobDataMap = new JobDataMap();
Properties originalProperties = new Properties();
originalProperties.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 0 0 ? * * 2050");
originalProperties.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY, "0");
originalProperties.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY, "1");
oldJobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, originalProperties);

JobDataMap newJobDataMap = FlowTriggerHandler.updatePropsInJobDataMap(oldJobDataMap, newCronExpression,
newEventToRevisit, newEventToTrigger);
Properties newProperties = (Properties) newJobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
Assert.assertEquals(newCronExpression, newProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
Assert.assertEquals(String.valueOf(newEventToRevisit),
newProperties.getProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY));
Assert.assertEquals(String.valueOf(newEventToTrigger),
newProperties.getProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY));
}

/**
* Provides input with an empty Properties object and checks that the three values in question are set.
*/
@Test
public void testSetPropsInJobDataMap() {
JobDataMap oldJobDataMap = new JobDataMap();
Properties originalProperties = new Properties();
oldJobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, originalProperties);

JobDataMap newJobDataMap = FlowTriggerHandler.updatePropsInJobDataMap(oldJobDataMap, newCronExpression,
newEventToRevisit, newEventToTrigger);
Properties newProperties = (Properties) newJobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
Assert.assertEquals(newCronExpression, newProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
Assert.assertEquals(String.valueOf(newEventToRevisit),
newProperties.getProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY));
Assert.assertEquals(String.valueOf(newEventToTrigger),
newProperties.getProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.codahale.metrics.Timer;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
Expand Down Expand Up @@ -493,24 +494,27 @@ public synchronized void setActive(boolean active) {
* dagStore, we compile the flow to generate the dag before calling addDag(), handling any errors that may result in
* the process.
*/
public void handleLaunchFlowEvent(DagActionStore.DagAction action) {
FlowId flowId = action.getFlowId();
FlowSpec spec;
public void handleLaunchFlowEvent(DagActionStore.DagAction launchAction) {
Preconditions.checkArgument(launchAction.getFlowActionType() == DagActionStore.FlowActionType.LAUNCH);
log.info("Handle launch flow event for action {}", launchAction);
FlowId flowId = launchAction.getFlowId();
try {
log.info("Handle launch flow event for action: {}", action);
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
FlowSpec spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
this.flowCompilationValidationHelper.createExecutionPlanIfValid(spec);
if (optionalJobExecutionPlanDag.isPresent()) {
addDag(optionalJobExecutionPlanDag.get(), true, true);
}
// Upon handling the action, delete it so on leadership change this is not duplicated
this.dagActionStore.get().deleteDagAction(launchAction);
} catch (URISyntaxException e) {
log.warn("Could not create URI object for flowId {} due to exception {}", flowId, e.getMessage());
} catch (SpecNotFoundException e) {
log.warn("Spec not found for flowId {} due to exception {}", flowId, e.getMessage());
} catch (IOException e) {
log.warn("Failed to add Job Execution Plan for flowId {} due to exception {}", flowId, e.getMessage());
log.warn("Failed to add Job Execution Plan for flowId {} OR delete dag action from dagActionStore due to "
+ "exception {}", flowId, e.getMessage());
} catch (InterruptedException e) {
log.warn("SpecCompiler failed to reach healthy state before compilation of flowId {}. Exception: ", flowId, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.util.ConfigUtils;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
Expand Down Expand Up @@ -176,29 +175,24 @@ private void scheduleReminderForEvent(Properties jobProps, MultiActiveLeaseArbit
+ random.nextInt(schedulerMaxBackoffMillis));
JobKey origJobKey = new JobKey(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "<<no job name>>"),
jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "<<no job group>>"));
// Triggers:job have an N:1 relationship but the job properties must remain constant between both, which does not
// allow us to keep track of additional properties for reminder events. By reusing the same job key, we either
// encounter an exception that the job already exists and cannot add it to the scheduler or have to overwrite the
// original job properties with the reminder event schedule. Thus, we differentiate the job and trigger key from the
// original event.
JobKey newJobKey = new JobKey(origJobKey.getName() + createSuffixForJobTrigger(status.getEventTimeMillis()),
origJobKey.getGroup());
try {
if (!this.schedulerService.getScheduler().checkExists(origJobKey)) {
log.warn("Attempting to set a reminder for a job that does not exist in the scheduler. Key: {}", origJobKey);
log.warn("Skipping setting a reminder for a job that does not exist in the scheduler. Key: {}", origJobKey);
this.jobDoesNotExistInSchedulerCount.inc();
return;
}
JobDetailImpl jobDetail = (JobDetailImpl) updatePropsInJobDetail(origJobKey, cronExpression,
JobKey reminderJobKey = constructReminderJobKey(origJobKey, status.getEventTimeMillis());
JobDetailImpl jobDetail = createJobDetailForReminderEvent(origJobKey, reminderJobKey, cronExpression,
status.getEventTimeMillis(), originalEventTimeMillis);
// Create a new trigger that is set to fire at the minimum reminder wait time calculated
Trigger trigger = JobScheduler.createTriggerForJob(newJobKey,
Trigger reminderTrigger = JobScheduler.createTriggerForJob(reminderJobKey,
(Properties) jobDetail.getJobDataMap().get(GobblinServiceJobScheduler.PROPERTIES_KEY), Optional.absent());
log.info("Flow Trigger Handler - [{}, eventTimestamp: {}] - attempting to schedule reminder for event {} in {} "
+ "millis", flowAction, originalEventTimeMillis, status.getEventTimeMillis(), trigger.getNextFireTime());
this.schedulerService.getScheduler().scheduleJob(jobDetail, trigger);
// TODO: remove this comment once we've confirmed this function works
log.info("Flow Trigger Handler - [{}, eventTimestamp: {}] - attempting to schedule reminder for event {}",
flowAction, originalEventTimeMillis, status.getEventTimeMillis());
this.schedulerService.getScheduler().scheduleJob(jobDetail, reminderTrigger);
log.info("Flow Trigger Handler - [{}, eventTimestamp: {}] - SCHEDULED REMINDER for event {} in {} millis",
flowAction, originalEventTimeMillis, status.getEventTimeMillis(), trigger.getNextFireTime());
flowAction, originalEventTimeMillis, status.getEventTimeMillis(), reminderTrigger.getNextFireTime());
} catch (SchedulerException e) {
log.warn("Failed to add job reminder due to SchedulerException for job {} trigger event {}. Exception: {}",
origJobKey, status.getEventTimeMillis(), e);
Expand All @@ -207,18 +201,35 @@ private void scheduleReminderForEvent(Properties jobProps, MultiActiveLeaseArbit
}

/**
* Helper function used to extract JobDetail for job identified by the key and update the Properties map to contain
* the cron scheduler for the reminder event and information about the event to revisit
* @param key
* This function constructs the JobKey for a reminder event given the original JobKey. Although multiple triggers can
* be created for one job, they are required to maintain the same jobProps for all triggers. This does not allow us
* to keep track of additional properties needed for reminder events, so we create a unique job and must differentiate
* the keys from the main job.
* @param originalJobKey
* @param eventToRevisitMillis
* @return
*/
protected JobKey constructReminderJobKey(JobKey originalJobKey, long eventToRevisitMillis) {
return new JobKey(originalJobKey.getName() + createSuffixForJobTrigger(eventToRevisitMillis),
originalJobKey.getGroup());
}

/**
* Helper function used to extract JobDetail for job identified by the originalKey and update it be associated with
* the event to revisit. It will update the jobKey to the reminderKey provides and the Properties map to
* contain the cron scheduler for the reminder event and information about the event to revisit
* @param originalKey
* @param reminderKey
* @param cronExpression
* @param reminderTimestampMillis
* @param originalEventTimeMillis
* @return
* @throws SchedulerException
*/
protected JobDetail updatePropsInJobDetail(JobKey key, String cronExpression, long reminderTimestampMillis,
long originalEventTimeMillis) throws SchedulerException {
JobDetailImpl jobDetail = (JobDetailImpl) this.schedulerService.getScheduler().getJobDetail(key);
protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey, JobKey reminderKey,
String cronExpression, long reminderTimestampMillis, long originalEventTimeMillis) throws SchedulerException {
JobDetailImpl jobDetail = (JobDetailImpl) this.schedulerService.getScheduler().getJobDetail(originalKey);
jobDetail.setKey(reminderKey);
JobDataMap jobDataMap = jobDetail.getJobDataMap();
jobDataMap = updatePropsInJobDataMap(jobDataMap, cronExpression, reminderTimestampMillis, originalEventTimeMillis);
jobDetail.setJobDataMap(jobDataMap);
Expand Down

0 comments on commit e2af88a

Please sign in to comment.