Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,21 @@ public class ConfigurationKeys {
public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS = "skip.scheduling.flows.after.num.days";
public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
// Scheduler lease determination store configuration
public static final String MYSQL_LEASE_ARBITER_PREFIX = "MysqlMultiActiveLeaseArbiter";
public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".constantsTable";
public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE = MYSQL_LEASE_ARBITER_PREFIX + ".gobblin_multi_active_scheduler_constants_store";
public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".schedulerLeaseArbiterTable";
public static final String DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = MYSQL_LEASE_ARBITER_PREFIX + ".gobblin_scheduler_lease_determination_store";
public static final String SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY = "eventToRevisitTimestampMillis";
public static final String SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY = "triggerEventTimestampMillis";
public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".epsilonMillis";
public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 5000;
// Note: linger should be on the order of seconds even though we measure in millis
public static final String SCHEDULER_EVENT_LINGER_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".lingerMillis";
public static final int DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS = 30000;
public static final String SCHEDULER_MAX_BACKOFF_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".maxBackoffMillis";
public static final int DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS = 5000;

// Job executor thread pool size
public static final String JOB_EXECUTOR_THREAD_POOL_SIZE_KEY = "jobexecutor.threadpool.size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class ServiceConfigKeys {
public static final boolean DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED = false;
public static final String GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "jobStatusMonitor.enabled";
public static final String GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "warmStandby.enabled";
public static final String GOBBLIN_SERVICE_MULTI_ACTIVE_SCHEDULER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "multiActiveScheduler.enabled";
// If true, will mark up/down d2 servers on leadership so that all requests will be routed to the leader node
public static final String GOBBLIN_SERVICE_D2_ONLY_ANNOUNCE_LEADER = GOBBLIN_SERVICE_PREFIX + "d2.onlyAnnounceLeader";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,24 @@
"type" : "string",
"doc" : "flow execution id for the dag action",
"compliance" : "NONE"
}, {
"name" : "dagAction",
"type": {
"type": "enum",
"name": "DagActionValue",
"symbols": [
"KILL",
"RESUME",
"LAUNCH"
],
Comment thread
umustafi marked this conversation as resolved.
"symbolDocs": {
"KILL": "Kill the flow corresponding to this dag",
"RESUME": "Resume or start a new flow corresponding to this dag",
"LAUNCH": "Launch a new execution of the flow corresponding to this dag"
}
},
"doc" : "type of dag action",
"compliance" : "NONE"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,26 @@
import java.io.IOException;
import java.sql.SQLException;
import java.util.Collection;
import lombok.EqualsAndHashCode;
import lombok.Getter;

import lombok.Data;


public interface DagActionStore {
enum DagActionValue {
KILL,
RESUME
enum FlowActionType {
Comment thread
umustafi marked this conversation as resolved.
KILL, // Kill invoked through API call
RESUME, // Resume flow invoked through API call
LAUNCH, // Launch new flow execution invoked adhoc or through scheduled trigger
RETRY, // Invoked through DagManager for flows configured to allow retries
CANCEL, // Invoked through DagManager if flow has been stuck in Orchestrated state for a while
ADVANCE // Launch next step in multi-hop dag
}

@Getter
@EqualsAndHashCode
@Data
class DagAction {
String flowGroup;
String flowName;
String flowExecutionId;
DagActionValue dagActionValue;
public DagAction(String flowGroup, String flowName, String flowExecutionId, DagActionValue dagActionValue) {
this.flowGroup = flowGroup;
this.flowName = flowName;
this.flowExecutionId = flowExecutionId;
this.dagActionValue = dagActionValue;
}
final String flowGroup;
final String flowName;
final String flowExecutionId;
final FlowActionType flowActionType;
}


Expand All @@ -51,40 +48,28 @@ public DagAction(String flowGroup, String flowName, String flowExecutionId, DagA
* @param flowGroup flow group for the dag action
* @param flowName flow name for the dag action
* @param flowExecutionId flow execution for the dag action
* @param flowActionType the value of the dag action
* @throws IOException
*/
boolean exists(String flowGroup, String flowName, String flowExecutionId) throws IOException, SQLException;
boolean exists(String flowGroup, String flowName, String flowExecutionId, FlowActionType flowActionType) throws IOException, SQLException;

/**
* Persist the dag action in {@link DagActionStore} for durability
* @param flowGroup flow group for the dag action
* @param flowName flow name for the dag action
* @param flowExecutionId flow execution for the dag action
* @param dagActionValue the value of the dag action
* @param flowActionType the value of the dag action
* @throws IOException
*/
void addDagAction(String flowGroup, String flowName, String flowExecutionId, DagActionValue dagActionValue) throws IOException;
void addDagAction(String flowGroup, String flowName, String flowExecutionId, FlowActionType flowActionType) throws IOException;

/**
* delete the dag action from {@link DagActionStore}
* @param flowGroup flow group for the dag action
* @param flowName flow name for the dag action
* @param flowExecutionId flow execution for the dag action
* @param DagAction containing all information needed to identify dag and specific action value
* @throws IOException
* @return true if we successfully delete one record, return false if the record does not exist
*/
boolean deleteDagAction(String flowGroup, String flowName, String flowExecutionId) throws IOException;

/***
* Retrieve action value by the flow group, flow name and flow execution id from the {@link DagActionStore}.
* @param flowGroup flow group for the dag action
* @param flowName flow name for the dag action
* @param flowExecutionId flow execution for the dag action
* @throws IOException Exception in retrieving the {@link DagAction}.
* @throws SpecNotFoundException If {@link DagAction} being retrieved is not present in store.
*/
DagAction getDagAction(String flowGroup, String flowName, String flowExecutionId) throws IOException, SpecNotFoundException,
SQLException;
boolean deleteDagAction(DagAction dagAction) throws IOException;

/***
* Get all {@link DagAction}s from the {@link DagActionStore}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.runtime.api;

import java.io.IOException;

import lombok.Data;


/**
Comment thread
umustafi marked this conversation as resolved.
* This interface defines a generic approach to a non-blocking, multiple active thread or host system, in which one or
* more active participants compete to take responsiblity for a particular flow's event. The type of flow event in
* question does not impact the algorithm other than to uniquely identify the flow event. Each participant uses the
* interface to initiate an attempt at ownership over the flow event and receives a response indicating the status of
* the attempt.
*
* At a high level the lease arbiter works as follows:
* 1. Multiple participants independently learn of a flow action event to act upon
* 2. Each participant attempts to acquire rights or `a lease` to be the sole participant acting on the event by
* calling the tryAcquireLease method below and receives the resulting status. The status indicates whether this
* participant has
* a) LeaseObtainedStatus -> this participant will attempt to carry out the required action before the lease expires
* b) LeasedToAnotherStatus -> another will attempt to carry out the required action before the lease expires
* c) NoLongerLeasingStatus -> flow event no longer needs to be acted upon (terminal state)
* 3. If another participant has acquired the lease before this one could, then the present participant must check back
* in at the time of lease expiry to see if it needs to attempt the lease again [status (b) above].
* 4. Once the participant which acquired the lease completes its work on the flow event, it calls recordLeaseSuccess
* to indicate to all other participants that the flow event no longer needs to be acted upon [status (c) above]
*/
public interface MultiActiveLeaseArbiter {
/**
* This method attempts to insert an entry into store for a particular flow action event if one does not already
* exist in the store for the flow action or has expired. Regardless of the outcome it also reads the lease
* acquisition timestamp of the entry for that flow action event (it could have pre-existed in the table or been newly
* added by the previous write). Based on the transaction results, it will return {@link LeaseAttemptStatus} to
* determine the next action.
* @param flowAction uniquely identifies the flow and the present action upon it
* @param eventTimeMillis is the time this flow action was triggered
* @return LeaseAttemptStatus
* @throws IOException
*/
LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis) throws IOException;

/**
* This method is used to indicate the owner of the lease has successfully completed required actions while holding
* the lease of the flow action event. It marks the lease as "no longer leasing", if the eventTimeMillis and
* leaseAcquisitionTimeMillis values have not changed since this owner acquired the lease (indicating the lease did
* not expire).
* @return true if successfully updated, indicating no further actions need to be taken regarding this event.
Comment thread
umustafi marked this conversation as resolved.
* false if failed to update the lease properly, the caller should continue seeking to acquire the lease as
* if any actions it did successfully accomplish, do not count
*/
boolean recordLeaseSuccess(LeaseObtainedStatus status) throws IOException;

/*
Class used to encapsulate status of lease acquisition attempt and derivations should contain information specific to
the status that results.
*/
abstract class LeaseAttemptStatus {}

class NoLongerLeasingStatus extends LeaseAttemptStatus {}

/*
The participant calling this method acquired the lease for the event in question. The class contains the
`eventTimestamp` associated with the lease as well as the time the caller obtained the lease or
`leaseAcquisitionTimestamp`.
*/
@Data
class LeaseObtainedStatus extends LeaseAttemptStatus {
private final DagActionStore.DagAction flowAction;
private final long eventTimestamp;
private final long leaseAcquisitionTimestamp;
}

/*
This flow action event already has a valid lease owned by another participant.
`eventTimeMillis` is the timestamp the lease is associated with, which may be a different timestamp for the same flow
action corresponding to the same instance of the event or a distinct one.
`minimumLingerDurationMillis` is the minimum amount of time to wait before this participant should return to check if
the lease has completed or expired
*/
@Data
class LeasedToAnotherStatus extends LeaseAttemptStatus {
private final DagActionStore.DagAction flowAction;
private final long eventTimeMillis;
private final long minimumLingerDurationMillis;
}
}
Loading