Skip to content

Commit

Permalink
Begin scheduler implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-weber committed Dec 2, 2024
1 parent 4ea7830 commit d3f10e5
Show file tree
Hide file tree
Showing 14 changed files with 630 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

package org.polypheny.db.workflow.dag.activities;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.catalog.logistic.DataModel;
import org.polypheny.db.workflow.dag.edges.Edge.EdgeState;
import org.polypheny.db.workflow.dag.settings.SettingDef.SettingValue;
import org.polypheny.db.workflow.dag.variables.WritableVariableStore;
import org.polypheny.db.workflow.engine.execution.context.ExecutionContext;
Expand Down Expand Up @@ -81,6 +85,16 @@ default void updateVariables( List<AlgDataType> inTypes, Map<String, SettingValu
*/
void reset();

default DataStateMerger getDataStateMerger() {
// typically depends on the activity type
return DataStateMerger.AND;
}


default ControlStateMerger overrideControlStateMerger() {
return null; // typically depends on the activity config -> we return null
}

enum PortType {
ANY,
REL,
Expand Down Expand Up @@ -121,4 +135,124 @@ enum ActivityCategory {
// more granular categories are also thinkable
}


enum DataStateMerger {
AND( DataStateMerger::andMerger ),
OR( DataStateMerger::orMerger );

private final Function<List<EdgeState>, Boolean> merger;


DataStateMerger( Function<List<EdgeState>, Boolean> merger ) {
this.merger = merger;
}


/**
* Computes whether an activity is NOT aborted based on its data edge states.
*
* @param dataEdges the EdgeState of all data inputs of an activity
* @return false if the data edge states result in an abort, true otherwise.
*/
public boolean merge( List<EdgeState> dataEdges ) {
return merger.apply( dataEdges );
}


private static boolean andMerger( List<EdgeState> dataEdges ) {
// abort if any dataEdge is inactive
return !dataEdges.contains( EdgeState.INACTIVE );
}


private static boolean orMerger( List<EdgeState> dataEdges ) {
// only abort if all dataEdges are inactive. Useful for merging activities
if ( dataEdges.isEmpty() ) {
return true;
}
return !dataEdges.stream().allMatch( state -> state == EdgeState.INACTIVE );
}
}


enum ControlStateMerger {
/**
* Corresponds to ANDing all success control edges and ORing all fail control edges
* The merged result is therefore only ACTIVE if all success control edges are active and at least
* one of the fail control edges (if present) is active.
*/
AND_OR( ControlStateMerger::andOrMerger ),

/**
* Corresponds to ANDing all control edges.
* The merged result is only ACTIVE, if all control edges are ACTIVE.
*/
AND_AND( ControlStateMerger::andAndMerger );

private final BiFunction<List<EdgeState>, List<EdgeState>, EdgeState> merger;


ControlStateMerger( BiFunction<List<EdgeState>, List<EdgeState>, EdgeState> merger ) {
this.merger = merger;
}


/**
* Merges the states of all incoming control edges of an activity into a single state.
* This state can be interpreted as:
* <ul>
* <li>{@link EdgeState#INACTIVE}: trigger an abort</li>
* <li>{@link EdgeState#IDLE}: wait for more control edges to become active or inactive</li>
* <li>{@link EdgeState#ACTIVE}: the activity is ready to be executed</li>
* </ul>
*
* @param successEdges the EdgeStates of all onSuccess control inputs of the activity
* @param failEdges the EdgeStates of all onFail control inputs of the activity
* @return the EdgeState resulting from the merge of all input edges
*/
public EdgeState merge( List<EdgeState> successEdges, List<EdgeState> failEdges ) {
return merger.apply( successEdges, failEdges );
}


private static EdgeState andOrMerger( List<EdgeState> successEdges, List<EdgeState> failEdges ) {
// ANDing all successEdges and ORing all failEdges

if ( successEdges.contains( EdgeState.INACTIVE ) ) {
return EdgeState.INACTIVE;
}

if ( !failEdges.isEmpty() ) {
if ( failEdges.stream().allMatch( state -> state == EdgeState.INACTIVE ) ) {
return EdgeState.INACTIVE;
}

if ( !failEdges.contains( EdgeState.ACTIVE ) ) {
return EdgeState.IDLE;
}
}

if ( successEdges.contains( EdgeState.IDLE ) ) {
return EdgeState.IDLE;
}
return EdgeState.ACTIVE;
}


private static EdgeState andAndMerger( List<EdgeState> successEdges, List<EdgeState> failEdges ) {
List<EdgeState> allEdges = new ArrayList<>();
allEdges.addAll( successEdges );
allEdges.addAll( failEdges );

if ( allEdges.contains( EdgeState.INACTIVE ) ) {
return EdgeState.INACTIVE;
}
if ( allEdges.contains( EdgeState.IDLE ) ) {
return EdgeState.IDLE;
}
return EdgeState.ACTIVE;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.UUID;
import lombok.Getter;
import lombok.Setter;
import org.polypheny.db.workflow.dag.activities.Activity.ControlStateMerger;
import org.polypheny.db.workflow.dag.settings.SettingDef.SettingValue;
import org.polypheny.db.workflow.dag.variables.VariableStore;
import org.polypheny.db.workflow.models.ActivityConfigModel;
Expand Down Expand Up @@ -88,6 +89,15 @@ public ActivityModel toModel( boolean includeState ) {
}


public ControlStateMerger getControlStateMerger() {
ControlStateMerger merger = activity.overrideControlStateMerger();
if ( merger == null ) {
return config.getControlStateMerger();
}
return merger;
}


public static ActivityWrapper fromModel( ActivityModel model ) {
return new ActivityWrapper( model.getId(), ActivityRegistry.activityFromType( model.getType() ), model.getType(), model.getSettings(), model.getConfig(), model.getRendering() );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ void execute() throws ExecutorException {

@Override
public void interrupt() {
super.interrupt();
if ( ctx != null ) {
ctx.setInterrupted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public abstract class Executor implements Callable<Void> {

final StorageManager sm;
final Workflow workflow;
boolean isInterrupted;


// TODO: add reference to monitor for monitoring the progress of activities
Expand All @@ -52,16 +53,21 @@ protected Executor( StorageManager sm, Workflow workflow ) {

abstract void execute() throws ExecutorException;


/**
* Tries to halt the execution in a best-effort manner.
* Only when call() returns or throws an exception is the execution really terminated.
*/
public abstract void interrupt();
public void interrupt() {
isInterrupted = true;
}


@Override
public Void call() throws ExecutorException {
execute();
if ( !isInterrupted ) {
execute();
}
return null;
}

Expand Down Expand Up @@ -110,11 +116,20 @@ CheckpointReader getReader( ActivityWrapper target, int toPort ) {

public static class ExecutorException extends Exception {

// TODO: implement ExecutorException
public ExecutorException( String message ) {
super( message );
}


public ExecutorException( Throwable cause ) {
super( cause );
}


public ExecutorException( String message, Throwable cause ) {
super( message, cause );
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ void execute() throws ExecutorException {

@Override
public void interrupt() {
super.interrupt();
throw new NotImplementedException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ void execute() throws ExecutorException {

@Override
public void interrupt() {
super.interrupt();
if ( !hasDetectedAbort && executor != null ) {
executor.shutdownNow();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ void execute() throws ExecutorException {

@Override
public void interrupt() {
super.interrupt();
if ( ctx != null ) {
ctx.setInterrupted();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2019-2024 The Polypheny Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.polypheny.db.workflow.engine.scheduler;

import java.util.Set;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Value;
import org.polypheny.db.workflow.engine.execution.Executor.ExecutorException;

@Value
@AllArgsConstructor
public class ExecutionResult {

ExecutionSubmission submission;
ExecutorException exception;


public ExecutionResult( ExecutionSubmission submission ) {
this.submission = submission;
exception = null;
}


public Set<UUID> getActivities() {
return submission.getActivities();
}


public UUID getSessionId() {
return submission.getSessionId();
}


public boolean isSuccess() {
return exception == null;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2019-2024 The Polypheny Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.polypheny.db.workflow.engine.scheduler;

import java.util.Set;
import java.util.UUID;
import lombok.Value;
import org.polypheny.db.workflow.engine.execution.Executor;
import org.polypheny.db.workflow.models.ActivityConfigModel.CommonTransaction;

@Value
public class ExecutionSubmission {

CommonTransaction commonType;
Executor executor;
Set<UUID> activities;
UUID sessionId;

}
Loading

0 comments on commit d3f10e5

Please sign in to comment.