diff --git a/build.gradle b/build.gradle
index aef8a30b0f..01ad5a9a0d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -184,3 +184,14 @@ configure(allprojects - project(':conductor-grpc')) {
}
}
}
+
+["cassandra-persistence", "core", "redis-concurrency-limit", "test-harness"].each {
+ configure(project(":conductor-$it")) {
+ spotless {
+ groovy {
+ importOrder('java', 'javax', 'org', 'com.netflix', '', '\\#com.netflix', '\\#')
+ licenseHeaderFile("$rootDir/licenseheader.txt")
+ }
+ }
+ }
+}
diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/config/CassandraConfiguration.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/config/CassandraConfiguration.java
index 352e5cfec1..2d2725f3e3 100644
--- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/config/CassandraConfiguration.java
+++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/config/CassandraConfiguration.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/config/CassandraProperties.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/config/CassandraProperties.java
index 19286cad45..28d3eee972 100644
--- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/config/CassandraProperties.java
+++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/config/CassandraProperties.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraBaseDAO.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraBaseDAO.java
index 70664c1694..c576be8e91 100644
--- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraBaseDAO.java
+++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraBaseDAO.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraEventHandlerDAO.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraEventHandlerDAO.java
index ea0b12de9f..ce797cea9e 100644
--- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraEventHandlerDAO.java
+++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraEventHandlerDAO.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java
index 74e2016da8..4341186d70 100644
--- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java
+++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -27,15 +27,15 @@
import com.netflix.conductor.cassandra.config.CassandraProperties;
import com.netflix.conductor.cassandra.util.Statements;
import com.netflix.conductor.common.metadata.events.EventExecution;
-import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
-import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.utils.RetryUtil;
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.core.exception.ApplicationException.Code;
import com.netflix.conductor.dao.ConcurrentExecutionLimitDAO;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.metrics.Monitors;
+import com.netflix.conductor.model.TaskModel;
+import com.netflix.conductor.model.WorkflowModel;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.PreparedStatement;
@@ -56,7 +56,6 @@
import static com.netflix.conductor.cassandra.util.Constants.TOTAL_PARTITIONS_KEY;
import static com.netflix.conductor.cassandra.util.Constants.TOTAL_TASKS_KEY;
import static com.netflix.conductor.cassandra.util.Constants.WORKFLOW_ID_KEY;
-import static com.netflix.conductor.common.metadata.tasks.Task.Status.IN_PROGRESS;
@Trace
public class CassandraExecutionDAO extends CassandraBaseDAO
@@ -172,11 +171,11 @@ public CassandraExecutionDAO(
}
@Override
- public List getPendingTasksByWorkflow(String taskName, String workflowId) {
- List tasks = getTasksForWorkflow(workflowId);
+ public List getPendingTasksByWorkflow(String taskName, String workflowId) {
+ List tasks = getTasksForWorkflow(workflowId);
return tasks.stream()
.filter(task -> taskName.equals(task.getTaskType()))
- .filter(task -> IN_PROGRESS.equals(task.getStatus()))
+ .filter(task -> TaskModel.Status.IN_PROGRESS.equals(task.getStatus()))
.collect(Collectors.toList());
}
@@ -185,7 +184,7 @@ public List getPendingTasksByWorkflow(String taskName, String workflowId)
* Conductor
*/
@Override
- public List getTasks(String taskType, String startKey, int count) {
+ public List getTasks(String taskType, String startKey, int count) {
throw new UnsupportedOperationException(
"This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
}
@@ -198,7 +197,7 @@ public List getTasks(String taskType, String startKey, int count) {
* @param tasks tasks to be created
*/
@Override
- public List createTasks(List tasks) {
+ public List createTasks(List tasks) {
validateTasks(tasks);
String workflowId = tasks.get(0).getWorkflowInstanceId();
try {
@@ -259,7 +258,7 @@ public List createTasks(List tasks) {
}
@Override
- public void updateTask(Task task) {
+ public void updateTask(TaskModel task) {
try {
// TODO: calculate the shard number the task belongs to
String taskPayload = toJson(task);
@@ -276,7 +275,7 @@ public void updateTask(Task task) {
&& task.getTaskDefinition().get().concurrencyLimit() > 0) {
if (task.getStatus().isTerminal()) {
removeTaskFromLimit(task);
- } else if (task.getStatus() == IN_PROGRESS) {
+ } else if (task.getStatus() == TaskModel.Status.IN_PROGRESS) {
addTaskToLimit(task);
}
}
@@ -296,7 +295,7 @@ public void updateTask(Task task) {
* Conductor
*/
@Override
- public boolean exceedsLimit(Task task) {
+ public boolean exceedsLimit(TaskModel task) {
Optional taskDefinition = task.getTaskDefinition();
if (taskDefinition.isEmpty()) {
return false;
@@ -342,7 +341,7 @@ public boolean exceedsLimit(Task task) {
@Override
public boolean removeTask(String taskId) {
- Task task = getTask(taskId);
+ TaskModel task = getTask(taskId);
if (task == null) {
LOGGER.warn("No such task found by id {}", taskId);
return false;
@@ -351,7 +350,7 @@ public boolean removeTask(String taskId) {
}
@Override
- public Task getTask(String taskId) {
+ public TaskModel getTask(String taskId) {
try {
String workflowId = lookupWorkflowIdFromTaskId(taskId);
if (workflowId == null) {
@@ -366,7 +365,8 @@ public Task getTask(String taskId) {
return Optional.ofNullable(resultSet.one())
.map(
row -> {
- Task task = readValue(row.getString(PAYLOAD_KEY), Task.class);
+ TaskModel task =
+ readValue(row.getString(PAYLOAD_KEY), TaskModel.class);
recordCassandraDaoRequests(
"getTask", task.getTaskType(), task.getWorkflowType());
recordCassandraDaoPayloadSize(
@@ -388,7 +388,7 @@ public Task getTask(String taskId) {
}
@Override
- public List getTasks(List taskIds) {
+ public List getTasks(List taskIds) {
Preconditions.checkNotNull(taskIds);
Preconditions.checkArgument(taskIds.size() > 0, "Task ids list cannot be empty");
String workflowId = lookupWorkflowIdFromTaskId(taskIds.get(0));
@@ -405,20 +405,20 @@ public List getTasks(List taskIds) {
* Conductor
*/
@Override
- public List getPendingTasksForTaskType(String taskType) {
+ public List getPendingTasksForTaskType(String taskType) {
throw new UnsupportedOperationException(
"This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
}
@Override
- public List getTasksForWorkflow(String workflowId) {
+ public List getTasksForWorkflow(String workflowId) {
return getWorkflow(workflowId, true).getTasks();
}
@Override
- public String createWorkflow(Workflow workflow) {
+ public String createWorkflow(WorkflowModel workflow) {
try {
- List tasks = workflow.getTasks();
+ List tasks = workflow.getTasks();
workflow.setTasks(new LinkedList<>());
String payload = toJson(workflow);
@@ -441,9 +441,9 @@ public String createWorkflow(Workflow workflow) {
}
@Override
- public String updateWorkflow(Workflow workflow) {
+ public String updateWorkflow(WorkflowModel workflow) {
try {
- List tasks = workflow.getTasks();
+ List tasks = workflow.getTasks();
workflow.setTasks(new LinkedList<>());
String payload = toJson(workflow);
recordCassandraDaoRequests("updateWorkflow", "n/a", workflow.getWorkflowName());
@@ -465,7 +465,7 @@ public String updateWorkflow(Workflow workflow) {
@Override
public boolean removeWorkflow(String workflowId) {
- Workflow workflow = getWorkflow(workflowId, true);
+ WorkflowModel workflow = getWorkflow(workflowId, true);
boolean removed = false;
// TODO: calculate number of shards and iterate
if (workflow != null) {
@@ -508,13 +508,13 @@ public void removeFromPendingWorkflow(String workflowType, String workflowId) {
}
@Override
- public Workflow getWorkflow(String workflowId) {
+ public WorkflowModel getWorkflow(String workflowId) {
return getWorkflow(workflowId, true);
}
@Override
- public Workflow getWorkflow(String workflowId, boolean includeTasks) {
- Workflow workflow = null;
+ public WorkflowModel getWorkflow(String workflowId, boolean includeTasks) {
+ WorkflowModel workflow = null;
try {
ResultSet resultSet;
if (includeTasks) {
@@ -522,7 +522,7 @@ public Workflow getWorkflow(String workflowId, boolean includeTasks) {
session.execute(
selectWorkflowWithTasksStatement.bind(
UUID.fromString(workflowId), DEFAULT_SHARD_ID));
- List tasks = new ArrayList<>();
+ List tasks = new ArrayList<>();
List rows = resultSet.all();
if (rows.size() == 0) {
@@ -532,9 +532,9 @@ public Workflow getWorkflow(String workflowId, boolean includeTasks) {
for (Row row : rows) {
String entityKey = row.getString(ENTITY_KEY);
if (ENTITY_TYPE_WORKFLOW.equals(entityKey)) {
- workflow = readValue(row.getString(PAYLOAD_KEY), Workflow.class);
+ workflow = readValue(row.getString(PAYLOAD_KEY), WorkflowModel.class);
} else if (ENTITY_TYPE_TASK.equals(entityKey)) {
- Task task = readValue(row.getString(PAYLOAD_KEY), Task.class);
+ TaskModel task = readValue(row.getString(PAYLOAD_KEY), TaskModel.class);
tasks.add(task);
} else {
throw new ApplicationException(
@@ -547,7 +547,7 @@ public Workflow getWorkflow(String workflowId, boolean includeTasks) {
if (workflow != null) {
recordCassandraDaoRequests("getWorkflow", "n/a", workflow.getWorkflowName());
- tasks.sort(Comparator.comparingInt(Task::getSeq));
+ tasks.sort(Comparator.comparingInt(TaskModel::getSeq));
workflow.setTasks(tasks);
}
} else {
@@ -557,10 +557,10 @@ public Workflow getWorkflow(String workflowId, boolean includeTasks) {
Optional.ofNullable(resultSet.one())
.map(
row -> {
- Workflow wf =
+ WorkflowModel wf =
readValue(
row.getString(PAYLOAD_KEY),
- Workflow.class);
+ WorkflowModel.class);
recordCassandraDaoRequests(
"getWorkflow", "n/a", wf.getWorkflowName());
return wf;
@@ -598,7 +598,7 @@ public List getRunningWorkflowIds(String workflowName, int version) {
* Conductor
*/
@Override
- public List getPendingWorkflowsByType(String workflowName, int version) {
+ public List getPendingWorkflowsByType(String workflowName, int version) {
throw new UnsupportedOperationException(
"This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
}
@@ -628,7 +628,8 @@ public long getInProgressTaskCount(String taskDefName) {
* Conductor
*/
@Override
- public List getWorkflowsByType(String workflowName, Long startTime, Long endTime) {
+ public List getWorkflowsByType(
+ String workflowName, Long startTime, Long endTime) {
throw new UnsupportedOperationException(
"This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
}
@@ -638,7 +639,7 @@ public List getWorkflowsByType(String workflowName, Long startTime, Lo
* Conductor
*/
@Override
- public List getWorkflowsByCorrelationId(
+ public List getWorkflowsByCorrelationId(
String workflowName, String correlationId, boolean includeTasks) {
throw new UnsupportedOperationException(
"This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
@@ -741,7 +742,7 @@ List getEventExecutions(
}
@Override
- public void addTaskToLimit(Task task) {
+ public void addTaskToLimit(TaskModel task) {
try {
recordCassandraDaoRequests(
"addTaskToLimit", task.getTaskType(), task.getWorkflowType());
@@ -770,7 +771,7 @@ public void addTaskToLimit(Task task) {
}
@Override
- public void removeTaskFromLimit(Task task) {
+ public void removeTaskFromLimit(TaskModel task) {
try {
recordCassandraDaoRequests(
"removeTaskFromLimit", task.getTaskType(), task.getWorkflowType());
@@ -797,7 +798,7 @@ public void removeTaskFromLimit(Task task) {
}
}
- private boolean removeTask(Task task) {
+ private boolean removeTask(TaskModel task) {
// TODO: calculate shard number based on seq and maxTasksPerShard
try {
// get total tasks for this workflow
@@ -834,7 +835,7 @@ private boolean removeTask(Task task) {
}
}
- private void removeTaskLookup(Task task) {
+ private void removeTaskLookup(TaskModel task) {
try {
recordCassandraDaoRequests(
"removeTaskLookup", task.getTaskType(), task.getWorkflowType());
@@ -854,7 +855,7 @@ private void removeTaskLookup(Task task) {
}
@VisibleForTesting
- void validateTasks(List tasks) {
+ void validateTasks(List tasks) {
Preconditions.checkNotNull(tasks, "Tasks object cannot be null");
Preconditions.checkArgument(!tasks.isEmpty(), "Tasks object cannot be empty");
tasks.forEach(
@@ -868,7 +869,7 @@ void validateTasks(List tasks) {
});
String workflowId = tasks.get(0).getWorkflowInstanceId();
- Optional optionalTask =
+ Optional optionalTask =
tasks.stream()
.filter(task -> !workflowId.equals(task.getWorkflowInstanceId()))
.findAny();
diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraMetadataDAO.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraMetadataDAO.java
index a9f9fe44c0..10bbe2dd51 100644
--- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraMetadataDAO.java
+++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraMetadataDAO.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Constants.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Constants.java
index f5eb9f7dfe..473c23132c 100644
--- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Constants.java
+++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Constants.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Statements.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Statements.java
index 5c538c41e7..a6ea122538 100644
--- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Statements.java
+++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Statements.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraEventHandlerDAOSpec.groovy b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraEventHandlerDAOSpec.groovy
index 912d36c65c..214f3722de 100644
--- a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraEventHandlerDAOSpec.groovy
+++ b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraEventHandlerDAOSpec.groovy
@@ -1,20 +1,20 @@
/*
- * Copyright 2021 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Copyright 2022 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
-
package com.netflix.conductor.cassandra.dao
import com.netflix.conductor.common.metadata.events.EventExecution
import com.netflix.conductor.common.metadata.events.EventHandler
+
import spock.lang.Subject
class CassandraEventHandlerDAOSpec extends CassandraSpec {
diff --git a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraExecutionDAOSpec.groovy b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraExecutionDAOSpec.groovy
index 11727500ae..2a3b81c390 100644
--- a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraExecutionDAOSpec.groovy
+++ b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraExecutionDAOSpec.groovy
@@ -1,26 +1,26 @@
/*
- * Copyright 2021 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License" you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Copyright 2022 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
-
package com.netflix.conductor.cassandra.dao
import com.netflix.conductor.common.metadata.events.EventExecution
-import com.netflix.conductor.common.metadata.tasks.Task
import com.netflix.conductor.common.metadata.tasks.TaskDef
import com.netflix.conductor.common.metadata.workflow.WorkflowDef
import com.netflix.conductor.common.metadata.workflow.WorkflowTask
-import com.netflix.conductor.common.run.Workflow
import com.netflix.conductor.core.exception.ApplicationException
import com.netflix.conductor.core.utils.IDGenerator
+import com.netflix.conductor.model.TaskModel
+import com.netflix.conductor.model.WorkflowModel
+
import spock.lang.Subject
import static com.netflix.conductor.common.metadata.events.EventExecution.Status.COMPLETED
@@ -40,8 +40,8 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
def tasks = []
// create tasks for a workflow and add to list
- Task task1 = new Task(workflowInstanceId: 'uuid', taskId: 'task1id', referenceTaskName: 'task1')
- Task task2 = new Task(workflowInstanceId: 'uuid', taskId: 'task2id', referenceTaskName: 'task2')
+ TaskModel task1 = new TaskModel(workflowInstanceId: 'uuid', taskId: 'task1id', referenceTaskName: 'task1')
+ TaskModel task2 = new TaskModel(workflowInstanceId: 'uuid', taskId: 'task2id', referenceTaskName: 'task2')
tasks << task1 << task2
when:
@@ -52,7 +52,7 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
and:
// add a task from a different workflow to the list
- Task task3 = new Task(workflowInstanceId: 'other-uuid', taskId: 'task3id', referenceTaskName: 'task3')
+ TaskModel task3 = new TaskModel(workflowInstanceId: 'other-uuid', taskId: 'task3id', referenceTaskName: 'task3')
tasks << task3
when:
@@ -69,11 +69,11 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
WorkflowDef workflowDef = new WorkflowDef()
workflowDef.name = "def1"
workflowDef.setVersion(1)
- Workflow workflow = new Workflow()
+ WorkflowModel workflow = new WorkflowModel()
workflow.setWorkflowDefinition(workflowDef)
workflow.setWorkflowId(workflowId)
workflow.setInput(new HashMap<>())
- workflow.setStatus(Workflow.WorkflowStatus.RUNNING)
+ workflow.setStatus(WorkflowModel.Status.RUNNING)
workflow.setCreateTime(System.currentTimeMillis())
when:
@@ -85,14 +85,14 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
when:
// read the workflow from the datastore
- Workflow found = executionDAO.getWorkflow(workflowId)
+ WorkflowModel found = executionDAO.getWorkflow(workflowId)
then:
workflow == found
and:
// update the workflow
- workflow.setStatus(Workflow.WorkflowStatus.COMPLETED)
+ workflow.setStatus(WorkflowModel.Status.COMPLETED)
executionDAO.updateWorkflow(workflow)
when:
@@ -120,18 +120,18 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
given: 'we create a workflow'
String workflowId = IDGenerator.generate()
WorkflowDef workflowDef = new WorkflowDef(name: 'def1', version: 1)
- Workflow workflow = new Workflow(workflowDefinition: workflowDef, workflowId: workflowId, input: new HashMap(), status: Workflow.WorkflowStatus.RUNNING, createTime: System.currentTimeMillis())
+ WorkflowModel workflow = new WorkflowModel(workflowDefinition: workflowDef, workflowId: workflowId, input: new HashMap(), status: WorkflowModel.Status.RUNNING, createTime: System.currentTimeMillis())
executionDAO.createWorkflow(workflow)
and: 'create tasks for this workflow'
- Task task1 = new Task(workflowInstanceId: workflowId, taskType: 'task1', referenceTaskName: 'task1', status: Task.Status.SCHEDULED, taskId: IDGenerator.generate())
- Task task2 = new Task(workflowInstanceId: workflowId, taskType: 'task2', referenceTaskName: 'task2', status: Task.Status.SCHEDULED, taskId: IDGenerator.generate())
- Task task3 = new Task(workflowInstanceId: workflowId, taskType: 'task3', referenceTaskName: 'task3', status: Task.Status.SCHEDULED, taskId: IDGenerator.generate())
+ TaskModel task1 = new TaskModel(workflowInstanceId: workflowId, taskType: 'task1', referenceTaskName: 'task1', status: TaskModel.Status.SCHEDULED, taskId: IDGenerator.generate())
+ TaskModel task2 = new TaskModel(workflowInstanceId: workflowId, taskType: 'task2', referenceTaskName: 'task2', status: TaskModel.Status.SCHEDULED, taskId: IDGenerator.generate())
+ TaskModel task3 = new TaskModel(workflowInstanceId: workflowId, taskType: 'task3', referenceTaskName: 'task3', status: TaskModel.Status.SCHEDULED, taskId: IDGenerator.generate())
def taskList = [task1, task2, task3]
when: 'add the tasks to the datastore'
- List tasks = executionDAO.createTasks(taskList)
+ List tasks = executionDAO.createTasks(taskList)
then:
tasks != null
@@ -177,7 +177,7 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
fetchedTasks != null && fetchedTasks.size() == 3
when: 'read workflow with tasks'
- Workflow found = executionDAO.getWorkflow(workflowId, true)
+ WorkflowModel found = executionDAO.getWorkflow(workflowId, true)
then:
found != null
@@ -192,21 +192,21 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
given: 'we create a workflow'
String workflowId = IDGenerator.generate()
WorkflowDef workflowDef = new WorkflowDef(name: 'def1', version: 1)
- Workflow workflow = new Workflow(workflowDefinition: workflowDef, workflowId: workflowId, input: new HashMap(), status: Workflow.WorkflowStatus.RUNNING, createTime: System.currentTimeMillis())
+ WorkflowModel workflow = new WorkflowModel(workflowDefinition: workflowDef, workflowId: workflowId, input: new HashMap(), status: WorkflowModel.Status.RUNNING, createTime: System.currentTimeMillis())
executionDAO.createWorkflow(workflow)
and: 'create tasks for this workflow'
- Task task1 = new Task(workflowInstanceId: workflowId, taskType: 'task1', referenceTaskName: 'task1', status: Task.Status.SCHEDULED, taskId: IDGenerator.generate())
- Task task2 = new Task(workflowInstanceId: workflowId, taskType: 'task2', referenceTaskName: 'task2', status: Task.Status.SCHEDULED, taskId: IDGenerator.generate())
- Task task3 = new Task(workflowInstanceId: workflowId, taskType: 'task3', referenceTaskName: 'task3', status: Task.Status.SCHEDULED, taskId: IDGenerator.generate())
+ TaskModel task1 = new TaskModel(workflowInstanceId: workflowId, taskType: 'task1', referenceTaskName: 'task1', status: TaskModel.Status.SCHEDULED, taskId: IDGenerator.generate())
+ TaskModel task2 = new TaskModel(workflowInstanceId: workflowId, taskType: 'task2', referenceTaskName: 'task2', status: TaskModel.Status.SCHEDULED, taskId: IDGenerator.generate())
+ TaskModel task3 = new TaskModel(workflowInstanceId: workflowId, taskType: 'task3', referenceTaskName: 'task3', status: TaskModel.Status.SCHEDULED, taskId: IDGenerator.generate())
and: 'add the tasks to the datastore'
executionDAO.createTasks([task1, task2, task3])
and: 'change the status of those tasks'
- task1.setStatus(Task.Status.IN_PROGRESS)
- task2.setStatus(Task.Status.COMPLETED)
- task3.setStatus(Task.Status.FAILED)
+ task1.setStatus(TaskModel.Status.IN_PROGRESS)
+ task2.setStatus(TaskModel.Status.COMPLETED)
+ task3.setStatus(TaskModel.Status.FAILED)
when: 'update the tasks'
executionDAO.updateTask(task1)
@@ -214,12 +214,12 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
executionDAO.updateTask(task3)
then:
- executionDAO.getTask(task1.taskId).status == Task.Status.IN_PROGRESS
- executionDAO.getTask(task2.taskId).status == Task.Status.COMPLETED
- executionDAO.getTask(task3.taskId).status == Task.Status.FAILED
+ executionDAO.getTask(task1.taskId).status == TaskModel.Status.IN_PROGRESS
+ executionDAO.getTask(task2.taskId).status == TaskModel.Status.COMPLETED
+ executionDAO.getTask(task3.taskId).status == TaskModel.Status.FAILED
when: 'get pending tasks for the workflow'
- List pendingTasks = executionDAO.getPendingTasksByWorkflow(task1.getTaskType(), workflowId)
+ List pendingTasks = executionDAO.getPendingTasksByWorkflow(task1.getTaskType(), workflowId)
then:
pendingTasks != null && pendingTasks.size() == 1
@@ -230,13 +230,13 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
given: 'we create a workflow'
String workflowId = IDGenerator.generate()
WorkflowDef workflowDef = new WorkflowDef(name: 'def1', version: 1)
- Workflow workflow = new Workflow(workflowDefinition: workflowDef, workflowId: workflowId, input: new HashMap(), status: Workflow.WorkflowStatus.RUNNING, createTime: System.currentTimeMillis())
+ WorkflowModel workflow = new WorkflowModel(workflowDefinition: workflowDef, workflowId: workflowId, input: new HashMap(), status: WorkflowModel.Status.RUNNING, createTime: System.currentTimeMillis())
executionDAO.createWorkflow(workflow)
and: 'create tasks for this workflow'
- Task task1 = new Task(workflowInstanceId: workflowId, taskType: 'task1', referenceTaskName: 'task1', status: Task.Status.SCHEDULED, taskId: IDGenerator.generate())
- Task task2 = new Task(workflowInstanceId: workflowId, taskType: 'task2', referenceTaskName: 'task2', status: Task.Status.SCHEDULED, taskId: IDGenerator.generate())
- Task task3 = new Task(workflowInstanceId: workflowId, taskType: 'task3', referenceTaskName: 'task3', status: Task.Status.SCHEDULED, taskId: IDGenerator.generate())
+ TaskModel task1 = new TaskModel(workflowInstanceId: workflowId, taskType: 'task1', referenceTaskName: 'task1', status: TaskModel.Status.SCHEDULED, taskId: IDGenerator.generate())
+ TaskModel task2 = new TaskModel(workflowInstanceId: workflowId, taskType: 'task2', referenceTaskName: 'task2', status: TaskModel.Status.SCHEDULED, taskId: IDGenerator.generate())
+ TaskModel task3 = new TaskModel(workflowInstanceId: workflowId, taskType: 'task3', referenceTaskName: 'task3', status: TaskModel.Status.SCHEDULED, taskId: IDGenerator.generate())
and: 'add the tasks to the datastore'
executionDAO.createTasks([task1, task2, task3])
@@ -284,23 +284,23 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
WorkflowTask workflowTask = new WorkflowTask(taskDefinition: taskDef)
workflowTask.setTaskDefinition(taskDef)
- Task task = new Task()
+ TaskModel task = new TaskModel()
task.taskDefName = taskDefName
task.taskId = taskId
task.workflowInstanceId = IDGenerator.generate()
task.setWorkflowTask(workflowTask)
task.setTaskType("test_task")
task.setWorkflowType("test_workflow")
- task.setStatus(Task.Status.SCHEDULED)
+ task.setStatus(TaskModel.Status.SCHEDULED)
- Task newTask = new Task()
+ TaskModel newTask = new TaskModel()
newTask.setTaskDefName(taskDefName)
newTask.setTaskId(IDGenerator.generate())
newTask.setWorkflowInstanceId(IDGenerator.generate())
newTask.setWorkflowTask(workflowTask)
newTask.setTaskType("test_task")
newTask.setWorkflowType("test_workflow")
- newTask.setStatus(Task.Status.SCHEDULED)
+ newTask.setStatus(TaskModel.Status.SCHEDULED)
when: // no tasks are IN_PROGRESS
executionDAO.addTaskToLimit(task)
@@ -309,7 +309,7 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
!executionDAO.exceedsLimit(task)
when: // set a task to IN_PROGRESS
- task.setStatus(Task.Status.IN_PROGRESS)
+ task.setStatus(TaskModel.Status.IN_PROGRESS)
executionDAO.addTaskToLimit(task)
then: // same task is checked
@@ -319,14 +319,14 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
executionDAO.exceedsLimit(newTask)
when: // set IN_PROGRESS task to COMPLETED
- task.setStatus(Task.Status.COMPLETED)
+ task.setStatus(TaskModel.Status.COMPLETED)
executionDAO.removeTaskFromLimit(task)
then: // check new task again
!executionDAO.exceedsLimit(newTask)
when: // set new task to IN_PROGRESS
- newTask.setStatus(Task.Status.IN_PROGRESS)
+ newTask.setStatus(TaskModel.Status.IN_PROGRESS)
executionDAO.addTaskToLimit(newTask)
then: // check new task again
diff --git a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraMetadataDAOSpec.groovy b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraMetadataDAOSpec.groovy
index 27049c2c21..5c06efe550 100644
--- a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraMetadataDAOSpec.groovy
+++ b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraMetadataDAOSpec.groovy
@@ -1,20 +1,20 @@
/*
- * Copyright 2021 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Copyright 2022 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
-
package com.netflix.conductor.cassandra.dao
import com.netflix.conductor.common.metadata.tasks.TaskDef
import com.netflix.conductor.common.metadata.workflow.WorkflowDef
+
import spock.lang.Subject
class CassandraMetadataDAOSpec extends CassandraSpec {
diff --git a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraSpec.groovy b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraSpec.groovy
index d9531f2b36..a5393210bb 100644
--- a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraSpec.groovy
+++ b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraSpec.groovy
@@ -1,34 +1,35 @@
/*
- * Copyright 2021 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Copyright 2022 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
-
package com.netflix.conductor.cassandra.dao
-import com.datastax.driver.core.ConsistencyLevel
-import com.datastax.driver.core.Session
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.netflix.conductor.cassandra.config.CassandraProperties
-import com.netflix.conductor.cassandra.util.Statements
-import com.netflix.conductor.common.config.TestObjectMapperConfiguration
-import groovy.transform.PackageScope
+import java.time.Duration
+
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.context.ContextConfiguration
import org.testcontainers.containers.CassandraContainer
import org.testcontainers.spock.Testcontainers
+
+import com.netflix.conductor.cassandra.config.CassandraProperties
+import com.netflix.conductor.cassandra.util.Statements
+import com.netflix.conductor.common.config.TestObjectMapperConfiguration
+
+import com.datastax.driver.core.ConsistencyLevel
+import com.datastax.driver.core.Session
+import com.fasterxml.jackson.databind.ObjectMapper
+import groovy.transform.PackageScope
import spock.lang.Shared
import spock.lang.Specification
-import java.time.Duration
-
@ContextConfiguration(classes = [TestObjectMapperConfiguration.class])
@Testcontainers
@PackageScope
diff --git a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/util/StatementsSpec.groovy b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/util/StatementsSpec.groovy
index f674688b9e..f826a36208 100644
--- a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/util/StatementsSpec.groovy
+++ b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/util/StatementsSpec.groovy
@@ -1,16 +1,15 @@
/*
- * Copyright 2021 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Copyright 2022 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
-
package com.netflix.conductor.cassandra.util
import spock.lang.Specification
diff --git a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java
index 3e7ca3699b..88700d2b83 100644
--- a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java
+++ b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -248,12 +248,12 @@ public void evaluateAndUploadLargePayload(TaskResult taskResult, String taskType
MetricsContainer.recordTaskResultPayloadSize(taskType, taskResultSize);
long payloadSizeThreshold =
- conductorClientConfiguration.getTaskOutputPayloadThresholdKB() * 1024;
+ conductorClientConfiguration.getTaskOutputPayloadThresholdKB() * 1024L;
if (taskResultSize > payloadSizeThreshold) {
if (!conductorClientConfiguration.isExternalPayloadStorageEnabled()
|| taskResultSize
> conductorClientConfiguration.getTaskOutputMaxPayloadThresholdKB()
- * 1024) {
+ * 1024L) {
taskResult.setReasonForIncompletion(
String.format(
"The TaskResult payload size: %d is greater than the permissible %d MB",
diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java
index d2957f62b6..82a6a1af21 100644
--- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java
+++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java
index cf4e5c246c..d449176203 100644
--- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java
+++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/common/src/main/java/com/netflix/conductor/common/run/Workflow.java b/common/src/main/java/com/netflix/conductor/common/run/Workflow.java
index dc9d410820..946ab90ed4 100644
--- a/common/src/main/java/com/netflix/conductor/common/run/Workflow.java
+++ b/common/src/main/java/com/netflix/conductor/common/run/Workflow.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/dao/index/NoopIndexDAO.java b/contribs/src/main/java/com/netflix/conductor/contribs/dao/index/NoopIndexDAO.java
index dbbcfbd670..4b5f22c6b2 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/dao/index/NoopIndexDAO.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/dao/index/NoopIndexDAO.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -17,10 +17,10 @@
import java.util.concurrent.CompletableFuture;
import com.netflix.conductor.common.metadata.events.EventExecution;
-import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
import com.netflix.conductor.common.run.SearchResult;
-import com.netflix.conductor.common.run.Workflow;
+import com.netflix.conductor.common.run.TaskSummary;
+import com.netflix.conductor.common.run.WorkflowSummary;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.dao.IndexDAO;
@@ -34,18 +34,18 @@ public class NoopIndexDAO implements IndexDAO {
public void setup() {}
@Override
- public void indexWorkflow(Workflow workflow) {}
+ public void indexWorkflow(WorkflowSummary workflowSummary) {}
@Override
- public CompletableFuture asyncIndexWorkflow(Workflow workflow) {
+ public CompletableFuture asyncIndexWorkflow(WorkflowSummary workflowSummary) {
return CompletableFuture.completedFuture(null);
}
@Override
- public void indexTask(Task task) {}
+ public void indexTask(TaskSummary taskSummary) {}
@Override
- public CompletableFuture asyncIndexTask(Task task) {
+ public CompletableFuture asyncIndexTask(TaskSummary taskSummary) {
return CompletableFuture.completedFuture(null);
}
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/dao/index/NoopIndexDAOConfiguration.java b/contribs/src/main/java/com/netflix/conductor/contribs/dao/index/NoopIndexDAOConfiguration.java
index 0835d50d0d..b9feec8de2 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/dao/index/NoopIndexDAOConfiguration.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/dao/index/NoopIndexDAOConfiguration.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWithTTLWorkflowStatusListener.java b/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWithTTLWorkflowStatusListener.java
index e80bfcd2da..69b50d6a38 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWithTTLWorkflowStatusListener.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWithTTLWorkflowStatusListener.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -20,10 +20,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.netflix.conductor.common.run.Workflow;
+import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
-import com.netflix.conductor.core.orchestration.ExecutionDAOFacade;
import com.netflix.conductor.metrics.Monitors;
+import com.netflix.conductor.model.WorkflowModel;
public class ArchivingWithTTLWorkflowStatusListener implements WorkflowStatusListener {
@@ -75,7 +75,7 @@ public void shutdownExecutorService() {
}
@Override
- public void onWorkflowCompleted(Workflow workflow) {
+ public void onWorkflowCompleted(WorkflowModel workflow) {
LOGGER.info("Archiving workflow {} on completion ", workflow.getWorkflowId());
if (delayArchiveSeconds > 0) {
scheduledThreadPoolExecutor.schedule(
@@ -90,7 +90,7 @@ public void onWorkflowCompleted(Workflow workflow) {
}
@Override
- public void onWorkflowTerminated(Workflow workflow) {
+ public void onWorkflowTerminated(WorkflowModel workflow) {
LOGGER.info("Archiving workflow {} on termination", workflow.getWorkflowId());
if (delayArchiveSeconds > 0) {
scheduledThreadPoolExecutor.schedule(
@@ -108,10 +108,10 @@ private class DelayArchiveWorkflow implements Runnable {
private final String workflowId;
private final String workflowName;
- private final Workflow.WorkflowStatus status;
+ private final WorkflowModel.Status status;
private final ExecutionDAOFacade executionDAOFacade;
- DelayArchiveWorkflow(Workflow workflow, ExecutionDAOFacade executionDAOFacade) {
+ DelayArchiveWorkflow(WorkflowModel workflow, ExecutionDAOFacade executionDAOFacade) {
this.workflowId = workflow.getWorkflowId();
this.workflowName = workflow.getWorkflowName();
this.status = workflow.getStatus();
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerConfiguration.java b/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerConfiguration.java
index 482e63d739..c98c0c173b 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerConfiguration.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerConfiguration.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -17,8 +17,8 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
-import com.netflix.conductor.core.orchestration.ExecutionDAOFacade;
@Configuration
@EnableConfigurationProperties(ArchivingWorkflowListenerProperties.class)
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerProperties.java b/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerProperties.java
index 90076089ff..dfd57d3017 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerProperties.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerProperties.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowStatusListener.java b/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowStatusListener.java
index f84c99f705..f1fe98cca8 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowStatusListener.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowStatusListener.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -15,10 +15,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.netflix.conductor.common.run.Workflow;
+import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
-import com.netflix.conductor.core.orchestration.ExecutionDAOFacade;
import com.netflix.conductor.metrics.Monitors;
+import com.netflix.conductor.model.WorkflowModel;
/**
* Provides default implementation of workflow archiving immediately after workflow is completed or
@@ -37,14 +37,14 @@ public ArchivingWorkflowStatusListener(ExecutionDAOFacade executionDAOFacade) {
}
@Override
- public void onWorkflowCompleted(Workflow workflow) {
+ public void onWorkflowCompleted(WorkflowModel workflow) {
LOGGER.info("Archiving workflow {} on completion ", workflow.getWorkflowId());
this.executionDAOFacade.removeWorkflow(workflow.getWorkflowId(), true);
Monitors.recordWorkflowArchived(workflow.getWorkflowName(), workflow.getStatus());
}
@Override
- public void onWorkflowTerminated(Workflow workflow) {
+ public void onWorkflowTerminated(WorkflowModel workflow) {
LOGGER.info("Archiving workflow {} on termination", workflow.getWorkflowId());
this.executionDAOFacade.removeWorkflow(workflow.getWorkflowId(), true);
Monitors.recordWorkflowArchived(workflow.getWorkflowName(), workflow.getStatus());
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisher.java b/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisher.java
index 45060f322d..c11ef06103 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisher.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisher.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -17,11 +17,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.run.WorkflowSummary;
+import com.netflix.conductor.core.dal.ModelMapper;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
import com.netflix.conductor.dao.QueueDAO;
+import com.netflix.conductor.model.WorkflowModel;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -35,6 +36,7 @@ public class ConductorQueueStatusPublisher implements WorkflowStatusListener {
private static final Logger LOGGER =
LoggerFactory.getLogger(ConductorQueueStatusPublisher.class);
private final QueueDAO queueDAO;
+ private final ModelMapper modelMapper;
private final ObjectMapper objectMapper;
private final String successStatusQueue;
@@ -43,9 +45,11 @@ public class ConductorQueueStatusPublisher implements WorkflowStatusListener {
public ConductorQueueStatusPublisher(
QueueDAO queueDAO,
+ ModelMapper modelMapper,
ObjectMapper objectMapper,
ConductorQueueStatusPublisherProperties properties) {
this.queueDAO = queueDAO;
+ this.modelMapper = modelMapper;
this.objectMapper = objectMapper;
this.successStatusQueue = properties.getSuccessQueue();
this.failureStatusQueue = properties.getFailureQueue();
@@ -53,26 +57,26 @@ public ConductorQueueStatusPublisher(
}
@Override
- public void onWorkflowCompleted(Workflow workflow) {
+ public void onWorkflowCompleted(WorkflowModel workflow) {
LOGGER.info("Publishing callback of workflow {} on completion ", workflow.getWorkflowId());
queueDAO.push(successStatusQueue, Collections.singletonList(workflowToMessage(workflow)));
}
@Override
- public void onWorkflowTerminated(Workflow workflow) {
+ public void onWorkflowTerminated(WorkflowModel workflow) {
LOGGER.info("Publishing callback of workflow {} on termination", workflow.getWorkflowId());
queueDAO.push(failureStatusQueue, Collections.singletonList(workflowToMessage(workflow)));
}
@Override
- public void onWorkflowFinalized(Workflow workflow) {
+ public void onWorkflowFinalized(WorkflowModel workflow) {
LOGGER.info("Publishing callback of workflow {} on finalization", workflow.getWorkflowId());
queueDAO.push(finalizeStatusQueue, Collections.singletonList(workflowToMessage(workflow)));
}
- private Message workflowToMessage(Workflow workflow) {
+ private Message workflowToMessage(WorkflowModel workflow) {
String jsonWfSummary;
- WorkflowSummary summary = new WorkflowSummary(workflow);
+ WorkflowSummary summary = new WorkflowSummary(modelMapper.getWorkflow(workflow));
try {
jsonWfSummary = objectMapper.writeValueAsString(summary);
} catch (JsonProcessingException e) {
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisherConfiguration.java b/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisherConfiguration.java
index 13d8d0a934..12dda6c444 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisherConfiguration.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisherConfiguration.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -17,6 +17,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import com.netflix.conductor.core.dal.ModelMapper;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
import com.netflix.conductor.dao.QueueDAO;
@@ -32,8 +33,9 @@ public class ConductorQueueStatusPublisherConfiguration {
@Bean
public WorkflowStatusListener getWorkflowStatusListener(
QueueDAO queueDAO,
+ ModelMapper modelMapper,
ConductorQueueStatusPublisherProperties properties,
ObjectMapper objectMapper) {
- return new ConductorQueueStatusPublisher(queueDAO, objectMapper, properties);
+ return new ConductorQueueStatusPublisher(queueDAO, modelMapper, objectMapper, properties);
}
}
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisherProperties.java b/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisherProperties.java
index e04d2fc2f0..ea9a53f743 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisherProperties.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisherProperties.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java
index 8ff19a0617..e4603791de 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/config/SQSEventQueueConfiguration.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/config/SQSEventQueueConfiguration.java
index 6de5573159..412bbb8cb9 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/config/SQSEventQueueConfiguration.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/config/SQSEventQueueConfiguration.java
@@ -32,7 +32,6 @@
import com.amazonaws.services.sqs.AmazonSQSClient;
import rx.Scheduler;
-@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Configuration
@EnableConfigurationProperties(SQSEventQueueProperties.class)
@ConditionalOnProperty(name = "conductor.event-queues.sqs.enabled", havingValue = "true")
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/storage/S3PayloadStorage.java b/contribs/src/main/java/com/netflix/conductor/contribs/storage/S3PayloadStorage.java
index eb9d7dd97d..e79bb6ebca 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/storage/S3PayloadStorage.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/storage/S3PayloadStorage.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -38,7 +38,9 @@
/**
* An implementation of {@link ExternalPayloadStorage} using AWS S3 for storing large JSON payload
- * data. The S3 client assumes that access to S3 is configured on the instance.
+ * data.
+ *
+ *
NOTE: The S3 client assumes that access to S3 is configured on the instance.
*
* @see DefaultAWSCredentialsProviderChain
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/storage/config/S3Configuration.java b/contribs/src/main/java/com/netflix/conductor/contribs/storage/config/S3Configuration.java
index bb9ca31132..a52ec750cb 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/storage/config/S3Configuration.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/storage/config/S3Configuration.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/DefaultRestTemplateProvider.java b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/DefaultRestTemplateProvider.java
index 35a9a15e39..670f260638 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/DefaultRestTemplateProvider.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/DefaultRestTemplateProvider.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/HttpTask.java b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/HttpTask.java
index 2fce5ac87f..f0f42eaf4d 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/HttpTask.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/HttpTask.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -21,22 +21,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
+import org.springframework.http.*;
import org.springframework.stereotype.Component;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
-import com.netflix.conductor.common.metadata.tasks.Task;
-import com.netflix.conductor.common.metadata.tasks.Task.Status;
-import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.utils.Utils;
+import com.netflix.conductor.model.TaskModel;
+import com.netflix.conductor.model.WorkflowModel;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
@@ -79,12 +74,12 @@ public HttpTask(
}
@Override
- public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
+ public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor executor) {
Object request = task.getInputData().get(requestParameter);
task.setWorkerId(Utils.getServerId());
if (request == null) {
task.setReasonForIncompletion(MISSING_REQUEST);
- task.setStatus(Status.FAILED);
+ task.setStatus(TaskModel.Status.FAILED);
return;
}
@@ -93,14 +88,14 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
String reason =
"Missing HTTP URI. See documentation for HttpTask for required input parameters";
task.setReasonForIncompletion(reason);
- task.setStatus(Status.FAILED);
+ task.setStatus(TaskModel.Status.FAILED);
return;
}
if (input.getMethod() == null) {
String reason = "No HTTP method specified";
task.setReasonForIncompletion(reason);
- task.setStatus(Status.FAILED);
+ task.setStatus(TaskModel.Status.FAILED);
return;
}
@@ -113,9 +108,9 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
task.getTaskId());
if (response.statusCode > 199 && response.statusCode < 300) {
if (isAsyncComplete(task)) {
- task.setStatus(Status.IN_PROGRESS);
+ task.setStatus(TaskModel.Status.IN_PROGRESS);
} else {
- task.setStatus(Status.COMPLETED);
+ task.setStatus(TaskModel.Status.COMPLETED);
}
} else {
if (response.body != null) {
@@ -123,7 +118,7 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
} else {
task.setReasonForIncompletion("No response from the remote service");
}
- task.setStatus(Status.FAILED);
+ task.setStatus(TaskModel.Status.FAILED);
}
//noinspection ConstantConditions
if (response != null) {
@@ -139,7 +134,7 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
input.getVipAddress(),
task.getWorkflowInstanceId(),
e);
- task.setStatus(Status.FAILED);
+ task.setStatus(TaskModel.Status.FAILED);
task.setReasonForIncompletion(
"Failed to invoke " + getTaskType() + " task due to: " + e);
task.getOutputData().put("response", e.toString());
@@ -206,13 +201,13 @@ private Object extractBody(String responseBody) {
}
@Override
- public boolean execute(Workflow workflow, Task task, WorkflowExecutor executor) {
+ public boolean execute(WorkflowModel workflow, TaskModel task, WorkflowExecutor executor) {
return false;
}
@Override
- public void cancel(Workflow workflow, Task task, WorkflowExecutor executor) {
- task.setStatus(Status.CANCELED);
+ public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor executor) {
+ task.setStatus(TaskModel.Status.CANCELED);
}
@Override
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/RestTemplateProvider.java b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/RestTemplateProvider.java
index 665be8caab..95c648b079 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/RestTemplateProvider.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/RestTemplateProvider.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/json/JsonJqTransform.java b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/json/JsonJqTransform.java
index 9ec9f2f43d..26520edaf4 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/json/JsonJqTransform.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/json/JsonJqTransform.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -22,10 +22,10 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import com.netflix.conductor.common.metadata.tasks.Task;
-import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
+import com.netflix.conductor.model.TaskModel;
+import com.netflix.conductor.model.WorkflowModel;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -58,7 +58,7 @@ public JsonJqTransform(ObjectMapper objectMapper) {
}
@Override
- public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
+ public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor executor) {
final Map taskInput = task.getInputData();
final Map taskOutput = task.getOutputData();
@@ -67,7 +67,7 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
if (queryExpression == null) {
task.setReasonForIncompletion(
"Missing '" + QUERY_EXPRESSION_PARAMETER + "' in input parameters");
- task.setStatus(Task.Status.FAILED);
+ task.setStatus(TaskModel.Status.FAILED);
return;
}
@@ -79,7 +79,7 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
final List result = query.apply(childScope, input);
- task.setStatus(Task.Status.COMPLETED);
+ task.setStatus(TaskModel.Status.COMPLETED);
if (result == null) {
taskOutput.put(OUTPUT_RESULT, null);
taskOutput.put(OUTPUT_RESULT_LIST, null);
@@ -96,7 +96,7 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
task.getTaskId(),
workflow.getWorkflowId(),
e);
- task.setStatus(Task.Status.FAILED);
+ task.setStatus(TaskModel.Status.FAILED);
final String message = extractFirstValidMessage(e);
task.setReasonForIncompletion(message);
taskOutput.put(OUTPUT_ERROR, message);
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/kafka/KafkaProducerManager.java b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/kafka/KafkaProducerManager.java
index db442a8452..4095af478e 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/kafka/KafkaProducerManager.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/kafka/KafkaProducerManager.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/kafka/KafkaPublishTask.java b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/kafka/KafkaPublishTask.java
index 21f779ca6a..8ec91a0396 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/kafka/KafkaPublishTask.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/kafka/KafkaPublishTask.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -34,11 +34,11 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import com.netflix.conductor.common.metadata.tasks.Task;
-import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.utils.Utils;
+import com.netflix.conductor.model.TaskModel;
+import com.netflix.conductor.model.WorkflowModel;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
@@ -76,7 +76,7 @@ public KafkaPublishTask(KafkaProducerManager clientManager, ObjectMapper objectM
}
@Override
- public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
+ public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor executor) {
long taskStartMillis = Instant.now().toEpochMilli();
task.setWorkerId(Utils.getServerId());
@@ -109,9 +109,9 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
try {
recordMetaDataFuture.get();
if (isAsyncComplete(task)) {
- task.setStatus(Task.Status.IN_PROGRESS);
+ task.setStatus(TaskModel.Status.IN_PROGRESS);
} else {
- task.setStatus(Task.Status.COMPLETED);
+ task.setStatus(TaskModel.Status.COMPLETED);
}
long timeTakenToCompleteTask = Instant.now().toEpochMilli() - taskStartMillis;
LOGGER.debug("Published message {}, Time taken {}", input, timeTakenToCompleteTask);
@@ -133,9 +133,9 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
}
}
- private void markTaskAsFailed(Task task, String reasonForIncompletion) {
+ private void markTaskAsFailed(TaskModel task, String reasonForIncompletion) {
task.setReasonForIncompletion(reasonForIncompletion);
- task.setStatus(Task.Status.FAILED);
+ task.setStatus(TaskModel.Status.FAILED);
}
/**
@@ -195,13 +195,13 @@ Object getKey(Input input) {
}
@Override
- public boolean execute(Workflow workflow, Task task, WorkflowExecutor executor) {
+ public boolean execute(WorkflowModel workflow, TaskModel task, WorkflowExecutor executor) {
return false;
}
@Override
- public void cancel(Workflow workflow, Task task, WorkflowExecutor executor) {
- task.setStatus(Task.Status.CANCELED);
+ public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor executor) {
+ task.setStatus(TaskModel.Status.CANCELED);
}
@Override
diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/listener/ArchivingWorkflowStatusListenerTest.java b/contribs/src/test/java/com/netflix/conductor/contribs/listener/ArchivingWorkflowStatusListenerTest.java
index 3d58db5be5..9a89e86deb 100644
--- a/contribs/src/test/java/com/netflix/conductor/contribs/listener/ArchivingWorkflowStatusListenerTest.java
+++ b/contribs/src/test/java/com/netflix/conductor/contribs/listener/ArchivingWorkflowStatusListenerTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -19,24 +19,22 @@
import org.mockito.Mockito;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
-import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.contribs.listener.archive.ArchivingWorkflowStatusListener;
-import com.netflix.conductor.core.orchestration.ExecutionDAOFacade;
+import com.netflix.conductor.core.dal.ExecutionDAOFacade;
+import com.netflix.conductor.model.WorkflowModel;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.*;
/** @author pavel.halabala */
public class ArchivingWorkflowStatusListenerTest {
- Workflow workflow;
+ WorkflowModel workflow;
ExecutionDAOFacade executionDAOFacade;
ArchivingWorkflowStatusListener listener;
@Before
public void before() {
- workflow = new Workflow();
+ workflow = new WorkflowModel();
WorkflowDef def = new WorkflowDef();
def.setName("name1");
def.setVersion(1);
diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/DefaultEventQueueProcessorTest.java b/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/DefaultEventQueueProcessorTest.java
index 1b6c619db7..f661e62b33 100644
--- a/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/DefaultEventQueueProcessorTest.java
+++ b/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/DefaultEventQueueProcessorTest.java
@@ -30,6 +30,7 @@
import com.netflix.conductor.common.config.TestObjectMapperConfiguration;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
+import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.events.queue.DefaultEventQueueProcessor;
import com.netflix.conductor.core.events.queue.Message;
@@ -63,7 +64,7 @@ public class DefaultEventQueueProcessorTest {
@Autowired private ObjectMapper objectMapper;
private static final List messages = new LinkedList<>();
- private static final List updatedTasks = new LinkedList<>();
+ private static final List updatedTasks = new LinkedList<>();
@Before
public void init() {
@@ -129,11 +130,11 @@ public static void setup() {
doAnswer(
(Answer)
invocation -> {
- updatedTasks.add(invocation.getArgument(0, Task.class));
+ updatedTasks.add(invocation.getArgument(0, TaskResult.class));
return null;
})
.when(executionService)
- .updateTask(any(Task.class));
+ .updateTask(any(TaskResult.class));
}
@Test
diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/tasks/http/DefaultRestTemplateProviderTest.java b/contribs/src/test/java/com/netflix/conductor/contribs/tasks/http/DefaultRestTemplateProviderTest.java
index 480e97588b..31e5ef2d45 100644
--- a/contribs/src/test/java/com/netflix/conductor/contribs/tasks/http/DefaultRestTemplateProviderTest.java
+++ b/contribs/src/test/java/com/netflix/conductor/contribs/tasks/http/DefaultRestTemplateProviderTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/tasks/http/HttpTaskTest.java b/contribs/src/test/java/com/netflix/conductor/contribs/tasks/http/HttpTaskTest.java
index f87eb6022b..065b492353 100644
--- a/contribs/src/test/java/com/netflix/conductor/contribs/tasks/http/HttpTaskTest.java
+++ b/contribs/src/test/java/com/netflix/conductor/contribs/tasks/http/HttpTaskTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -12,343 +12,356 @@
*/
package com.netflix.conductor.contribs.tasks.http;
-import org.junit.Ignore;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.*;
+import org.mockserver.client.MockServerClient;
+import org.mockserver.model.MediaType;
+import org.testcontainers.containers.MockServerContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import com.netflix.conductor.common.metadata.tasks.TaskType;
+import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
+import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
+import com.netflix.conductor.core.execution.DeciderService;
+import com.netflix.conductor.core.execution.WorkflowExecutor;
+import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry;
+import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils;
+import com.netflix.conductor.core.utils.ParametersUtils;
+import com.netflix.conductor.dao.MetadataDAO;
+import com.netflix.conductor.model.TaskModel;
+import com.netflix.conductor.model.WorkflowModel;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
@SuppressWarnings("unchecked")
-@Ignore // Test causes "OutOfMemoryError" error during build
+// @Ignore // Test causes "OutOfMemoryError" error during build
public class HttpTaskTest {
- // private static final String ERROR_RESPONSE = "Something went wrong!";
- // private static final String TEXT_RESPONSE = "Text Response";
- // private static final double NUM_RESPONSE = 42.42d;
- //
- // private HttpTask httpTask;
- // private WorkflowExecutor workflowExecutor;
- // private final Workflow workflow = new Workflow();
- //
- // private static final ObjectMapper objectMapper = new ObjectMapper();
- // private static String JSON_RESPONSE;
- //
- // @ClassRule
- // public static MockServerContainer mockServer = new MockServerContainer(
- // DockerImageName.parse("mockserver/mockserver"));
- //
- // @BeforeClass
- // public static void init() throws Exception {
- // Map map = new HashMap<>();
- // map.put("key", "value1");
- // map.put("num", 42);
- // map.put("SomeKey", null);
- // JSON_RESPONSE = objectMapper.writeValueAsString(map);
- //
- // final TypeReference