Skip to content

Commit 8d7a028

Browse files
committed
Begin with implementation of Executors
1 parent a63bdd3 commit 8d7a028

32 files changed

+1065
-171
lines changed

plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/Workflow.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import java.util.List;
2020
import java.util.UUID;
21-
import org.polypheny.db.workflow.dag.activities.Activity;
21+
import org.polypheny.db.workflow.dag.activities.ActivityWrapper;
2222
import org.polypheny.db.workflow.dag.edges.Edge;
2323
import org.polypheny.db.workflow.models.EdgeModel;
2424
import org.polypheny.db.workflow.models.WorkflowConfigModel;
@@ -29,7 +29,9 @@
2929
*/
3030
public interface Workflow {
3131

32-
List<Activity> getActivities(); // TODO: change return type to Map<UUID, Activity> ?
32+
List<ActivityWrapper> getActivities(); // TODO: change return type to Map<UUID, Activity> ?
33+
34+
ActivityWrapper getActivity( UUID activityId );
3335

3436
/**
3537
* Get all edges of this workflow as list with arbitrary order.
@@ -47,7 +49,7 @@ public interface Workflow {
4749
*/
4850
List<Edge> getEdges( UUID from, UUID to );
4951

50-
List<Edge> getEdges( Activity from, Activity to );
52+
List<Edge> getEdges( ActivityWrapper from, ActivityWrapper to );
5153

5254
List<Edge> getInEdges( UUID target );
5355

@@ -69,7 +71,7 @@ public interface Workflow {
6971
*/
7072
void setState( WorkflowState state );
7173

72-
void addActivity( Activity activity );
74+
void addActivity( ActivityWrapper activity );
7375

7476
void deleteActivity( UUID activityId );
7577

plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/WorkflowImpl.java

+14-8
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import lombok.Setter;
2727
import org.polypheny.db.catalog.exceptions.GenericRuntimeException;
2828
import org.polypheny.db.util.Pair;
29-
import org.polypheny.db.workflow.dag.activities.Activity;
29+
import org.polypheny.db.workflow.dag.activities.ActivityWrapper;
3030
import org.polypheny.db.workflow.dag.edges.Edge;
3131
import org.polypheny.db.workflow.models.ActivityModel;
3232
import org.polypheny.db.workflow.models.EdgeModel;
@@ -35,7 +35,7 @@
3535

3636
public class WorkflowImpl implements Workflow {
3737

38-
private final Map<UUID, Activity> activities;
38+
private final Map<UUID, ActivityWrapper> activities;
3939
private final Map<Pair<UUID, UUID>, List<Edge>> edges;
4040
private final WorkflowConfigModel config;
4141
@Getter
@@ -50,7 +50,7 @@ public WorkflowImpl() {
5050
}
5151

5252

53-
private WorkflowImpl( Map<UUID, Activity> activities, Map<Pair<UUID, UUID>, List<Edge>> edges, WorkflowConfigModel config ) {
53+
private WorkflowImpl( Map<UUID, ActivityWrapper> activities, Map<Pair<UUID, UUID>, List<Edge>> edges, WorkflowConfigModel config ) {
5454
this.activities = activities;
5555
this.edges = edges;
5656
this.config = config;
@@ -59,11 +59,11 @@ private WorkflowImpl( Map<UUID, Activity> activities, Map<Pair<UUID, UUID>, List
5959

6060
public static Workflow fromModel( WorkflowModel model ) {
6161

62-
Map<UUID, Activity> activities = new ConcurrentHashMap<>();
62+
Map<UUID, ActivityWrapper> activities = new ConcurrentHashMap<>();
6363
Map<Pair<UUID, UUID>, List<Edge>> edges = new ConcurrentHashMap<>();
6464

6565
for ( ActivityModel a : model.getActivities() ) {
66-
activities.put( a.getId(), Activity.fromModel( a ) );
66+
activities.put( a.getId(), ActivityWrapper.fromModel( a ) );
6767
}
6868
for ( EdgeModel e : model.getEdges() ) {
6969
Pair<UUID, UUID> key = Pair.of( e.getFromId(), e.getToId() );
@@ -76,11 +76,17 @@ public static Workflow fromModel( WorkflowModel model ) {
7676

7777

7878
@Override
79-
public List<Activity> getActivities() {
79+
public List<ActivityWrapper> getActivities() {
8080
return new ArrayList<>( activities.values() );
8181
}
8282

8383

84+
@Override
85+
public ActivityWrapper getActivity( UUID activityId ) {
86+
return activities.get( activityId );
87+
}
88+
89+
8490
@Override
8591
public List<Edge> getEdges() {
8692
return edges.values()
@@ -97,7 +103,7 @@ public List<Edge> getEdges( UUID from, UUID to ) {
97103

98104

99105
@Override
100-
public List<Edge> getEdges( Activity from, Activity to ) {
106+
public List<Edge> getEdges( ActivityWrapper from, ActivityWrapper to ) {
101107
return getEdges( from.getId(), to.getId() );
102108
}
103109

@@ -133,7 +139,7 @@ public WorkflowConfigModel getConfig() {
133139

134140

135141
@Override
136-
public void addActivity( Activity activity ) {
142+
public void addActivity( ActivityWrapper activity ) {
137143
if ( activities.containsKey( activity.getId() ) ) {
138144
throw new GenericRuntimeException( "Cannot add activity instance that is already part of this workflow." );
139145
}

plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/Activity.java

+38-45
Original file line numberDiff line numberDiff line change
@@ -16,36 +16,18 @@
1616

1717
package org.polypheny.db.workflow.dag.activities;
1818

19-
import com.fasterxml.jackson.databind.JsonNode;
2019
import java.util.List;
2120
import java.util.Map;
2221
import java.util.Optional;
23-
import java.util.UUID;
2422
import org.polypheny.db.algebra.type.AlgDataType;
23+
import org.polypheny.db.catalog.logistic.DataModel;
2524
import org.polypheny.db.workflow.dag.settings.SettingDef.SettingValue;
25+
import org.polypheny.db.workflow.dag.variables.WritableVariableStore;
2626
import org.polypheny.db.workflow.engine.execution.ExecutionContext;
2727
import org.polypheny.db.workflow.engine.storage.CheckpointReader;
28-
import org.polypheny.db.workflow.models.ActivityConfigModel;
29-
import org.polypheny.db.workflow.models.ActivityModel;
30-
import org.polypheny.db.workflow.models.RenderModel;
3128

3229
public interface Activity {
3330

34-
String getType();
35-
36-
UUID getId();
37-
38-
ActivityState getState();
39-
40-
/**
41-
* Changes the state of this activity to the specified state.
42-
* After initialization, this should never be done by the activity itself.
43-
* The state is typically changed by the scheduler.
44-
*
45-
* @param state the new state of this activity
46-
*/
47-
void setState( ActivityState state );
48-
4931
/**
5032
* This method computes the output tuple-types by considering (a preview of) input types and settings.
5133
* If the input types or settings are not available or cannot be validated yet, the output type is set to an empty {@link Optional}
@@ -61,32 +43,39 @@ public interface Activity {
6143
*/
6244
List<Optional<AlgDataType>> previewOutTypes( List<Optional<AlgDataType>> inTypes, Map<String, Optional<SettingValue>> settings ) throws ActivityException;
6345

64-
void execute( List<CheckpointReader> inputs, Map<String, SettingValue> settings, ExecutionContext ctx ) throws Exception; // default execution method. TODO: introduce execution context to track progress, abort, inputs, outputs...
65-
66-
void updateSettings( Map<String, JsonNode> settings );
67-
68-
ActivityConfigModel getConfig();
6946

70-
void setConfig( ActivityConfigModel config );
71-
72-
RenderModel getRendering();
47+
/**
48+
* This method is called just before execution starts and can be used to write variables based on input tuple types and settings.
49+
* To be able to update variables while having access to the input data, the activity should instead implement {@link VariableWriter}.
50+
*
51+
* @param inTypes a list of {@link AlgDataType} representing the input tuple types.
52+
* @param settings a map of setting keys to {@link SettingValue} representing the settings.
53+
* @param variables a WritableVariableStore to be used for updating any variable values.
54+
*/
55+
default void updateVariables( List<AlgDataType> inTypes, Map<String, SettingValue> settings, WritableVariableStore variables ) {
56+
}
7357

74-
void setRendering( RenderModel rendering );
58+
// settings do NOT include values from the updateVariables step.
7559

76-
ActivityDef getDef();
60+
/**
61+
* Execute this activity.
62+
* Any input CheckpointReaders are provided and expected to be closed by the caller.
63+
* CheckpointWriters for any outputs are created from the ExecutionContext.
64+
* The settings do not incorporate any changes to variables from {@code updateVariables()}.
65+
*
66+
* @param inputs a list of input readers for each input specified by the annotation.
67+
* @param settings the instantiated setting values, according to the specified settings annotations
68+
* @param ctx ExecutionContext to be used for creating checkpoints, updating progress and periodically checking for an abort
69+
* @throws Exception in case the execution fails at any point
70+
*/
71+
void execute( List<CheckpointReader> inputs, Map<String, SettingValue> settings, ExecutionContext ctx ) throws Exception; // default execution method
7772

7873
/**
7974
* Reset any execution-specific state of this activity.
8075
* It is guaranteed to be called before execution starts.
8176
*/
8277
void reset();
8378

84-
ActivityModel toModel( boolean includeState );
85-
86-
static Activity fromModel( ActivityModel model ) {
87-
return ActivityRegistry.activityFromModel( model );
88-
}
89-
9079
enum PortType {
9180
ANY,
9281
REL,
@@ -102,17 +91,21 @@ public boolean canReadFrom( PortType other ) {
10291
public boolean canWriteTo( PortType other ) {
10392
return this == other || other == ANY;
10493
}
105-
}
10694

10795

108-
enum ActivityState {
109-
IDLE,
110-
QUEUED,
111-
EXECUTING,
112-
SKIPPED, // => execution was aborted
113-
FAILED,
114-
FINISHED,
115-
SAVED // => finished + checkpoint created
96+
/**
97+
* Returns the corresponding DataModel enum.
98+
* In case of ANY, DataModel.RELATIONAL is returned.
99+
*
100+
* @return the corresponding DataModel
101+
*/
102+
public DataModel getDataModel() {
103+
return switch ( this ) {
104+
case ANY, REL -> DataModel.RELATIONAL;
105+
case DOC -> DataModel.DOCUMENT;
106+
case LPG -> DataModel.GRAPH;
107+
};
108+
}
116109
}
117110

118111

plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/ActivityException.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
@Getter
2222
public class ActivityException extends Exception {
2323

24-
private final Activity activity;
24+
private final ActivityWrapper activity;
2525

2626

27-
public ActivityException( String message, Activity activity ) {
27+
public ActivityException( String message, ActivityWrapper activity ) {
2828
super( message );
2929
this.activity = activity;
3030
}
@@ -42,7 +42,7 @@ public static class InvalidSettingException extends ActivityException {
4242
private final String settingKey;
4343

4444

45-
public InvalidSettingException( String message, Activity activity, String settingKey ) {
45+
public InvalidSettingException( String message, ActivityWrapper activity, String settingKey ) {
4646
super( message, activity );
4747
this.settingKey = settingKey;
4848
}
@@ -61,7 +61,7 @@ public static class InvalidInputException extends ActivityException {
6161
private final int index;
6262

6363

64-
public InvalidInputException( String message, Activity activity, int index ) {
64+
public InvalidInputException( String message, ActivityWrapper activity, int index ) {
6565
super( message, activity );
6666
this.index = index;
6767
}

plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/ActivityRegistry.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import org.polypheny.db.workflow.dag.annotations.Group;
3838
import org.polypheny.db.workflow.dag.settings.SettingDef;
3939
import org.polypheny.db.workflow.dag.settings.SettingDef.SettingValue;
40-
import org.polypheny.db.workflow.models.ActivityModel;
4140
import org.reflections.Reflections;
4241

4342
public class ActivityRegistry {
@@ -69,13 +68,13 @@ public static ActivityDef get( String activityType ) {
6968
}
7069

7170

72-
public static Activity activityFromModel( ActivityModel model ) {
73-
ActivityDef def = get( model.getType() );
71+
public static Activity activityFromType( String activityType ) {
72+
ActivityDef def = get( activityType );
7473
try {
75-
Constructor<? extends Activity> constructor = def.getActivityClass().getConstructor( ActivityModel.class );
76-
return constructor.newInstance( model );
74+
Constructor<? extends Activity> constructor = def.getActivityClass().getConstructor();
75+
return constructor.newInstance();
7776
} catch ( InvocationTargetException | NoSuchMethodException | InstantiationException | IllegalAccessException e ) {
78-
throw new RuntimeException( "Encountered problem during instantiation for type: " + model.getType() );
77+
throw new RuntimeException( "Encountered problem during activity instantiation for type: " + activityType );
7978
}
8079
}
8180

0 commit comments

Comments
 (0)