Skip to content

Commit 01cac22

Browse files
committed
Edit TooSoonToRerunSameFlowException exception handling
1 parent 7d2b49f commit 01cac22

File tree

4 files changed

+18
-14
lines changed

4 files changed

+18
-14
lines changed

gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java

+14
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,22 @@ public class TooSoonToRerunSameFlowException extends RuntimeException {
2727
@Getter
2828
private final FlowSpec flowSpec;
2929

30+
/**
31+
* Account for unwrapping within @{link FlowCatalog#updateOrAddSpecHelper}`s `CallbackResult` error handling for `SpecCatalogListener`s
32+
* @return `TooSoonToRerunSameFlowException` wrapped in another `TooSoonToRerunSameFlowException
33+
*/
34+
public static TooSoonToRerunSameFlowException wrappedOnce(FlowSpec flowSpec) {
35+
return new TooSoonToRerunSameFlowException(flowSpec, new TooSoonToRerunSameFlowException(flowSpec));
36+
}
37+
3038
public TooSoonToRerunSameFlowException(FlowSpec flowSpec) {
3139
super("Lease already occupied by another recent execution of this flow: " + flowSpec);
3240
this.flowSpec = flowSpec;
3341
}
42+
43+
/** restricted-access ctor: use {@link #wrappedOnce(FlowSpec)} instead */
44+
private TooSoonToRerunSameFlowException(FlowSpec flowSpec, Throwable cause) {
45+
super("Lease already occupied by another recent execution of this flow: " + flowSpec, cause);
46+
this.flowSpec = flowSpec;
47+
}
3448
}

gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java

+1-7
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@
5757
import org.apache.gobblin.runtime.api.SpecSearchObject;
5858
import org.apache.gobblin.runtime.api.SpecSerDe;
5959
import org.apache.gobblin.runtime.api.SpecStore;
60-
import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException;
6160
import org.apache.gobblin.runtime.spec_serde.JavaSpecSerDe;
6261
import org.apache.gobblin.runtime.spec_store.FSSpecStore;
6362
import org.apache.gobblin.service.ServiceConfigKeys;
@@ -393,12 +392,7 @@ private Map<String, AddSpecResponse> updateOrAddSpecHelper(Spec spec, boolean tr
393392
// If flow fails compilation, the result will have a non-empty string with the error
394393
if (!response.getValue().getFailures().isEmpty()) {
395394
for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry : response.getValue().getFailures().entrySet()) {
396-
Throwable error = entry.getValue().getError();
397-
if (error instanceof RuntimeException && error.getCause() instanceof TooSoonToRerunSameFlowException) {
398-
throw (TooSoonToRerunSameFlowException) error.getCause();
399-
} else {
400-
throw error.getCause();
401-
}
395+
throw entry.getValue().getError().getCause();
402396
}
403397
}
404398
}

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java

-4
Original file line numberDiff line numberDiff line change
@@ -362,10 +362,6 @@ else if (leaseValidityStatus == 2) {
362362
}
363363
}
364364

365-
/*
366-
Determines if a lease can be acquired for the given flow. A lease is acquirable if
367-
no existing lease record exists in arbiter table or the record is older then epsilon time
368-
*/
369365
@Override
370366
public boolean existsSimilarLeaseWithinConsolidationPeriod(DagActionStore.LeaseParams leaseParams) throws IOException {
371367
Optional<GetEventInfoResult> infoResult = getExistingEventInfo(leaseParams);

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.util.Properties;
2525
import java.util.concurrent.TimeUnit;
2626

27-
import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException;
2827
import org.slf4j.Logger;
2928
import org.slf4j.LoggerFactory;
3029

@@ -54,6 +53,7 @@
5453
import org.apache.gobblin.runtime.api.SpecCatalogListener;
5554
import org.apache.gobblin.runtime.api.SpecProducer;
5655
import org.apache.gobblin.runtime.api.TopologySpec;
56+
import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException;
5757
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
5858
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
5959
import org.apache.gobblin.service.modules.flow.FlowUtils;
@@ -139,7 +139,7 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
139139

140140
/*
141141
enforces that a similar adhoc flow is not launching,
142-
else throw TooSoonToRerunSameFlowException
142+
else throw {@link TooSoonToRerunSameFlowException}
143143
*/
144144
private void enforceNoRecentAdhocExecOfSameFlow(FlowSpec flowSpec) {
145145
if (!flowSpec.isScheduled()) {
@@ -151,7 +151,7 @@ private void enforceNoRecentAdhocExecOfSameFlow(FlowSpec flowSpec) {
151151
try {
152152
if (dagManagementStateStore.existsCurrentlyLaunchingExecOfSameFlow(flowGroup, flowName, FlowUtils.getOrCreateFlowExecutionId(flowSpec))) {
153153
_log.warn("Another recent adhoc flow execution found for " + flowGroup + "." + flowName);
154-
throw new RuntimeException(new TooSoonToRerunSameFlowException(flowSpec));
154+
throw TooSoonToRerunSameFlowException.wrappedOnce(flowSpec);
155155
}
156156
} catch (IOException exception) {
157157
_log.error("Unable to check whether similar flow exists " + flowGroup + "." + flowName);

0 commit comments

Comments
 (0)