Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Separate object models into domain model and DTO #2702

Merged
merged 15 commits into from
Jan 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,14 @@ configure(allprojects - project(':conductor-grpc')) {
}
}
}

["cassandra-persistence", "core", "redis-concurrency-limit", "test-harness"].each {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this configuration be done for all projects? We will end up adding Spock tests for most of the modules.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can extend this to all java projects, i only added the ones that currently have spock tests

configure(project(":conductor-$it")) {
spotless {
groovy {
importOrder('java', 'javax', 'org', 'com.netflix', '', '\\#com.netflix', '\\#')
licenseHeaderFile("$rootDir/licenseheader.txt")
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Netflix, Inc.
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Netflix, Inc.
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Netflix, Inc.
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Netflix, Inc.
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Netflix, Inc.
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -27,15 +27,15 @@
import com.netflix.conductor.cassandra.config.CassandraProperties;
import com.netflix.conductor.cassandra.util.Statements;
import com.netflix.conductor.common.metadata.events.EventExecution;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.utils.RetryUtil;
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.core.exception.ApplicationException.Code;
import com.netflix.conductor.dao.ConcurrentExecutionLimitDAO;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.PreparedStatement;
Expand All @@ -56,7 +56,6 @@
import static com.netflix.conductor.cassandra.util.Constants.TOTAL_PARTITIONS_KEY;
import static com.netflix.conductor.cassandra.util.Constants.TOTAL_TASKS_KEY;
import static com.netflix.conductor.cassandra.util.Constants.WORKFLOW_ID_KEY;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.IN_PROGRESS;

@Trace
public class CassandraExecutionDAO extends CassandraBaseDAO
Expand Down Expand Up @@ -172,11 +171,11 @@ public CassandraExecutionDAO(
}

@Override
public List<Task> getPendingTasksByWorkflow(String taskName, String workflowId) {
List<Task> tasks = getTasksForWorkflow(workflowId);
public List<TaskModel> getPendingTasksByWorkflow(String taskName, String workflowId) {
List<TaskModel> tasks = getTasksForWorkflow(workflowId);
return tasks.stream()
.filter(task -> taskName.equals(task.getTaskType()))
.filter(task -> IN_PROGRESS.equals(task.getStatus()))
.filter(task -> TaskModel.Status.IN_PROGRESS.equals(task.getStatus()))
.collect(Collectors.toList());
}

Expand All @@ -185,7 +184,7 @@ public List<Task> getPendingTasksByWorkflow(String taskName, String workflowId)
* Conductor
*/
@Override
public List<Task> getTasks(String taskType, String startKey, int count) {
public List<TaskModel> getTasks(String taskType, String startKey, int count) {
throw new UnsupportedOperationException(
"This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
}
Expand All @@ -198,7 +197,7 @@ public List<Task> getTasks(String taskType, String startKey, int count) {
* @param tasks tasks to be created
*/
@Override
public List<Task> createTasks(List<Task> tasks) {
public List<TaskModel> createTasks(List<TaskModel> tasks) {
validateTasks(tasks);
String workflowId = tasks.get(0).getWorkflowInstanceId();
try {
Expand Down Expand Up @@ -259,7 +258,7 @@ public List<Task> createTasks(List<Task> tasks) {
}

@Override
public void updateTask(Task task) {
public void updateTask(TaskModel task) {
try {
// TODO: calculate the shard number the task belongs to
String taskPayload = toJson(task);
Expand All @@ -276,7 +275,7 @@ public void updateTask(Task task) {
&& task.getTaskDefinition().get().concurrencyLimit() > 0) {
if (task.getStatus().isTerminal()) {
removeTaskFromLimit(task);
} else if (task.getStatus() == IN_PROGRESS) {
} else if (task.getStatus() == TaskModel.Status.IN_PROGRESS) {
addTaskToLimit(task);
}
}
Expand All @@ -296,7 +295,7 @@ public void updateTask(Task task) {
* Conductor
*/
@Override
public boolean exceedsLimit(Task task) {
public boolean exceedsLimit(TaskModel task) {
Optional<TaskDef> taskDefinition = task.getTaskDefinition();
if (taskDefinition.isEmpty()) {
return false;
Expand Down Expand Up @@ -342,7 +341,7 @@ public boolean exceedsLimit(Task task) {

@Override
public boolean removeTask(String taskId) {
Task task = getTask(taskId);
TaskModel task = getTask(taskId);
if (task == null) {
LOGGER.warn("No such task found by id {}", taskId);
return false;
Expand All @@ -351,7 +350,7 @@ public boolean removeTask(String taskId) {
}

@Override
public Task getTask(String taskId) {
public TaskModel getTask(String taskId) {
try {
String workflowId = lookupWorkflowIdFromTaskId(taskId);
if (workflowId == null) {
Expand All @@ -366,7 +365,8 @@ public Task getTask(String taskId) {
return Optional.ofNullable(resultSet.one())
.map(
row -> {
Task task = readValue(row.getString(PAYLOAD_KEY), Task.class);
TaskModel task =
readValue(row.getString(PAYLOAD_KEY), TaskModel.class);
recordCassandraDaoRequests(
"getTask", task.getTaskType(), task.getWorkflowType());
recordCassandraDaoPayloadSize(
Expand All @@ -388,7 +388,7 @@ public Task getTask(String taskId) {
}

@Override
public List<Task> getTasks(List<String> taskIds) {
public List<TaskModel> getTasks(List<String> taskIds) {
Preconditions.checkNotNull(taskIds);
Preconditions.checkArgument(taskIds.size() > 0, "Task ids list cannot be empty");
String workflowId = lookupWorkflowIdFromTaskId(taskIds.get(0));
Expand All @@ -405,20 +405,20 @@ public List<Task> getTasks(List<String> taskIds) {
* Conductor
*/
@Override
public List<Task> getPendingTasksForTaskType(String taskType) {
public List<TaskModel> getPendingTasksForTaskType(String taskType) {
throw new UnsupportedOperationException(
"This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
}

@Override
public List<Task> getTasksForWorkflow(String workflowId) {
public List<TaskModel> getTasksForWorkflow(String workflowId) {
return getWorkflow(workflowId, true).getTasks();
}

@Override
public String createWorkflow(Workflow workflow) {
public String createWorkflow(WorkflowModel workflow) {
try {
List<Task> tasks = workflow.getTasks();
List<TaskModel> tasks = workflow.getTasks();
workflow.setTasks(new LinkedList<>());
String payload = toJson(workflow);

Expand All @@ -441,9 +441,9 @@ public String createWorkflow(Workflow workflow) {
}

@Override
public String updateWorkflow(Workflow workflow) {
public String updateWorkflow(WorkflowModel workflow) {
try {
List<Task> tasks = workflow.getTasks();
List<TaskModel> tasks = workflow.getTasks();
workflow.setTasks(new LinkedList<>());
String payload = toJson(workflow);
recordCassandraDaoRequests("updateWorkflow", "n/a", workflow.getWorkflowName());
Expand All @@ -465,7 +465,7 @@ public String updateWorkflow(Workflow workflow) {

@Override
public boolean removeWorkflow(String workflowId) {
Workflow workflow = getWorkflow(workflowId, true);
WorkflowModel workflow = getWorkflow(workflowId, true);
boolean removed = false;
// TODO: calculate number of shards and iterate
if (workflow != null) {
Expand Down Expand Up @@ -508,21 +508,21 @@ public void removeFromPendingWorkflow(String workflowType, String workflowId) {
}

@Override
public Workflow getWorkflow(String workflowId) {
public WorkflowModel getWorkflow(String workflowId) {
return getWorkflow(workflowId, true);
}

@Override
public Workflow getWorkflow(String workflowId, boolean includeTasks) {
Workflow workflow = null;
public WorkflowModel getWorkflow(String workflowId, boolean includeTasks) {
WorkflowModel workflow = null;
try {
ResultSet resultSet;
if (includeTasks) {
resultSet =
session.execute(
selectWorkflowWithTasksStatement.bind(
UUID.fromString(workflowId), DEFAULT_SHARD_ID));
List<Task> tasks = new ArrayList<>();
List<TaskModel> tasks = new ArrayList<>();

List<Row> rows = resultSet.all();
if (rows.size() == 0) {
Expand All @@ -532,9 +532,9 @@ public Workflow getWorkflow(String workflowId, boolean includeTasks) {
for (Row row : rows) {
String entityKey = row.getString(ENTITY_KEY);
if (ENTITY_TYPE_WORKFLOW.equals(entityKey)) {
workflow = readValue(row.getString(PAYLOAD_KEY), Workflow.class);
workflow = readValue(row.getString(PAYLOAD_KEY), WorkflowModel.class);
} else if (ENTITY_TYPE_TASK.equals(entityKey)) {
Task task = readValue(row.getString(PAYLOAD_KEY), Task.class);
TaskModel task = readValue(row.getString(PAYLOAD_KEY), TaskModel.class);
tasks.add(task);
} else {
throw new ApplicationException(
Expand All @@ -547,7 +547,7 @@ public Workflow getWorkflow(String workflowId, boolean includeTasks) {

if (workflow != null) {
recordCassandraDaoRequests("getWorkflow", "n/a", workflow.getWorkflowName());
tasks.sort(Comparator.comparingInt(Task::getSeq));
tasks.sort(Comparator.comparingInt(TaskModel::getSeq));
workflow.setTasks(tasks);
}
} else {
Expand All @@ -557,10 +557,10 @@ public Workflow getWorkflow(String workflowId, boolean includeTasks) {
Optional.ofNullable(resultSet.one())
.map(
row -> {
Workflow wf =
WorkflowModel wf =
readValue(
row.getString(PAYLOAD_KEY),
Workflow.class);
WorkflowModel.class);
recordCassandraDaoRequests(
"getWorkflow", "n/a", wf.getWorkflowName());
return wf;
Expand Down Expand Up @@ -598,7 +598,7 @@ public List<String> getRunningWorkflowIds(String workflowName, int version) {
* Conductor
*/
@Override
public List<Workflow> getPendingWorkflowsByType(String workflowName, int version) {
public List<WorkflowModel> getPendingWorkflowsByType(String workflowName, int version) {
throw new UnsupportedOperationException(
"This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
}
Expand Down Expand Up @@ -628,7 +628,8 @@ public long getInProgressTaskCount(String taskDefName) {
* Conductor
*/
@Override
public List<Workflow> getWorkflowsByType(String workflowName, Long startTime, Long endTime) {
public List<WorkflowModel> getWorkflowsByType(
String workflowName, Long startTime, Long endTime) {
throw new UnsupportedOperationException(
"This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
}
Expand All @@ -638,7 +639,7 @@ public List<Workflow> getWorkflowsByType(String workflowName, Long startTime, Lo
* Conductor
*/
@Override
public List<Workflow> getWorkflowsByCorrelationId(
public List<WorkflowModel> getWorkflowsByCorrelationId(
String workflowName, String correlationId, boolean includeTasks) {
throw new UnsupportedOperationException(
"This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
Expand Down Expand Up @@ -741,7 +742,7 @@ List<EventExecution> getEventExecutions(
}

@Override
public void addTaskToLimit(Task task) {
public void addTaskToLimit(TaskModel task) {
try {
recordCassandraDaoRequests(
"addTaskToLimit", task.getTaskType(), task.getWorkflowType());
Expand Down Expand Up @@ -770,7 +771,7 @@ public void addTaskToLimit(Task task) {
}

@Override
public void removeTaskFromLimit(Task task) {
public void removeTaskFromLimit(TaskModel task) {
try {
recordCassandraDaoRequests(
"removeTaskFromLimit", task.getTaskType(), task.getWorkflowType());
Expand All @@ -797,7 +798,7 @@ public void removeTaskFromLimit(Task task) {
}
}

private boolean removeTask(Task task) {
private boolean removeTask(TaskModel task) {
// TODO: calculate shard number based on seq and maxTasksPerShard
try {
// get total tasks for this workflow
Expand Down Expand Up @@ -834,7 +835,7 @@ private boolean removeTask(Task task) {
}
}

private void removeTaskLookup(Task task) {
private void removeTaskLookup(TaskModel task) {
try {
recordCassandraDaoRequests(
"removeTaskLookup", task.getTaskType(), task.getWorkflowType());
Expand All @@ -854,7 +855,7 @@ private void removeTaskLookup(Task task) {
}

@VisibleForTesting
void validateTasks(List<Task> tasks) {
void validateTasks(List<TaskModel> tasks) {
Preconditions.checkNotNull(tasks, "Tasks object cannot be null");
Preconditions.checkArgument(!tasks.isEmpty(), "Tasks object cannot be empty");
tasks.forEach(
Expand All @@ -868,7 +869,7 @@ void validateTasks(List<Task> tasks) {
});

String workflowId = tasks.get(0).getWorkflowInstanceId();
Optional<Task> optionalTask =
Optional<TaskModel> optionalTask =
tasks.stream()
.filter(task -> !workflowId.equals(task.getWorkflowInstanceId()))
.findAny();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Netflix, Inc.
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Netflix, Inc.
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Netflix, Inc.
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down
Loading